Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

read/write dynamo db from apache spark [closed]

I would like to know if there are any java libraries that support read/write from/to dynamo db (AWS) from apache spark(Mesos), I know there are some libraries to support EMR spark per this article https://aws.amazon.com/blogs/big-data/analyze-your-data-on-amazon-dynamodb-with-apache-spark/ .Please advise.

thanks Pradeep

like image 973
pradeep aru Avatar asked Oct 17 '17 18:10

pradeep aru


People also ask

How do you improve read and writes for DynamoDB?

You can increase your DynamoDB throughput by several times, by parallelizing reads/writes over multiple partitions. Use DynamoDB as an attribute store rather than as a document store. This will not only reduce the read/write costs but also improve the performance of your operations considerably.

Is DynamoDB stateless or stateful?

DynamoDB is a web service, and interactions with it are stateless.

Can we use hibernate with DynamoDB?

You can use Hibernate to map object-oriented domain models to a traditional relational database. The tutorial below shows how to use the CData JDBC Driver for Amazon DynamoDB to generate an ORM of your Amazon DynamoDB repository with Hibernate.


1 Answers

You can read items from and write items to DynamoDB tables using apache spark and emr-dynamodb-connector library. For reading data you can use javaSparkContext.hadoopRDD(jobConf, DynamoDBInputFormat.class, Text.class, DynamoDBItemWritable.class); and for writing data to DynamoDB: javaPairRDD.saveAsHadoopDataset(jobConf);. Below is an example (works in EMR and non-EMR environments):

public static void main(String[] args) throws Exception {
    SparkConf conf = new SparkConf()
            .setAppName("DynamoDBApplication")
            .setMaster("local[4]")
            .registerKryoClasses(new Class<?>[]{
                    Class.forName("org.apache.hadoop.io.Text"),
                    Class.forName("org.apache.hadoop.dynamodb.DynamoDBItemWritable")
            });

    JavaSparkContext sc = new JavaSparkContext(conf);

    JobConf jobConf = getDynamoDbJobConf(sc, "TableNameForRead", "TableNameForWrite");

    // read all items from DynamoDB table with name TableNameForRead
    JavaPairRDD<Text, DynamoDBItemWritable> javaPairRdd = sc.hadoopRDD(jobConf, DynamoDBInputFormat.class, Text.class, DynamoDBItemWritable.class);
    System.out.println("count: " + javaPairRdd.count());

    // process data in any way, below is just a simple example
    JavaRDD<Map<String, AttributeValue>> javaRDD = javaPairRdd.map(t -> {
        DynamoDBItemWritable item = t._2();
        Map<String, AttributeValue> attrs = item.getItem();
        String hashKey = attrs.get("key").getS();
        Long result = Long.valueOf(attrs.get("resultAttribute").getN());
        System.out.println(String.format("hashKey=%s, result=%d", hashKey, result));
        return attrs;
    });
    System.out.println("count: " + javaRDD.count());

    // update JavaPairRdd in order to store it to DynamoDB, below is just a simple example with updating hashKey
    JavaPairRDD<Text, DynamoDBItemWritable> updatedJavaPairRDD = javaPairRdd.mapToPair(t -> {
        DynamoDBItemWritable item = t._2();
        Map<String, AttributeValue> attrs = item.getItem();
        String hashKey = attrs.get("key").getS();
        String updatedHashKey = hashKey + "_new";
        attrs.get("key").setS(updatedHashKey);
        return new Tuple2<>(t._1(), item);
    });

    // write items to DynamoDB table with name TableNameForWrite
    updatedJavaPairRDD.saveAsHadoopDataset(jobConf);

    sc.stop();
}


private static JobConf getDynamoDbJobConf(JavaSparkContext sc, String tableNameForRead, String tableNameForWrite) {
    final JobConf jobConf = new JobConf(sc.hadoopConfiguration());
    jobConf.set("dynamodb.servicename", "dynamodb");

    jobConf.set("dynamodb.input.tableName", tableNameForRead);
    jobConf.set("dynamodb.output.tableName", tableNameForWrite);

    jobConf.set("dynamodb.awsAccessKeyId", "YOUR_AWS_ACCESS_KEY");
    jobConf.set("dynamodb.awsSecretAccessKey", "YOUR_AWS_SECRET_KEY");
    jobConf.set("dynamodb.endpoint", "dynamodb.us-west-1.amazonaws.com");
    jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat");
    jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat");

    return jobConf;
}

For running this code you need the following maven dependencies:

<dependencies>

    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-core</artifactId>
        <version>2.6.0</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.6.0</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-annotations</artifactId>
        <version>2.6.0</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.module</groupId>
        <artifactId>jackson-module-scala_2.10</artifactId>
        <version>2.6.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>${spark.version}</version>
    </dependency>

    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-java-sdk-emr</artifactId>
        <version>1.11.113</version>
    </dependency>
    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-java-sdk-dynamodb</artifactId>
        <version>1.11.113</version>
    </dependency>

    <!-- https://github.com/awslabs/emr-dynamodb-connector -->
    <dependency>
        <groupId>com.amazon.emr</groupId>
        <artifactId>emr-dynamodb-hadoop</artifactId>
        <version>4.2.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-aws</artifactId>
        <version>2.8.0</version>
    </dependency>

</dependencies>
like image 184
Vasyl Sarzhynskyi Avatar answered Oct 27 '22 07:10

Vasyl Sarzhynskyi