Logo Questions Linux Laravel Mysql Ubuntu Git Menu

How do I ensure that my Apache Spark setup code runs only once?

I'm writing a Spark job in Scala that reads in parquet files on S3, does some simple transforms, and then saves them to a DynamoDB instance. Each time it runs we need to create a new table in Dynamo so I've written a Lambda function which is responsible for table creation. The first thing my Spark job does is generates a table name, invokes my Lambda function (passing the new table name to it), waits for the table to be created, and then proceeds normally with the ETL steps.

However it looks as though my Lambda function is consistently being invoked twice. I cannot explain that. Here's a sample of the code:

def main(spark: SparkSession, pathToParquet: String) {

  // generate a unique table name
  val tableName = generateTableName()

  // call the lambda function
  val result = callLambdaFunction(tableName)

  // wait for the table to be created

  // normal ETL pipeline
  var parquetRDD = spark.read.parquet(pathToParquet)
  val transformedRDD = parquetRDD.map((row: Row) => transformData(row), encoder=kryo[(Text, DynamoDBItemWritable)])

The code to wait for table creation is pretty-straightforward, as you can see:

def waitForTableCreation(tableName: String) {
  val client: AmazonDynamoDB = AmazonDynamoDBClientBuilder.defaultClient()
  val waiter: Waiter[DescribeTableRequest] = client.waiters().tableExists()
  try {
    waiter.run(new WaiterParameters[DescribeTableRequest](new DescribeTableRequest(tableName)))
  } catch {
      case ex: WaiterTimedOutException =>
        LOGGER.error("Timed out waiting to create table: " + tableName)
        throw ex
      case t: Throwable => throw t

And the lambda invocation is equally simple:

def callLambdaFunction(tableName: String) {
  val myLambda = LambdaInvokerFactory.builder()
    .lambdaFunctionNameResolver(new LambdaByName(LAMBDA_FUNCTION_NAME))
  myLambda.invoke(new MyLambdaInput(tableName))

Like I said, when I run spark-submit on this code, it definitely does hit the Lambda function. But I can't explain why it hits it twice. The result is that I get two tables provisioned in DynamoDB.

The waiting step also seems to fail within the context of running this as a Spark job. But when I unit-test my waiting code it seems to work fine on its own. It successfully blocks until the table is ready.

At first I theorized that perhaps spark-submit was sending this code to all of the worker nodes and they were independently running the whole thing. Initially I had a Spark cluster with 1 master and 2 workers. However I tested this out on another cluster with 1 master and 5 workers, and there again it hit the Lambda function exactly twice, and then apparently failed to wait for table creation because it dies shortly after invoking the Lambdas.

Does anyone have any clues as to what Spark might be doing? Am I missing something obvious?

UPDATE: Here's my spark-submit args which are visible on the Steps tab of EMR.

spark-submit --deploy-mode cluster --class com.mypackage.spark.MyMainClass s3://my-bucket/my-spark-job.jar

And here's the code for my getConfiguration function:

def getConfiguration(tableName: String) : JobConf = {
  val conf = new Configuration()
  conf.set("dynamodb.servicename", "dynamodb")
  conf.set("dynamodb.input.tableName", tableName)
  conf.set("dynamodb.output.tableName", tableName)
  conf.set("dynamodb.endpoint", "https://dynamodb.us-east-1.amazonaws.com")
  conf.set("dynamodb.regionid", "us-east-1")
  conf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
  conf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
  new JobConf(conf)

Also here is a Gist containing some of the exception logs I see when I try to run this.

like image 498
soapergem Avatar asked Jan 27 '23 03:01


2 Answers

Thanks @soapergem for adding logging and options. I add an answer (a try one) since it may be a little bit longer than a comment :)

To wrap-up:

  • nothing strange with spark-submit and configuration options
  • in https://gist.github.com/soapergem/6b379b5a9092dcd43777bdec8dee65a8#file-stderr-log you can see that the application is executed twice. It passes twice from an ACCEPTED to RUNNING state. And that's consistent with EMR defaults (How to prevent EMR Spark step from retrying?). To confirm that, you can check whether you have 2 tables created after executing the step (I suppose here that you're generating tables with dynamic names; a different name per execution which in case of retry should give 2 different names)

For your last question:

It looks like my code might work if I run it in "client" deploy mode, instead of "cluster" deploy mode? Does that offer any hints to anyone here?

For more information about the difference, please check https://community.hortonworks.com/questions/89263/difference-between-local-vs-yarn-cluster-vs-yarn-c.html In your case, it looks like the machine executing spark-submit in client mode has different IAM policies than the EMR jobflow. My supposition here is that your jobflow role is not allowed to dynamodb:Describe* and that's why you're getting the exception with 500 code (from your gist):

Caused by: com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException: Requested resource not found: Table: EmrTest_20190708143902 not found (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ResourceNotFoundException; Request ID: V0M91J7KEUVR4VM78MF5TKHLEBVV4KQNSO5AEMVJF66Q9ASUAAJG)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.doInvoke(AmazonDynamoDBClient.java:4243)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:4210)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.executeDescribeTable(AmazonDynamoDBClient.java:1890)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.describeTable(AmazonDynamoDBClient.java:1857)
    at org.apache.hadoop.dynamodb.DynamoDBClient$1.call(DynamoDBClient.java:129)
    at org.apache.hadoop.dynamodb.DynamoDBClient$1.call(DynamoDBClient.java:126)
at org.apache.hadoop.dynamodb.DynamoDBFibonacciRetryer.runWithRetry(DynamoDBFibonacciRetryer.java:80)

To confirm this hypothesis, you an execute your part creating the table and waiting for creation locally (no Spark code here, just a simple java command of your main function) and:

  • for the first execution ensure that you have all permissions. IMO it will be dynamodb:Describe* on Resources: * (if it's the reason, AFAIK you should use somthing Resources: Test_Emr* in production for principle of least privilege )
  • for the 2nd execution remove dynamodb:Describe* and check whether you're getting the same stack trace like in the gist
like image 50
Bartosz Konieczny Avatar answered Jan 29 '23 21:01

Bartosz Konieczny

I encountered the same problem in cluster mode too (v2.4.0). I workaround it by launching my apps programmatically using SparkLauncher instead of using spark-submit.sh. You could move your lambda logic into your main method that starts your spark app like this:

def main(args: Array[String]) = {
    // generate a unique table name
    val tableName = generateTableName()

    // call the lambda function
    val result = callLambdaFunction(tableName)

    // wait for the table to be created

    val latch = new CountDownLatch(1);

    val handle = new SparkLauncher(env)
        .setConf("spark.executor.instances", "2")
        .setConf("spark.executor.cores", "2")
        // other conf ... 
        .startApplication(new SparkAppHandle.Listener {
            override def stateChanged(sparkAppHandle: SparkAppHandle): Unit = {

            override def infoChanged(sparkAppHandle: SparkAppHandle): Unit = {


    println("app is launching...")
    println("app exited")
like image 29
Lim Yow Cheng Avatar answered Jan 29 '23 21:01

Lim Yow Cheng