Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to solve an assignment problem (like Hungarian/linear_sum_assignment) with an edge case in PySpark UDF

I have an assignment problem, and I wanted to ask the SO community the best way to go about implementing this for my spark dataframe (utilizing spark 3.1+). I will first describe the problem and then move to implementation.

Here is the problem: I have up to N tasks and up to N individuals (in the case of this problem, N=10). Each individual has a cost of performing each task, where the min cost is $0 and the max cost is $10. It is sort of a Hungarian algorithm problem with some caveats.

  1. There will be some instances where there are less than 10 tasks and/or less than 10 individuals, and it is okay for someone not to be assigned a task (or for a task to not be assigned an individual).
  2. [The more complex edge case/the one I am having trouble with] - There may be one task in the list that has the flag multiTask=True (there cannot be more than 1 multiTask, and it is possible there are none). If a worker has a cost less than x for the multiTask, he is automatically assigned to the multiTask and the multiTask is considered taken during the optimization.
    • I will share a few examples. In this example, the x value to be assigned to the multi task is 1.
      • If 1 worker out of 10 has a cost of .25 on the multiTask, he is assigned to the multiTask and then the other 9 workers will be assigned to the other 9 tasks
      • If 2 workers out of the 10 have a cost < 1 on the multiTask, both of them are assigned to the multiTask and then the other 8 workers will be assigned to 8 of the remaining 9 tasks. 1 task will not be assigned to anyone.
      • If all 10 workers have a cost < 1 on the multiTask, all of them are assigned to the multiTask. This is very rare but possible.
      • If no workers have a cost < 1 on the multiTask, the multiTask will only be assigned to one person during the optimization to minimize the cost.

Here is what the spark dataframe looks like. Note: I am showing an example where N=3 (3 tasks, 3 individuals) for simplicity sake.

from pyspark.sql import Row

rdd = spark.sparkContext.parallelize([
  Row(date='2019-08-01', locationId='z2-NY', workerId=129, taskId=220, cost=1.50, isMultiTask=False),
  Row(date='2019-08-01', locationId='z2-NY', workerId=129, taskId=110, cost=2.90, isMultiTask=True),
  Row(date='2019-08-01', locationId='z2-NY', workerId=129, taskId=190, cost=0.80, isMultiTask=False),
  Row(date='2019-08-01', locationId='z2-NY', workerId=990, taskId=220, cost=1.80, isMultiTask=False),
  Row(date='2019-08-01', locationId='z2-NY', workerId=990, taskId=110, cost=0.90, isMultiTask=True),
  Row(date='2019-08-01', locationId='z2-NY', workerId=990, taskId=190, cost=9.99, isMultiTask=False),
  Row(date='2019-08-01', locationId='z2-NY', workerId=433, taskId=220, cost=1.20, isMultiTask=False),
  Row(date='2019-08-01', locationId='z2-NY', workerId=433, taskId=110, cost=0.25, isMultiTask=True),
  Row(date='2019-08-01', locationId='z2-NY', workerId=433, taskId=190, cost=4.99, isMultiTask=False)
])

df = spark.createDataFrame(rdd)

You will see there is a date/location as I need to solve this assignment problem for every date/location grouping. I was planning to solve this by assigning each worker and task an "index" based on their IDs using dense_rank() and then using a pandas UDF, populating the N x N numpy array based on the indices, and invoking the linear_sum_assignment function. However, I don't believe that this plan will work due to the 2nd edge case I laid out with the multiTask.

worker_order_window = Window.partitionBy("date", "locationId").orderBy("workerId")
task_order_window = Window.partitionBy("date", "locationId").orderBy("taskId")

# get the dense_rank because will use this to assign a worker ID an index for the np array for linear_sum_assignment
# dense_rank - 1 as arrays are 0 indexed
df = df.withColumn("worker_idx", dense_rank().over(worker_order_window) - 1) 
df = df.withColumn("task_idx", dense_rank().over(task_order_window) - 1)


def linear_assignment_udf(pandas_df: pd.DataFrame) -> pd.DataFrame:
  df_dict = pandas_df.to_dict('records')
  # in case there are less than N rows/columns
  N = max(pandas_df.shape[0], pandas_df.shape[1])
  arr = np.zeros((N,N))
  for row in df_dict: 
    # worker_idx will be the row number, task idx will be the col number
    worker_idx = row.get('worker_idx')
    task_idx = row.get('task_idx')
    arr[worker_idx][task_idx] = row.get('cost')
  rids, cids = linear_sum_assignment(n)
  
  return_list = []
  # now want to return a dataframe that says which task_idx a worker has 
  for r, c in zip(rids, cids):
    for d in df_dict: 
      if d.get('worker_idx') == r:
        d['task_assignment'] = c
        return_list.append(d)
  return pd.DataFrame(return_list)
      
  
  
schema = StructType.fromJson(df.schema.jsonValue()).add('task_assignment', 'integer')
df = df.groupBy("date", "locationId").applyInPandas(linear_assignment_udf, schema)

df = df.withColumn("isAssigned", when(col("task_assignment") == col("task_idx"), True).otherwise(False))

As you can see, this case does not cover the multiTask at all. I would like to solve this in the most efficient way possible so I am not tied to pandas udf or scipy.

like image 732
Lauren Leder Avatar asked Sep 22 '21 19:09

Lauren Leder


People also ask

What is linear_ sum_ assignment?

The linear sum assignment problem is also known as minimum weight matching in bipartite graphs. A problem instance is described by a matrix C, where each C[i,j] is the cost of matching vertex i of the first partite set (a “worker”) and vertex j of the second set (a “job”).

How is an unbalanced assignment solved by Hungarian method?

When Hungarian method is applied to solve unbalanced assignment problem in which the numbers of jobs are more than the number of machines, the procedure assigns some of the jobs to dummy machines which actually ignore the execution of those jobs.

What is the linear sum assignment problem?

The linear sum assignment problem (LSAP) is one of the most famous problems in linear programming and in combinatorial optimization. Informally speaking, we are given an n × n cost matrix C = (cij ) and we want to match each row to a different column in such a way that the sum of the corresponding entries is minimized.

What is the linear assignment problem?

So… What is the linear assignment problem? The linear assignment problem represents the need to maximize the available resources (or minimize the expenditure) with limited resources.

Is the assignment problem NP-hard?

The assignment problem is a special case of the transportation problem, which in turn is a special case of the min-cost flow problem, so it can be solved using algorithms that solve the more general cases. Also, our problem is a special case of binary integer linear programming problem (which is NP-hard).

What is the assignment problem in graph theory?

Then our task is to find minimum-weight matching in the graph (the matching will consists of N edges, because our bipartite graph is complete). This problem is known as the assignment problem.

When does the Hungarian algorithm change the weight of available edges?

If we can’t find perfect matching on the current step, then the Hungarian algorithm changes weights of the available edges in such a way that the new 0-weight edges appear and these changes do not influence the optimal solution. A.


1 Answers

I don't know anything about the libraries you're using so I can't help you with the code, but I think you should do this in two steps:

  1. Assign workers to the multitask if they are required to be assigned to it. If someone is assigned to this task don't include it in your cost matrix.
  2. Assign workers to tasks using the Hungarian Algorithm (or some alternative) as normal.

The basic Hungarian Algorithm only works on square cost matrices, and it looks like you've handled that correctly by padding your cost matrix with 0's, but there are modifications of the algorithm that work with rectangular matrices. You may want to see if you have access to one of those alternatives as it could be significantly faster.

like image 166
Yay295 Avatar answered Oct 23 '22 16:10

Yay295