I have an assignment problem, and I wanted to ask the SO community the best way to go about implementing this for my spark dataframe (utilizing spark 3.1+). I will first describe the problem and then move to implementation.
Here is the problem: I have up to N tasks and up to N individuals (in the case of this problem, N=10). Each individual has a cost of performing each task, where the min cost is $0 and the max cost is $10. It is sort of a Hungarian algorithm problem with some caveats.
multiTask=True
(there cannot be more than 1 multiTask
, and it is possible there are none). If a worker has a cost less than x
for the multiTask, he is automatically assigned to the multiTask and the multiTask is considered taken during the optimization.
Here is what the spark dataframe looks like. Note: I am showing an example where N=3 (3 tasks, 3 individuals) for simplicity sake.
from pyspark.sql import Row
rdd = spark.sparkContext.parallelize([
Row(date='2019-08-01', locationId='z2-NY', workerId=129, taskId=220, cost=1.50, isMultiTask=False),
Row(date='2019-08-01', locationId='z2-NY', workerId=129, taskId=110, cost=2.90, isMultiTask=True),
Row(date='2019-08-01', locationId='z2-NY', workerId=129, taskId=190, cost=0.80, isMultiTask=False),
Row(date='2019-08-01', locationId='z2-NY', workerId=990, taskId=220, cost=1.80, isMultiTask=False),
Row(date='2019-08-01', locationId='z2-NY', workerId=990, taskId=110, cost=0.90, isMultiTask=True),
Row(date='2019-08-01', locationId='z2-NY', workerId=990, taskId=190, cost=9.99, isMultiTask=False),
Row(date='2019-08-01', locationId='z2-NY', workerId=433, taskId=220, cost=1.20, isMultiTask=False),
Row(date='2019-08-01', locationId='z2-NY', workerId=433, taskId=110, cost=0.25, isMultiTask=True),
Row(date='2019-08-01', locationId='z2-NY', workerId=433, taskId=190, cost=4.99, isMultiTask=False)
])
df = spark.createDataFrame(rdd)
You will see there is a date/location as I need to solve this assignment problem for every date/location grouping.
I was planning to solve this by assigning each worker and task an "index" based on their IDs using dense_rank()
and then using a pandas UDF, populating the N x N numpy array based on the indices, and invoking the linear_sum_assignment
function. However, I don't believe that this plan will work due to the 2nd edge case I laid out with the multiTask.
worker_order_window = Window.partitionBy("date", "locationId").orderBy("workerId")
task_order_window = Window.partitionBy("date", "locationId").orderBy("taskId")
# get the dense_rank because will use this to assign a worker ID an index for the np array for linear_sum_assignment
# dense_rank - 1 as arrays are 0 indexed
df = df.withColumn("worker_idx", dense_rank().over(worker_order_window) - 1)
df = df.withColumn("task_idx", dense_rank().over(task_order_window) - 1)
def linear_assignment_udf(pandas_df: pd.DataFrame) -> pd.DataFrame:
df_dict = pandas_df.to_dict('records')
# in case there are less than N rows/columns
N = max(pandas_df.shape[0], pandas_df.shape[1])
arr = np.zeros((N,N))
for row in df_dict:
# worker_idx will be the row number, task idx will be the col number
worker_idx = row.get('worker_idx')
task_idx = row.get('task_idx')
arr[worker_idx][task_idx] = row.get('cost')
rids, cids = linear_sum_assignment(n)
return_list = []
# now want to return a dataframe that says which task_idx a worker has
for r, c in zip(rids, cids):
for d in df_dict:
if d.get('worker_idx') == r:
d['task_assignment'] = c
return_list.append(d)
return pd.DataFrame(return_list)
schema = StructType.fromJson(df.schema.jsonValue()).add('task_assignment', 'integer')
df = df.groupBy("date", "locationId").applyInPandas(linear_assignment_udf, schema)
df = df.withColumn("isAssigned", when(col("task_assignment") == col("task_idx"), True).otherwise(False))
As you can see, this case does not cover the multiTask at all. I would like to solve this in the most efficient way possible so I am not tied to pandas udf or scipy.
The linear sum assignment problem is also known as minimum weight matching in bipartite graphs. A problem instance is described by a matrix C, where each C[i,j] is the cost of matching vertex i of the first partite set (a “worker”) and vertex j of the second set (a “job”).
When Hungarian method is applied to solve unbalanced assignment problem in which the numbers of jobs are more than the number of machines, the procedure assigns some of the jobs to dummy machines which actually ignore the execution of those jobs.
The linear sum assignment problem (LSAP) is one of the most famous problems in linear programming and in combinatorial optimization. Informally speaking, we are given an n × n cost matrix C = (cij ) and we want to match each row to a different column in such a way that the sum of the corresponding entries is minimized.
So… What is the linear assignment problem? The linear assignment problem represents the need to maximize the available resources (or minimize the expenditure) with limited resources.
The assignment problem is a special case of the transportation problem, which in turn is a special case of the min-cost flow problem, so it can be solved using algorithms that solve the more general cases. Also, our problem is a special case of binary integer linear programming problem (which is NP-hard).
Then our task is to find minimum-weight matching in the graph (the matching will consists of N edges, because our bipartite graph is complete). This problem is known as the assignment problem.
If we can’t find perfect matching on the current step, then the Hungarian algorithm changes weights of the available edges in such a way that the new 0-weight edges appear and these changes do not influence the optimal solution. A.
I don't know anything about the libraries you're using so I can't help you with the code, but I think you should do this in two steps:
The basic Hungarian Algorithm only works on square cost matrices, and it looks like you've handled that correctly by padding your cost matrix with 0's, but there are modifications of the algorithm that work with rectangular matrices. You may want to see if you have access to one of those alternatives as it could be significantly faster.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With