Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using Amazon SWF To communicate between servers

Use Amazon SWF to communicate messages between servers?

  1. On server A I want to run a script A
  2. When that is finished I want to send a message to server B to run a script B
  3. If it completes successfully I want it to clear the job from the workflow queue

I’m having a really hard time working out how I can use Boto and SWF in combination to do this. I am not after some complete code but what I am after is if anyone can explain a little more about what is involved.

  • How do I actually tell server B to check for the completion of script A?
  • How do I make sure server A wont pick up the completion of script A and try and run script B (since server B should run this)?
  • How do I actually notify SWF of script A completion? Is thee a flag, or a message, or what?

I’m pretty confused about all of this. What design should I use?

like image 828
Jimmy Avatar asked Feb 12 '13 09:02

Jimmy


People also ask

What is the main purpose of using Amazon SWF?

Amazon Simple Workflow Service (Amazon SWF) is a web service that makes it easy to coordinate work across distributed application components.

Is SWF synchronous or asynchronous?

The Amazon Simple Workflow Service (Amazon SWF) makes it easier to develop asynchronous and distributed applications by providing a programming model and infrastructure for coordinating distributed components and maintaining their execution state in a reliable way.

Does Amazon SWF copy or cache any of the data from external stores as part of the history?

Both the data passed directly and the pointers to other data stores are recorded in the workflow execution history; however, Amazon SWF doesn't copy or cache any of the data from external stores as part of the history.


2 Answers

I think you ask some very good questions which highlight how helpful SWF can be as a service. In short, you don't tell your servers to coordinate work between themselves. Your decider orchestrates all this for you, with the help of SWF service.

The implementation of your workflow will go as follows:

  1. Registering your workflow and its activities with the service (a one-off).
  2. Implement the decider and workers.
  3. Let your workers and deciders run.
  4. Start a new workflow.

There is a number of ways to feed credentials into boto.swf's code. For the purposes of this exercise, I recommend exporting them to environment before running the code below:

export AWS_ACCESS_KEY_ID=<your access key>
export AWS_SECRET_ACCESS_KEY=<your secret key>

1) To register the domain, workflow and activities execute the following:

# ab_setup.py
import boto.swf.layer2 as swf

DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'

swf.Domain(name=DOMAIN).register()
swf.ActivityType(domain=DOMAIN, name=ACTIVITY1, version=VERSION, task_list='a_tasks').register()
swf.ActivityType(domain=DOMAIN, name=ACTIVITY2, version=VERSION, task_list='b_tasks').register()
swf.WorkflowType(domain=DOMAIN, name='MyWorkflow', version=VERSION, task_list='default_tasks').register()

2) Implement and run deciders and workers.

# ab_decider.py
import time
import boto.swf.layer2 as swf

DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'

class ABDecider(swf.Decider):

    domain = DOMAIN
    task_list = 'default_tasks'
    version = VERSION

    def run(self):
        history = self.poll()
        # Print history to familiarize yourself with its format.
        print history
        if 'events' in history:
            # Get a list of non-decision events to see what event came in last.
            workflow_events = [e for e in history['events']
                               if not e['eventType'].startswith('Decision')]
            decisions = swf.Layer1Decisions()
            # Record latest non-decision event.
            last_event = workflow_events[-1]
            last_event_type = last_event['eventType']
            if last_event_type == 'WorkflowExecutionStarted':
                # At the start, get the worker to fetch the first assignment.
                decisions.schedule_activity_task('%s-%i' % (ACTIVITY1, time.time()),
                   ACTIVITY1, VERSION, task_list='a_tasks')
            elif last_event_type == 'ActivityTaskCompleted':
                # Take decision based on the name of activity that has just completed.
                # 1) Get activity's event id.
                last_event_attrs = last_event['activityTaskCompletedEventAttributes']
                completed_activity_id = last_event_attrs['scheduledEventId'] - 1
                # 2) Extract its name.
                activity_data = history['events'][completed_activity_id]
                activity_attrs = activity_data['activityTaskScheduledEventAttributes']
                activity_name = activity_attrs['activityType']['name']
                # 3) Optionally, get the result from the activity.
                result = last_event['activityTaskCompletedEventAttributes'].get('result')

                # Take the decision.
                if activity_name == ACTIVITY1:
                    # Completed ACTIVITY1 just came in. Kick off ACTIVITY2.
                    decisions.schedule_activity_task('%s-%i' % (ACTIVITY2, time.time()),
                        ACTIVITY2, VERSION, task_list='b_tasks', input=result)
                elif activity_name == ACTIVITY2:
                    # Server B completed activity. We're done.
                    decisions.complete_workflow_execution()

            self.complete(decisions=decisions)
            return True

The workers are much simpler, you don't need to use inheritance if you don't want to.

# ab_worker.py
import os
import time
import boto.swf.layer2 as swf

DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'

class MyBaseWorker(swf.ActivityWorker):

    domain = DOMAIN
    version = VERSION
    task_list = None

    def run(self):
        activity_task = self.poll()
        print activity_task
        if 'activityId' in activity_task:
            # Get input.
            # Get the method for the requested activity.
            try:
                self.activity(activity_task.get('input'))
            except Exception, error:
                self.fail(reason=str(error))
                raise error

            return True

    def activity(self, activity_input):
        raise NotImplementedError

class WorkerA(MyBaseWorker):
    task_list = 'a_tasks'

    def activity(self, activity_input):
        result = str(time.time())
        print 'worker a reporting time: %s' % result
        self.complete(result=result)

class WorkerB(MyBaseWorker):
    task_list = 'b_tasks'

    def activity(self, activity_input):
        result = str(os.getpid())
        print 'worker b returning pid: %s' % result
        self.complete(result=result)

3) Run your deciders and workers. Your decider and workers may be running from separate hosts, or from one and the same machine. Open four terminals and run your actors:

First your decider

$ python -i ab_decider.py 
>>> while ABDecider().run(): pass
... 

Then worker A, you could do this from server A:

$ python -i ab_workers.py 
>>> while WorkerA().run(): pass

Then worker B, possibly from server B but if you run them all from a laptop it will work just as well:

$ python -i ab_workers.py 
>>> while WorkerB().run(): pass
... 

4) Finally, kick off the workflow.

$ python
Python 2.6.5 (r265:79063, Apr 16 2010, 13:57:41) 
[GCC 4.4.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import boto.swf.layer2 as swf
>>> workflows = swf.Domain(name='stackoverflow').workflows()
>>> workflows
[<WorkflowType 'MyWorkflow-1.0' at 0xdeb1d0>]
>>> execution = workflows[0].start(task_list='default_tasks')
>>> 

Switch back to see what happens with your actors. They might disconnect from the service after one minute of inactivity. If that happens, press arrow-up+enter to re-enter the polling loop.

You can now go to your AWS management console's SWF panel, check out how the executions are doing and view their history. Alternatively, you can query for it through the command line.

>>> execution.history()
[{'eventId': 1, 'eventType': 'WorkflowExecutionStarted', 
'workflowExecutionStartedEventAttributes': {'taskList': {'name': 'default_tasks'}, 
'parentInitiatedEventId': 0, 'taskStartToCloseTimeout': '300', 'childPolicy': 
'TERMINATE', 'executionStartToCloseTimeout': '3600', 'workflowType': {'version': 
'1.0', 'name': 'MyWorkflow'}}, 'eventTimestamp': 1361132267.5810001}, {'eventId': 2, 
'eventType': 'DecisionTaskScheduled', 'decisionTaskScheduledEventAttributes': 
{'startToCloseTimeout': '300', 'taskList': {'name': ...

That's just an example of a workflow with serial execution of activities but it's also possible for the decider to schedule and coordinate parallel execution of activities.

I hope this will at least get you started. For a slightly more complex example of a serial workflow, I recommend looking at this.

like image 150
oozie _at_ concourse.farm Avatar answered Sep 26 '22 11:09

oozie _at_ concourse.farm


I don't have any example code to share, but you can definitely use SWF to coordinate the execution of scripts across two servers. The main idea with this is to create three pieces of code that talk to SWF:

  • A component that knows which script to execute first and what to do once that first script is done executing. This is called the "decider" in SWF terms.
  • Two components that each understand how to execute the specific script you want to run on each machine. These are called "activity workers" in SWF terms.

The first component, the decider, calls two SWF APIs: PollForDecisionTask and RespondDecisionTaskCompleted. The poll request will give the decider component the current history of an executing workflow, basically the "where am i" state information for your script runner. You write code that looks at these events and figure out which script should execute. These "commands" to execute a script would be in the form of a scheduling of an activity task, which is returned as part of the call to RespondDecisionTaskCompleted.

The second components you write, the activity workers, each call two SWF APIs: PollForActivityTask and RespondActivityTaskCompleted. The poll request will give the activity worker an indication that it should execute the script it knows about, what SWF calls an activity task. The information returned from the poll request to SWF can include single execution-specific data that was sent to SWF as part of the scheduling of the activity task. Each of your servers would be independently polling SWF for activity tasks to indicate the execution of the local script on that host. Once the worker is done executing the script, it calls back to SWF through the RespondActivityTaskCompleted API.

The callback from your activity worker to SWF results in a new history being handed out to the decider component that I already mentioned. It will look at the history, see that the first script is done, and schedule the second one to execute. Once it sees that the second one is done, it can "close" the workflow using another type of decision.

You kick off the whole process of executing the scripts on each host by calling the StartWorkflowExecution API. This creates the record of the overall process in SWF and kicks out the first history to the decider process to schedule the execution of the first script on the first host.

Hopefully this gives a bit more context on how to accomplish this type of workflow using SWF. If you haven't already, I would take a look at the dev guide on the SWF page for additional info.

like image 20
risears Avatar answered Sep 22 '22 11:09

risears