Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

In PySpark, how can I log to log4j from inside a transformation

I want to log to the standard logger inside an executor during transformation with log levels and formatting respected. Unfortunately I can't get access to the log4j logger object inside the method as it's not serializable, and the spark context isn't available inside the transformation. I could just log outside of the transformation all of the objects I'm going to touch but that doesn't really help debugging or monitoring code execution.

def slow_row_contents_fetch(row):
    rows = fetch_id_row_contents(row) # API fetch, DB fetch, etc
    # This shows up, but not controllable by log level
    print "Processed slow row with {} results".format(len(rows))
    return rows

sc.parallelize(fetchable_ids).flatMap(slow_row_contents_fetch, True)

Outside of the transformation I can get the logger via:

logger = sc._jvm.org.apache.log4j.LogManager.getRootLogger()
logger.warn('This will show up as expected')

But the sc isn't available inside the transformation, for good reasons. You see the following message if you try to call the sc directly inside the transformation:

Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

I can just print but that's not easily filterable and just get tracked as unformatted error messages to the log4j logger.

Serializing the logger itself, as excepted, fails when calling the logger within the transform function:

...
File "/usr/lib/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
  File "/usr/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  File "/usr/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 36, in deco
  File "/usr/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 304, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling o36.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
        at py4j.Gateway.invoke(Gateway.java:252)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:745)

Is there a way to get access to the executor logger during transformations in pyspark?

like image 459
Pyrce Avatar asked Mar 15 '16 21:03

Pyrce


2 Answers

After a few hours of digging into the spark repository, it seems this is impossible to achieve currently. The executor doesn't actually have a jvm instance it's attached to, the data is just streamed over the socket without a jvm native binding to utilize.

Here's the worker creation code that streams the error messages to stderr:

private def createSimpleWorker(): Socket = {
  ...
  val worker = pb.start()

  // Redirect worker stdout and stderr
  redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream)

  ...
}

/**
 * Redirect the given streams to our stderr in separate threads.
 */
private def redirectStreamsToStderr(stdout: InputStream, stderr: InputStream) {
  try {
    new RedirectThread(stdout, System.err, "stdout reader for " + pythonExec).start()
    new RedirectThread(stderr, System.err, "stderr reader for " + pythonExec).start()
  } catch {
    case e: Exception =>
      logError("Exception in redirecting streams", e)
  }
}

And here's the worker.py code for communicating the job processing. There's no place to emit log messages or message type which indicates a log event.

try:
    ...
    command = pickleSer._read_with_length(infile)
    if isinstance(command, Broadcast):
        command = pickleSer.loads(command.value)
    func, profiler, deserializer, serializer = command
    init_time = time.time()

    def process():
        iterator = deserializer.load_stream(infile)
        serializer.dump_stream(func(split_index, iterator), outfile)

    if profiler:
        profiler.profile(process)
    else:
        process()
except Exception:
    try:
        write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
        write_with_length(traceback.format_exc().encode("utf-8"), outfile)
    except IOError:
        # JVM close the socket
        pass
    except Exception:
        # Write the error to stderr if it happened while serializing
        print("PySpark worker failed with exception:", file=sys.stderr)
        print(traceback.format_exc(), file=sys.stderr)
    exit(-1)
finish_time = time.time()
report_times(outfile, boot_time, init_time, finish_time)
write_long(shuffle.MemoryBytesSpilled, outfile)
write_long(shuffle.DiskBytesSpilled, outfile)

# Mark the beginning of the accumulators section of the output
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
write_int(len(_accumulatorRegistry), outfile)
for (aid, accum) in _accumulatorRegistry.items():
    pickleSer._write_with_length((aid, accum._value), outfile)
...

And finally the message types available:

class SpecialLengths(object):
    END_OF_DATA_SECTION = -1
    PYTHON_EXCEPTION_THROWN = -2
    TIMING_DATA = -3
    END_OF_STREAM = -4
    NULL = -5
like image 165
Pyrce Avatar answered Oct 21 '22 13:10

Pyrce


Have a look at this question

Similar situation

You can get your map function to return you an object that can contain a stack trace string or a real object, and a bool flag stating if there was an error. This can be useful to debug a task that has side effects, or if you have specific data conditions that cause failures.

like image 35
ThatDataGuy Avatar answered Oct 21 '22 14:10

ThatDataGuy