My question is, when should I do dataframe.cache() and when it's usefull?
Also, in my code should I cache the dataframes in the commented lines?
Note: My dataframes are loaded from a Redshift DB.
Many thanks
Here my code:
def sub_tax_transfer_pricing_eur_aux(manager, dataframe, seq_recs, seq_reservas, df_aux): df_vta = manager.get_dataframe(tables['dwc_oth_v_re_v_impuesto_sap_vta']) df_cpa = manager.get_dataframe(tables['dwc_oth_v_re_v_impuesto_sap_cpa']) dataframe = dataframe.filter(dataframe.seq_rec.isin(seq_recs)) \ .filter(dataframe.seq_reserva.isin(seq_reservas)) ################################################## #SHOULD I CACHE HERE df_vta, df_cpa and dataframe ################################################## dataframe = dataframe.join(df_vta, [dataframe.ind_tipo_imp_vta_fac == df_vta.ind_tipo_imp_vta, dataframe.cod_impuesto_vta_fac == df_vta.cod_impuesto_vta, dataframe.cod_clasif_vta_fac == df_vta.cod_clasif_vta, dataframe.cod_esquema_vta_fac == df_vta.cod_esquema_vta, dataframe.cod_empresa_vta_fac == df_vta.cod_emp_atlas_vta, ]).drop("ind_tipo_imp_vta", "cod_impuesto_vta", "cod_clasif_vta", "cod_esquema_vta", "cod_emp_atlas_vta") \ .join(df_cpa, [dataframe.ind_tipo_imp_vta_fac == df_cpa.ind_tipo_imp_cpa, dataframe.cod_impuesto_vta_fac == df_cpa.cod_impuesto_cpa, dataframe.cod_clasif_vta_fac == df_cpa.cod_clasif_cpa, dataframe.cod_esquema_vta_fac == df_cpa.cod_esquema_cpa, dataframe.cod_empresa_vta_fac == df_cpa.cod_emp_atlas_cpa, ]).drop("ind_tipo_imp_cpa", "cod_impuesto_cpa", "cod_clasif_cpa", "cod_esquema_cpa", "cod_emp_atlas_cpa") \ .select("seq_rec", "seq_reserva", "ind_tipo_regimen_fac", "imp_margen_canal", "ind_tipo_regimen_con", "imp_coste", "imp_margen_canco", "imp_venta", "pct_impuesto_vta", "pct_impuesto_cpa") ###################################### #SHOULD I CACHE HERE dataframe AGAIN ? ###################################### dataframe = dataframe.withColumn("amount1", func.when(dataframe.ind_tipo_regimen_fac == 'E', dataframe.imp_margen_canal * ( 1 - (1 / (1 + (dataframe.pct_impuesto_vta / 100))))) .otherwise(dataframe.imp_venta * ( 1 - (1 / (1 + (dataframe.pct_impuesto_vta / 100)))) - ( dataframe.imp_venta - dataframe.imp_margen_canal) * ( 1 - (1 / (1 + (dataframe.pct_impuesto_cpa / 100)))))) dataframe = dataframe.withColumn("amount2", func.when(dataframe.ind_tipo_regimen_con == 'E', dataframe.imp_margen_canco * ( 1 - (1 / (1 + (dataframe.pct_impuesto_vta / 100))))) .otherwise((dataframe.imp_coste + dataframe.imp_margen_canco) * ( 1 - (1 / (1 + (dataframe.pct_impuesto_vta / 100)))) - ( dataframe.imp_coste) * ( 1 - (1 / (1 + (dataframe.pct_impuesto_cpa / 100)))))) dataframe = dataframe.na.fill({'amount1': 0}) dataframe = dataframe.na.fill({'amount2': 0}) dataframe = dataframe.join(df_aux, [dataframe.seq_rec == df_aux.operative_incoming, dataframe.seq_reserva == df_aux.booking_id]) dataframe = dataframe.withColumn("impuesto_canco1", udf_currency_exchange(dataframe.booking_currency, func.lit(EUR), dataframe.creation_date, dataframe.amount1)) dataframe = dataframe.withColumn("impuesto_canco2", udf_currency_exchange(dataframe.booking_currency, func.lit(EUR), dataframe.creation_date, dataframe.amount2)) dataframe = dataframe.withColumn("impuesto_canco", dataframe.impuesto_canco1 + dataframe.impuesto_canco2) dataframe = dataframe.na.fill({'impuesto_canco': 0}) dataframe = dataframe.select("operative_incoming", "booking_id", "impuesto_canco") ###################################### #SHOULD I CACHE HERE dataframe AGAIN ? ###################################### dataframe = dataframe.groupBy("operative_incoming", "booking_id").agg({'impuesto_canco': 'sum'}). \ withColumnRenamed("SUM(impuesto_canco)", "impuesto_canco") return dataframe
Caching is recommended in the following situations: For RDD re-use in iterative machine learning applications. For RDD re-use in standalone Spark applications. When RDD computation is expensive, caching can help in reducing the cost of recovery in the case one executor fails.
cache() is an Apache Spark transformation that can be used on a DataFrame, Dataset, or RDD when you want to perform more than one action. cache() caches the specified DataFrame, Dataset, or RDD in the memory of your cluster's workers.
You can call getStorageLevel. useMemory on the Dataframe and the RDD to find out if the dataset is in memory.
The only difference between cache() and persist() is ,using Cache technique we can save intermediate results in memory only when needed while in Persist() we can save the intermediate results in 5 storage levels(MEMORY_ONLY, MEMORY_AND_DISK, MEMORY_ONLY_SER, MEMORY_AND_DISK_SER, DISK_ONLY).
when should I do dataframe.cache() and when it's usefull?
cache
what you are going to use across queries (and early and often up to available memory). It does not really matter what programming language you use (Python or Scala or Java or SQL or R) as the underlying mechanics is the same.
You can see if a DataFrame was cached in your physical plan using explain
operator (where InMemoryRelation
entities reflect cached datasets with their storage level):
== Physical Plan == *Project [id#0L, id#0L AS newId#16L] +- InMemoryTableScan [id#0L] +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *Range (0, 1, step=1, splits=Some(8))
After you cache
(or persist
) your DataFrame the first query may get slower, but it is going to pay off for the following queries.
You can check whether a Dataset was cached or not using the following code:
scala> :type q2 org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] val cache = spark.sharedState.cacheManager scala> cache.lookupCachedData(q2.queryExecution.logical).isDefined res0: Boolean = false
Also, in my code should I cache the dataframes in the commented lines?
Yes and no. Cache what represents external datasets so you don't pay the extra price of transmitting data across network (while accessing the external storage) every time you query over them.
Don't cache what you use only once or is easy to compute. Otherwise, cache
.
Be careful what you cache, i.e. what Dataset
is cached, as it gives different queries cached.
// cache after range(5) val q1 = spark.range(5).cache.filter($"id" % 2 === 0).select("id") scala> q1.explain == Physical Plan == *Filter ((id#0L % 2) = 0) +- InMemoryTableScan [id#0L], [((id#0L % 2) = 0)] +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *Range (0, 5, step=1, splits=8) // cache at the end val q2 = spark.range(1).filter($"id" % 2 === 0).select("id").cache scala> q2.explain == Physical Plan == InMemoryTableScan [id#17L] +- InMemoryRelation [id#17L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *Filter ((id#17L % 2) = 0) +- *Range (0, 1, step=1, splits=8)
There's one surprise with caching in Spark SQL. Caching is lazy and that's why you pay the extra price to have rows cached the very first action, but that only happens with DataFrame API. In SQL, caching is eager which makes a huge difference in query performance as you don't have you call an action to trigger caching.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With