Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to log/print message in pyspark pandas_udf?

I have tested that both logger and print can't print message in a pandas_udf , either in cluster mode or client mode.

Test code:

import sys
import numpy as np
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import logging

logger = logging.getLogger('test')

spark = (SparkSession
.builder
.appName('test')
.getOrCreate())


df = spark.createDataFrame(pd.DataFrame({
    'y': np.random.randint(1, 10, (20,)),
    'ds': np.random.randint(1000, 9999, (20,)),
    'store_id' : ['a'] * 10 + ['b'] *7 + ['q']*3,
    'product_id' : ['c'] * 5 + ['d'] *12 + ['e']*3,
    })
)


@pandas_udf('y int, ds int, store_id string, product_id string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
    print('#'*100)
    logger.info('$'*100)
    logger.error('&'*100)
    return pd.DataFrame([], columns=['y', 'ds','store_id','product_id'])


df1 = df.groupby(['store_id', 'product_id']).apply(train_predict)

Also note:

log4jLogger = spark.sparkContext._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
LOGGER.info("#"*50)

You can't use this in pandas_udf, because this log beyond to spark context object, you can't refer to spark session/context in a udf.

The only way I know is use Excetion as the answer I wrote below. But it is tricky and with drawback. I want to know if there is any way to just print message in pandas_udf.

like image 473
Mithril Avatar asked Jul 24 '19 05:07

Mithril


People also ask

What is@ pandasudf?

A pandas user-defined function (UDF)—also known as vectorized UDF—is a user-defined function that uses Apache Arrow to transfer data and pandas to work with the data. pandas UDFs allow vectorized operations that can increase performance up to 100x compared to row-at-a-time Python UDFs.

How do you print strings in Pyspark?

By default, Pyspark reads all the data in the form of strings. So, we call our data variable then it returns every column with its number in the form of a string. To print, the raw data call the show() function with the data variable using the dot operator – '.

How does pandas UDF work?

Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. A Pandas UDF is defined using the pandas_udf as a decorator or to wrap the function, and no additional configuration is required.


1 Answers

Currently, I tried every way in spark 2.4 .

Without log, it is hard to debug a faulty pandas_udf. The only workable way I know can print error messgage in pandas_udf is raise Exception . So it really cost time to debug in this way, but there isn't a better way I know .

@pandas_udf('y int, ds int, store_id string, product_id string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
    print('#'*100)
    logger.info('$'*100)
    logger.error('&'*100)
    raise Exception('@'*100)  # The only way I know can print message but would break execution 
    return pd.DataFrame([], columns=['y', 'ds','store_id','product_id'])

The drawback is you can't keep spark running after print message.

like image 196
Mithril Avatar answered Nov 10 '22 03:11

Mithril