I'm looking for a data type to help me model resource availability over fluid time.
I've come at this problem from many directions but always come back to the fundamental problem of not knowing a data type to model something as simple as an integer over time.
I could convert my appointments into time series events (eg appointment arrives means -1 availability, appointment leaves means +1) but I still don't know how to manipulate that data so that I can distil out periods where the availability is greater than zero.
Somebody's left a close-vote citing lack of focus, but my goal here seems pretty singular so I'll try to explain the problem graphically. I'm trying to infer the periods of time where the number of active jobs falls below a given capacity.
Turning a range of known parallel capacity (eg 3 between 9-6) and a list of jobs with variable start/ends, into a list of time ranges of available time.
My approach would be to build the time series, but include the availability object with a value set to the availability in that period.
availability:
[
{
"start": 09:00,
"end": 12:00,
"value": 4
},
{
"start": 12:00,
"end": 13:00,
"value": 3
}
]
data: [
{
"start": 10:00,
"end": 10:30,
}
]
Build the time series indexing on start/end times, with the value as the value. A start time for availability is +value, end time -value. While for an event, it'd be -1 or +1 as you said.
"09:00" 4
"10:00" -1
"10:30" 1
"12:00" -4
"12:00" 3
"13:00" -3
Then group by index, sum and cumulative sum.
getting:
"09:00" 4
"10:00" 3
"10:30" 4
"12:00" 3
"13:00" 0
Example code in pandas:
import numpy as np
import pandas as pd
data = [
{
"start": "10:00",
"end": "10:30",
}
]
breakpoints = [
{
"start": "00:00",
"end": "09:00",
"value": 0
},
{
"start": "09:00",
"end": "12:00",
"value": 4
},
{
"start": "12:00",
"end": "12:30",
"value": 4
},
{
"start": "12:30",
"end": "13:00",
"value": 3
},
{
"start": "13:00",
"end": "00:00",
"value": 0
}
]
df = pd.DataFrame(data, columns=['start', 'end'])
print(df.head(5))
starts = pd.DataFrame(data, columns=['start'])
starts["value"] = -1
starts = starts.set_index("start")
ends = pd.DataFrame(data, columns=['end'])
ends["value"] = 1
ends = ends.set_index("end")
breakpointsStarts = pd.DataFrame(breakpoints, columns=['start', 'value']).set_index("start")
breakpointsEnds = pd.DataFrame(breakpoints, columns=['end', 'value'])
breakpointsEnds["value"] = breakpointsEnds["value"].transform(lambda x: -x)
breakpointsEnds = breakpointsEnds.set_index("end")
countsDf = pd.concat([starts, ends, breakpointsEnds, breakpointsStarts]).sort_index()
countsDf = countsDf.groupby(countsDf.index).sum().cumsum()
print(countsDf)
# Periods that are available
df = countsDf
df["available"] = df["value"] > 0
# Indexes where the value of available changes
# Alternatively swap out available for the value.
time_changes = df["available"].diff()[df["available"].diff() != 0].index.values
newDf = pd.DataFrame(time_changes, columns= ["start"])
# Setting the end column to the value of the next start
newDf['end'] = newDf.transform(np.roll, shift=-1)
print(newDf)
# Join this back in to get the actual value of available
mergedDf = newDf.merge(df, left_on="start", right_index=True)
print(mergedDf)
returning at the end:
start end value available
0 00:00 09:00 0 False
1 09:00 13:00 4 True
2 13:00 00:00 0 False
I'd approach it the same way you did with the appointments. Model the free time as appointments on its own. For each ending appointment check if theres another on ongoing, if so, skip here. If not, find the next starting appointment (one with a start date greater than this ones enddate.)
After you iterated all off your appointments, you should have an inverted mask of it.
To me, this problem would be well-represented by a list of boolean values. For ease of explanation, let's assume the length of every potential job is a multiple of 15 minutes. So, from 9 to 6, we have 135 "time slots" that we want to track availability for. We represent a queue's availability in a time slot with boolean variables: False
if the queue is processing a job, True
if the queue is available.
First, we create a list of time slots for every queue as well as the output. So, every queue and the output has time slots tk, 1 <= k <= 135.
Then, given five job queues, qj, 1 <= j <= 5, we say that tk is "open" at time k if there exists at least one qj where the time slot list at index k is True
.
We can implement this in standalone Python as follows:
slots = [ True ] * 135
queues = [ slots ] * 5
output = [ False ] * 135
def available (k):
for q in queues:
if q[k]:
return True
return False
We can then assume there exists some function dispatch (length)
that assigns a job to an available queue, setting the appropriate slots in queue[q]
to False
.
Finally, to update the output, we simply call:
def update():
for k in range(0, 135):
output[k] = available[k]
Or, for increased efficiency:
def update(i, j):
for k in range(i, j):
output[k] = available[k]
Then, you could simply call update(i, j)
whenever dispatch()
updates time slots i
thru j
for a new job. In this way, dispatching and updating is an O(n) operation, where n
is how many time slots are being changed, regardless of how many time slots there are.
This would allow you to make a simple function that maps human-readable time onto the range of time slot values, which would allow for making time slots larger or smaller as you wish.
You could also easily extend this idea to use a pandas data frame where each column is one queue, allowing you to use Series.any()
on every row at once to quickly update the output column.
Would love to hear suggestions regarding this approach! Perhaps there's a complexity of the problem I've missed, but I think this is a nice solution.
You can use (datetime, increment)
tuples to keep track of the changes in availability. A job-start event has increment = 1
and a job-end event has increment = -1
. Then itertools.accumulate
allows for computing the cumulative availability as jobs start and end over time. Here's an example implementation:
from datetime import time
import itertools as it
def compute_availability(jobs, opening_hours, capacity):
jobs = [((x, -1), (y, +1)) for x, y in jobs]
opens, closes = opening_hours
events = [[opens, capacity]] + sorted(t for job in jobs for t in job) + [(closes, 0)]
availability = list(it.accumulate(events,
lambda x, y: [y[0], x[1] + y[1]]))
for x, y in zip(availability, availability[1:]):
# If multiple events happen at the same time, only yield the last one.
if y[0] > x[0]:
yield x
This adds artificial (opens, capacity)
and (closes, 0)
events to initialize the computation. The above example considers a single day but it is easy to extend it to multiple days by creating opens
and closes
datetime
objects that share the day of the first and last job respectively.
Here is the output for the OP's example schedule:
from pprint import pprint
jobs = [(time(10), time(15)),
(time(9), time(11)),
(time(12, 30), time(16)),
(time(10), time(18))]
availability = list(compute_availability(
jobs, opening_hours=(time(9), time(18)), capacity=3
))
pprint(availability)
which prints:
[[datetime.time(9, 0), 2],
[datetime.time(10, 0), 0],
[datetime.time(11, 0), 1],
[datetime.time(12, 30), 0],
[datetime.time(15, 0), 1],
[datetime.time(16, 0), 2]]
The first element indicates when the availability changes and the second element denotes the availability that results from that change. For example at 9am one job is submitted causing the availability to drop from 3 to 2 and then at 10am two more jobs are submitted while the first one is still running (hence availability drops to 0).
Now that we have the initial availability computed an important aspect is to update it as new jobs are added. Here it is desirable not to recompute the availability from the full job list since that might be costly if many jobs are being tracked. Because the availability
is already sorted we can use the bisect
module to determine the relevant update range in O(log(N)). Then a number of steps need to be performed. Let's say the job is scheduled as [x, y]
where x
, y
are two datetime objects.
[x, y]
interval is greater than zero (including the event to the left of x
(i.e. the previous event)).[x, y]
by 1.x
is not in the list of events we need to add it, otherwise we need to check whether we can merge the x
event with the one left to it.y
is not in the list of events we need to add it.Here is the relevant code:
import bisect
def add_job(availability, job, *, weight=1):
"""weight: how many lanes the job requires"""
job = list(job)
start = bisect.bisect(availability, job[:1])
# Emulate a `bisect_right` which doens't work directly since
# we're comparing lists of different length.
if start < len(availability):
start += (job[0] == availability[start][0])
stop = bisect.bisect(availability, job[1:])
if any(slot[1] < weight for slot in availability[start-1:stop]):
raise ValueError('The requested time slot is not available')
for slot in availability[start:stop]:
slot[1] -= weight
if job[0] > availability[start-1][0]:
previous_availability = availability[start-1][1]
availability.insert(start, [job[0], previous_availability - weight])
stop += 1
else:
availability[start-1][1] -= weight
if start >= 2 and availability[start-1][1] == availability[start-2][1]:
del availability[start-1]
stop -= 1
if stop == len(availability) or job[1] < availability[stop][0]:
previous_availability = availability[stop-1][1]
availability.insert(stop, [job[1], previous_availability + weight])
We can test it by adding some jobs to the OP's example schedule:
for job in [[time(15), time(17)],
[time(11, 30), time(12)],
[time(13), time(14)]]: # this one should raise since availability is zero
print(f'\nAdding {job = }')
add_job(availability, job)
pprint(availability)
which outputs:
Adding job = [datetime.time(15, 0), datetime.time(17, 0)]
[[datetime.time(9, 0), 2],
[datetime.time(10, 0), 0],
[datetime.time(11, 0), 1],
[datetime.time(12, 30), 0],
[datetime.time(16, 0), 1],
[datetime.time(17, 0), 2]]
Adding job = [datetime.time(11, 30), datetime.time(12, 0)]
[[datetime.time(9, 0), 2],
[datetime.time(10, 0), 0],
[datetime.time(11, 0), 1],
[datetime.time(11, 30), 0],
[datetime.time(12, 0), 1],
[datetime.time(12, 30), 0],
[datetime.time(16, 0), 1],
[datetime.time(17, 0), 2]]
Adding job = [datetime.time(13, 0), datetime.time(14, 0)]
Traceback (most recent call last):
[...]
ValueError: The requested time slot is not available
We can also use this interface to block all lanes during hours when the service is unavailable (e.g. from 6pm to 9am on the next day). Just submit a job with weight=capacity
for that time span:
add_job(availability,
[datetime(2020, 3, 14, 18), datetime(2020, 3, 15, 9)]
weight=3)
We can also use add_job
to build the full schedule from scratch:
availability = availability = list(compute_availability(
[], opening_hours=(time(9), time(18)), capacity=3
))
print('Initial availability')
pprint(availability)
for job in jobs:
print(f'\nAdding {job = }')
add_job(availability, job)
pprint(availability)
which outputs:
Initial availability
[[datetime.time(9, 0), 3]]
Adding job = (datetime.time(10, 0), datetime.time(15, 0))
[[datetime.time(9, 0), 3],
[datetime.time(10, 0), 2],
[datetime.time(15, 0), 3]]
Adding job = (datetime.time(9, 0), datetime.time(11, 0))
[[datetime.time(9, 0), 2],
[datetime.time(10, 0), 1],
[datetime.time(11, 0), 2],
[datetime.time(15, 0), 3]]
Adding job = (datetime.time(12, 30), datetime.time(16, 0))
[[datetime.time(9, 0), 2],
[datetime.time(10, 0), 1],
[datetime.time(11, 0), 2],
[datetime.time(12, 30), 1],
[datetime.time(15, 0), 2],
[datetime.time(16, 0), 3]]
Adding job = (datetime.time(10, 0), datetime.time(18, 0))
[[datetime.time(9, 0), 2],
[datetime.time(10, 0), 0],
[datetime.time(11, 0), 1],
[datetime.time(12, 30), 0],
[datetime.time(15, 0), 1],
[datetime.time(16, 0), 2],
[datetime.time(18, 0), 3]]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With