I am running on Windows 7, Python 2.7 via Anaconda 4.3.17, Luigi 2.4.0, Pandas 0.18, sklearn version 0.18. Per below, I am trying to have a luigi.LocalTarget output be a pickle to store a few different objects (using firstJob) and then read from that pickle in a dependent job (secondJob). firstJob completes successfully if I run the following from the command line:
"python -m luigi --module luigiPickle firstJob --date 2017-06-07 --local-scheduler"
However, if I try running secondJob i.e.,
"python -m luigi --module luigiPickle secondJob --date 2017-06-07 --local-scheduler"
I get
Traceback (most recent call last):
File "C:\Anaconda2\lib\site-packages\luigi-2.4.0-py2.7.egg\luigi\worker.py", l
ine 191, in run
new_deps = self._run_get_new_deps()
File "C:\Anaconda2\lib\site-packages\luigi-2.4.0-py2.7.egg\luigi\worker.py", l
ine 129, in _run_get_new_deps
task_gen = self.task.run()
File "luigiPickle.py", line 41, in run
ret2 = pickle.load(inFile)
File "C:\Anaconda2\lib\pickle.py", line 1384, in load
return Unpickler(file).load()
File "C:\Anaconda2\lib\pickle.py", line 864, in load
dispatch[key](self)
File "C:\Anaconda2\lib\pickle.py", line 1096, in load_global
klass = self.find_class(module, name)
File "C:\Anaconda2\lib\pickle.py", line 1130, in find_class
__import__(module)
ImportError: No module named frame
It appears that luigi is having trouble reading the pickle due to not recognizing the pandas.DataFrame() object (perhaps a scope issue?).
import luigi
import pandas as pd
import pickle
from sklearn.linear_model import LinearRegression
class firstJob(luigi.Task):
date = luigi.DateParameter()
def requires(self):
return None
def output(self):
return luigi.LocalTarget('%s_first.pickle' % self.date)
def run(self):
ret = {}
ret['a'] = pd.DataFrame({'a': [1, 2], 'b': [3, 4]})
ret['b'] = pd.DataFrame({'a': [3, 4], 'd': [0, 0]})
ret['c'] = LinearRegression()
outFile = self.output().open('wb')
pickle.dump(ret, outFile, protocol=pickle.HIGHEST_PROTOCOL)
outFile.close()
class secondJob(luigi.Task):
date = luigi.DateParameter()
def requires(self):
return firstJob(self.date)
def output(self):
return luigi.LocalTarget('%s_second.pickle' % self.date)
def run(self):
inFile = self.input().open('rb')
ret2 = pickle.load(inFile)
inFile.close()
if __name__ == '__main__':
luigi.run()
By default, Luigi tasks run using the Luigi scheduler. To run one of your previous tasks using the Luigi scheduler omit the --local-scheduler argument from the command. Re-run the task from Step 3 using the following command: python -m luigi --module word-frequency GetTopBooks.
Luigi is a workflow management system to efficiently launch a group of tasks with defined dependencies between them. It is a Python based API that was developed by Spotify® to build and execute pipelines of Hadoop jobs, but it can also be used to create workflows with any external jobs written in R or Scala or Spark.
The luigi open command doesn't work with the b flag for binary- it strips it out of the options string. (not sure why). Better to just use standard open with the path attribute:
open(self.input().path, 'rb')
and open(self.output().path, 'wb')
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With