I am quite experienced with Spark cluster configuration and running Pyspark pipelines, but I'm just starting with Beam. So, I am trying to do an apple-to-apple comparison between Pyspark and the Beam python SDK on a Spark PortableRunner (running on top of the same small Spark cluster, 4 workers each with 4 cores and 8GB RAM), and I've settled on a wordcount job for a reasonably large dataset, storing the results in a Parquet table.
I have thus downloaded 50GB of Wikipedia text files, splitted across about 100 uncompressed files, and stored them in the directory /mnt/nfs_drive/wiki_files/
(/mnt/nfs_drive
is a NFS drive mounted on all workers).
First, I am running the following Pyspark wordcount script:
from pyspark.sql import SparkSession, Row
from operator import add
wiki_files = '/mnt/nfs_drive/wiki_files/*'
spark = SparkSession.builder.appName("WordCountSpark").getOrCreate()
spark_counts = spark.read.text(wiki_files).rdd.map(lambda r: r['value']) \
.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add) \
.map(lambda x: Row(word=x[0], count=x[1]))
spark.createDataFrame(spark_counts).write.parquet(path='/mnt/nfs_drive/spark_output', mode='overwrite')
The script runs perfectly well and outputs the parquet files at the desired location in about 8 minutes. The main stage (reading and splitting tokens) is divided in a reasonable number of tasks, so that the cluster is used efficiently:
I am now trying to achieve the same with Beam and the portable runner. First, I have started the Spark job server (on the Spark master node) with the following command:
docker run --rm --net=host -e SPARK_EXECUTOR_MEMORY=8g apache/beam_spark_job_server:2.25.0 --spark-master-url=spark://localhost:7077
Then, on the master and worker nodes, I am running the SDK Harness as follows:
docker run --net=host -d --rm -v /mnt/nfs_drive:/mnt/nfs_drive apache/beam_python3.6_sdk:2.25.0 --worker_pool
Now that the Spark cluster is set up to run Beam pipelines, I can submit the following script:
import apache_beam as beam
import pyarrow
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import fileio
options = PipelineOptions([
"--runner=PortableRunner",
"--job_endpoint=localhost:8099",
"--environment_type=EXTERNAL",
"--environment_config=localhost:50000",
"--job_name=WordCountBeam"
])
wiki_files = '/mnt/nfs_drive/wiki_files/*'
p = beam.Pipeline(options=options)
beam_counts = (
p
| fileio.MatchFiles(wiki_files)
| beam.Map(lambda x: x.path)
| beam.io.ReadAllFromText()
| 'ExtractWords' >> beam.FlatMap(lambda x: x.split(' '))
| beam.combiners.Count.PerElement()
| beam.Map(lambda x: {'word': x[0], 'count': x[1]})
)
_ = beam_counts | 'Write' >> beam.io.WriteToParquet('/mnt/nfs_drive/beam_output',
pyarrow.schema(
[('word', pyarrow.binary()), ('count', pyarrow.int64())]
)
)
result = p.run().wait_until_finish()
The code is submitted successfully, I can see the job on the Spark UI and the workers are executing it. However, even if left running for more than 1 hour, it does not produce any output!
I thus wanted to make sure that there is not a problem with my setup, so I've run the exact same script on a smaller dataset (just 1 Wiki file). This completes successfully in about 3.5 minutes (Spark wordcount on the same dataset takes 16s!).
I wondered how could Beam be that slower, so I started looking at the DAG submitted to Spark by the Beam pipeline via the job server. I noticed that the Spark job spends most of the time in the following stage:
This is just splitted in 2 tasks, as shown here:
Printing debugging lines show that this task is where the "heavy-lifting" (i.e. reading lines from the wiki files and splitting tokens) is performed - however, since this happens in 2 tasks only, the work will be distributed on 2 workers at most. What's also interesting is that running on the large 50GB dataset results on exactly the same DAG with exactly the same number of tasks.
I am quite unsure how to proceed further. It seems like the Beam pipeline has reduced parallelism, but I'm not sure if this is due to sub-optimal translation of the pipeline by the job server, or whether I should specify my PTransforms in some other way to increase the parallelism on Spark.
Any suggestion appreciated!
The file IO part of the pipeline can be simplified by using apache_beam.io.textio.ReadFromText(file_pattern='/mnt/nfs_drive/wiki_files/*')
.
Fusion is another reason that could prevent parallelism. The solution is to throw in a apache_beam.transforms.util.Reshuffle
after reading in all the files.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With