Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to run a function on all Spark workers before processing data in PySpark?

Tags:

I'm running a Spark Streaming task in a cluster using YARN. Each node in the cluster runs multiple spark workers. Before the streaming starts I want to execute a "setup" function on all workers on all nodes in the cluster.

The streaming task classifies incoming messages as spam or not spam, but before it can do that it needs to download the latest pre-trained models from HDFS to local disk, like this pseudo code example:

def fetch_models():     if hadoop.version > local.version:         hadoop.download() 

I've seen the following examples here on SO:

sc.parallelize().map(fetch_models) 

But in Spark 1.6 parallelize() requires some data to be used, like this shitty work-around I'm doing now:

sc.parallelize(range(1, 1000)).map(fetch_models) 

Just to be fairly sure that the function is run on ALL workers I set the range to 1000. I also don't exactly know how many workers are in the cluster when running.

I've read the programming documentation and googled relentlessly but I can't seem to find any way to actually just distribute anything to all workers without any data.

After this initialization phase is done, the streaming task is as usual, operating on incoming data from Kafka.

The way I'm using the models is by running a function similar to this:

spark_partitions = config.get(ConfigKeys.SPARK_PARTITIONS) stream.union(*create_kafka_streams())\     .repartition(spark_partitions)\     .foreachRDD(lambda rdd: rdd.foreachPartition(lambda partition: spam.on_partition(config, partition))) 

Theoretically I could check whether or not the models are up to date in the on_partition function, though it would be really wasteful to do this on each batch. I'd like to do it before Spark starts retrieving batches from Kafka, since the downloading from HDFS can take a couple of minutes...

UPDATE:

To be clear: it's not an issue on how to distribute the files or how to load them, it's about how to run an arbitrary method on all workers without operating on any data.

To clarify what actually loading models means currently:

def on_partition(config, partition):     if not MyClassifier.is_loaded():         MyClassifier.load_models(config)      handle_partition(config, partition) 

While MyClassifier is something like this:

class MyClassifier:     clf = None      @staticmethod     def is_loaded():         return MyClassifier.clf is not None      @staticmethod     def load_models(config):         MyClassifier.clf = load_from_file(config) 

Static methods since PySpark doesn't seem to be able to serialize classes with non-static methods (the state of the class is irrelevant with relation to another worker). Here we only have to call load_models() once, and on all future batches MyClassifier.clf will be set. This is something that should really not be done for each batch, it's a one time thing. Same with downloading the files from HDFS using fetch_models().

like image 939
Oscar Avatar asked May 20 '16 09:05

Oscar


People also ask

Which action executes the function passed again and again until only one value is left?

reduce() – executes the function passed again and again until only one value is left. The function should take two arguments and return one value.

What is SC parallelize in PySpark?

The sc. parallelize() method is the SparkContext's parallelize method to create a parallelized collection. This allows Spark to distribute the data across multiple nodes, instead of depending on a single node to process the data: Now that we have created ... Get PySpark Cookbook now with the O'Reilly learning platform.

What does worker node do in Spark?

Worker node refers to node which runs the application code in the cluster. Worker Node is the Slave Node. Master node assign work and worker node actually perform the assigned tasks. Worker node processes the data stored on the node, they report the resources to the master.

How do you make a SparkContext?

To create a SparkContext you first need to build a SparkConf object that contains information about your application. SparkConf conf = new SparkConf(). setAppName(appName). setMaster(master); JavaSparkContext sc = new JavaSparkContext(conf);


2 Answers

If all you want is to distribute a file between worker machines the simplest approach is to use SparkFiles mechanism:

some_path = ...  # local file, a file in DFS, an HTTP, HTTPS or FTP URI. sc.addFile(some_path) 

and retrieve it on the workers using SparkFiles.get and standard IO tools:

from pyspark import SparkFiles  with open(SparkFiles.get(some_path)) as fw:     ... # Do something 

If you want to make sure that model is actually loaded the simplest approach is to load on module import. Assuming config can be used to retrieve model path:

  • model.py:

    from pyspark import SparkFiles  config = ... class MyClassifier:     clf = None      @staticmethod     def is_loaded():         return MyClassifier.clf is not None      @staticmethod     def load_models(config):         path = SparkFiles.get(config.get("model_file"))         MyClassifier.clf = load_from_file(path)  # Executed once per interpreter  MyClassifier.load_models(config)   
  • main.py:

    from pyspark import SparkContext  config = ...  sc = SparkContext("local", "foo")  # Executed before StreamingContext starts sc.addFile(config.get("model_file")) sc.addPyFile("model.py")  import model  ssc = ... stream = ... stream.map(model.MyClassifier.do_something).pprint()  ssc.start() ssc.awaitTermination() 
like image 86
zero323 Avatar answered Sep 19 '22 11:09

zero323


This is a typical use case for Spark's broadcast variables. Let's say fetch_models returns the models rather than saving them locally, you would do something like:

bc_models = sc.broadcast(fetch_models())  spark_partitions = config.get(ConfigKeys.SPARK_PARTITIONS) stream.union(*create_kafka_streams())\     .repartition(spark_partitions)\     .foreachRDD(lambda rdd: rdd.foreachPartition(lambda partition: spam.on_partition(config, partition, bc_models.value))) 

This does assume that your models fit in memory, on the driver and the executors.

You may be worried that broadcasting the models from the single driver to all the executors is inefficient, but it uses 'efficient broadcast algorithms' that can outperform distributing through HDFS significantly according to this analysis

like image 20
sgvd Avatar answered Sep 21 '22 11:09

sgvd