Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

mrjob: setup logging on EMR

I'm trying to use mrjob for running hadoop on EMR, and can't figure out how to setup logging (user generated logs in map/reduce steps) so I will be able to access them after the cluster is terminated.

I have tried to setup logging using the logging module, print and sys.stderr.write() but without luck so far. The only option which works for me is to write the logs to a file then SSH the machine and read it, but its cumbersome. I would like my logs to go to stderr/stdout/syslog and be automatically collected to S3, so I can view them after the cluster is terminated.

Here is the word_freq example with logging:

"""The classic MapReduce job: count the frequency of words.
"""
from mrjob.job import MRJob
import re
import logging
import logging.handlers
import sys

WORD_RE = re.compile(r"[\w']+")


class MRWordFreqCount(MRJob):

    def mapper_init(self):
        self.logger = logging.getLogger()
        self.logger.setLevel(logging.INFO)
        self.logger.addHandler(logging.FileHandler("/tmp/mr.log"))
        self.logger.addHandler(logging.StreamHandler())
        self.logger.addHandler(logging.StreamHandler(sys.stdout))
        self.logger.addHandler(logging.handlers.SysLogHandler())

    def mapper(self, _, line):
        self.logger.info("Test logging: %s", line)
        sys.stderr.write("Test stderr: %s\n" % line)
        print "Test print: %s" % line
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def combiner(self, word, counts):
        yield (word, sum(counts))

    def reducer(self, word, counts):
        yield (word, sum(counts))


if __name__ == '__main__':
    MRWordFreqCount.run()
like image 530
Beka Avatar asked Sep 30 '14 14:09

Beka


2 Answers

Out of all options, the only one really works is using stderr with a direct write (sys.stderr.write) or using a logger with a StreamHandler to stderr.

The logs can later be retrieved after the job is finished (successfully or with an error) from:

[s3_log_uri]/[jobflow-id]/task-attempts/[job-id]/[attempt-id]/stderr

Be sure to keep the logs in your runners.emr.cleanup configuration.

like image 55
Beka Avatar answered Nov 11 '22 04:11

Beka


Here is an exmaple to get logging on stdout (python3)

from mrjob.job import MRJob
from mrjob.job import MRStep
from mrjob.util import log_to_stream, log_to_null
import re
import sys
import logging

log = logging.getLogger(__name__)

WORD_RE = re.compile(r'[\w]+')

class MostUsedWords(MRJob):

    def set_up_logging(cls, quiet=False, verbose=False, stream=None):  
        log_to_stream(name='mrjob', debug=verbose, stream=stream)
        log_to_stream(name='__main__', debug=verbose, stream=stream)

    def steps(self):
        return [
            MRStep (mapper = self.mapper_get_words,
                    combiner = self.combiner_get_words,
                    reducer = self.reduce_get_words),
            MRStep (reducer = self.reducer_find_max)
        ]
        pass
    def mapper_get_words(self,  _, line):
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def combiner_get_words(self, word, counts):
        yield (word, sum(counts))

    def reduce_get_words(self, word, counts):
        log.info(word + "\t" +str(list(counts)) )
        yield None, (sum(counts), word)

    def reducer_find_max(self, key, value):
        # value is pairs i.e., tuples
        yield max(value)


if __name__ == '__main__':
    MostUsedWords.run()
like image 43
Hafiz Muhammad Shafiq Avatar answered Nov 11 '22 04:11

Hafiz Muhammad Shafiq