Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Producer-Consumer Queue in AngularJS

I know python and databases since several years ago.

But I want to improve my limited JavaScript knowledge. For my toy project I want to use an asynchronous queue in the web browser and use AngularJS for this.

In python there is a nice class called multiprocessing.Queue which I used in the past.

Now I search something like this, but in AngularJS

async-queue-js

  • Step 1: The in-queue pulls work-items (pink circles). Just a view json bytes.

  • Step 2: The User processes the data.

  • Step 3: The out-queue cares for sending the result to the server.

Why this "complicated" setup? Because I want the application to be as responsive as possible. The in-queue should pre-load some data and the out-queue should handle response communication.

An other benefit is, that with this setup the application can handle server or network outage for a period of some minutes.

The two way data binding of AngularJS which immediately updates data which the user has edited does not really fit to my question. Or I missed something. I am new to AngularJS.

The pink circles in the picture represent JSON data structures. I want to push each of them with one request to the browser.

Example:

The user sees a question, then he needs to fill out three fields. For example:

  • answer: Type text
  • like-this-question: integer from 1..5
  • difficulty: integer from 1..5

The data should be put into the queue after the used pressed "submit". He should get the next question immediately.

Question:

Is there already a producer-consumer Queue available for AngularJS? If not, how to implement it?

Update

Sending the data from the client could be implemented with plain AJAX. The in-queue which pre-fetches the data is the more complicated part. Although both could use the same implementation. It is important that the client gets the new data with super low latency. The in-queue should be filled with up to 5 items every time to avoid that the client waits for data.

In my case it does not matter if the browser gets closed and the items in the in-queue get lost. Filling the in-queue is read-only on the server part.

I am not fixed on AngularJS. I happy to change the framework if there are good reasons.

Preserving the in-queue between browser reloads could be done with localStorage (html5)

like image 892
guettli Avatar asked Nov 04 '15 18:11

guettli


People also ask

What is a producer consumer queue?

Producer-consumer queues are one of the most fundamental components in concurrent systems, they represent means to transfer data/messages/tasks/transactions between threads/stages/agents.

What is producer in angular?

Code responsible for producing data is called the producer or the data source, while code that consumes the data is called the consumer.

What is producer and consumer method?

Producer and Consumer are two separate processes. Both processes share a common buffer or queue. The producer continuously produces certain data and pushes it onto the buffer, whereas the consumer consumes those data from the buffer.

What is producer consumer design pattern?

Producer Consumer Design pattern is a classic concurrency pattern which reduces coupling between Producer and Consumer by separating Identification of work with Execution of Work.


2 Answers

Just rethinking, do you really need producer-consumer in your frontend? In your example, I think a simple pub-sub or $Q is good enough in this case.

example:

create a subscriber service, in your example, questionHandlerService which subscribe the event question-submitted, in your questionService when user submit the question, just publish the event question-submitted with the data, fire and forget. you don't need to wait for the response from the questionHandlerService.

Just keep in mind, there is only one main thread in javascript, if your method will block the ui, like loop through 1000 items in the array, synchronous process them, there is no help if you are put that in another "queue", because, it will must block the ui when executing unless you execute it in a web-worker. And what if user refresh the browser? your unprocessed request just lost.

firing an XHR call will not block the UI, it doesn't make sense to implement the producer-consumer in frontend, you can just validate the input fire the XHR, let backend to handle the heavy lifting, in your backend controller, you can use a queue to save the request and response immediatly. and process the queue from whatever thread or another process.

like image 123
Sean Avatar answered Oct 23 '22 07:10

Sean


Working Plunker - The Backend Service is used just for mocking the rest-interface; I fixed some bug such as error-limit. So, assume the Plunker as a last-version...

I haven't enough time to improve what following, but, you can assume it as a starter point...

By the way, I think that you need could be:

  1. A service that wraps $http.
  2. A method push that we will use when we'll need to register a task.
  3. A recursive private method _each that, step by step, reduces the registered queue.
  4. ...everything else that you think is important (getCurrentTask, removeTask, ecc.).

Usage:

angular
  .module('myApp', ['Queue'])
  .controller('MyAppCtrl', function($httpQueue, $scope) { 
    var vm = $scope;
    
    // using a route.resolve could be better!
    $httpQueue
      .pull()
      .then(function(tasks) { vm.tasks = tasks;  })
      .catch(function() { vm.tasks = [{ name: '', description: '' }]; })
    ;
  
    vm.onTaskEdited = function(event, task, form) {
      event.preventDefault();
      if(form.$invalid || form.$pristine ) { return; }
      
      
      return $httpQueue.push(task);
      
    };
  })
;
<article ng-app="myApp">
  <div ng-controller="MyAppCtrl">
    
    
    <form ng-repeat="task in tasks" name="taskForm" ng-submit="onTaskEdited($event, task, taskForm)">
      <input ng-model="task.name" placeholder="Task Name" />
      <textarea ng-model="task.description"></textarea>
    </form>
    
    
  </div>
</article>

Module Definition

(function(window, angular, APP) {
  'use strict';

  function $httpQueueFactory($q, $http) {
    var self = this;

    var api = '/api/v1/tasks';

    self.queue = [];
    var processing = false;


    //Assume it as a private method, never call it directly
    self._each = function() {
      var configs = { cache: false };
			
			
      return self
        .isQueueEmpty()
        .then(function(count) {
          processing = false;
          return count;
        })
        .catch(function() {
          if(processing) {
            return;
          }
        
          processing = true;
          var payload = self.queue.shift();
     
          var route = api;
          var task = 'post';
          if(payload.id) {
            task = 'put';
            route = api + '/' + payload.id;
          }
        
          return $http
            [task](route, payload, configs)
            .catch(function(error) {
              console.error('$httpQueue._each:error', error, payload);
              //because of the error we re-append this task to the queue;
              return self.push(payload);
            })
            .finally(function() {
              processing = false;
              return self._each();
            })
          ;
        })
      ;
    };


    self.isQueueEmpty = function() {
      var length = self.queue.length;
      var task = length > 0 ? 'reject' : 'when';
      
      return $q[task](length);
    };

    self.push = function(data) {
      self.queue.push(data);
      self._each();

      return self;
    };

    self.pull = function(params) {
      var configs = { cache: false };
      
      configs.params = angular.extend({}, params || {});

      return $http
        .get(api, configs)
        .then(function(result) {
          console.info('$httpQueue.pull:success', result);

          return result.data;
        })
        .catch(function(error) {
          console.error('$httpQueue.pull:error', error);
        
          return $q.reject(error);
        })
      ;
    };
  }



  APP
    .service('$httpQueue', ['$q', '$http', $httpQueueFactory])
  ;	

})(window, window.angular, window.angular.module('Queue', []));

Handle DataLayer changes

Handling changes on data-layer (what you called in-queue), instead, is a bit more difficult task because we need to keep in sync the last pull with the current pull...

By the way, if you are looking for a system that has a RealTime Notification you probably should have a look on a Socket Layer... I suggest Socket.io because is a well-tested, industry-approved, solution.

If you cannot implement a socket layer, another solution could be a long polling implementation, well described here.

For simplicity, in this post, we are going to implement a simple interval that updates the current task list... So, the previous example becames:

angular
  .module('myApp', ['Queue'])
  .controller('MyAppCtrl', function($httpQueue, $scope, $interval) { 
    var vm = $scope;
    var 
      pollingCount = 0, // infinite polling
      pollingDelay = 1000
    ;
    
    // using a route.resolve could be better!
    $httpQueue
      .pull()
      .then(function(tasks) { vm.tasks = tasks;  })
      .catch(function() { vm.tasks = [{ name: '', description: '' }]; })
      .finally(function() { return $interval(vm.updateViewModel.bind(vm), pollingDelay, pollingCount, true); })
    ;
  
    var isLastPullFinished = false;
    vm.updateViewModel = function() {
      if(!isLastPullFinished) { return; }
      
      return $http
        .pull()
        .then(function(tasks) {
          for(var i = 0, len = tasks.length; i < len; i++) {
            
            for(var j = 0, jLen = vm.tasks.length; j < jLen; j++) {
              if(tasks[i].id !== vm.tasks[j].id) { continue; }
              
              // todo: manage recursively merging, in angular 1.3+ there is a
              // merge method https://docs.angularjs.org/api/ng/function/angular.merge
              // todo: control if the task model is $dirty (if the user is editing it)
              angular.extend(vm.tasks[j], tasks[i]);
            }
            
          };
          
          return vm.tasks;
        })
        .finally(function() {
          isLastPullfinished = true;
        })
      ;
    };
  
    
    
    vm.onTaskEdited = function(event, task, form) {
      event.preventDefault();
      if(form.$invalid || form.$pristine ) { return; }
      
      
      return $httpQueue.push(task);
      
    };
  })
;

Hope it Helps; I don't tested it, so, could be there some bugs!

like image 22
Hitmands Avatar answered Oct 23 '22 08:10

Hitmands