I want to log to the standard logger inside an executor during transformation with log levels and formatting respected. Unfortunately I can't get access to the log4j logger object inside the method as it's not serializable, and the spark context isn't available inside the transformation. I could just log outside of the transformation all of the objects I'm going to touch but that doesn't really help debugging or monitoring code execution.
def slow_row_contents_fetch(row):
rows = fetch_id_row_contents(row) # API fetch, DB fetch, etc
# This shows up, but not controllable by log level
print "Processed slow row with {} results".format(len(rows))
return rows
sc.parallelize(fetchable_ids).flatMap(slow_row_contents_fetch, True)
Outside of the transformation I can get the logger via:
logger = sc._jvm.org.apache.log4j.LogManager.getRootLogger()
logger.warn('This will show up as expected')
But the sc
isn't available inside the transformation, for good reasons. You see the following message if you try to call the sc directly inside the transformation:
Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
I can just print but that's not easily filterable and just get tracked as unformatted error messages to the log4j logger.
Serializing the logger itself, as excepted, fails when calling the logger within the transform function:
...
File "/usr/lib/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
File "/usr/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/usr/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 36, in deco
File "/usr/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 304, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling o36.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
at py4j.Gateway.invoke(Gateway.java:252)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Is there a way to get access to the executor logger during transformations in pyspark?
After a few hours of digging into the spark repository, it seems this is impossible to achieve currently. The executor doesn't actually have a jvm instance it's attached to, the data is just streamed over the socket without a jvm native binding to utilize.
Here's the worker creation code that streams the error messages to stderr:
private def createSimpleWorker(): Socket = {
...
val worker = pb.start()
// Redirect worker stdout and stderr
redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream)
...
}
/**
* Redirect the given streams to our stderr in separate threads.
*/
private def redirectStreamsToStderr(stdout: InputStream, stderr: InputStream) {
try {
new RedirectThread(stdout, System.err, "stdout reader for " + pythonExec).start()
new RedirectThread(stderr, System.err, "stderr reader for " + pythonExec).start()
} catch {
case e: Exception =>
logError("Exception in redirecting streams", e)
}
}
And here's the worker.py code for communicating the job processing. There's no place to emit log messages or message type which indicates a log event.
try:
...
command = pickleSer._read_with_length(infile)
if isinstance(command, Broadcast):
command = pickleSer.loads(command.value)
func, profiler, deserializer, serializer = command
init_time = time.time()
def process():
iterator = deserializer.load_stream(infile)
serializer.dump_stream(func(split_index, iterator), outfile)
if profiler:
profiler.profile(process)
else:
process()
except Exception:
try:
write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
write_with_length(traceback.format_exc().encode("utf-8"), outfile)
except IOError:
# JVM close the socket
pass
except Exception:
# Write the error to stderr if it happened while serializing
print("PySpark worker failed with exception:", file=sys.stderr)
print(traceback.format_exc(), file=sys.stderr)
exit(-1)
finish_time = time.time()
report_times(outfile, boot_time, init_time, finish_time)
write_long(shuffle.MemoryBytesSpilled, outfile)
write_long(shuffle.DiskBytesSpilled, outfile)
# Mark the beginning of the accumulators section of the output
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
write_int(len(_accumulatorRegistry), outfile)
for (aid, accum) in _accumulatorRegistry.items():
pickleSer._write_with_length((aid, accum._value), outfile)
...
And finally the message types available:
class SpecialLengths(object):
END_OF_DATA_SECTION = -1
PYTHON_EXCEPTION_THROWN = -2
TIMING_DATA = -3
END_OF_STREAM = -4
NULL = -5
Have a look at this question
Similar situation
You can get your map function to return you an object that can contain a stack trace string or a real object, and a bool flag stating if there was an error. This can be useful to debug a task that has side effects, or if you have specific data conditions that cause failures.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With