Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

what does load() do in spark?

spark is lazy right? so what does load() do?

start = timeit.default_timer()

 df = sqlContext.read.option(
     "es.resource", indexes
 ).format("org.elasticsearch.spark.sql")
 end = timeit.default_timer()

 print('without load: ', end - start) # almost instant
 start = timeit.default_timer()

 df = df.load()
 end = timeit.default_timer()
 print('load: ', end - start) # takes 1sec

 start = timeit.default_timer()

 df.show()
 end = timeit.default_timer()
 print('show: ', end - start) # takes 4 sec

If show() is the only action, I would guess load won't take much time as 1sec. So I'm concluding load() is an action (as opposed to transformation in spark)

Does load actually load whole data into memory? I don't think so, but then what does it do?

I've searched and looked at the doc https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html but it doesn't help..

like image 944
eugene Avatar asked Apr 13 '26 01:04

eugene


2 Answers

tl;dr load() is a DataFrameReader api(org.apache.spark.sql.DataFrameReader#load) as seen from the below code, that returns a DataFrame, on top which Spark transformations can be applied.

/**
   * Loads input in as a `DataFrame`, for data sources that support multiple paths.
   * Only works if the source is a HadoopFsRelationProvider.
   *
   * @since 1.6.0
   */
  @scala.annotation.varargs
  def load(paths: String*): DataFrame

One needs to create a DataFrame to perform a transformation.
To create a dataframe from a path(HDFS, S3 etc), users can use spark.read.format("<format>").load().(There are datasource specific API as well that loads the files automatically like spark.read.parquet(<path>))

Why does it take whole 1 second?

In file based sources, this time can be attributed to listing of files. In HDFS these listing is not expensive, where as in case of cloud storage like S3, this listing is very expensive and takes time propotionate to number of files.
In your case the datasource used is elastic-search, The time can be attributed to connection establishment, collecting metadata to perform a distributed scan etc which depends on Elastic serach connector impl. We can enable the debug logs and check for more information. If elastic search has way to log the request it received, we could check the elastic search logs for the requests that were made after the time load() was fired.

like image 64
DaRkMaN Avatar answered Apr 17 '26 16:04

DaRkMaN


It does nothing. It is just part of the sqlContext.read as a parameter, that you did not set directly on the read. read allows data formats to be specified. The DF or underlying RDD is evaluated lazily as they say.

like image 30
thebluephantom Avatar answered Apr 17 '26 18:04

thebluephantom