Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Find occurrences of subsequences in event data with time constraints

Tags:

python

I have event data in the following format:

event     A A A A A C B C D A A A B
timestamp 0 3 4 4 5 5 6 7 7 8 8 9 10

Given a list of sequences S and events E, how can I efficiently find the non-overlapping occurrences of S in E that are within a time window W, and each event in the occurrence is within an interval L from the previous event?

Example results with S = {A, AA, AAA, AAB, BB, CA}, W=3, L=2:

occurrences:
A: [0, 3, 4, 4, 5, 8, 8, 9]
AA: [(3,4), (4,5), (8,8)]
AAA: [(3,4,4), (8,8,9)]
AAB: [(4,5,6), (8,9,10)]
BB: []
CA: [(7,8)]

As you can see, an occurrence doesn't have to be contiguous (i.e. all elements of the sequence appearing in a series). The timestamp is only illustrated as an integer.

like image 293
Dedalus Avatar asked Jan 31 '19 15:01

Dedalus


1 Answers

It can be done with one pass over the data if you keep track of valid so-far-incomplete subsequences and forget them as soon as they are complete or impossible to complete any more. To that end I've written a Sequence class which keeps track of

  • the name of the sequence
  • the indices where its events occur, to figure out whether it's overlapping with the previous completed sequence
  • the times of the events, because that's our output and we need them to check for our constraints
  • the position in the sequence name we currently are, so that we know what the next event should be and when the sequence is complete, and
  • a flag to forget the sequence if it has exceeded our window/length constraints.

Code

events = 'AAAAACBCDAAAB'
timestamps = [0, 3, 4, 4, 5, 5, 6, 7, 7, 8, 8, 9, 10]

SEQUENCES = {'A', 'AA', 'AAA', 'AAB', 'BB', 'CA'}
WINDOW = 3
LENGTH = 2

class Sequence:
    def __init__(self, seq, starting_index, starting_time):
        self.sequence = seq
        self.pos = 0
        self.indices = [starting_index]
        self.times = [starting_time]
        self.has_expired = False

    def is_next_event_acceptable(self, event, time):
        if self.sequence[self.pos+1] != event:
            return False
        else:
            if time - self.times[0] > WINDOW or time - self.times[-1] > LENGTH:
                self.has_expired = True
                return False
            return True

    def add_event_if_acceptable(self, event, index, time):
        if self.is_next_event_acceptable(event, time):
            self.pos += 1
            self.indices.append(index)
            self.times.append(time)

    def is_complete(self):
        return len(self.sequence) == self.pos + 1

    def __repr__(self):
        seq = list(self.sequence)
        seq.insert(self.pos, '[')
        seq.insert(self.pos + 2, ']')
        return ''.join(seq)


def find_non_overlapping_subsequences(events, timestamps):
    working_sequences = []
    results = {s: {'seq': [], 'last_index': -1} for s in SEQUENCES}

    for index, (event, time) in enumerate(zip(events, timestamps)):
        # First work with any present sequences in the queue
        # and then introduce any new ones
        for Seq in working_sequences:
            Seq.add_event_if_acceptable(event, index, time)
        for seq in SEQUENCES:
            if seq.startswith(event):
                working_sequences.append(Sequence(seq, index, time))
        # Any successfully completed sequences, or sequences
        # that can't be completed anymore are to be removed
        seq_idx_to_remove = []
        for i, Seq in enumerate(working_sequences):
            if Seq.has_expired:
                seq_idx_to_remove.append(i)
            elif Seq.is_complete():
                seq_idx_to_remove.append(i)
                # Only add the sequence to the results if the indices
                # aren't overlapping with the previous one
                sequence, times, indices = Seq.sequence, Seq.times, Seq.indices
                if results[sequence]['last_index'] < indices[0]:
                    results[sequence]['seq'].append(times)
                    results[sequence]['last_index'] = indices[-1]
        # We must remove the items in reverse order so that
        # we don't disturb the 'forward' ordering
        for i in seq_idx_to_remove[::-1]:
            del working_sequences[i]

    return results

results = find_non_overlapping_subsequences(events, timestamps)
for key, value in sorted(results.items()):
    print(key, value['seq'])

Output

A [[0], [3], [4], [4], [5], [8], [8], [9]]
AA [[3, 4], [4, 5], [8, 8]]
AAA [[3, 4, 4], [8, 8, 9]]
AAB [[4, 5, 6], [8, 8, 10]]
BB []
CA [[7, 8]]

Update

This may take a long time for a long event series and this depends on how many sequences you have to consider at each step. This means the more long-lived your sequences are, the more you have to examine at every iteration.

  • If a sequence requires more events to be completed, it'll take more iterations to do so.
  • The longer the size of SEQUENCES, the more new sequences you'll introduce at each step.
  • If the window or length duration times are longer, a sequence will survive for longer before expired.
  • The more unique events you have (and if they're uniformly present in the series), the longer it'll take to complete a given sequence. For example, the sequence 'AB' will complete faster if at each iteration you only encounter As and Bs, than, say, any letter of the alphabet.

While the above factors ultimately define how long each iteration step may be, there are a couple of optimisations that can be made. At each step we go through all currently incomplete sequences in working_sequences and check what the effect of the new event is on them. However, if we rework the Sequence class, every time a sequence is updated we can compute what the next event is. Then, at each step we can bin these sequences based on that fact. This way, if the next event is 'A', we'll only check any sequences that accept that event. This can also conveniently split sequences that have been completed or expired.

The second, and less impactful optimisation, is computing in advance all the sequences that start with a specific event so that we don't have to iterate through SEQUENCES every time.

This should avoid any unnecessary checks and improve overall performance. However, the worst case scenario is still the same as the simple version above. For example, if 90% of your events are 'A' and 90% of your starting events or next events for a sequence are 'A', this will still take 90% of the time compared to before.

The following changes in the code reflect these optimisations. I've also assumed that the timestamps are strictly increasing, so anything that relies on the indices attribute can be simplified.

EXPIRED = '#'
COMPLETED = '='

class Sequence:
    def __init__(self, seq, starting_time):
        self.sequence = seq
        self.pos = 0
        self.times = [starting_time]
        self.has_expired = False
        self.next_event = self.next_event_query()

    def is_next_event_acceptable(self, event, time):
        if self.next_event != event:
            return False
        if time - self.times[0] > WINDOW or time - self.times[-1] > LENGTH:
            self.has_expired = True
            return False
        return True

    def update_sequence(self, event, time):
        if self.is_next_event_acceptable(event, time):
            self.pos += 1
            self.times.append(time)
        self.next_event = self.next_event_query()

    def next_event_query(self):
        if self.has_expired:
            return EXPIRED
        return COMPLETED if len(self.sequence) == self.pos + 1 else self.sequence[self.pos+1]

    def __repr__(self):
        seq = list(self.sequence)
        seq.insert(self.pos, '[')
        seq.insert(self.pos + 2, ']')
        return ''.join(seq)


def find_non_overlapping_subsequences(events, timestamps): 
    unique_events = set(events)
    starting_events = {}
    for seq in SEQUENCES:
        unique_events.update(seq)
        first_event = seq[0]
        if first_event not in starting_events:
            starting_events[first_event] = []
        starting_events[first_event].append(seq)
    for e in unique_events:
        if e not in starting_events:
            starting_events[e] = []

    all_symbols = ''.join(unique_events) + EXPIRED + COMPLETED
    working_sequences = {event: [] for event in all_symbols}
    next_event_lists = {event: [] for event in all_symbols}
    results = {s: {'seq': [], 'last_time': timestamps[0]-1} for s in SEQUENCES}

    for event, time in zip(events, timestamps):
        next_event_lists[event] = []
        for S in working_sequences[event]:
            S.update_sequence(event, time)
            next_event_lists[S.next_event].append(S)
        for seq in starting_events[event]:
            S = Sequence(seq, time)
            next_event_lists[S.next_event].append(S)
        for S in next_event_lists[COMPLETED]:
            # Only add the sequence to the results if the timestamps
            # don't overlap with the previous one
            sequence, times = S.sequence, S.times
            if results[sequence]['last_time'] < times[0]:
                results[sequence]['seq'].append(times)
                results[sequence]['last_time'] = times[-1]
        next_event_lists[EXPIRED] = []
        next_event_lists[COMPLETED] = []
        working_sequences = next_event_lists.copy()

    return results
like image 130
Reti43 Avatar answered Nov 14 '22 23:11

Reti43