Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there a way to get Step Functions input values into EMR step Args

We are running batch spark jobs using AWS EMR clusters. Those jobs run periodically and we would like to orchestrate those via AWS Step Functions.

As of November 2019 Step Functions has support for EMR natively. When adding a Step to the cluster we can use the following config:

"Some Step": {
    "Type": "Task",
    "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
    "Parameters": {
        "ClusterId.$": "$.cluster.ClusterId",
        "Step": {
            "Name": "FirstStep",
            "ActionOnFailure": "CONTINUE",
            "HadoopJarStep": {
                "Jar": "command-runner.jar",
                "Args": [
                    "spark-submit",
                    "--class",
                    "com.some.package.Class",
                    "JarUri",
                    "--startDate",
                    "$.time",
                    "--daysToLookBack",
                    "$.daysToLookBack"
                ]
             }
         }
     },
     "Retry" : [
         {
             "ErrorEquals": [ "States.ALL" ],
             "IntervalSeconds": 1,
             "MaxAttempts": 1,
             "BackoffRate": 2.0
         }
     ],
     "ResultPath": "$.firstStep",
     "End": true
}

Within the Args List of the HadoopJarStep we would like to set arguments dynamically. e.g. if the input of the state machine execution is:

{
    "time": "2020-01-08",
    "daysToLookBack": 2
}

The strings in the config starting with "$." should be replaced accordingly when executing the State Machine, and the step on the EMR cluster should run command-runner.jar spark-submit --class com.some.package.Class JarUri --startDate 2020-01-08 --daysToLookBack 2. But instead it runs command-runner.jar spark-submit --class com.some.package.Class JarUri --startDate $.time --daysToLookBack $.daysToLookBack.

Does anyone know if there is a way to do this?

like image 956
d.a.d.a Avatar asked Jan 09 '20 15:01

d.a.d.a


2 Answers

You can use the States.Array() intrinsic function. Your Parameters becomes:

  "Parameters": {
    "ClusterId.$": "$.cluster.ClusterId",
    "Step": {
      "Name": "FirstStep",
      "ActionOnFailure": "CONTINUE",
      "HadoopJarStep": {
        "Jar": "command-runner.jar",
        "Args.$": "States.Array('spark-submit', '--class', 'com.some.package.Class', 'JarUri', '--startDate', $.time, '--daysToLookBack', '$.daysToLookBack')"
      }
    }
  }

Intrinsic functions are documented here but I don't think it explains the usage very well. The code snippets provided in the Step Functions console are more useful.

Note that you can also do string formatting on the args using States.Format(). For example, you could construct a path using an input variable as the final path segment:

"Args.$": "States.Array('mycommand', '--path', States.Format('my/base/path/{}', $.someInputVariable))"
like image 190
Emma Avatar answered Oct 21 '22 08:10

Emma


Parameters allow you to define key-value pairs, so as the value for the "Args" key is an array, you won't be able to dynamically reference a specific element in the array, you would need to reference the whole array instead. For example "Args.$": "$.Input.ArgsArray".

So for your use-case the best way to achieve this would be to add a pre-processing state, before calling this state. In the pre-processing state you can either call a Lambda function and format your input/output through code or for something as simple as adding a dynamic value to an array you can use a Pass State to reformat the data and then inside your task State Parameters you can use JSONPath to get the array which you defined in in the pre-processor. Here's an example:

{
"Comment": "A Hello World example of the Amazon States Language using Pass states",
"StartAt": "HardCodedInputs",
"States": {
    "HardCodedInputs": {
        "Type": "Pass",
        "Parameters": {
            "cluster": {
                "ClusterId": "ValueForClusterIdVariable"
            },
            "time": "ValueForTimeVariable",
            "daysToLookBack": "ValueFordaysToLookBackVariable"
        },
        "Next": "Pre-Process"
    },
    "Pre-Process": {
        "Type": "Pass",
        "Parameters": {
            "FormattedInputsForEmr": {
                "ClusterId.$": "$.cluster.ClusterId",
                "Args": [
                    {
                        "Arg1": "spark-submit"
                    },
                    {
                        "Arg2": "--class"
                    },
                    {
                        "Arg3": "com.some.package.Class"
                    },
                    {
                        "Arg4": "JarUri"
                    },
                    {
                        "Arg5": "--startDate"
                    },
                    {
                        "Arg6.$": "$.time"
                    },
                    {
                        "Arg7": "--daysToLookBack"
                    },
                    {
                        "Arg8.$": "$.daysToLookBack"
                    }
                ]
            }
        },
        "Next": "Some Step"
    },
    "Some Step": {
        "Type": "Pass",
        "Parameters": {
            "ClusterId.$": "$.FormattedInputsForEmr.ClusterId",
            "Step": {
                "Name": "FirstStep",
                "ActionOnFailure": "CONTINUE",
                "HadoopJarStep": {
                    "Jar": "command-runner.jar",
                    "Args.$": "$.FormattedInputsForEmr.Args[*][*]"
                }
            }
        },
        "End": true
    }
  }
}
like image 14
Joe Avatar answered Oct 21 '22 09:10

Joe