Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Map step function result and extract certain keys

I have the following output coming from a step function task: ListObjectsV2

{
  "Contents": [
    {
      "ETag": "\"86c12c034bc6c30cb89b500b954c188f\"",
      "Key": "55271f52fffe4461a2ee3228ebb97157/input/batch_1.csv",
      "LastModified": "2023-02-09T13:46:20Z",
      "Size": 796014,
      "StorageClass": "STANDARD"
    },
    {
      "ETag": "\"58e4a770e0f66073b00d185df500f07f\"",
      "Key": "55271f52fffe4461a2ee3228ebb97157/input/batch_2.csv",
      "LastModified": "2023-02-09T13:47:20Z",
      "Size": 934038,
      "StorageClass": "STANDARD"
    },
    {
      "ETag": "\"460abd0de64d5cb67e8f0d46878cb1ef\"",
      "Key": "55271f52fffe4461a2ee3228ebb97157/input/batch_3.csv",
      "LastModified": "2023-02-09T13:46:57Z",
      "Size": 794264,
      "StorageClass": "STANDARD"
    },
    {
      "ETag": "\"1bfedc3dc92e4ba8d04e24b9b5a0ed58\"",
      "Key": "55271f52fffe4461a2ee3228ebb97157/input/batch_4.csv",
      "LastModified": "2023-02-09T13:46:24Z",
      "Size": 788756,
      "StorageClass": "STANDARD"
    },
    {
      "ETag": "\"9d6c434ce5ebdf203a790fbcf19338dc\"",
      "Key": "55271f52fffe4461a2ee3228ebb97157/input/batch_5.csv",
      "LastModified": "2023-02-09T13:47:07Z",
      "Size": 831156,
      "StorageClass": "STANDARD"
    }
  ],
  "IsTruncated": false,
  "KeyCount": 5,
  "MaxKeys": 1000,
  "Name": "vita-internal-text-classification-dev-183576513728",
  "Prefix": "55271f52fffe4461a2ee3228ebb97157"
}

I want to have an array containing only the Key key, to pass to the next state, like so:

[
    {
        "Key": "55271f52fffe4461a2ee3228ebb97157/input/batch_1.csv",
    },
    {
        "Key": "55271f52fffe4461a2ee3228ebb97157/input/batch_2.csv",
    },
    {
        "Key": "55271f52fffe4461a2ee3228ebb97157/input/batch_3.csv",
    },
    {
        "Key": "55271f52fffe4461a2ee3228ebb97157/input/batch_4.csv",
    },
    {
        "Key": "55271f52fffe4461a2ee3228ebb97157/input/batch_5.csv",
    }
]

So far I've tried setting the ResultPath to:

$.Contents[*].Key

$.Contents[*].['Key']

What I get is:

  [
    "55271f52fffe4461a2ee3228ebb97157/input/batch_1.csv",
    "55271f52fffe4461a2ee3228ebb97157/input/batch_2.csv",
    "55271f52fffe4461a2ee3228ebb97157/input/batch_3.csv",
    "55271f52fffe4461a2ee3228ebb97157/input/batch_4.csv",
    "55271f52fffe4461a2ee3228ebb97157/input/batch_5.csv",
  ]

But I've gotten bad output from that, any help?

like image 232
Or Shemtov Avatar asked Sep 02 '25 10:09

Or Shemtov


2 Answers

The way I've solved this is to use an Inline Map state with a Pass state to build the necessary format. You can see this pattern in an example here for how to use Step Functions Distributed Map to bulk delete objects from S3. You can see this in the inner Create Object Identifier Array Map state. If you were doing this in Standard Workflows, this could be a cost concern given the number of state transitions involved. But since in the Item Processor I'm using Express Workflows, which are billed by duration (and these are super fast), it works pretty well.

enter image description here


{
  "Comment": "A state machine to bulk delete objects from S3 using Distributed Map",
  "StartAt": "Confirm Bucket Provided",
  "States": {
    "Confirm Bucket Provided": {
      "Type": "Choice",
      "Choices": [
        {
          "Not": {
            "Variable": "$.bucket",
            "IsPresent": true
          },
          "Next": "Fail - No Bucket"
        }
      ],
      "Default": "Check for Prefix"
    },
    "Check for Prefix": {
      "Type": "Choice",
      "Choices": [
        {
          "Not": {
            "Variable": "$.prefix",
            "IsPresent": true
          },
          "Next": "Generate Parameters - Without Prefix"
        }
      ],
      "Default": "Generate Parameters - With Prefix"
    },
    "Generate Parameters - Without Prefix": {
      "Type": "Pass",
      "Parameters": {
        "Bucket.$": "$.bucket",
        "Prefix": ""
      },
      "ResultPath": "$.list_parameters",
      "Next": "Delete Objects from S3 Bucket"
    },
    "Fail - No Bucket": {
      "Type": "Fail",
      "Error": "InsuffcientArguments",
      "Cause": "No Bucket was provided"
    },
    "Generate Parameters - With Prefix": {
      "Type": "Pass",
      "Next": "Delete Objects from S3 Bucket",
      "Parameters": {
        "Bucket.$": "$.bucket",
        "Prefix.$": "$.prefix"
      },
      "ResultPath": "$.list_parameters"
    },
    "Delete Objects from S3 Bucket": {
      "Type": "Map",
      "ItemProcessor": {
        "ProcessorConfig": {
          "Mode": "DISTRIBUTED",
          "ExecutionType": "EXPRESS"
        },
        "StartAt": "Create Object Identifier Array",
        "States": {
          "Create Object Identifier Array": {
            "Type": "Map",
            "ItemProcessor": {
              "ProcessorConfig": {
                "Mode": "INLINE"
              },
              "StartAt": "Create Object Identifier",
              "States": {
                "Create Object Identifier": {
                  "Type": "Pass",
                  "End": true,
                  "Parameters": {
                    "Key.$": "$.Key"
                  }
                }
              }
            },
            "ItemsPath": "$.Items",
            "ResultPath": "$.object_identifiers",
            "Next": "Delete Objects"
          },
          "Delete Objects": {
            "Type": "Task",
            "Next": "Clear Output",
            "Parameters": {
              "Bucket.$": "$.BatchInput.bucket",
              "Delete": {
                "Objects.$": "$.object_identifiers"
              }
            },
            "Resource": "arn:aws:states:::aws-sdk:s3:deleteObjects",
            "Retry": [
              {
                "ErrorEquals": [
                  "States.ALL"
                ],
                "BackoffRate": 2,
                "IntervalSeconds": 1,
                "MaxAttempts": 6
              }
            ],
            "ResultSelector": {
              "Deleted.$": "$.Deleted",
              "RetryCount.$": "$$.State.RetryCount"
            }
          },
          "Clear Output": {
            "Type": "Pass",
            "End": true,
            "Result": {}
          }
        }
      },
      "ItemReader": {
        "Resource": "arn:aws:states:::s3:listObjectsV2",
        "Parameters": {
          "Bucket.$": "$.list_parameters.Bucket",
          "Prefix.$": "$.list_parameters.Prefix"
        }
      },
      "MaxConcurrency": 5,
      "Label": "S3objectkeys",
      "ItemBatcher": {
        "MaxInputBytesPerBatch": 204800,
        "MaxItemsPerBatch": 1000,
        "BatchInput": {
          "bucket.$": "$.list_parameters.Bucket"
        }
      },
      "ResultSelector": {},
      "End": true
    }
  }
}
like image 136
Justin Callison Avatar answered Sep 04 '25 00:09

Justin Callison


Recently, we had the same issue in AWS Step functions while trying to list objects of a bucket and using the list to delete them in the next step.

After fetching the list of objects running an AWS API call step:

tasks.CallAwsService(
            self,
            f"list-s3-objects-{bucket}-{prefix}",
            service="s3",
            action="listObjects",
            parameters={
                "Bucket": bucket,
                "Prefix": prefix,
            },
            iam_resources=["*"],
            result_path="$.s3_object_list",
        )

The following JsonPath expression worked to feed the object list for deletion:

tasks.CallAwsService(
            self,
            f"delete-s3-objects-{bucket}",
            service="s3",
            action="deleteObjects",
            parameters={
                "Bucket": bucket,
                "Delete": {"Objects": sfn.JsonPath.list_at("$.s3_object_list.Contents[*].['Key','VersionId']"
        )Ï},
            },
            iam_resources=["*"],
            result_path=sfn.JsonPath.DISCARD,
        )

The key part is the following JsonPath:

Contents[*].['Key','VersionId']"
like image 33
F. Santiago Avatar answered Sep 04 '25 01:09

F. Santiago