I want to modify the bus scheduling problem from ortools so as each driver's shift to be consecutive in terms of slots and drivers can share a shift at the same time if needed.
For example, assuming that we have the following half-hour shifts (format similar to the original bus_scheduling_problem from ortools):
shifts = [
[0, '07:00', '07:30', 420, 450, 30],
[1, '07:30', '08:00', 450, 480, 30],
[2, '08:00', '08:30', 480, 510, 30],
[3, '08:30', '09:00', 510, 540, 30],
[4, '09:00', '09:30', 540, 570, 30],
[5, '09:30', '10:00', 570, 600, 30],
[6, '10:00', '10:30', 600, 630, 30],
[7, '10:30', '11:00', 630, 660, 30],
[8, '11:00', '11:30', 660, 690, 30],
[9, '11:30', '12:00', 690, 720, 30],
[10, '12:00', '12:30', 720, 750, 30],
[11, '12:30', '13:00', 750, 780, 30],
[12, '13:00', '13:30', 780, 810, 30],
[13, '13:30', '14:00', 810, 840, 30],
[14, '14:00', '14:30', 840, 870, 30],
[15, '14:30', '15:00', 870, 900, 30],
[16, '15:00', '15:30', 900, 930, 30],
[17, '15:30', '16:00', 930, 960, 30],
[18, '16:00', '16:30', 960, 990, 30],
[19, '16:30', '17:00', 990, 1020, 30],
[20, '17:00', '17:30', 1020, 1050, 30],
[21, '17:30', '18:00', 1050, 1080, 30],
[22, '18:00', '18:30', 1080, 1110, 30],
[23, '18:30', '19:00', 1110, 1140, 30],
[24, '19:00', '19:30', 1140, 1170, 30],
[25, '19:30', '20:00', 1170, 1200, 30],
[26, '20:00', '20:30', 1200, 1230, 30],
[27, '20:30', '21:00', 1230, 1260, 30],
[28, '21:00', '21:30', 1260, 1290, 30],
[29, '21:30', '22:00', 1290, 1320, 30],
[30, '22:00', '22:30', 1320, 1350, 30],
[31, '22:30', '23:00', 1350, 1380, 30],
[32, '23:00', '23:30', 1380, 1410, 30],
[33, '23:30', '24:00', 1410, 1440, 30]
]
I successfully execute the this version of the bus_scheduling code and I find that I need 2 drivers to satisfy the needs for the above mentioned schedule. The range of working hours is from 07:00 am to 24:00 (midnight)
.
As a result, if we have 2 bus drivers for this schedule, I would prefer an allocation that covers the daily duty based on 12-h driver shift as following:
Driver 1: 07:00 - 19:00 with a break at 13:00
Driver 2: 12:00 - 24:00 with a break at 14:00 (basically no overlap with Driver 1's break)
What I mean by consecutive hours is that solutions that satisfy a 12-h driver shift solution in the form of 07:00-11:00 + 14:00-15:00 + 17:00-24:00
should not be acceptable. Solutions with more drivers should also make sure that breaks are not overlapping so as not all drivers are on break. Moreover, slots for breaks can be blocked due to high work load.
I got an answer at the or-tools discussion saying that I need to maintain at each node the total time since the start of the shift, but I'm having difficulty in coding that assuming it solves my question.
To me, bus scheduling problem from ortools is an overkill for your task since you mentioned that shift durations are always 30
minutes, and setup/cleanup time is not needed. Moreover, drivers must work exactly 11
hours and have a contiguous break. Instead, we could write a script similar to nurse scheduling problem that is probably easier to understand (for me, it was the first time writing something with or-tools and it was clear).
First, the total number of shifts can be calculated as below:
num_shifts = len(shifts)
Number of drivers needed:
num_drivers = ceil(float(num_shifts) / working_time)
In your case, driver must drive exactly 11
hours, and so it's 22
shifts (each shift is fixed at 30
minutes):
working_time = 22
Break is 1
hour so:
break_time = 2
As you mentioned in the comment, each driver must take a break after 4
hours of driving, but not later than after 8
hours:
break_interval = [8, 16]
The latest shift when a driver can start working:
latest_start_shift = num_shifts - working_time - break_time
Really, if he/she starts working later, then the driver won't work off the full working time.
Let's define an array of shifts for drivers:
driver_shifts = {}
for driver_id in range(num_drivers):
for shift_id in range(num_shifts):
driver_shifts[(driver_id, shift_id)] = model.NewBoolVar('driver%ishift%i' % (driver_id, shift_id))
driver_shifts[(d, s)]
equals 1
if shift s
is assigned to driver d
, and 0
otherwise.
Additionally, create an array of start shifts for drivers:
start_time = {}
for driver_id in range(num_drivers):
for shift_id in range(latest_start_shift + 1):
start_time[(driver_id, shift_id)] = model.NewBoolVar('driver%istart%i' % (driver_id, shift_id))
start_time[(d, s)]
equals 1
if driver d
starts a working day at shift s
, and 0
otherwise.
Each driver must drive exactly the required driving time within the day:
for driver_id in range(num_drivers):
model.Add(sum(driver_shifts[(driver_id, shift_id)] for shift_id in range(num_shifts)) == working_time)
However, it's not enough since the driver must do it contiguously with one break in between. We will see how to do this later.
Each shift must be covered by at least one driving driver:
for shift_id in range(num_shifts):
model.Add(sum(driver_shifts[(driver_id, shift_id)] for driver_id in range(num_drivers)) >= 1)
Here where start_time
comes into play. The basic idea is that for each possible start time of driver, we enforce that the driver works off the time (physically, a driver can start working only once day!).
So, driver can start working only once per day:
for driver_id in range(num_drivers):
model.Add(sum(start_time[(driver_id, start_shift_id)] for start_shift_id in range(latest_start_shift + 1)) == 1)
For each start time of the driver, the working time within consecutive working_time + break_time
is working_time
for driver_id in range(num_drivers):
for start_shift_id in range(latest_start_shift + 1):
model.Add(sum(driver_shifts[(driver_id, shift_id)] for shift_id in
range(start_shift_id, start_shift_id + working_time + break_time)) == working_time) \
.OnlyEnforceIf(start_time[(driver_id, start_shift_id)])
For this, we need an additional array break_ind[(d, s, b)]
that denotes whether a given driver d
with a given working shift start s
takes a break at shift b
. So, in this case, driver_shifts
values should be 0
for the time of the break:
l = start_shift_id + break_interval[0]
r = start_shift_id + break_interval[1]
for s in range(l, r):
break_ind[(driver_id, start_shift_id, s)] = model.NewBoolVar("d%is%is%i"%(driver_id, start_shift_id, s))
model.Add(sum(driver_shifts[(driver_id, s1)] for s1 in range(s, s + break_time)) == 0)\
.OnlyEnforceIf(start_time[(driver_id, start_shift_id)])\
.OnlyEnforceIf(break_ind[(driver_id, start_shift_id, s)])
Also, a driver can take a break only once a day:
model.Add(sum(break_ind[(driver_id, start_shift_id, s)] for s in range(l, r)) == 1)
You can check the full code below or here (I added it for reference in the future). There you can also find a simplified version for a case when drivers don't take a break.
from ortools.sat.python import cp_model
from math import ceil
shifts = [
[0, '07:00', '07:30', 420, 450, 30],
[1, '07:30', '08:00', 450, 480, 30],
[2, '08:00', '08:30', 480, 510, 30],
[3, '08:30', '09:00', 510, 540, 30],
[4, '09:00', '09:30', 540, 570, 30],
[5, '09:30', '10:00', 570, 600, 30],
[6, '10:00', '10:30', 600, 630, 30],
[7, '10:30', '11:00', 630, 660, 30],
[8, '11:00', '11:30', 660, 690, 30],
[9, '11:30', '12:00', 690, 720, 30],
[10, '12:00', '12:30', 720, 750, 30],
[11, '12:30', '13:00', 750, 780, 30],
[12, '13:00', '13:30', 780, 810, 30],
[13, '13:30', '14:00', 810, 840, 30],
[14, '14:00', '14:30', 840, 870, 30],
[15, '14:30', '15:00', 870, 900, 30],
[16, '15:00', '15:30', 900, 930, 30],
[17, '15:30', '16:00', 930, 960, 30],
[18, '16:00', '16:30', 960, 990, 30],
[19, '16:30', '17:00', 990, 1020, 30],
[20, '17:00', '17:30', 1020, 1050, 30],
[21, '17:30', '18:00', 1050, 1080, 30],
[22, '18:00', '18:30', 1080, 1110, 30],
[23, '18:30', '19:00', 1110, 1140, 30],
[24, '19:00', '19:30', 1140, 1170, 30],
[25, '19:30', '20:00', 1170, 1200, 30],
[26, '20:00', '20:30', 1200, 1230, 30],
[27, '20:30', '21:00', 1230, 1260, 30],
[28, '21:00', '21:30', 1260, 1290, 30],
[29, '21:30', '22:00', 1290, 1320, 30],
[30, '22:00', '22:30', 1320, 1350, 30],
[31, '22:30', '23:00', 1350, 1380, 30],
[32, '23:00', '23:30', 1380, 1410, 30],
[33, '23:30', '24:00', 1410, 1440, 30]
]
class VarArraySolutionPrinter(cp_model.CpSolverSolutionCallback):
def __init__(self, driver_shifts, num_drivers, num_shifts, solutions):
cp_model.CpSolverSolutionCallback.__init__(self)
self.driver_shifts = driver_shifts
self.num_drivers = num_drivers
self.num_shifts = num_shifts
self.solutions = solutions
self.solution_id = 0
def on_solution_callback(self):
if self.solution_id in self.solutions:
self.solution_id += 1
print ("Solution found!")
for driver_id in range(self.num_drivers):
print ("*************Driver#%s*************" % driver_id)
for shift_id in range(self.num_shifts):
if (self.Value(self.driver_shifts[(driver_id, shift_id)])):
print('Shift from %s to %s' %
(shifts[shift_id][1],
shifts[shift_id][2]))
print()
def solution_count(self):
return self.solution_id
solver = cp_model.CpSolver()
model = cp_model.CpModel()
num_shifts = len(shifts)
working_time = 22
break_time = 2
# when take a break within the working time
break_interval = [8, 16]
latest_start_shift = num_shifts - working_time - break_time
num_drivers = ceil(float(num_shifts) / working_time)
# create an array of assignments of drivers
driver_shifts = {}
for driver_id in range(num_drivers):
for shift_id in range(num_shifts):
driver_shifts[(driver_id, shift_id)] = model.NewBoolVar('driver%ishift%i' % (driver_id, shift_id))
# driver must work exactly {working_time} shifts
for driver_id in range(num_drivers):
model.Add(sum(driver_shifts[(driver_id, shift_id)] for shift_id in range(num_shifts)) == working_time)
# each shift must be covered by at least one driver
for shift_id in range(num_shifts):
model.Add(sum(driver_shifts[(driver_id, shift_id)] for driver_id in range(num_drivers)) >= 1)
# create an array of start times for drivers
start_time = {}
for driver_id in range(num_drivers):
for shift_id in range(latest_start_shift + 1):
start_time[(driver_id, shift_id)] = model.NewBoolVar('driver%istart%i' % (driver_id, shift_id))
break_ind = {}
for driver_id in range(num_drivers):
for start_shift_id in range(latest_start_shift + 1):
model.Add(sum(driver_shifts[(driver_id, shift_id)] for shift_id in
range(start_shift_id, start_shift_id + working_time + break_time)) == working_time) \
.OnlyEnforceIf(start_time[(driver_id, start_shift_id)])
l = start_shift_id + break_interval[0]
r = start_shift_id + break_interval[1]
for s in range(l, r):
break_ind[(driver_id, start_shift_id, s)] = model.NewBoolVar("d%is%is%i"%(driver_id, start_shift_id, s))
model.Add(sum(driver_shifts[(driver_id, s1)] for s1 in range(s, s + break_time)) == 0)\
.OnlyEnforceIf(start_time[(driver_id, start_shift_id)])\
.OnlyEnforceIf(break_ind[(driver_id, start_shift_id, s)])
model.Add(sum(break_ind[(driver_id, start_shift_id, s)] for s in range(l, r)) == 1)
for driver_id in range(num_drivers):
model.Add(sum(start_time[(driver_id, start_shift_id)] for start_shift_id in range(latest_start_shift + 1)) == 1)
solution_printer = VarArraySolutionPrinter(driver_shifts, num_drivers, num_shifts, range(2))
status = solver.SearchForAllSolutions(model, solution_printer)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With