Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

When to cache a DataFrame?

My question is, when should I do dataframe.cache() and when it's usefull?

Also, in my code should I cache the dataframes in the commented lines?

Note: My dataframes are loaded from a Redshift DB.

Many thanks

Here my code:

def sub_tax_transfer_pricing_eur_aux(manager, dataframe, seq_recs, seq_reservas, df_aux):     df_vta = manager.get_dataframe(tables['dwc_oth_v_re_v_impuesto_sap_vta'])     df_cpa = manager.get_dataframe(tables['dwc_oth_v_re_v_impuesto_sap_cpa'])      dataframe = dataframe.filter(dataframe.seq_rec.isin(seq_recs)) \         .filter(dataframe.seq_reserva.isin(seq_reservas))      ##################################################     #SHOULD I CACHE HERE df_vta, df_cpa and dataframe     ##################################################      dataframe = dataframe.join(df_vta, [dataframe.ind_tipo_imp_vta_fac == df_vta.ind_tipo_imp_vta,                                         dataframe.cod_impuesto_vta_fac == df_vta.cod_impuesto_vta,                                         dataframe.cod_clasif_vta_fac == df_vta.cod_clasif_vta,                                         dataframe.cod_esquema_vta_fac == df_vta.cod_esquema_vta,                                         dataframe.cod_empresa_vta_fac == df_vta.cod_emp_atlas_vta,                                         ]).drop("ind_tipo_imp_vta", "cod_impuesto_vta", "cod_clasif_vta",                                                 "cod_esquema_vta", "cod_emp_atlas_vta") \         .join(df_cpa, [dataframe.ind_tipo_imp_vta_fac == df_cpa.ind_tipo_imp_cpa,                        dataframe.cod_impuesto_vta_fac == df_cpa.cod_impuesto_cpa,                        dataframe.cod_clasif_vta_fac == df_cpa.cod_clasif_cpa,                        dataframe.cod_esquema_vta_fac == df_cpa.cod_esquema_cpa,                        dataframe.cod_empresa_vta_fac == df_cpa.cod_emp_atlas_cpa,                        ]).drop("ind_tipo_imp_cpa", "cod_impuesto_cpa", "cod_clasif_cpa",                                "cod_esquema_cpa", "cod_emp_atlas_cpa") \         .select("seq_rec", "seq_reserva", "ind_tipo_regimen_fac", "imp_margen_canal", "ind_tipo_regimen_con",                 "imp_coste", "imp_margen_canco", "imp_venta", "pct_impuesto_vta", "pct_impuesto_cpa")      ######################################              #SHOULD I CACHE HERE dataframe AGAIN ?     ######################################      dataframe = dataframe.withColumn("amount1",                                      func.when(dataframe.ind_tipo_regimen_fac == 'E',                                                dataframe.imp_margen_canal * (                                                    1 - (1 / (1 + (dataframe.pct_impuesto_vta                                                                   / 100)))))                                      .otherwise(dataframe.imp_venta * (                                          1 - (1 / (1 + (dataframe.pct_impuesto_vta / 100)))) - (                                                     dataframe.imp_venta - dataframe.imp_margen_canal) * (                                                     1 - (1 / (1 + (dataframe.pct_impuesto_cpa / 100))))))      dataframe = dataframe.withColumn("amount2",                                      func.when(dataframe.ind_tipo_regimen_con == 'E',                                                dataframe.imp_margen_canco * (                                                    1 - (1 / (1 + (dataframe.pct_impuesto_vta                                                                   / 100)))))                                      .otherwise((dataframe.imp_coste + dataframe.imp_margen_canco) * (                                          1 - (1 / (1 + (dataframe.pct_impuesto_vta / 100)))) - (                                                     dataframe.imp_coste) * (                                                     1 - (1 / (1 + (dataframe.pct_impuesto_cpa / 100))))))      dataframe = dataframe.na.fill({'amount1': 0})     dataframe = dataframe.na.fill({'amount2': 0})      dataframe = dataframe.join(df_aux, [dataframe.seq_rec == df_aux.operative_incoming,                                         dataframe.seq_reserva == df_aux.booking_id])      dataframe = dataframe.withColumn("impuesto_canco1", udf_currency_exchange(dataframe.booking_currency,                                                                              func.lit(EUR),                                                                              dataframe.creation_date,                                                                              dataframe.amount1))      dataframe = dataframe.withColumn("impuesto_canco2", udf_currency_exchange(dataframe.booking_currency,                                                                              func.lit(EUR),                                                                              dataframe.creation_date,                                                                              dataframe.amount2))      dataframe = dataframe.withColumn("impuesto_canco", dataframe.impuesto_canco1 + dataframe.impuesto_canco2)      dataframe = dataframe.na.fill({'impuesto_canco': 0})      dataframe = dataframe.select("operative_incoming", "booking_id", "impuesto_canco")     ######################################              #SHOULD I CACHE HERE dataframe AGAIN ?     ######################################     dataframe = dataframe.groupBy("operative_incoming", "booking_id").agg({'impuesto_canco': 'sum'}). \         withColumnRenamed("SUM(impuesto_canco)", "impuesto_canco")      return dataframe 
like image 453
Alezis Avatar asked May 24 '17 10:05

Alezis


People also ask

When should you cache Pyspark?

Caching is recommended in the following situations: For RDD re-use in iterative machine learning applications. For RDD re-use in standalone Spark applications. When RDD computation is expensive, caching can help in reducing the cost of recovery in the case one executor fails.

What does cache () do in Pyspark?

cache() is an Apache Spark transformation that can be used on a DataFrame, Dataset, or RDD when you want to perform more than one action. cache() caches the specified DataFrame, Dataset, or RDD in the memory of your cluster's workers.

How do I know if a data frame is cached?

You can call getStorageLevel. useMemory on the Dataframe and the RDD to find out if the dataset is in memory.

Which is better cache or persist?

The only difference between cache() and persist() is ,using Cache technique we can save intermediate results in memory only when needed while in Persist() we can save the intermediate results in 5 storage levels(MEMORY_ONLY, MEMORY_AND_DISK, MEMORY_ONLY_SER, MEMORY_AND_DISK_SER, DISK_ONLY).


1 Answers

when should I do dataframe.cache() and when it's usefull?

cache what you are going to use across queries (and early and often up to available memory). It does not really matter what programming language you use (Python or Scala or Java or SQL or R) as the underlying mechanics is the same.

You can see if a DataFrame was cached in your physical plan using explain operator (where InMemoryRelation entities reflect cached datasets with their storage level):

== Physical Plan == *Project [id#0L, id#0L AS newId#16L] +- InMemoryTableScan [id#0L]       +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)             +- *Range (0, 1, step=1, splits=Some(8)) 

After you cache (or persist) your DataFrame the first query may get slower, but it is going to pay off for the following queries.

You can check whether a Dataset was cached or not using the following code:

scala> :type q2 org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]  val cache = spark.sharedState.cacheManager scala> cache.lookupCachedData(q2.queryExecution.logical).isDefined res0: Boolean = false 

Also, in my code should I cache the dataframes in the commented lines?

Yes and no. Cache what represents external datasets so you don't pay the extra price of transmitting data across network (while accessing the external storage) every time you query over them.

Don't cache what you use only once or is easy to compute. Otherwise, cache.


Be careful what you cache, i.e. what Dataset is cached, as it gives different queries cached.

// cache after range(5) val q1 = spark.range(5).cache.filter($"id" % 2 === 0).select("id") scala> q1.explain == Physical Plan == *Filter ((id#0L % 2) = 0) +- InMemoryTableScan [id#0L], [((id#0L % 2) = 0)]       +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)             +- *Range (0, 5, step=1, splits=8)  // cache at the end val q2 = spark.range(1).filter($"id" % 2 === 0).select("id").cache scala> q2.explain == Physical Plan == InMemoryTableScan [id#17L]    +- InMemoryRelation [id#17L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)          +- *Filter ((id#17L % 2) = 0)             +- *Range (0, 1, step=1, splits=8) 

There's one surprise with caching in Spark SQL. Caching is lazy and that's why you pay the extra price to have rows cached the very first action, but that only happens with DataFrame API. In SQL, caching is eager which makes a huge difference in query performance as you don't have you call an action to trigger caching.

like image 70
Jacek Laskowski Avatar answered Oct 14 '22 10:10

Jacek Laskowski