Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Persisting data to DynamoDB using Apache Spark

I have a application where 1. I read JSON files from S3 using SqlContext.read.json into Dataframe 2. Then do some transformations on the DataFrame 3. Finally I want to persist the records to DynamoDB using one of the record value as key and rest of JSON parameters as values/columns.

I am trying something like :

JobConf jobConf = new JobConf(sc.hadoopConfiguration());
jobConf.set("dynamodb.servicename", "dynamodb");
jobConf.set("dynamodb.input.tableName", "my-dynamo-table");   // Pointing to DynamoDB table
jobConf.set("dynamodb.endpoint", "dynamodb.us-east-1.amazonaws.com");
jobConf.set("dynamodb.regionid", "us-east-1");
jobConf.set("dynamodb.throughput.read", "1");
jobConf.set("dynamodb.throughput.read.percent", "1");
jobConf.set("dynamodb.version", "2011-12-05");

jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat");
jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat");

DataFrame df = sqlContext.read().json("s3n://mybucket/abc.json");
RDD<String> jsonRDD = df.toJSON();
JavaRDD<String> jsonJavaRDD = jsonRDD.toJavaRDD();
PairFunction<String, Text, DynamoDBItemWritable> keyData = new PairFunction<String, Text, DynamoDBItemWritable>() {
    public Tuple2<Text, DynamoDBItemWritable> call(String row) {
        DynamoDBItemWritable writeable = new DynamoDBItemWritable();
        try {
            System.out.println("JSON : " + row);
            JSONObject jsonObject = new JSONObject(row);

            System.out.println("JSON Object: " + jsonObject);

            Map<String, AttributeValue> attributes = new HashMap<String, AttributeValue>();
            AttributeValue attributeValue = new AttributeValue();
            attributeValue.setS(row);
            attributes.put("values", attributeValue);

            AttributeValue attributeKeyValue = new AttributeValue();
            attributeValue.setS(jsonObject.getString("external_id"));
            attributes.put("primary_key", attributeKeyValue);

            AttributeValue attributeSecValue = new AttributeValue();
            attributeValue.setS(jsonObject.getString("123434335"));
            attributes.put("creation_date", attributeSecValue);
            writeable.setItem(attributes);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return new Tuple2(new Text(row), writeable);
    }
};

JavaPairRDD<Text, DynamoDBItemWritable> pairs = jsonJavaRDD
        .mapToPair(keyData);

Map<Text, DynamoDBItemWritable> map = pairs.collectAsMap();
System.out.println("Results : " + map);
pairs.saveAsHadoopDataset(jobConf);    

However I do not see any data getting written to DynamoDB. Nor do I get any error messages.

like image 997
lazywiz Avatar asked Mar 09 '16 20:03

lazywiz


People also ask

Is DynamoDB persistent?

This service allows you to persist state updates using the Amazon DynamoDB database. Query functionality is also fully supported. Features: Writing/reading information to relational database systems.

Which API is used to write data to DynamoDB?

In Amazon DynamoDB, you can use either the DynamoDB API, or PartiQL, a SQL-compatible query language, to add an item to a table. With the DynamoDB API, you use the PutItem operation to add an item to a table. The primary key for this table consists of Artist and SongTitle. You must specify values for these attributes.

How the data is stored in DynamoDB?

Amazon DynamoDB stores data in partitions. A partition is an allocation of storage for a table, backed by solid state drives (SSDs) and automatically replicated across multiple Availability Zones within an AWS Region.


1 Answers

I'm not sure, but your's seems more complex than it may need to be.

I've used the following to write an RDD to DynamoDB successfully:

val ddbInsertFormattedRDD = inputRDD.map { case (skey, svalue) =>
    val ddbMap = new util.HashMap[String, AttributeValue]()

    val key = new AttributeValue()
    key.setS(skey.toString)
    ddbMap.put("DynamoDbKey", key)


    val value = new AttributeValue()
    value.setS(svalue.toString)
    ddbMap.put("DynamoDbKey", value)

    val item = new DynamoDBItemWritable()
    item.setItem(ddbMap)

    (new Text(""), item)
}

val ddbConf = new JobConf(sc.hadoopConfiguration)
ddbConf.set("dynamodb.output.tableName", "my-dynamo-table")
ddbConf.set("dynamodb.throughput.write.percent", "0.5")
ddbConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
ddbConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
ddbInsertFormattedRDD.saveAsHadoopDataset(ddbConf)

Also, have you checked that you have upped the capacity correctly?

like image 74
Timvw74 Avatar answered Nov 03 '22 02:11

Timvw74