I would like to know if there are any java libraries that support read/write from/to dynamo db (AWS) from apache spark(Mesos), I know there are some libraries to support EMR spark per this article https://aws.amazon.com/blogs/big-data/analyze-your-data-on-amazon-dynamodb-with-apache-spark/ .Please advise.
thanks Pradeep
You can increase your DynamoDB throughput by several times, by parallelizing reads/writes over multiple partitions. Use DynamoDB as an attribute store rather than as a document store. This will not only reduce the read/write costs but also improve the performance of your operations considerably.
DynamoDB is a web service, and interactions with it are stateless.
You can use Hibernate to map object-oriented domain models to a traditional relational database. The tutorial below shows how to use the CData JDBC Driver for Amazon DynamoDB to generate an ORM of your Amazon DynamoDB repository with Hibernate.
You can read items from and write items to DynamoDB tables using apache spark and emr-dynamodb-connector library. For reading data you can use javaSparkContext.hadoopRDD(jobConf, DynamoDBInputFormat.class, Text.class, DynamoDBItemWritable.class);
and for writing data to DynamoDB: javaPairRDD.saveAsHadoopDataset(jobConf);
. Below is an example (works in EMR and non-EMR environments):
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf()
.setAppName("DynamoDBApplication")
.setMaster("local[4]")
.registerKryoClasses(new Class<?>[]{
Class.forName("org.apache.hadoop.io.Text"),
Class.forName("org.apache.hadoop.dynamodb.DynamoDBItemWritable")
});
JavaSparkContext sc = new JavaSparkContext(conf);
JobConf jobConf = getDynamoDbJobConf(sc, "TableNameForRead", "TableNameForWrite");
// read all items from DynamoDB table with name TableNameForRead
JavaPairRDD<Text, DynamoDBItemWritable> javaPairRdd = sc.hadoopRDD(jobConf, DynamoDBInputFormat.class, Text.class, DynamoDBItemWritable.class);
System.out.println("count: " + javaPairRdd.count());
// process data in any way, below is just a simple example
JavaRDD<Map<String, AttributeValue>> javaRDD = javaPairRdd.map(t -> {
DynamoDBItemWritable item = t._2();
Map<String, AttributeValue> attrs = item.getItem();
String hashKey = attrs.get("key").getS();
Long result = Long.valueOf(attrs.get("resultAttribute").getN());
System.out.println(String.format("hashKey=%s, result=%d", hashKey, result));
return attrs;
});
System.out.println("count: " + javaRDD.count());
// update JavaPairRdd in order to store it to DynamoDB, below is just a simple example with updating hashKey
JavaPairRDD<Text, DynamoDBItemWritable> updatedJavaPairRDD = javaPairRdd.mapToPair(t -> {
DynamoDBItemWritable item = t._2();
Map<String, AttributeValue> attrs = item.getItem();
String hashKey = attrs.get("key").getS();
String updatedHashKey = hashKey + "_new";
attrs.get("key").setS(updatedHashKey);
return new Tuple2<>(t._1(), item);
});
// write items to DynamoDB table with name TableNameForWrite
updatedJavaPairRDD.saveAsHadoopDataset(jobConf);
sc.stop();
}
private static JobConf getDynamoDbJobConf(JavaSparkContext sc, String tableNameForRead, String tableNameForWrite) {
final JobConf jobConf = new JobConf(sc.hadoopConfiguration());
jobConf.set("dynamodb.servicename", "dynamodb");
jobConf.set("dynamodb.input.tableName", tableNameForRead);
jobConf.set("dynamodb.output.tableName", tableNameForWrite);
jobConf.set("dynamodb.awsAccessKeyId", "YOUR_AWS_ACCESS_KEY");
jobConf.set("dynamodb.awsSecretAccessKey", "YOUR_AWS_SECRET_KEY");
jobConf.set("dynamodb.endpoint", "dynamodb.us-west-1.amazonaws.com");
jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat");
jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat");
return jobConf;
}
For running this code you need the following maven dependencies:
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.10</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-emr</artifactId>
<version>1.11.113</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-dynamodb</artifactId>
<version>1.11.113</version>
</dependency>
<!-- https://github.com/awslabs/emr-dynamodb-connector -->
<dependency>
<groupId>com.amazon.emr</groupId>
<artifactId>emr-dynamodb-hadoop</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>2.8.0</version>
</dependency>
</dependencies>
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With