I have tested that both logger
and print
can't print message in a pandas_udf
, either in cluster mode or client mode.
Test code:
import sys
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import logging
logger = logging.getLogger('test')
spark = (SparkSession
.builder
.appName('test')
.getOrCreate())
df = spark.createDataFrame(pd.DataFrame({
'y': np.random.randint(1, 10, (20,)),
'ds': np.random.randint(1000, 9999, (20,)),
'store_id' : ['a'] * 10 + ['b'] *7 + ['q']*3,
'product_id' : ['c'] * 5 + ['d'] *12 + ['e']*3,
})
)
@pandas_udf('y int, ds int, store_id string, product_id string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
print('#'*100)
logger.info('$'*100)
logger.error('&'*100)
return pd.DataFrame([], columns=['y', 'ds','store_id','product_id'])
df1 = df.groupby(['store_id', 'product_id']).apply(train_predict)
Also note:
log4jLogger = spark.sparkContext._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
LOGGER.info("#"*50)
You can't use this in pandas_udf
, because this log beyond to spark context object, you can't refer to spark session/context in a udf.
The only way I know is use Excetion
as the answer I wrote below.
But it is tricky and with drawback.
I want to know if there is any way to just print message in pandas_udf.
A pandas user-defined function (UDF)—also known as vectorized UDF—is a user-defined function that uses Apache Arrow to transfer data and pandas to work with the data. pandas UDFs allow vectorized operations that can increase performance up to 100x compared to row-at-a-time Python UDFs.
By default, Pyspark reads all the data in the form of strings. So, we call our data variable then it returns every column with its number in the form of a string. To print, the raw data call the show() function with the data variable using the dot operator – '.
Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. A Pandas UDF is defined using the pandas_udf as a decorator or to wrap the function, and no additional configuration is required.
Currently, I tried every way in spark 2.4 .
Without log, it is hard to debug a faulty pandas_udf. The only workable way I know can print error messgage in pandas_udf is raise Exception
. So it really cost time to debug in this way, but there isn't a better way I know .
@pandas_udf('y int, ds int, store_id string, product_id string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
print('#'*100)
logger.info('$'*100)
logger.error('&'*100)
raise Exception('@'*100) # The only way I know can print message but would break execution
return pd.DataFrame([], columns=['y', 'ds','store_id','product_id'])
The drawback is you can't keep spark running after print message.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With