I'm trying to use a Python driver to run an iterative MRjob program. The exit criteria depend on a counter.
The job itself seems to run. If I run a single iteration from the command line, I can then hadoop fs -cat /user/myname/myhdfsdir/part-00000
and see expected results from the single iteration.
However, I need to use a Python driver to run the code and access counters from the runner
. This is because it is an iterative algorithm that requires the value of the counter to determine the exit criteria.
OUTPUT_PATH = /user/myname/myhdfsdir !hadoop fs -rm -r {OUTPUT_PATH} from my_custom_MRjob import my_custom_MRjob mr_job = my_custom_MRjob(args=["localDir/localTextFile.txt", "-r", "hadoop", "--output-dir=hdfs://"+OUTPUT_PATH, "--no-output"]) while True: with mr_job.make_runner() as runner: print runner.get_opts() runner.run() with open('localDir/localTextFile.txt', 'w') as f: for line in runner.stream_output(): key,value = mr_job.parse_output_line(line) # f.write(key +'\t'+ value +'\n') print "End of MRjob iteration. Counters: {}".format(runner.counters()) # read a particular counter # use counter value to evaluate exit criteria if exit_criteria_met: break
This produces the following error:
IOErrorTraceback (most recent call last) <ipython-input-136-aded8ecaa727> in <module>() 25 runner.run() 26 with open('localDir/localTextFile.txt', 'w') as f: ---> 27 for line in runner.stream_output(): 28 key,value = mr_job.parse_output_line(line) 29 # /home/myname/.conda/envs/py27/lib/python2.7/site-packages/mrjob/util.pyc in _to_lines(chunks) 391 leftovers = [] 392 --> 393 for chunk in chunks: 394 # special case for b'' standing for EOF 395 if chunk == b'': /home/myname/.conda/envs/py27/lib/python2.7/site-packages/mrjob/runner.pyc in cat_output(self) 555 yield b'' # EOF of previous file 556 --> 557 for chunk in self.fs._cat_file(filename): 558 yield chunk 559 /home/myname/.conda/envs/py27/lib/python2.7/site-packages/mrjob/fs/composite.pyc in _cat_file(self, path) 75 76 def _cat_file(self, path): ---> 77 for line in self._do_action('_cat_file', path): 78 yield line 79 /home/myname/.conda/envs/py27/lib/python2.7/site-packages/mrjob/fs/hadoop.pyc in _cat_file(self, filename) 272 273 if returncode != 0: --> 274 raise IOError("Could not stream %s" % filename) 275 276 def mkdir(self, path): IOError: Could not stream hdfs://hdfs:/user/myname/myhdfsdir/part-00000
What is especially baffling and frustrating is this: hdfs://hdfs:/user/myname/myhdfsdir/part-00000
. Note the presence of two hdfs
schemes in the URL but with only one forward slash in the second instance of hdfs. I have tried adding and removing the literal hdfs://
at this in the mrjob args: "--output-dir=hdfs://"+OUTPUT_PATH
. I get the same error signature in both cases.
If I run the driver in "local" mode instead of Hadoop, I have no problem, with the obvious and critical exception that I do not have access to the Hadoop engine. This works fine:
mr_job = my_custom_MRjob(args=["localDir/localTextFile.txt"])
I need to read in the initial input file, always from the local filesystem (even in Hadoop mode). Then run the MRjob iteration, with its output overwriting the local filesystem input file. Then access the counters from the runner and evaluate exit criteria. If exit criteria are not met, run the job again with input from the local filesystem, this time with that local input file updated from the previous run.
As long as you have a path that contains hdfs:/
you will not succeed as that is never going to be valid.
In the comments you mentioned that you tried to add hdfs://
manually, which may be a nice hack, but in your code I don't see you 'clean up' the wrong hdfs:/
. So even if you add the right prefix, the next thing in line will be the wrong one, and the code still has no chance to succeed.
So, please clean it up.
Practical note: This question is from a while ago, if there is a problem in the software itself that is likely resolved by now. If the problem persists, it is likely something odd in the code that you try to use. Perhaps start with a trivial example from a reliable source to exclude this possibility.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With