Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

SparkR DataFrame partitioning issue

In my R script, I have a SparkDataFrame of two columns (time, value) containing data for four different months. Because of the fact that I need to apply my function to an each month separately, I figured I would repartition it into four partitions where each of them would hold data for a separate month.

I created an additional column called partition, having an integer value 0 - 3 and after that called the repartition method by this specific column.

Sadly as it's being described in this topic: Spark SQL - Difference between df.repartition and DataFrameWriter partitionBy?, with the repartition method we are only sure that all the data with the same key will end up in the same partition, however data with a different key can also end up in the same partition.

In my case, executing code visible below results in creating 4 partitions but populating only 2 of them with data.

I guess I should be using the partitionBy method, however in case of SparkR I have no idea how to do that. The official documentation states that this method is applied to something called WindowSpec and not a DataFrame.

I would really appreciate some help with this matter as I have no idea how to incorporate this method into my code.

sparkR.session(
   master="local[*]",  sparkConfig = list(spark.sql.shuffle.partitions="4"))
df <- as.DataFrame(inputDat) # this is a dataframe with added partition column
repartitionedDf <- repartition(df, col = df$partition)

schema <- structType(
  structField("time", "timestamp"), 
  structField("value", "double"), 
  structField("partition", "string"))

processedDf <- dapply(repartitionedDf, 
  function(x) { data.frame(produceHourlyResults(x), stringsAsFactors = FALSE) },
  schema)
like image 703
Kamil Potoczny Avatar asked Jan 26 '18 15:01

Kamil Potoczny


1 Answers

You are using wrong method. If you

need to apply my function to an each month separately

you should use gapply that

Groups the SparkDataFrame using the specified columns and applies the R function to each group.

df %>% group_by("month") %>% gapply(fun, schema)

or

df %>% gapply("month", fun, schema)

In my case, executing code visible below results in creating 4 partitions but populating only 2 of them with data.

This suggests hash collisions. Increasing number of partitions reasonably above the number of unique keys should resolve the problem:

spark.sql.shuffle.partitions 17

I guess i should be using the partitionBy method, however

No. partitionBy is used with window functions (SparkR window function).


To address your comment:

i decided to use dapply with separate partitions in order to be able to easily save each month into separate CSV file

Hash partitioner doesn't work like that How does HashPartitioner work?

You can try with partitionBy in the writer, but I am not sure if it directly supported in SparkR. It is supported in structured streaming, for batch you may have to call Java methods or use tables with metastore:

createDataFrame(iris) %>% createOrReplaceTempView("iris_view")
sql(
    "CREATE TABLE iris 
    USING csv PARTITIONED BY(species)
    LOCATION '/tmp/iris' AS SELECT * FROM iris_view"
)
like image 163
Alper t. Turker Avatar answered Sep 23 '22 17:09

Alper t. Turker