Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do you model something-over-time in Python?

I'm looking for a data type to help me model resource availability over fluid time.

  • We're open from 9 til 6 and can handle 5 parallel jobs. In my imaginary programming land, I've just initialised an object with that range with a value of 3 across the board.
  • We have appointments on the books, each with start and end times.
  • I need to punch each of those out of the day
  • That leaves me with a graph of sorts where the availability goes up and down, but ultimately allowing me to quickly find time ranges where there is remaining availability.

I've come at this problem from many directions but always come back to the fundamental problem of not knowing a data type to model something as simple as an integer over time.

I could convert my appointments into time series events (eg appointment arrives means -1 availability, appointment leaves means +1) but I still don't know how to manipulate that data so that I can distil out periods where the availability is greater than zero.


Somebody's left a close-vote citing lack of focus, but my goal here seems pretty singular so I'll try to explain the problem graphically. I'm trying to infer the periods of time where the number of active jobs falls below a given capacity.

enter image description here

Turning a range of known parallel capacity (eg 3 between 9-6) and a list of jobs with variable start/ends, into a list of time ranges of available time.

like image 676
Oli Avatar asked Mar 05 '20 00:03

Oli


4 Answers

My approach would be to build the time series, but include the availability object with a value set to the availability in that period.

availability: 
[
  {
    "start": 09:00,
    "end": 12:00,
    "value": 4
  },
  {
     "start": 12:00,
     "end": 13:00,
     "value": 3
  }
]
data: [
  {
    "start": 10:00,
    "end": 10:30,
  }
]

Build the time series indexing on start/end times, with the value as the value. A start time for availability is +value, end time -value. While for an event, it'd be -1 or +1 as you said.

"09:00" 4
"10:00" -1
"10:30" 1
"12:00" -4
"12:00" 3
"13:00" -3

Then group by index, sum and cumulative sum.

getting:

"09:00" 4
"10:00" 3
"10:30" 4
"12:00" 3
"13:00" 0

Example code in pandas:

import numpy as np
import pandas as pd


data = [
  {
    "start": "10:00",
    "end": "10:30",
  }
]

breakpoints = [
  {
    "start": "00:00",
    "end": "09:00",
    "value": 0
  },
  {
    "start": "09:00",
    "end": "12:00",
    "value": 4
  },
  {
    "start": "12:00",
    "end": "12:30",
    "value": 4
  },
  {
    "start": "12:30",
    "end": "13:00",
    "value": 3
  },
  {
    "start": "13:00",
    "end": "00:00",
    "value": 0
  }
]

df = pd.DataFrame(data, columns=['start', 'end'])

print(df.head(5))

starts = pd.DataFrame(data, columns=['start'])
starts["value"] = -1
starts = starts.set_index("start")

ends = pd.DataFrame(data, columns=['end'])
ends["value"] = 1
ends = ends.set_index("end")

breakpointsStarts = pd.DataFrame(breakpoints, columns=['start', 'value']).set_index("start")

breakpointsEnds = pd.DataFrame(breakpoints, columns=['end', 'value'])
breakpointsEnds["value"] = breakpointsEnds["value"].transform(lambda x: -x)
breakpointsEnds = breakpointsEnds.set_index("end")

countsDf = pd.concat([starts, ends, breakpointsEnds, breakpointsStarts]).sort_index()
countsDf = countsDf.groupby(countsDf.index).sum().cumsum()

print(countsDf)

# Periods that are available

df = countsDf
df["available"] = df["value"] > 0

# Indexes where the value of available changes
# Alternatively swap out available for the value.
time_changes = df["available"].diff()[df["available"].diff() != 0].index.values
newDf = pd.DataFrame(time_changes, columns= ["start"])

# Setting the end column to the value of the next start
newDf['end'] = newDf.transform(np.roll, shift=-1)
print(newDf)

# Join this back in to get the actual value of available
mergedDf = newDf.merge(df, left_on="start", right_index=True)

print(mergedDf)

returning at the end:

   start    end  value  available
0  00:00  09:00      0      False
1  09:00  13:00      4       True
2  13:00  00:00      0      False
like image 165
Ben Albin Avatar answered Nov 10 '22 00:11

Ben Albin


I'd approach it the same way you did with the appointments. Model the free time as appointments on its own. For each ending appointment check if theres another on ongoing, if so, skip here. If not, find the next starting appointment (one with a start date greater than this ones enddate.)

After you iterated all off your appointments, you should have an inverted mask of it.

like image 34
Aiyion.Prime Avatar answered Nov 09 '22 22:11

Aiyion.Prime


To me, this problem would be well-represented by a list of boolean values. For ease of explanation, let's assume the length of every potential job is a multiple of 15 minutes. So, from 9 to 6, we have 135 "time slots" that we want to track availability for. We represent a queue's availability in a time slot with boolean variables: False if the queue is processing a job, True if the queue is available.

First, we create a list of time slots for every queue as well as the output. So, every queue and the output has time slots tk, 1 <= k <= 135.

Then, given five job queues, qj, 1 <= j <= 5, we say that tk is "open" at time k if there exists at least one qj where the time slot list at index k is True.

We can implement this in standalone Python as follows:

slots = [ True ] * 135
queues = [ slots ] * 5
output = [ False ] * 135

def available (k):

 for q in queues:
  if q[k]:
   return True

 return False

We can then assume there exists some function dispatch (length) that assigns a job to an available queue, setting the appropriate slots in queue[q] to False.

Finally, to update the output, we simply call:

def update():

 for k in range(0, 135):
  output[k] = available[k]

Or, for increased efficiency:

def update(i, j):
 for k in range(i, j):
  output[k] = available[k]

Then, you could simply call update(i, j) whenever dispatch() updates time slots i thru j for a new job. In this way, dispatching and updating is an O(n) operation, where n is how many time slots are being changed, regardless of how many time slots there are.

This would allow you to make a simple function that maps human-readable time onto the range of time slot values, which would allow for making time slots larger or smaller as you wish.

You could also easily extend this idea to use a pandas data frame where each column is one queue, allowing you to use Series.any() on every row at once to quickly update the output column.

Would love to hear suggestions regarding this approach! Perhaps there's a complexity of the problem I've missed, but I think this is a nice solution.

like image 28
Nate Norris Avatar answered Nov 09 '22 23:11

Nate Norris


You can use (datetime, increment) tuples to keep track of the changes in availability. A job-start event has increment = 1 and a job-end event has increment = -1. Then itertools.accumulate allows for computing the cumulative availability as jobs start and end over time. Here's an example implementation:

from datetime import time
import itertools as it

def compute_availability(jobs, opening_hours, capacity):
    jobs = [((x, -1), (y, +1)) for x, y in jobs]
    opens, closes = opening_hours
    events = [[opens, capacity]] + sorted(t for job in jobs for t in job) + [(closes, 0)]
    availability = list(it.accumulate(events,
                                      lambda x, y: [y[0], x[1] + y[1]]))
    for x, y in zip(availability, availability[1:]):
        # If multiple events happen at the same time, only yield the last one.
        if y[0] > x[0]:
            yield x

This adds artificial (opens, capacity) and (closes, 0) events to initialize the computation. The above example considers a single day but it is easy to extend it to multiple days by creating opens and closes datetime objects that share the day of the first and last job respectively.

Example

Here is the output for the OP's example schedule:

from pprint import pprint

jobs = [(time(10), time(15)),
        (time(9), time(11)),
        (time(12, 30), time(16)),
        (time(10), time(18))]

availability = list(compute_availability(
    jobs, opening_hours=(time(9), time(18)), capacity=3
))
pprint(availability)

which prints:

[[datetime.time(9, 0), 2],
 [datetime.time(10, 0), 0],
 [datetime.time(11, 0), 1],
 [datetime.time(12, 30), 0],
 [datetime.time(15, 0), 1],
 [datetime.time(16, 0), 2]]

The first element indicates when the availability changes and the second element denotes the availability that results from that change. For example at 9am one job is submitted causing the availability to drop from 3 to 2 and then at 10am two more jobs are submitted while the first one is still running (hence availability drops to 0).

Adding new jobs

Now that we have the initial availability computed an important aspect is to update it as new jobs are added. Here it is desirable not to recompute the availability from the full job list since that might be costly if many jobs are being tracked. Because the availability is already sorted we can use the bisect module to determine the relevant update range in O(log(N)). Then a number of steps need to be performed. Let's say the job is scheduled as [x, y] where x, y are two datetime objects.

  1. Check if the availability in the [x, y] interval is greater than zero (including the event to the left of x (i.e. the previous event)).
  2. Decrease the availability of all events in [x, y] by 1.
  3. If x is not in the list of events we need to add it, otherwise we need to check whether we can merge the x event with the one left to it.
  4. If y is not in the list of events we need to add it.

Here is the relevant code:

import bisect

def add_job(availability, job, *, weight=1):
    """weight: how many lanes the job requires"""
    job = list(job)
    start = bisect.bisect(availability, job[:1])
    # Emulate a `bisect_right` which doens't work directly since
    # we're comparing lists of different length.
    if start < len(availability):
        start += (job[0] == availability[start][0])
    stop = bisect.bisect(availability, job[1:])

    if any(slot[1] < weight for slot in availability[start-1:stop]):
        raise ValueError('The requested time slot is not available')

    for slot in availability[start:stop]:
        slot[1] -= weight

    if job[0] > availability[start-1][0]:
        previous_availability = availability[start-1][1]
        availability.insert(start, [job[0], previous_availability - weight])
        stop += 1
    else:
        availability[start-1][1] -= weight
        if start >= 2 and availability[start-1][1] == availability[start-2][1]:
            del availability[start-1]
            stop -= 1

    if stop == len(availability) or job[1] < availability[stop][0]:
        previous_availability = availability[stop-1][1]
        availability.insert(stop, [job[1], previous_availability + weight])

Example schedule

We can test it by adding some jobs to the OP's example schedule:

for job in [[time(15), time(17)],
            [time(11, 30), time(12)],
            [time(13), time(14)]]:  # this one should raise since availability is zero
    print(f'\nAdding {job = }')
    add_job(availability, job)
    pprint(availability)

which outputs:

Adding job = [datetime.time(15, 0), datetime.time(17, 0)]
[[datetime.time(9, 0), 2],
 [datetime.time(10, 0), 0],
 [datetime.time(11, 0), 1],
 [datetime.time(12, 30), 0],
 [datetime.time(16, 0), 1],
 [datetime.time(17, 0), 2]]

Adding job = [datetime.time(11, 30), datetime.time(12, 0)]
[[datetime.time(9, 0), 2],
 [datetime.time(10, 0), 0],
 [datetime.time(11, 0), 1],
 [datetime.time(11, 30), 0],
 [datetime.time(12, 0), 1],
 [datetime.time(12, 30), 0],
 [datetime.time(16, 0), 1],
 [datetime.time(17, 0), 2]]

Adding job = [datetime.time(13, 0), datetime.time(14, 0)]
Traceback (most recent call last):
  [...]
ValueError: The requested time slot is not available

Blocking night hours

We can also use this interface to block all lanes during hours when the service is unavailable (e.g. from 6pm to 9am on the next day). Just submit a job with weight=capacity for that time span:

add_job(availability,
        [datetime(2020, 3, 14, 18), datetime(2020, 3, 15, 9)]
        weight=3)

Build full schedule from scratch

We can also use add_job to build the full schedule from scratch:

availability = availability = list(compute_availability(
    [], opening_hours=(time(9), time(18)), capacity=3
))
print('Initial availability')
pprint(availability)
for job in jobs:
    print(f'\nAdding {job = }')
    add_job(availability, job)
    pprint(availability)

which outputs:

Initial availability
[[datetime.time(9, 0), 3]]

Adding job = (datetime.time(10, 0), datetime.time(15, 0))
[[datetime.time(9, 0), 3],
 [datetime.time(10, 0), 2],
 [datetime.time(15, 0), 3]]

Adding job = (datetime.time(9, 0), datetime.time(11, 0))
[[datetime.time(9, 0), 2],
 [datetime.time(10, 0), 1],
 [datetime.time(11, 0), 2],
 [datetime.time(15, 0), 3]]

Adding job = (datetime.time(12, 30), datetime.time(16, 0))
[[datetime.time(9, 0), 2],
 [datetime.time(10, 0), 1],
 [datetime.time(11, 0), 2],
 [datetime.time(12, 30), 1],
 [datetime.time(15, 0), 2],
 [datetime.time(16, 0), 3]]

Adding job = (datetime.time(10, 0), datetime.time(18, 0))
[[datetime.time(9, 0), 2],
 [datetime.time(10, 0), 0],
 [datetime.time(11, 0), 1],
 [datetime.time(12, 30), 0],
 [datetime.time(15, 0), 1],
 [datetime.time(16, 0), 2],
 [datetime.time(18, 0), 3]]
like image 36
a_guest Avatar answered Nov 09 '22 22:11

a_guest