I have event data in the following format:
event A A A A A C B C D A A A B
timestamp 0 3 4 4 5 5 6 7 7 8 8 9 10
Given a list of sequences S
and events E
, how can I efficiently find the non-overlapping occurrences of S
in E
that are within a time window W
, and each event in the occurrence is within an interval L
from the previous event?
Example results with S = {A, AA, AAA, AAB, BB, CA}, W=3, L=2
:
occurrences:
A: [0, 3, 4, 4, 5, 8, 8, 9]
AA: [(3,4), (4,5), (8,8)]
AAA: [(3,4,4), (8,8,9)]
AAB: [(4,5,6), (8,9,10)]
BB: []
CA: [(7,8)]
As you can see, an occurrence doesn't have to be contiguous (i.e. all elements of the sequence appearing in a series). The timestamp is only illustrated as an integer.
It can be done with one pass over the data if you keep track of valid so-far-incomplete subsequences and forget them as soon as they are complete or impossible to complete any more. To that end I've written a Sequence
class which keeps track of
Code
events = 'AAAAACBCDAAAB'
timestamps = [0, 3, 4, 4, 5, 5, 6, 7, 7, 8, 8, 9, 10]
SEQUENCES = {'A', 'AA', 'AAA', 'AAB', 'BB', 'CA'}
WINDOW = 3
LENGTH = 2
class Sequence:
def __init__(self, seq, starting_index, starting_time):
self.sequence = seq
self.pos = 0
self.indices = [starting_index]
self.times = [starting_time]
self.has_expired = False
def is_next_event_acceptable(self, event, time):
if self.sequence[self.pos+1] != event:
return False
else:
if time - self.times[0] > WINDOW or time - self.times[-1] > LENGTH:
self.has_expired = True
return False
return True
def add_event_if_acceptable(self, event, index, time):
if self.is_next_event_acceptable(event, time):
self.pos += 1
self.indices.append(index)
self.times.append(time)
def is_complete(self):
return len(self.sequence) == self.pos + 1
def __repr__(self):
seq = list(self.sequence)
seq.insert(self.pos, '[')
seq.insert(self.pos + 2, ']')
return ''.join(seq)
def find_non_overlapping_subsequences(events, timestamps):
working_sequences = []
results = {s: {'seq': [], 'last_index': -1} for s in SEQUENCES}
for index, (event, time) in enumerate(zip(events, timestamps)):
# First work with any present sequences in the queue
# and then introduce any new ones
for Seq in working_sequences:
Seq.add_event_if_acceptable(event, index, time)
for seq in SEQUENCES:
if seq.startswith(event):
working_sequences.append(Sequence(seq, index, time))
# Any successfully completed sequences, or sequences
# that can't be completed anymore are to be removed
seq_idx_to_remove = []
for i, Seq in enumerate(working_sequences):
if Seq.has_expired:
seq_idx_to_remove.append(i)
elif Seq.is_complete():
seq_idx_to_remove.append(i)
# Only add the sequence to the results if the indices
# aren't overlapping with the previous one
sequence, times, indices = Seq.sequence, Seq.times, Seq.indices
if results[sequence]['last_index'] < indices[0]:
results[sequence]['seq'].append(times)
results[sequence]['last_index'] = indices[-1]
# We must remove the items in reverse order so that
# we don't disturb the 'forward' ordering
for i in seq_idx_to_remove[::-1]:
del working_sequences[i]
return results
results = find_non_overlapping_subsequences(events, timestamps)
for key, value in sorted(results.items()):
print(key, value['seq'])
Output
A [[0], [3], [4], [4], [5], [8], [8], [9]]
AA [[3, 4], [4, 5], [8, 8]]
AAA [[3, 4, 4], [8, 8, 9]]
AAB [[4, 5, 6], [8, 8, 10]]
BB []
CA [[7, 8]]
This may take a long time for a long event series and this depends on how many sequences you have to consider at each step. This means the more long-lived your sequences are, the more you have to examine at every iteration.
SEQUENCES
, the more new sequences you'll introduce at each step.While the above factors ultimately define how long each iteration step may be, there are a couple of optimisations that can be made. At each step we go through all currently incomplete sequences in working_sequences
and check what the effect of the new event is on them. However, if we rework the Sequence
class, every time a sequence is updated we can compute what the next event is. Then, at each step we can bin these sequences based on that fact. This way, if the next event is 'A', we'll only check any sequences that accept that event. This can also conveniently split sequences that have been completed or expired.
The second, and less impactful optimisation, is computing in advance all the sequences that start with a specific event so that we don't have to iterate through SEQUENCES
every time.
This should avoid any unnecessary checks and improve overall performance. However, the worst case scenario is still the same as the simple version above. For example, if 90% of your events are 'A' and 90% of your starting events or next events for a sequence are 'A', this will still take 90% of the time compared to before.
The following changes in the code reflect these optimisations. I've also assumed that the timestamps are strictly increasing, so anything that relies on the indices
attribute can be simplified.
EXPIRED = '#'
COMPLETED = '='
class Sequence:
def __init__(self, seq, starting_time):
self.sequence = seq
self.pos = 0
self.times = [starting_time]
self.has_expired = False
self.next_event = self.next_event_query()
def is_next_event_acceptable(self, event, time):
if self.next_event != event:
return False
if time - self.times[0] > WINDOW or time - self.times[-1] > LENGTH:
self.has_expired = True
return False
return True
def update_sequence(self, event, time):
if self.is_next_event_acceptable(event, time):
self.pos += 1
self.times.append(time)
self.next_event = self.next_event_query()
def next_event_query(self):
if self.has_expired:
return EXPIRED
return COMPLETED if len(self.sequence) == self.pos + 1 else self.sequence[self.pos+1]
def __repr__(self):
seq = list(self.sequence)
seq.insert(self.pos, '[')
seq.insert(self.pos + 2, ']')
return ''.join(seq)
def find_non_overlapping_subsequences(events, timestamps):
unique_events = set(events)
starting_events = {}
for seq in SEQUENCES:
unique_events.update(seq)
first_event = seq[0]
if first_event not in starting_events:
starting_events[first_event] = []
starting_events[first_event].append(seq)
for e in unique_events:
if e not in starting_events:
starting_events[e] = []
all_symbols = ''.join(unique_events) + EXPIRED + COMPLETED
working_sequences = {event: [] for event in all_symbols}
next_event_lists = {event: [] for event in all_symbols}
results = {s: {'seq': [], 'last_time': timestamps[0]-1} for s in SEQUENCES}
for event, time in zip(events, timestamps):
next_event_lists[event] = []
for S in working_sequences[event]:
S.update_sequence(event, time)
next_event_lists[S.next_event].append(S)
for seq in starting_events[event]:
S = Sequence(seq, time)
next_event_lists[S.next_event].append(S)
for S in next_event_lists[COMPLETED]:
# Only add the sequence to the results if the timestamps
# don't overlap with the previous one
sequence, times = S.sequence, S.times
if results[sequence]['last_time'] < times[0]:
results[sequence]['seq'].append(times)
results[sequence]['last_time'] = times[-1]
next_event_lists[EXPIRED] = []
next_event_lists[COMPLETED] = []
working_sequences = next_event_lists.copy()
return results
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With