Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Structured Streaming program that reads from non-empty Kafka topic (starting from earliest) triggers batches locally, but not on EMR cluster

We are running into a problem where -- for one of our applications -- we don't see any evidences of batches being processed in the Structured Streaming tab of the Spark UI.

I have written a small program (below) to reproduce the issue. A self-contained project that allows you to build the app, along with scripts that facilitate upload to AWS, and details on how to run and reproduce the issue can be found here: https://github.com/buildlackey/spark-struct-streaming-metrics-missing-on-aws (The github version of the app is a slightly evolved version of what is presented below, but it illustrates the problem of Spark streaming metrics not showing up.)

The program can be run 'locally' -- on someones' laptop in local[*] mode (say with a dockerized Kafka instance), or on an EMR cluster. For local mode operation you invoke the main method with 'localTest' as the first argument.

In our case, when we run on the EMR cluster, pointing to a topic where we know there are many data records (we read from 'earliest'), we see that THERE ARE INDEED NO BATCHES PROCESSED -- on the cluster for some reason...

In the local[*] case we CAN see batches processed. To capture evidence of this i wrote a forEachBatch handler that simply does a toLocalIterator.asScala.toList.mkString("\n") on the Dataset of each batch, and then dumps the resultant string to a file. Running locally.. i see evidence of the captured records in the temporary file. HOWEVER, when I run on the cluster and i ssh into one of the executors i see NO SUCH files. I also checked the master node.... no files matching the pattern 'Missing' So... batches are not triggering on the cluster. Our kakfa has plenty of data and when running on the cluster the logs show we are churning through messages at increasing offsets:

21/12/16 05:15:21 DEBUG KafkaDataConsumer: Get spark-kafka-source-blah topic.foo.event-18 nextOffset 4596542913 requested 4596542913
21/12/16 05:15:21 DEBUG KafkaDataConsumer: Get spark-kafka-source-blah topic.foo.event-18 nextOffset 4596542914 requested 4596542914

Note to get the logs we are using:

yarn yarn logs --applicationId <appId>

which should get both driver and executor logs for the entire run (when app terminates)

Now, in the local[*] case we CAN see batches processed. The evidence is that we see a file whose name is matching the pattern 'Missing' in our tmp folder.

I am including my simple demo program below. If you can spot the issue and clue us in, I'd be very grateful !

// Please forgive the busy code.. i stripped this down from a much larger system....
import com.typesafe.scalalogging.StrictLogging
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.{Dataset, SparkSession}
import java.io.File
import java.util
import scala.collection.JavaConverters.asScalaIteratorConverter
import scala.concurrent.duration.Duration

object AwsSupportCaseFailsToYieldLogs extends StrictLogging {
  case class KafkaEvent(fooMsgKey: Array[Byte],
                        fooMsg: Array[Byte],
                        topic: String,
                        partition: String,
                        offset: String) extends Serializable

  case class SparkSessionConfig(appName: String, master: String) {
    def sessionBuilder(): SparkSession.Builder = {
      val builder = SparkSession.builder
      builder.master(master)
      builder
    }
  }

  case class KafkaConfig(kafkaBootstrapServers: String, kafkaTopic: String, kafkaStartingOffsets: String)
    def sessionFactory: (SparkSessionConfig) => SparkSession = {
      (sparkSessionConfig) => {
        sparkSessionConfig.sessionBuilder().getOrCreate()
      }
    }

    def main(args: Array[String]): Unit = {
      val (sparkSessionConfig, kafkaConfig) =
        if (args.length >= 1 && args(0) == "localTest") {
          getLocalTestConfiguration
        } else {
          getRunOnClusterConfiguration
        }

      val spark: SparkSession = sessionFactory(sparkSessionConfig)

      spark.sparkContext.setLogLevel("ERROR")

      import spark.implicits._

      val dataSetOfKafkaEvent: Dataset[KafkaEvent] = spark.readStream.
        format("kafka").
        option("subscribe", kafkaConfig.kafkaTopic).
        option("kafka.bootstrap.servers", kafkaConfig.kafkaBootstrapServers).
        option("startingOffsets", kafkaConfig.kafkaStartingOffsets).
        load.
        select(
          $"key" cast "binary",
          $"value" cast "binary",
          $"topic",
          $"partition" cast "string",
          $"offset" cast "string").map { row =>

        KafkaEvent(
          row.getAs[Array[Byte]](0),
          row.getAs[Array[Byte]](1),
          row.getAs[String](2),
          row.getAs[String](3),
          row.getAs[String](4))
      }

      val initDF = dataSetOfKafkaEvent.map { item: KafkaEvent => item.toString }
      val function: (Dataset[String], Long) => Unit =
        (dataSetOfString, batchId) => {
          val iter: util.Iterator[String] = dataSetOfString.toLocalIterator()

          val lines  = iter.asScala.toList.mkString("\n")
          val outfile = writeStringToTmpFile(lines)
          println(s"writing to file: ${outfile.getAbsolutePath}")
          logger.error(s"writing to file: ${outfile.getAbsolutePath} /  $lines")
        }
      val trigger = Trigger.ProcessingTime(Duration("1 second"))

      initDF.writeStream
        .foreachBatch(function)
        .trigger(trigger)
        .outputMode("append")
        .start
        .awaitTermination()
    }

    private def getLocalTestConfiguration: (SparkSessionConfig, KafkaConfig) = {
      val sparkSessionConfig: SparkSessionConfig =
        SparkSessionConfig(master = "local[*]", appName = "dummy2")
      val kafkaConfig: KafkaConfig =
        KafkaConfig(
          kafkaBootstrapServers = "localhost:9092",
          kafkaTopic = "test-topic",
          kafkaStartingOffsets = "earliest")
      (sparkSessionConfig, kafkaConfig)
    }

    private def getRunOnClusterConfiguration = {
      val sparkSessionConfig: SparkSessionConfig = SparkSessionConfig(master = "yarn", appName = "AwsSupportCase")
      val kafkaConfig: KafkaConfig =
        KafkaConfig(
          kafkaBootstrapServers= "kafka.foo.bar.broker:9092",         //  TODO - change this for kafka on your EMR cluster.
          kafkaTopic= "mongo.bongo.event",                            //  TODO - change this for kafka on your EMR cluster.
          kafkaStartingOffsets = "earliest")
      (sparkSessionConfig, kafkaConfig)
    }

  def writeStringFile(string: String, file: File): File = {
    java.nio.file.Files.write(java.nio.file.Paths.get(file.getAbsolutePath), string.getBytes).toFile
  }

  def writeStringToTmpFile(string: String, deleteOnExit: Boolean = false): File = {
    val file: File = File.createTempFile("streamingConsoleMissing", "sad")
    if (deleteOnExit) {
      file.delete()
    }
    writeStringFile(string, file)
  }
}
like image 266
Chris Bedford Avatar asked Nov 06 '22 23:11

Chris Bedford


1 Answers

I have encountered similar issue, maxOffsetsPerTrigger would fix the issue. Actually, it's not issue.

  1. All logs and metrics per batch are only printed or showing after finish of this batch. That's the reason why you can't see the job make progress.
  2. If maxOffsetsPerTrigger can't solve the issue, you could try to consume from latest offset to confirm the procssing logic is correct.
like image 108
Warren Zhu Avatar answered Nov 15 '22 09:11

Warren Zhu