I am trying to transfer CSV data from S3 bucket to DynamoDB using AWS pipeline, following is my pipe line script, it is not working properly,
CSV file structure
Name, Designation,Company
A,TL,C1
B,Prog, C2
DynamoDb : N_Table, with Name as hash value
{
"objects": [
{
"id": "Default",
"scheduleType": "cron",
"name": "Default",
"role": "DataPipelineDefaultRole",
"resourceRole": "DataPipelineDefaultResourceRole"
},
{
"id": "DynamoDBDataNodeId635",
"schedule": {
"ref": "ScheduleId639"
},
"tableName": "N_Table",
"name": "MyDynamoDBData",
"type": "DynamoDBDataNode"
},
{
"emrLogUri": "s3://onlycsv/error",
"id": "EmrClusterId636",
"schedule": {
"ref": "ScheduleId639"
},
"masterInstanceType": "m1.small",
"coreInstanceType": "m1.xlarge",
"enableDebugging": "true",
"installHive": "latest",
"name": "ImportCluster",
"coreInstanceCount": "1",
"logUri": "s3://onlycsv/error1",
"type": "EmrCluster"
},
{
"id": "S3DataNodeId643",
"schedule": {
"ref": "ScheduleId639"
},
"directoryPath": "s3://onlycsv/data.csv",
"name": "MyS3Data",
"dataFormat": {
"ref": "DataFormatId1"
},
"type": "S3DataNode"
},
{
"id": "ScheduleId639",
"startDateTime": "2013-08-03T00:00:00",
"name": "ImportSchedule",
"period": "1 Hours",
"type": "Schedule",
"endDateTime": "2013-08-04T00:00:00"
},
{
"id": "EmrActivityId637",
"input": {
"ref": "S3DataNodeId643"
},
"schedule": {
"ref": "ScheduleId639"
},
"name": "MyImportJob",
"runsOn": {
"ref": "EmrClusterId636"
},
"maximumRetries": "0",
"myDynamoDBWriteThroughputRatio": "0.25",
"attemptTimeout": "24 hours",
"type": "EmrActivity",
"output": {
"ref": "DynamoDBDataNodeId635"
},
"step": "s3://elasticmapreduce/libs/script-runner/script-runner.jar,s3://elasticmapreduce/libs/hive/hive-script,--run-hive-script,--hive-versions,latest,--args,-f,s3://elasticmapreduce/libs/hive/dynamodb/importDynamoDBTableFromS3,-d,DYNAMODB_OUTPUT_TABLE=#{output.tableName},-d,S3_INPUT_BUCKET=#{input.directoryPath},-d,DYNAMODB_WRITE_PERCENT=#{myDynamoDBWriteThroughputRatio},-d,DYNAMODB_ENDPOINT=dynamodb.us-east-1.amazonaws.com"
},
{
"id": "DataFormatId1",
"name": "DefaultDataFormat1",
"column": [
"Name",
"Designation",
"Company"
],
"columnSeparator": ",",
"recordSeparator": "\n",
"type": "Custom"
}
]
}
Out of four steps while executing the pipeline, two are getting finished, but it is not executing completely
Currently (2015-04) default import pipeline template does not support importing CSV files.
If your CSV file is not too big (under 1GB or so) you can create a ShellCommandActivity to convert CSV to DynamoDB JSON format first and the feed that to EmrActivity that imports the resulting JSON file into your table.
As a first step you can create sample DynamoDB table including all the field types you need, populate with dummy values and then export the records using pipeline (Export/Import button in DynamoDB console). This will give you the idea about the format that is expected by Import pipeline. The type names are not obvious, and the Import activity is very sensitive about the correct case (e.g. you should have bOOL for boolean field).
Afterwards it should be easy to create an awk script (or any other text converter, at least with awk you can use the default AMI image for your shell activity), which you can feed to your shellCommandActivity. Don't forget to enable "staging" flag, so your output is uploaded back to S3 for the Import activity to pick it up.
If you are using the template data pipeline for Importing data from S3 to DynamoDB, these dataformats won't work. Instead, use the format in the link below to store the input S3 data file http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-importexport-ddb-pipelinejson-verifydata2.html
This format of the output file generated by the template data pipeline that exports data from DynamoDB to S3.
Hope that helps.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With