Amazon are promoting boto3 for future development but do not provide enough documentation for the new boto3.
Does anybody have any example code of using SWF with boto3 that they would care to share?
Amazon SWF is a fully managed workflow service for building scalable, resilient applications. Amazon SWF provides simple API calls that can be executed from code written in any language and run on your EC2 instances, or any of your machines located anywhere in the world that can access the Internet.
Boto3 is a ground-up rewrite of Boto. It uses a data-driven approach to generate classes at runtime from JSON description files that are shared between SDKs in various languages. This includes descriptions for a high level, object oriented interface similar to those available in previous versions of Boto.
Boto3 makes it easy to integrate your Python application, library, or script with AWS services including Amazon S3, Amazon EC2, Amazon DynamoDB, and more.
Each workflow execution can run for a maximum of 1 year. Each workflow execution history can grow up to 25,000 events. If your use case requires you to go beyond these limits, you can use features Amazon SWF provides to continue executions and structure your applications using child workflow executions.
This is the only example I have found so far:
https://github.com/jhludwig/aws-swf-boto3
So the process overview looks like this (note this is pulled directly from the link above, but with some additional notes added and more of a flow).
It should be noted, SWF operates on the names of things. It's up to your code to give those names an execution meaning. For example, your Decider
will poll and using the Task names decide what's next.
Some things I am not exactly certain on. TASKLIST
references I believe are a kind of namespacing. It's not really a list of things, it's more about isolating things by name. Now I could be totally wrong about that, from my basic understanding, that's what I think it's saying.
You can run your Decider and Workers from ANYWHERE. Since they reach out and up to AWS, if your firewall allows 0.0.0.0/0 egress you will have access.
The AWS Docs also mention you can run a lambda, but I have not found out how to trigger that.
import boto3
from botocore.exceptions import ClientError
swf = boto3.client('swf')
try:
swf.register_domain(
name=<DOMAIN>,
description="Test SWF domain",
workflowExecutionRetentionPeriodInDays="10" # keep history for this long
)
except ClientError as e:
print "Domain already exists: ", e.response.get("Error", {}).get("Code")
With the domain created we now register the workflow:
try:
swf.register_workflow_type(
domain=DOMAIN, # string
name=WORKFLOW, # string
version=VERSION, # string
description="Test workflow",
defaultExecutionStartToCloseTimeout="250",
defaultTaskStartToCloseTimeout="NONE",
defaultChildPolicy="TERMINATE",
defaultTaskList={"name": TASKLIST } # TASKLIST is a string
)
print "Test workflow created!"
except ClientError as e:
print "Workflow already exists: ", e.response.get("Error", {}).get("Code")
With our Workflow registered, we can now begin assign tasks.
You can assign N tasks. Remember, these are mainly strings, your code will give them execution meaning.
try:
swf.register_activity_type(
domain=DOMAIN,
name="DoSomething",
version=VERSION, # string
description="This is a worker that does something",
defaultTaskStartToCloseTimeout="NONE",
defaultTaskList={"name": TASKLIST } # TASKLIST is a string
)
print "Worker created!"
except ClientError as e:
print "Activity already exists: ", e.response.get("Error", {}).get("Code")
With our Domain, Workflow, and Task created, we can now begin a workflow.
import boto3
swf = boto3.client('swf')
response = swf.start_workflow_execution(
domain=DOMAIN # string,
workflowId='test-1001',
workflowType={
"name": WORKFLOW,# string
"version": VERSION # string
},
taskList={
'name': TASKLIST
},
input=''
)
print "Workflow requested: ", response
Note the workflowId
, this is a custom identifier, for example str(uuid.uuid4())
. From the docs:
The user defined identifier associated with the workflow execution. You can use this to associate a custom identifier with the workflow execution. You may specify the same identifier if a workflow execution is logically a restart of a previous execution. You cannot have two open workflow executions with the same workflowId at the same time.
http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.start_workflow_execution
At this point, nothing will happen because we don't have a Decider
running nor any Workers
. Lets see what those look like.
Our decider will poll to get a decide task to make a decision about:
import boto3
from botocore.client import Config
import uuid
botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)
Note the timeout settings above. You can reference this PR to see the rationale behind it:
https://github.com/boto/botocore/pull/634
From the Boto3 SWF docs:
Workers should set their client side socket timeout to at least 70 seconds (10 seconds higher than the maximum time service may hold the poll request).
That PR is what enabled boto3 to do that functionality.
http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.poll_for_decision_task
print "Listening for Decision Tasks"
while True:
newTask = swf.poll_for_decision_task(
domain=DOMAIN ,
taskList={'name': TASKLIST }, # TASKLIST is a string
identity='decider-1', # any identity you would like to provide, it's recorded in the history
reverseOrder=False)
if 'taskToken' not in newTask:
print "Poll timed out, no new task. Repoll"
elif 'events' in newTask:
eventHistory = [evt for evt in newTask['events'] if not evt['eventType'].startswith('Decision')]
lastEvent = eventHistory[-1]
if lastEvent['eventType'] == 'WorkflowExecutionStarted':
print "Dispatching task to worker", newTask['workflowExecution'], newTask['workflowType']
swf.respond_decision_task_completed(
taskToken=newTask['taskToken'],
decisions=[
{
'decisionType': 'ScheduleActivityTask',
'scheduleActivityTaskDecisionAttributes': {
'activityType':{
'name': TASKNAME, # string
'version': VERSION # string
},
'activityId': 'activityid-' + str(uuid.uuid4()),
'input': '',
'scheduleToCloseTimeout': 'NONE',
'scheduleToStartTimeout': 'NONE',
'startToCloseTimeout': 'NONE',
'heartbeatTimeout': 'NONE',
'taskList': {'name': TASKLIST}, # TASKLIST is a string
}
}
]
)
print "Task Dispatched:", newTask['taskToken']
elif lastEvent['eventType'] == 'ActivityTaskCompleted':
swf.respond_decision_task_completed(
taskToken=newTask['taskToken'],
decisions=[
{
'decisionType': 'CompleteWorkflowExecution',
'completeWorkflowExecutionDecisionAttributes': {
'result': 'success'
}
}
]
)
print "Task Completed!"
Note that at the end of this snippet, we check if we have ActivityTaskCompleted
and we respond with the decision CompleteWorkflowExecution
to let SWF know we are done.
That's out decider, what's the worker look like?
http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.poll_for_activity_task
Note again, we set the read_timeout
import boto3
from botocore.client import Config
botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)
Now we start our worker polling:
print "Listening for Worker Tasks"
while True:
task = swf.poll_for_activity_task(
domain=DOMAIN,# string
taskList={'name': TASKLIST}, # TASKLIST is a string
identity='worker-1') # identity is for our history
if 'taskToken' not in task:
print "Poll timed out, no new task. Repoll"
else:
print "New task arrived"
swf.respond_activity_task_completed(
taskToken=task['taskToken'],
result='success'
)
print "Task Done"
Again we signal SWF that we have completed our work.
The link to the official documentation is [here][1].
There are a lot of code samples out there just follow the link or [this][2] one. Under the available service section, it has enlisted all the services that the boto3 now supports along with detail example.
Some of the examples are: boto3 and getting the execution count of SWF
import boto3
import datetime
import time
import dateutil.tz
def lambda_handler(event,context):
swfClient = boto3.client('swf')
currentTimeZone = dateutil.tz.gettz('Australia/Brisbane')
latestDate = datetime.datetime.now(tz=currentTimeZone)
oldestDate = latestDate - datetime.timedelta(1)
fullTextPreloadResponse = swfClient.count_open_workflow_executions(
domain=domainName,
startTimeFilter={
'oldestDate': oldestDate,
'latestDate': latestDate
},
typeFilter={
'name': 'NAME_OF_YOUR_SWF_WORKFLOW_NAME',
'version': 'VERSION_NUMBER'
}
)
print("the count is " + str(fullTextResponse['count']))
print(fullTextResponse)
This is what I have used in my case to get the count of the running SWF Workflow type. The format that I have used is well defined in the documentation mentioned above.
To simply use boto3 & SWF together, it starts with importing boto3 in the python lambda function. Then python DateTime is being added. Then a boto3.client sets the client from where we can use | interact with SWF.
Other examples would be:
history = swf.get_workflow_execution_history(
domain= domainName,
execution={
'workflowId': workflowId,
'runId': runId
},
)
Hope this one helps you! [1]: https://boto3.amazonaws.com/v1/documentation/api/latest/index.html [2]: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/index.html
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With