Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

python luigi localTarget pickle

I am running on Windows 7, Python 2.7 via Anaconda 4.3.17, Luigi 2.4.0, Pandas 0.18, sklearn version 0.18. Per below, I am trying to have a luigi.LocalTarget output be a pickle to store a few different objects (using firstJob) and then read from that pickle in a dependent job (secondJob). firstJob completes successfully if I run the following from the command line:

"python -m luigi --module luigiPickle firstJob --date 2017-06-07 --local-scheduler"

However, if I try running secondJob i.e.,

"python -m luigi --module luigiPickle secondJob --date 2017-06-07 --local-scheduler"

I get

Traceback (most recent call last):
  File "C:\Anaconda2\lib\site-packages\luigi-2.4.0-py2.7.egg\luigi\worker.py", l
ine 191, in run
    new_deps = self._run_get_new_deps()
  File "C:\Anaconda2\lib\site-packages\luigi-2.4.0-py2.7.egg\luigi\worker.py", l
ine 129, in _run_get_new_deps
    task_gen = self.task.run()
  File "luigiPickle.py", line 41, in run
    ret2 = pickle.load(inFile)
  File "C:\Anaconda2\lib\pickle.py", line 1384, in load
    return Unpickler(file).load()
  File "C:\Anaconda2\lib\pickle.py", line 864, in load
    dispatch[key](self)
  File "C:\Anaconda2\lib\pickle.py", line 1096, in load_global
    klass = self.find_class(module, name)
  File "C:\Anaconda2\lib\pickle.py", line 1130, in find_class
    __import__(module)
ImportError: No module named frame

It appears that luigi is having trouble reading the pickle due to not recognizing the pandas.DataFrame() object (perhaps a scope issue?).

import luigi
import pandas as pd
import pickle
from sklearn.linear_model import LinearRegression

class firstJob(luigi.Task):
    date = luigi.DateParameter()

    def requires(self):
        return None

    def output(self):
        return luigi.LocalTarget('%s_first.pickle' % self.date)

    def run(self):
        ret = {}
        ret['a'] = pd.DataFrame({'a': [1, 2], 'b': [3, 4]})
        ret['b'] = pd.DataFrame({'a': [3, 4], 'd': [0, 0]})
        ret['c'] = LinearRegression()
        outFile = self.output().open('wb')
        pickle.dump(ret, outFile, protocol=pickle.HIGHEST_PROTOCOL)
        outFile.close()

class secondJob(luigi.Task):
    date = luigi.DateParameter()

    def requires(self):
        return firstJob(self.date)

    def output(self):
        return luigi.LocalTarget('%s_second.pickle' % self.date)

    def run(self):
        inFile = self.input().open('rb')
        ret2 = pickle.load(inFile)
        inFile.close()

if __name__ == '__main__':
    luigi.run()
like image 719
user975 Avatar asked Jun 07 '17 15:06

user975


People also ask

How Do You Use Luigi in Python?

By default, Luigi tasks run using the Luigi scheduler. To run one of your previous tasks using the Luigi scheduler omit the --local-scheduler argument from the command. Re-run the task from Step 3 using the following command: python -m luigi --module word-frequency GetTopBooks.

What is a Luigi workflow?

Luigi is a workflow management system to efficiently launch a group of tasks with defined dependencies between them. It is a Python based API that was developed by Spotify® to build and execute pipelines of Hadoop jobs, but it can also be used to create workflows with any external jobs written in R or Scala or Spark.


1 Answers

The luigi open command doesn't work with the b flag for binary- it strips it out of the options string. (not sure why). Better to just use standard open with the path attribute:

open(self.input().path, 'rb') and open(self.output().path, 'wb').

like image 60
MattMcKnight Avatar answered Sep 28 '22 07:09

MattMcKnight