Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

boto3 and SWF example needed

Amazon are promoting boto3 for future development but do not provide enough documentation for the new boto3.

Does anybody have any example code of using SWF with boto3 that they would care to share?

like image 763
user3066737 Avatar asked Sep 22 '15 16:09

user3066737


People also ask

What is the use of SWF in AWS?

Amazon SWF is a fully managed workflow service for building scalable, resilient applications. Amazon SWF provides simple API calls that can be executed from code written in any language and run on your EC2 instances, or any of your machines located anywhere in the world that can access the Internet.

What is the difference between Boto and Boto3?

Boto3 is a ground-up rewrite of Boto. It uses a data-driven approach to generate classes at runtime from JSON description files that are shared between SDKs in various languages. This includes descriptions for a high level, object oriented interface similar to those available in previous versions of Boto.

What is the use of Boto3 library?

Boto3 makes it easy to integrate your Python application, library, or script with AWS services including Amazon S3, Amazon EC2, Amazon DynamoDB, and more.

How long can an SWF workflow execution last?

Each workflow execution can run for a maximum of 1 year. Each workflow execution history can grow up to 25,000 events. If your use case requires you to go beyond these limits, you can use features Amazon SWF provides to continue executions and structure your applications using child workflow executions.


2 Answers

This is the only example I have found so far:

https://github.com/jhludwig/aws-swf-boto3

So the process overview looks like this (note this is pulled directly from the link above, but with some additional notes added and more of a flow).

It should be noted, SWF operates on the names of things. It's up to your code to give those names an execution meaning. For example, your Decider will poll and using the Task names decide what's next.

Some things I am not exactly certain on. TASKLIST references I believe are a kind of namespacing. It's not really a list of things, it's more about isolating things by name. Now I could be totally wrong about that, from my basic understanding, that's what I think it's saying.

You can run your Decider and Workers from ANYWHERE. Since they reach out and up to AWS, if your firewall allows 0.0.0.0/0 egress you will have access.

The AWS Docs also mention you can run a lambda, but I have not found out how to trigger that.

Create the boto3 swf client:

import boto3
from botocore.exceptions import ClientError

swf = boto3.client('swf')

Create a domain

try:
  swf.register_domain(
    name=<DOMAIN>,
    description="Test SWF domain",
    workflowExecutionRetentionPeriodInDays="10" # keep history for this long
  )
except ClientError as e:
    print "Domain already exists: ", e.response.get("Error", {}).get("Code")

With the domain created we now register the workflow:

Register Workflow

try:
  swf.register_workflow_type(
    domain=DOMAIN, # string
    name=WORKFLOW, # string
    version=VERSION, # string
    description="Test workflow",
    defaultExecutionStartToCloseTimeout="250",
    defaultTaskStartToCloseTimeout="NONE",
    defaultChildPolicy="TERMINATE",
    defaultTaskList={"name": TASKLIST } # TASKLIST is a string
  )
  print "Test workflow created!"
except ClientError as e:
  print "Workflow already exists: ", e.response.get("Error", {}).get("Code")

With our Workflow registered, we can now begin assign tasks.

Assign Tasks to the Workflow.

You can assign N tasks. Remember, these are mainly strings, your code will give them execution meaning.

try:
  swf.register_activity_type(
    domain=DOMAIN,
    name="DoSomething",
    version=VERSION, # string
    description="This is a worker that does something",
    defaultTaskStartToCloseTimeout="NONE",
    defaultTaskList={"name": TASKLIST } # TASKLIST is a string
  )
  print "Worker created!"
except ClientError as e:
  print "Activity already exists: ", e.response.get("Error", {}).get("Code")

Send Start the Workflow

With our Domain, Workflow, and Task created, we can now begin a workflow.

import boto3

swf = boto3.client('swf')

response = swf.start_workflow_execution(
  domain=DOMAIN # string,
  workflowId='test-1001',
  workflowType={
    "name": WORKFLOW,# string
    "version": VERSION # string
  },
  taskList={
      'name': TASKLIST
  },
  input=''
)

print "Workflow requested: ", response

Note the workflowId, this is a custom identifier, for example str(uuid.uuid4()). From the docs:

The user defined identifier associated with the workflow execution. You can use this to associate a custom identifier with the workflow execution. You may specify the same identifier if a workflow execution is logically a restart of a previous execution. You cannot have two open workflow executions with the same workflowId at the same time.

http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.start_workflow_execution

At this point, nothing will happen because we don't have a Decider running nor any Workers. Lets see what those look like.

Decider

Our decider will poll to get a decide task to make a decision about:

import boto3
from botocore.client import Config
import uuid

botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)

Note the timeout settings above. You can reference this PR to see the rationale behind it:

https://github.com/boto/botocore/pull/634

From the Boto3 SWF docs:

Workers should set their client side socket timeout to at least 70 seconds (10 seconds higher than the maximum time service may hold the poll request).

That PR is what enabled boto3 to do that functionality.

http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.poll_for_decision_task

print "Listening for Decision Tasks"

while True:

  newTask = swf.poll_for_decision_task(
    domain=DOMAIN ,
    taskList={'name': TASKLIST }, # TASKLIST is a string
    identity='decider-1', # any identity you would like to provide, it's recorded in the history
    reverseOrder=False)

  if 'taskToken' not in newTask:
    print "Poll timed out, no new task.  Repoll"

  elif 'events' in newTask:

    eventHistory = [evt for evt in newTask['events'] if not evt['eventType'].startswith('Decision')]
    lastEvent = eventHistory[-1]

    if lastEvent['eventType'] == 'WorkflowExecutionStarted':
      print "Dispatching task to worker", newTask['workflowExecution'], newTask['workflowType']
      swf.respond_decision_task_completed(
        taskToken=newTask['taskToken'],
        decisions=[
          {
            'decisionType': 'ScheduleActivityTask',
            'scheduleActivityTaskDecisionAttributes': {
                'activityType':{
                    'name': TASKNAME, # string
                    'version': VERSION # string
                    },
                'activityId': 'activityid-' + str(uuid.uuid4()),
                'input': '',
                'scheduleToCloseTimeout': 'NONE',
                'scheduleToStartTimeout': 'NONE',
                'startToCloseTimeout': 'NONE',
                'heartbeatTimeout': 'NONE',
                'taskList': {'name': TASKLIST}, # TASKLIST is a string
            }
          }
        ]
      )
      print "Task Dispatched:", newTask['taskToken']

    elif lastEvent['eventType'] == 'ActivityTaskCompleted':
      swf.respond_decision_task_completed(
        taskToken=newTask['taskToken'],
        decisions=[
          {
            'decisionType': 'CompleteWorkflowExecution',
            'completeWorkflowExecutionDecisionAttributes': {
              'result': 'success'
            }
          }
        ]
      )
      print "Task Completed!"

Note that at the end of this snippet, we check if we have ActivityTaskCompleted and we respond with the decision CompleteWorkflowExecution to let SWF know we are done.

That's out decider, what's the worker look like?

Worker

http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.poll_for_activity_task

Note again, we set the read_timeout

import boto3
from botocore.client import Config

botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)

Now we start our worker polling:

print "Listening for Worker Tasks"

while True:

  task = swf.poll_for_activity_task(
    domain=DOMAIN,# string
    taskList={'name': TASKLIST}, # TASKLIST is a string
    identity='worker-1') # identity is for our history

  if 'taskToken' not in task:
    print "Poll timed out, no new task.  Repoll"

  else:
    print "New task arrived"

    swf.respond_activity_task_completed(
        taskToken=task['taskToken'],
        result='success'
    )

    print "Task Done"

Again we signal SWF that we have completed our work.

like image 187
AJ Venturella Avatar answered Oct 27 '22 01:10

AJ Venturella


The link to the official documentation is [here][1].

There are a lot of code samples out there just follow the link or [this][2] one. Under the available service section, it has enlisted all the services that the boto3 now supports along with detail example.

Some of the examples are: boto3 and getting the execution count of SWF

import boto3
import datetime
import time
import dateutil.tz

def lambda_handler(event,context):
    swfClient = boto3.client('swf')
    currentTimeZone = dateutil.tz.gettz('Australia/Brisbane')
    latestDate = datetime.datetime.now(tz=currentTimeZone)
    oldestDate = latestDate - datetime.timedelta(1)

    fullTextPreloadResponse = swfClient.count_open_workflow_executions(
         domain=domainName,
         startTimeFilter={
             'oldestDate': oldestDate,
             'latestDate': latestDate
         },
         typeFilter={
             'name': 'NAME_OF_YOUR_SWF_WORKFLOW_NAME',
             'version': 'VERSION_NUMBER'
         }
     )
     print("the count is " + str(fullTextResponse['count']))
     print(fullTextResponse)

This is what I have used in my case to get the count of the running SWF Workflow type. The format that I have used is well defined in the documentation mentioned above.

To simply use boto3 & SWF together, it starts with importing boto3 in the python lambda function. Then python DateTime is being added. Then a boto3.client sets the client from where we can use | interact with SWF.

Other examples would be:

history = swf.get_workflow_execution_history(
            domain= domainName,
            execution={
                'workflowId': workflowId,
                'runId': runId
            },
        )

Hope this one helps you! [1]: https://boto3.amazonaws.com/v1/documentation/api/latest/index.html [2]: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/index.html

like image 28
Sajjan Mishra Avatar answered Oct 27 '22 01:10

Sajjan Mishra