I need to get a work-item from a work-queue and then sequentially run a series of containers to process each work-item. This can be done using initContainers (https://stackoverflow.com/a/46880653/94078)
What would be the recommended way of restarting the process to get the next work-item?
Thanks!
Jobs should be used for working with work queues. When using work queues you should not set the .spec.comletions
(or set it to null
). In that case Pods will keep getting created until one of the Pods exit successfully. It is a little awkward exiting from the (main) container with a failure state on purpose, but this is the specification. You may set .spec.parallelism
to your liking irrespective of this setting; I've set it to 1
as it appears you do not want any parallelism.
In your question you did not specify what you want to do if the work queue gets empty, so I will give two solutions, one if you want to wait for new items (infinite) and one if want to end the job if the work queue gets empty (finite, but indefinite number of items).
Both examples use redis, but you can apply this pattern to your favorite queue. Note that the part that pops an item from the queue is not safe; if your Pod dies for some reason after having popped an item, that item will remain unprocessed or not fully processed. See the reliable-queue pattern for a proper solution.
To implement the sequential steps on each work item I've used init containers. Note that this really is a primitve solution, but you have limited options if you don't want to use some framework to implement a proper pipeline.
There is an asciinema if any would like to see this at work without deploying redis, etc.
To test this you'll need to create, at a minimum, a redis Pod and a Service. I am using the example from fine parallel processing work queue. You can deploy that with:
kubectl apply -f https://rawgit.com/kubernetes/website/master/docs/tasks/job/fine-parallel-processing-work-queue/redis-pod.yaml
kubectl apply -f https://rawgit.com/kubernetes/website/master/docs/tasks/job/fine-parallel-processing-work-queue/redis-service.yaml
The rest of this solution expects that you have a service name redis
in the same namespace as your Job and it does not require authentication and a Pod called redis-master
.
To insert some items in the work queue use this command (you will need bash for this to work):
echo -ne "rpush job "{1..10}"\n" | kubectl exec -it redis-master -- redis-cli
This version waits if the queue is empty thus it will never complete.
apiVersion: batch/v1
kind: Job
metadata:
name: primitive-pipeline-infinite
spec:
parallelism: 1
completions: null
template:
metadata:
name: primitive-pipeline-infinite
spec:
volumes: [{name: shared, emptyDir: {}}]
initContainers:
- name: pop-from-queue-unsafe
image: redis
command: ["sh","-c","redis-cli -h redis blpop job 0 >/shared/item.txt"]
volumeMounts: [{name: shared, mountPath: /shared}]
- name: step-1
image: busybox
command: ["sh","-c","echo step-1 working on `cat /shared/item.txt` ...; sleep 5"]
volumeMounts: [{name: shared, mountPath: /shared}]
- name: step-2
image: busybox
command: ["sh","-c","echo step-2 working on `cat /shared/item.txt` ...; sleep 5"]
volumeMounts: [{name: shared, mountPath: /shared}]
- name: step-3
image: busybox
command: ["sh","-c","echo step-3 working on `cat /shared/item.txt` ...; sleep 5"]
volumeMounts: [{name: shared, mountPath: /shared}]
containers:
- name: done
image: busybox
command: ["sh","-c","echo all done with `cat /shared/item.txt`; sleep 1; exit 1"]
volumeMounts: [{name: shared, mountPath: /shared}]
restartPolicy: Never
This version stops the job if the queue is empty. Note the trick that the pop init container checks if the queue is empty and all the subsequent init containers and the main container immediately exit if it is indeed empty - this is the mechanism that signals Kubernetes that the Job is completed and there is no need to create new Pods for it.
apiVersion: batch/v1
kind: Job
metadata:
name: primitive-pipeline-finite
spec:
parallelism: 1
completions: null
template:
metadata:
name: primitive-pipeline-finite
spec:
volumes: [{name: shared, emptyDir: {}}]
initContainers:
- name: pop-from-queue-unsafe
image: redis
command: ["sh","-c","redis-cli -h redis lpop job >/shared/item.txt; grep -q . /shared/item.txt || :>/shared/done.txt"]
volumeMounts: [{name: shared, mountPath: /shared}]
- name: step-1
image: busybox
command: ["sh","-c","[ -f /shared/done.txt ] && exit 0; echo step-1 working on `cat /shared/item.txt` ...; sleep 5"]
volumeMounts: [{name: shared, mountPath: /shared}]
- name: step-2
image: busybox
command: ["sh","-c","[ -f /shared/done.txt ] && exit 0; echo step-2 working on `cat /shared/item.txt` ...; sleep 5"]
volumeMounts: [{name: shared, mountPath: /shared}]
- name: step-3
image: busybox
command: ["sh","-c","[ -f /shared/done.txt ] && exit 0; echo step-3 working on `cat /shared/item.txt` ...; sleep 5"]
volumeMounts: [{name: shared, mountPath: /shared}]
containers:
- name: done
image: busybox
command: ["sh","-c","[ -f /shared/done.txt ] && exit 0; echo all done with `cat /shared/item.txt`; sleep 1; exit 1"]
volumeMounts: [{name: shared, mountPath: /shared}]
restartPolicy: Never
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With