Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scheduling jobs in a DAG-manner

We have a system with different types of jobs. Let's call them for example:

job_1
job_2
job_3

They all require different sets of parameters (and optional parameters). I.e. we run job_1(x) for different x= A, B, C .... job_2 runs for a set of parameters that is dependent on the results of job_1(x) and also job_2 loads data that job_A(x) stored. And so on.

The result is a tree structure of dependencies. Now, these jobs occasionally fail for one reason or another. So, if job_A for x=B fails that branch of the tree will fail completely and shouldn't run. All the other branches should run though.

All the jobs are written in Python and use parallelism (based on spawning SLURM jobs). They are scheduled with cron. This is obviously not very and has two major drawbacks:

  • It is very hard to debug. All the jobs run whether a job higher in the tree failed or not. It is hard to see where the problem is without a deep understanding of the dependencies.
  • If higher job (for example job_A) is not finished job_B might be scheduled to run, and fails or runs based on stale date.

In order to tackle that problem we were looking at airflow for scheduling or visualization because it is written in Python and it seems to roughly fit our needs. I see different challenges though:

  • The dependency tree of jobs is either very general (i.e. job_B depends on job_A) or very wide (i.e. job_B(y) for a 100 parameters depends on job_A(x=A). The visualized tree in the first case would have roughly 10 leaves but would make debugging very hard because the job might just have failed for a certain parameter. The visualized tree in the latter case would be very wide and have roughly 300 leaves. It would be more accurate but the visualization might be hard to read. Can we filter failed jobs, and just look at their dependencies?
  • We have parallelism within the job (and we need it otherwise the jobs run for more than a day, and we want to rerun the whole lot every day) does that screw up our scheduling?
  • We want to change our jobs and data management as little as possible.
  • Can we implement the rule system of what jobs to spawn next in a easily understandable way?

Is airflow a good choice for this? I understand there are a few others (luigi, Azkaban etc.) out there which are somewhat related to the Hadoop stack (which we are not using because it is not Big Data). How much hacking is required? How much hacking is sensible?

like image 922
Joachim Avatar asked Oct 31 '22 13:10

Joachim


1 Answers

I can't really speak for airflow, but I can speak for luigi.

A brief overview of luigi: Luigi is designed for data flow and data dependency, just like airflow is, but was developed at Spotify. Every step in the data flow is represented as a class that inherits from luigi.Task, and we call each step a task. Each task is made up of three primary functions and has parameter declarations as well. The three functions and their descriptions:

  1. Requires: In this function, you specify which tasks the task at hand depends on by returning those tasks.
  2. Output: In this function, you specify where the results of this task will be saved by returning an object of class Luigi.LocalTarget (or similar but for remote).
  3. Run: In this function, you specify what actually happens when the task is run.

Note: The luigi central scheduler knows when a task is completed by checking whether or not a file exists - specifically the file specified in the output function of the task that is returned in the requires function of the task to be run.

Can we filter failed jobs, and just look at their dependencies?

Luigi logs all parameters passed to each task and each attempt to run each task. By default, luigi doesn't save the logs, but you can easily set it up to. I made a big luigi pipeline last summer, and I had it save the logs. It then used fuzzy string comparisons (using the Levenshtein library) to remove duplicate lines and heavily compress the log, and then basically searched for the word "error", and if it came up, then it would send an email to me with the compressed log in it.

We have parallelism within the job (and we need it otherwise the jobs run for more than a day, and we want to rerun the whole lot every day) does that screw up our scheduling?

I ran tasks with parallelism inside of them with no problems. Some libraries can cause issues though, e.g. gensim.

We want to change our jobs and data management as little as possible.

You can generally just paste the bulk of your computing into the run functions of luigi tasks.

Can we implement the rule system of what jobs to spawn next in a easily understandable way?

I believe so, yes. For each task, you specify which task(s) it depends on in the requires function of the task.

Something else to think about is documentation. Luigi's documentation is pretty good, but it hasn't caught on as heavily as it could. Luigi's community is not very large and not incredibly active either. Airflow is pretty comparable as far as I know, but it is newer, so probably has a more active community for the time being.

Here's a blog post by the guy who wrote luigi with some brief comparisons between luigi and newer alternatives. His conclusion: they all kind of suck. Including Luigi.

like image 128
Charlie Haley Avatar answered Nov 02 '22 11:11

Charlie Haley