Is there a way to assure FIFO (first in, first out) behavior with Task Queues on GAE?
GAE Documentation says that FIFO is one of the factors that affect task execution order, but the same documentation says that “the system's scheduling may 'jump' new tasks to the head of the queue” and I have confirmed this behavior with a test. The effect: my events are being processed out of order.
Docs says:
https://developers.google.com/appengine/docs/java/taskqueue/overview-push
The order in which tasks are executed depends on several factors:
The position of the task in the queue. App Engine attempts to process tasks based on FIFO > (first in, first out) order. In general, tasks are inserted into the end of a queue, and executed from the head of the queue.
The backlog of tasks in the queue. The system attempts to deliver the lowest latency possible for any given task via specially optimized notifications to the scheduler. Thus, in the case that a queue has a large backlog of tasks, the system's scheduling may "jump" new tasks to the head of the queue.
The value of the task's etaMillis property. This property specifies the earliest time that a task can execute. App Engine always waits until after the specified ETA to process push tasks.
The value of the task's countdownMillis property. This property specifies the minimum number of seconds to wait before executing a task. Countdown and eta are mutually exclusive; if you specify one, do not specify the other.
What I need to do? In my use case, I'll process 1-2 million events/day coming from vehicles. These events can be sent at any interval (1 sec, 1 minute or 1 hour). The order of the event processing has to be assured. I need process by timestamp order, which is generated on a embedded device inside the vehicle.
What I have now?
A Rest servlet that is called by the consumer and creates a Task (Event data is on payload).
After this, a worker servlet get this Task and:
Deserialize Event data;
Put Event on Datastore;
Update Vehicle On Datastore.
So, again, is there any way to assure just FIFO behavior? Or how can I improve this solution to get this?
What is Queue? A queue is defined as a linear data structure that is open at both ends and the operations are performed in First In First Out (FIFO) order.
The first in first out circular queue (FIFO) is also useful for data flow problems. It is a very common data structure used for I/O interfacing. The order preserving data structure temporarily saves data created by the source (producer) before it is processed by the sink (consumer).
Cloud Tasks lets you separate out pieces of work that can be performed independently, outside of your main application flow, and send them off to be processed, asynchronously, using handlers that you create. These independent pieces of work are called tasks.
Cloud Tasks is a fully managed service that allows you to manage the execution, dispatch and delivery of a large number of distributed tasks. You can asynchronously perform work outside of a user request. Your tasks can be executed on App Engine or any arbitrary HTTP endpoint.
You need to approach this with three separate steps:
Implement a Sharding Counter to generate a monotonically
increasing ID
. As much as I like to use the timestamp
from
Google's server to indicate task ordering, it appears that timestamps
between GAE servers might vary more than what your requirement is.
Add your tasks to a Pull Queue
instead of a Push Queue
. When
constructing your TaskOption
, add the ID
obtained from Step #1 as a tag.
After adding the task, store the ID
somewhere on your datastore.
Have your worker servlet lease Tasks
by a certain tag from the Pull Queue
.
Query the datastore to get the earliest ID that you need to fetch, and use the ID
as
the lease tag
. In this way, you can simulate FIFO behavior for your task queue.
After you finished your processing, delete the ID
from your datastore, and don't forget to delete the Task
from your Pull Queue
too. Also, I would recommend you run your task consumptions on the Backend.
UPDATE: As noted by Nick Johnson and mjaggard, sharding in step #1 doesn't seem to be viable to generate a monotonically increasing IDs, and other sources of IDs would then be needed. I seem to recall you were using timestamps generated by your vehicles, would it be possible to use this in lieu of a monotonically increasing ID?
Regardless of the way to generate the IDs, the basic idea is to use datastore's query mechanism to produce a FIFO ordering of Tasks
, and use task Tag
to pull specific task from the TaskQueue
.
There is a caveat, though. Due to the eventual consistency read policy on high-replication datastores, if you choose HRD as your datastore (and you should, the M/S is deprecated as of April 4th, 2012), there might be some stale data returned by the query on step #2.
I think the simple answer is "no", however partly in order to help improve the situation, I am using a pull queue - pulling 1000 tasks at a time and then sorting them. If timing isn't important, you could sort them and put them into the datastore and then complete a batch at a time. You've still got to work out what to do with the tasks at the beginning and ends of the batch - because they might be out of order with interleaving tasks in other batches.
Ok. This is how I've done it.
1) Rest servlet that is called from the consumer:
If Event sequence doesn't match Vehicle sequence (from datastore)
Creates a task on a "wait" queue to call me again
else
State validation
Creates a task on the "regular" queue (Event data is on payload).
2) A worker servlet gets the task from the "regular" queue, and so on... (same pseudo code)
This way I can pause the "regular" queue in order to do a data maintenance without losing events.
Thank you for your answers. My solution is a mix of them.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With