I'm trying to use mrjob for running hadoop on EMR, and can't figure out how to setup logging (user generated logs in map/reduce steps) so I will be able to access them after the cluster is terminated.
I have tried to setup logging using the logging
module, print
and sys.stderr.write()
but without luck so far. The only option which works for me is to write the logs to a file then SSH the machine and read it, but its cumbersome. I would like my logs to go to stderr/stdout/syslog and be automatically collected to S3, so I can view them after the cluster is terminated.
Here is the word_freq example with logging:
"""The classic MapReduce job: count the frequency of words.
"""
from mrjob.job import MRJob
import re
import logging
import logging.handlers
import sys
WORD_RE = re.compile(r"[\w']+")
class MRWordFreqCount(MRJob):
def mapper_init(self):
self.logger = logging.getLogger()
self.logger.setLevel(logging.INFO)
self.logger.addHandler(logging.FileHandler("/tmp/mr.log"))
self.logger.addHandler(logging.StreamHandler())
self.logger.addHandler(logging.StreamHandler(sys.stdout))
self.logger.addHandler(logging.handlers.SysLogHandler())
def mapper(self, _, line):
self.logger.info("Test logging: %s", line)
sys.stderr.write("Test stderr: %s\n" % line)
print "Test print: %s" % line
for word in WORD_RE.findall(line):
yield (word.lower(), 1)
def combiner(self, word, counts):
yield (word, sum(counts))
def reducer(self, word, counts):
yield (word, sum(counts))
if __name__ == '__main__':
MRWordFreqCount.run()
Out of all options, the only one really works is using stderr with a direct write (sys.stderr.write
) or using a logger with a StreamHandler to stderr.
The logs can later be retrieved after the job is finished (successfully or with an error) from:
[s3_log_uri]/[jobflow-id]/task-attempts/[job-id]/[attempt-id]/stderr
Be sure to keep the logs in your runners.emr.cleanup
configuration.
Here is an exmaple to get logging on stdout (python3)
from mrjob.job import MRJob
from mrjob.job import MRStep
from mrjob.util import log_to_stream, log_to_null
import re
import sys
import logging
log = logging.getLogger(__name__)
WORD_RE = re.compile(r'[\w]+')
class MostUsedWords(MRJob):
def set_up_logging(cls, quiet=False, verbose=False, stream=None):
log_to_stream(name='mrjob', debug=verbose, stream=stream)
log_to_stream(name='__main__', debug=verbose, stream=stream)
def steps(self):
return [
MRStep (mapper = self.mapper_get_words,
combiner = self.combiner_get_words,
reducer = self.reduce_get_words),
MRStep (reducer = self.reducer_find_max)
]
pass
def mapper_get_words(self, _, line):
for word in WORD_RE.findall(line):
yield (word.lower(), 1)
def combiner_get_words(self, word, counts):
yield (word, sum(counts))
def reduce_get_words(self, word, counts):
log.info(word + "\t" +str(list(counts)) )
yield None, (sum(counts), word)
def reducer_find_max(self, key, value):
# value is pairs i.e., tuples
yield max(value)
if __name__ == '__main__':
MostUsedWords.run()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With