Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Low parallelism when running Apache Beam wordcount pipeline on Spark with Python SDK

I am quite experienced with Spark cluster configuration and running Pyspark pipelines, but I'm just starting with Beam. So, I am trying to do an apple-to-apple comparison between Pyspark and the Beam python SDK on a Spark PortableRunner (running on top of the same small Spark cluster, 4 workers each with 4 cores and 8GB RAM), and I've settled on a wordcount job for a reasonably large dataset, storing the results in a Parquet table.

I have thus downloaded 50GB of Wikipedia text files, splitted across about 100 uncompressed files, and stored them in the directory /mnt/nfs_drive/wiki_files/ (/mnt/nfs_drive is a NFS drive mounted on all workers).

First, I am running the following Pyspark wordcount script:

from pyspark.sql import SparkSession, Row
from operator import add
wiki_files = '/mnt/nfs_drive/wiki_files/*'

spark = SparkSession.builder.appName("WordCountSpark").getOrCreate()

spark_counts = spark.read.text(wiki_files).rdd.map(lambda r: r['value']) \
    .flatMap(lambda x: x.split(' ')) \
    .map(lambda x: (x, 1)) \
    .reduceByKey(add) \
    .map(lambda x: Row(word=x[0], count=x[1]))

spark.createDataFrame(spark_counts).write.parquet(path='/mnt/nfs_drive/spark_output', mode='overwrite')

The script runs perfectly well and outputs the parquet files at the desired location in about 8 minutes. The main stage (reading and splitting tokens) is divided in a reasonable number of tasks, so that the cluster is used efficiently: enter image description here

I am now trying to achieve the same with Beam and the portable runner. First, I have started the Spark job server (on the Spark master node) with the following command:

docker run --rm --net=host -e SPARK_EXECUTOR_MEMORY=8g apache/beam_spark_job_server:2.25.0 --spark-master-url=spark://localhost:7077

Then, on the master and worker nodes, I am running the SDK Harness as follows:

 docker run --net=host -d --rm -v /mnt/nfs_drive:/mnt/nfs_drive apache/beam_python3.6_sdk:2.25.0 --worker_pool

Now that the Spark cluster is set up to run Beam pipelines, I can submit the following script:

import apache_beam as beam
import pyarrow
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import fileio

options = PipelineOptions([
    "--runner=PortableRunner",
    "--job_endpoint=localhost:8099",
    "--environment_type=EXTERNAL",
    "--environment_config=localhost:50000",
    "--job_name=WordCountBeam"
])


wiki_files = '/mnt/nfs_drive/wiki_files/*'

p = beam.Pipeline(options=options)
beam_counts = (
    p
    | fileio.MatchFiles(wiki_files)
    | beam.Map(lambda x: x.path)
    | beam.io.ReadAllFromText()
    | 'ExtractWords' >> beam.FlatMap(lambda x: x.split(' '))
    | beam.combiners.Count.PerElement()
    | beam.Map(lambda x: {'word': x[0], 'count': x[1]})
)


_ = beam_counts | 'Write' >> beam.io.WriteToParquet('/mnt/nfs_drive/beam_output',
      pyarrow.schema(
          [('word', pyarrow.binary()), ('count', pyarrow.int64())]
      )
)

result = p.run().wait_until_finish()

The code is submitted successfully, I can see the job on the Spark UI and the workers are executing it. However, even if left running for more than 1 hour, it does not produce any output!

I thus wanted to make sure that there is not a problem with my setup, so I've run the exact same script on a smaller dataset (just 1 Wiki file). This completes successfully in about 3.5 minutes (Spark wordcount on the same dataset takes 16s!).

I wondered how could Beam be that slower, so I started looking at the DAG submitted to Spark by the Beam pipeline via the job server. I noticed that the Spark job spends most of the time in the following stage: enter image description here

This is just splitted in 2 tasks, as shown here: enter image description here

Printing debugging lines show that this task is where the "heavy-lifting" (i.e. reading lines from the wiki files and splitting tokens) is performed - however, since this happens in 2 tasks only, the work will be distributed on 2 workers at most. What's also interesting is that running on the large 50GB dataset results on exactly the same DAG with exactly the same number of tasks.

I am quite unsure how to proceed further. It seems like the Beam pipeline has reduced parallelism, but I'm not sure if this is due to sub-optimal translation of the pipeline by the job server, or whether I should specify my PTransforms in some other way to increase the parallelism on Spark.

Any suggestion appreciated!

like image 952
hiryu Avatar asked Nov 17 '20 16:11

hiryu


1 Answers

The file IO part of the pipeline can be simplified by using apache_beam.io.textio.ReadFromText(file_pattern='/mnt/nfs_drive/wiki_files/*').

Fusion is another reason that could prevent parallelism. The solution is to throw in a apache_beam.transforms.util.Reshuffle after reading in all the files.

like image 112
Jiayuan Ma Avatar answered Oct 25 '22 20:10

Jiayuan Ma