Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to share data in `AWS Step Functions` without passing it between the steps

I use AWS Step Functions and have the following workflow

AWS Step Functions workflow

initStep - It's a lambda function handler, that gets some data and sends it to SQS for external service.

activity = os.getenv('ACTIVITY')
queue_name = os.getenv('QUEUE_NAME')

def lambda_handler(event, context):
  event['my_activity'] = activity
  data = json.dumps(event)

  # Retrieving a queue by its name
  sqs = boto3.resource('sqs')
  queue = sqs.get_queue_by_name(QueueName=queue_name)

  queue.send_message(MessageBody=data, MessageGroupId='messageGroup1' + str(datetime.time(datetime.now())))

  return event

validationWaiting - It's an activity that waits for an answer from the external service that include the data.

complete - It's a lambda function handler, that uses the data from the initStep.

def lambda_handler(event, context):
  email = event['email'] if 'email' in event else None
  data = event['data'] if 'data' in event else None

  client = boto3.client(service_name='ses')
  to = email.split(', ')
  message_conrainer = {'Subject': {'Data': 'Email from step functions'},
           'Body': {'Html': {
               'Charset': "UTF-8",
               'Data': """<html><body>
                            <p>""" + data """</p>
                            </body> </html> """
           }}}

  destination = {'ToAddresses': to,
               'CcAddresses': [],
               'BccAddresses': []}

  return client.send_email(Source=from_addresses,
                         Destination=destination,
                         Message=message_container)

It does work, but the problem is that I'm sending full data from the initStep to external service, just to pass it later to complete. Potentially more steps can be added.

I believe it would be better to share it as some sort of global data (of current step function), that way I could add or remove steps and data would still be available for all.

like image 395
Marina Avatar asked Feb 23 '19 19:02

Marina


3 Answers

You can make use of InputPath and ResultPath. In initStep you would only send necessary data to external service (probably along with some unique identifier of Execution). In the ValidaitonWaiting step you can set following properties (in State Machine definition):

  • InputPath: What data will be provided to GetActivityTask. Probably you want to set it to something like $.execution_unique_id where execution_unique_id is field in your data that external service uses to identify Execution (to match it with specific request during initStep).
  • ResultPath: Where output of ValidationWaiting Activity will be saved in data. You can set it to $.validation_output and json result from external service will be present there.

This way you can send to external service only data that is actually needed by it and you won't lose access to any data that was previously (before ValidationWaiting step) in the input.

For example, you could have following definition of the State Machine:

{
  "StartAt": "initStep",
  "States": {
    "initStep": {
      "Type": "Pass",
      "Result": {
        "executionId": "some:special:id",
        "data": {},
        "someOtherData": {"value": "key"}
      },
      "Next": "ValidationWaiting"
    },
    "ValidationWaiting": {
      "Type": "Pass",
      "InputPath": "$.executionId",
      "ResultPath": "$.validationOutput",
      "Result": {
        "validationMessages": ["a", "b"]
      },
      "Next": "Complete"
    },
    "Complete": {
      "Type": "Pass",
      "End": true
    }
  }
}

I've used Pass states for initStep and ValidationWaiting to simplify the example (I haven't run it, but it should work). Result field is specific to Pass task and it is equivalent to the result of your Lambda functions or Activity.

In this scenario Complete step would get following input:

{
  "executionId": "some:special:id",
  "data": {},
  "someOtherData": {"value": key"},
  "validationOutput": {
    "validationMessages": ["a", "b"]
  }
}

So the result of ValidationWaiting step has been saved into validationOutput field.

like image 133
Marcin Sucharski Avatar answered Oct 13 '22 02:10

Marcin Sucharski


Based on the answer of Marcin Sucharski I've came up with my own solution.

I needed to use Type: Task since initStep is a lambda, which sends SQS.

I didn't needed InputPath in ValidationWaiting, but only ResultPath, which store the data received in activity.

I work with Serverless framework, here is my final solution:

StartAt: initStep
States: 
  initStep:
    Type: Task
    Resource: arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:init-step
    Next: ValidationWaiting
  ValidationWaiting:
    Type: Task
    ResultPath: $.validationOutput
    Resource: arn:aws:states:#{AWS::Region}:#{AWS::AccountId}:activity:validationActivity
    Next: Complete
    Catch:
      - ErrorEquals:
        - States.ALL
      ResultPath: $.validationOutput
      Next: Complete
  Complete:
    Type: Task
    Resource: arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:complete-step
    End: true
like image 21
Marina Avatar answered Oct 13 '22 01:10

Marina


Here a short and simple solution with InputPath and ResultPath. My Lambda Check_Ubuntu_Updates return a list of instance ready to be updated. This list of instances is received by the step Notify_Results, then it use this data. Remember that if you have several ResultPath in your Step Function and you need more than 1 input in a step you can use InputPath only with $.

{
  "Comment": "A state machine that check some updates systems available.",
  "StartAt": "Check_Ubuntu_Updates",
  "States": {
    "Check_Ubuntu_Updates": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:#############:function:Check_Ubuntu_Updates",
      "ResultPath": "$.instances",
      "Next": "Notify_Results"
    },
    "Notify_Results": {
      "Type": "Task",
      "InputPath": "$.instances",
      "Resource": "arn:aws:lambda:us-east-1:#############:function:Notify_Results",
      "End": true
    }
  }
}
like image 44
Miguel Conde Avatar answered Oct 13 '22 00:10

Miguel Conde