Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

AWS Data pipeline CSV data from S3 to DynamoDB

I am trying to transfer CSV data from S3 bucket to DynamoDB using AWS pipeline, following is my pipe line script, it is not working properly,

CSV file structure

Name, Designation,Company

A,TL,C1

B,Prog, C2

DynamoDb : N_Table, with Name as hash value

{
"objects": [
    {
        "id": "Default",
        "scheduleType": "cron",
        "name": "Default",
        "role": "DataPipelineDefaultRole",
        "resourceRole": "DataPipelineDefaultResourceRole"
    },
    {
        "id": "DynamoDBDataNodeId635",
        "schedule": {
            "ref": "ScheduleId639"
        },
        "tableName": "N_Table",
        "name": "MyDynamoDBData",
        "type": "DynamoDBDataNode"
    },
    {
        "emrLogUri": "s3://onlycsv/error",
        "id": "EmrClusterId636",
        "schedule": {
            "ref": "ScheduleId639"
        },
        "masterInstanceType": "m1.small",
        "coreInstanceType": "m1.xlarge",
        "enableDebugging": "true",
        "installHive": "latest",
        "name": "ImportCluster",
        "coreInstanceCount": "1",
        "logUri": "s3://onlycsv/error1",
        "type": "EmrCluster"
    },
    {
        "id": "S3DataNodeId643",
        "schedule": {
            "ref": "ScheduleId639"
        },
        "directoryPath": "s3://onlycsv/data.csv",
        "name": "MyS3Data",
        "dataFormat": {
            "ref": "DataFormatId1"
        },
        "type": "S3DataNode"
    },
    {
        "id": "ScheduleId639",
        "startDateTime": "2013-08-03T00:00:00",
        "name": "ImportSchedule",
        "period": "1 Hours",
        "type": "Schedule",
        "endDateTime": "2013-08-04T00:00:00"
    },
    {
        "id": "EmrActivityId637",
        "input": {
            "ref": "S3DataNodeId643"
        },
        "schedule": {
            "ref": "ScheduleId639"
        },
        "name": "MyImportJob",
        "runsOn": {
            "ref": "EmrClusterId636"
        },
        "maximumRetries": "0",
        "myDynamoDBWriteThroughputRatio": "0.25",
        "attemptTimeout": "24 hours",
        "type": "EmrActivity",
        "output": {
            "ref": "DynamoDBDataNodeId635"
        },
        "step": "s3://elasticmapreduce/libs/script-runner/script-runner.jar,s3://elasticmapreduce/libs/hive/hive-script,--run-hive-script,--hive-versions,latest,--args,-f,s3://elasticmapreduce/libs/hive/dynamodb/importDynamoDBTableFromS3,-d,DYNAMODB_OUTPUT_TABLE=#{output.tableName},-d,S3_INPUT_BUCKET=#{input.directoryPath},-d,DYNAMODB_WRITE_PERCENT=#{myDynamoDBWriteThroughputRatio},-d,DYNAMODB_ENDPOINT=dynamodb.us-east-1.amazonaws.com"
    },
    {
        "id": "DataFormatId1",
        "name": "DefaultDataFormat1",
        "column": [
            "Name",
            "Designation",
            "Company"
        ],
        "columnSeparator": ",",
        "recordSeparator": "\n",
        "type": "Custom"
    }
]

}

Out of four steps while executing the pipeline, two are getting finished, but it is not executing completely

like image 304
NKS Avatar asked Aug 03 '13 16:08

NKS


2 Answers

Currently (2015-04) default import pipeline template does not support importing CSV files.

If your CSV file is not too big (under 1GB or so) you can create a ShellCommandActivity to convert CSV to DynamoDB JSON format first and the feed that to EmrActivity that imports the resulting JSON file into your table.

As a first step you can create sample DynamoDB table including all the field types you need, populate with dummy values and then export the records using pipeline (Export/Import button in DynamoDB console). This will give you the idea about the format that is expected by Import pipeline. The type names are not obvious, and the Import activity is very sensitive about the correct case (e.g. you should have bOOL for boolean field).

Afterwards it should be easy to create an awk script (or any other text converter, at least with awk you can use the default AMI image for your shell activity), which you can feed to your shellCommandActivity. Don't forget to enable "staging" flag, so your output is uploaded back to S3 for the Import activity to pick it up.

like image 188
Altair7852 Avatar answered Nov 02 '22 04:11

Altair7852


If you are using the template data pipeline for Importing data from S3 to DynamoDB, these dataformats won't work. Instead, use the format in the link below to store the input S3 data file http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-importexport-ddb-pipelinejson-verifydata2.html

This format of the output file generated by the template data pipeline that exports data from DynamoDB to S3.

Hope that helps.

like image 35
HarshitGupta Avatar answered Nov 02 '22 04:11

HarshitGupta