Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Fast sorting/Filtering based on alternating values

Consider the following sample data table,

dt <- data.table(src = LETTERS[1:10], 
                 dst = LETTERS[10:1], 
                 src1 = letters[15:24], 
                 dst1 = letters[24:15])

#which looks like,
#    src dst src1 dst1
# 1:   A   J    o    x
# 2:   B   I    p    w
# 3:   C   H    q    v
# 4:   D   G    r    u
# 5:   E   F    s    t
# 6:   F   E    t    s
# 7:   G   D    u    r
# 8:   H   C    v    q
# 9:   I   B    w    p
#10:   J   A    x    o

The first goal is to order it based on reversed rowwise paired elements (src - dst & src1 - dst1) which can be achieved as follows to create the 5 'pairs':

dt[, key := paste0(pmin(src, dst), pmax(src, dst), pmin(src1, dst1), pmax(src1, dst1))][order(key)]

#    src dst src1 dst1  key
# 1:   A   J    o    x AJox
# 2:   J   A    x    o AJox
# 3:   B   I    p    w BIpw
# 4:   I   B    w    p BIpw
# 5:   C   H    q    v CHqv
# 6:   H   C    v    q CHqv
# 7:   D   G    r    u DGru
# 8:   G   D    u    r DGru
# 9:   E   F    s    t EFst
#10:   F   E    t    s EFst

However, in real life there could be at least one row that does not have a pair ("incomplete flow"). So a real life example of that would be,

#              cdatetime   srcaddr dstaddr  srcport   dstport     key    totals time_diff
# 1: 2017-05-12 14:58:32    IP_1    IP_2   54793    8080 182808054793      3        NA
# 2: 2017-05-12 14:58:32    IP_2    IP_1    8080   54793 182808054793      3         0
# 3: 2017-05-17 08:37:16    IP_1    IP_2   54793    8080 182808054793      3    409124
# 4: 2017-05-11 08:12:28    IP_1    IP_2   54813    8080 182808054813      3        NA
# 5: 2017-05-11 08:12:28    IP_2    IP_1    8080   54813 182808054813      3         0
# 6: 2017-05-17 08:37:16    IP_1    IP_2   54813    8080 182808054813      3    519888
# 7: 2017-05-02 06:51:16    IP_1    IP_2   50794    8080 182808050794      5        NA
# 8: 2017-05-02 06:51:16    IP_2    IP_1    8080   50794 182808050794      5         0
# 9: 2017-05-08 06:57:08    IP_1    IP_2   50794    8080 182808050794      5    518752
#10: 2017-05-11 06:32:49    IP_1    IP_2   50794    8080 182808050794      5    257741
#11: 2017-05-11 06:32:49    IP_2    IP_1    8080   50794 182808050794      5         0
#12: 2017-05-04 06:52:05    IP_1    IP_2   51896    8080 182808051896      5        NA
#13: 2017-05-04 06:52:05    IP_2    IP_1    8080   51896 182808051896      5         0
#14: 2017-05-04 10:22:26    IP_1    IP_2   51896    8080 182808051896      5     12621
#15: 2017-05-04 10:22:26    IP_2    IP_1    8080   51896 182808051896      5         0
#16: 2017-05-08 07:22:47    IP_1    IP_2   51896    8080 182808051896      5    334821
#17: 2017-05-15 05:56:00    IP_1    IP_2   62744     162  17016262744      3        NA
#18: 2017-05-17 10:41:00    IP_1    IP_2   62744     162  17016262744      3    189900
#19: 2017-05-18 09:31:00    IP_1    IP_2   62744     162  17016262744      3     82200

The second goal now is to remove those incomplete flows. Now, to identify those "incomplete flows" we calculate the time difference between them and set 30 as the threshold. The tricky part comes here; If we have only 2 rows and they are more than 30 seconds apart, then filter them both out and for 3 rows (for a certain key) then the one with time diff > 30 needs to go - or if two of them have > 30 seconds apart, remove all 3. However, when we have 4 or more rows, then we need to drop the one(s) with time difference > 30 that do not have another pair with < 30. From the above table, rows 3, 6, 9, 16, 17, 18, 19 have to be removed based on the condition that they are more than 30 seconds apart. Looking at cdatetime will clarify which are the complete flows.

Expected output would be,

 #        cdatetime      srcaddr dstaddr srcport dstport          key totals time_diff
# 1: 2017-05-12 14:58:32    IP_1    IP_2   54793    8080 182808054793      3        NA
# 2: 2017-05-12 14:58:32    IP_2    IP_1    8080   54793 182808054793      3         0
# 3: 2017-05-11 08:12:28    IP_1    IP_2   54813    8080 182808054813      3        NA
# 4: 2017-05-11 08:12:28    IP_2    IP_1    8080   54813 182808054813      3         0
# 5: 2017-05-02 06:51:16    IP_1    IP_2   50794    8080 182808050794      5        NA
# 6: 2017-05-02 06:51:16    IP_2    IP_1    8080   50794 182808050794      5         0
# 7: 2017-05-11 06:32:49    IP_1    IP_2   50794    8080 182808050794      5    257741
# 8: 2017-05-11 06:32:49    IP_2    IP_1    8080   50794 182808050794      5         0
# 9: 2017-05-04 06:52:05    IP_1    IP_2   51896    8080 182808051896      5        NA
#10: 2017-05-04 06:52:05    IP_2    IP_1    8080   51896 182808051896      5         0
#11: 2017-05-04 10:22:26    IP_1    IP_2   51896    8080 182808051896      5     12621
#12: 2017-05-04 10:22:26    IP_2    IP_1    8080   51896 182808051896      5         0

DATA OF REAL LIFE EXAMPLES ABOVE

structure(list(cdatetime = structure(c(1494590312, 1494590312, 
1494999436, 1494479548, 1494479548, 1494999436, 1493697076, 1493697076, 
1494215828, 1494473569, 1494473569, 1493869925, 1493869925, 1493882546, 
1493882546, 1494217367, 1494816960, 1495006860, 1495089060), class = c("POSIXct", 
"POSIXt"), tzone = ""), srcaddr = structure(c(1L, 2L, 1L, 1L, 
2L, 1L, 1L, 2L, 1L, 1L, 2L, 1L, 2L, 1L, 2L, 1L, 1L, 1L, 1L), .Label = c("IP_1", 
"IP_2"), class = "factor"), dstaddr = structure(c(2L, 1L, 2L, 
2L, 1L, 2L, 2L, 1L, 2L, 2L, 1L, 2L, 1L, 2L, 1L, 2L, 2L, 2L, 2L
), .Label = c("IP_1", "IP_2"), class = "factor"), srcport = c(54793L, 
8080L, 54793L, 54813L, 8080L, 54813L, 50794L, 8080L, 50794L, 
50794L, 8080L, 51896L, 8080L, 51896L, 8080L, 51896L, 62744L, 
62744L, 62744L), dstport = c(8080L, 54793L, 8080L, 8080L, 54813L, 
8080L, 8080L, 50794L, 8080L, 8080L, 50794L, 8080L, 51896L, 8080L, 
51896L, 8080L, 162L, 162L, 162L), key = c(182808054793, 182808054793, 
182808054793, 182808054813, 182808054813, 182808054813, 182808050794, 
182808050794, 182808050794, 182808050794, 182808050794, 182808051896, 
182808051896, 182808051896, 182808051896, 182808051896, 17016262744, 
17016262744, 17016262744), totals = c(3L, 3L, 3L, 3L, 3L, 3L, 
5L, 5L, 5L, 5L, 5L, 5L, 5L, 5L, 5L, 5L, 3L, 3L, 3L), time_diff = c(NA, 
0, 409124, NA, 0, 519888, NA, 0, 518752, 257741, 0, NA, 0, 12621, 
0, 334821, NA, 189900, 82200)), .Names = c("cdatetime", "srcaddr", 
"dstaddr", "srcport", "dstport", "key", "totals", "time_diff"
), row.names = c(NA, -19L), class = c("data.table", "data.frame"
))

All the above will run on a dataSet with approx. 180M rows, so efficiency is the key word here.

like image 332
Sotos Avatar asked Oct 29 '22 07:10

Sotos


2 Answers

This is my suggestion to identify pairs of complete flows. (However, note the caveats at the end)

library(data.table)   # CRAN version 1.10.4 used 

# set keys to avoid using order() repeatedly
setkey(DT, key, cdatetime)
# compute time diff again, grouped by key,
# using shift() with default "lag" type and a useful fill value 
DT[, time_diff := difftime(cdatetime, shift(cdatetime, fill = -Inf), units = "secs"), by = key]
# now find the begin of a potentially new pair where time_diff > 30 secs
# and count the jumps within each key group using cumsum()
DT[, pair.id := cumsum(time_diff > 30), by = key]
# count the number of partners within each supposedly pair
DT[, count.pairs := .N, .(key, pair.id)]

Note that in R, the logical values can be coerced to numeric:

as.integer(FALSE)
#[1] 0
as.integer(TRUE)
#[1] 1

So, everytime cumsum() finds time_diff > 30 to be TRUE the pair.id count is advanced by one. Otherwise, i.e., if the time differences is less than 30 secs, the pair.id remains the same. By this, pairs or groups of events laying in the 30 secs time window are identified.

Now, DThas gained two additional columns (note that setkey()has changed the order of rows):

              cdatetime srcaddr dstaddr srcport dstport          key totals   time_diff pair.id count.pairs
 1: 2017-05-15 04:56:00    IP_1    IP_2   62744     162  17016262744      3    Inf secs       1           1
 2: 2017-05-17 09:41:00    IP_1    IP_2   62744     162  17016262744      3 189900 secs       2           1
 3: 2017-05-18 08:31:00    IP_1    IP_2   62744     162  17016262744      3  82200 secs       3           1
 4: 2017-05-02 05:51:16    IP_1    IP_2   50794    8080 182808050794      5    Inf secs       1           2
 5: 2017-05-02 05:51:16    IP_2    IP_1    8080   50794 182808050794      5      0 secs       1           2
 6: 2017-05-08 05:57:08    IP_1    IP_2   50794    8080 182808050794      5 518752 secs       2           1
 7: 2017-05-11 05:32:49    IP_1    IP_2   50794    8080 182808050794      5 257741 secs       3           2
 8: 2017-05-11 05:32:49    IP_2    IP_1    8080   50794 182808050794      5      0 secs       3           2
 9: 2017-05-04 05:52:05    IP_1    IP_2   51896    8080 182808051896      5    Inf secs       1           2
10: 2017-05-04 05:52:05    IP_2    IP_1    8080   51896 182808051896      5      0 secs       1           2
11: 2017-05-04 09:22:26    IP_1    IP_2   51896    8080 182808051896      5  12621 secs       2           2
12: 2017-05-04 09:22:26    IP_2    IP_1    8080   51896 182808051896      5      0 secs       2           2
13: 2017-05-08 06:22:47    IP_1    IP_2   51896    8080 182808051896      5 334821 secs       3           1
14: 2017-05-12 13:58:32    IP_1    IP_2   54793    8080 182808054793      3    Inf secs       1           2
15: 2017-05-12 13:58:32    IP_2    IP_1    8080   54793 182808054793      3      0 secs       1           2
16: 2017-05-17 07:37:16    IP_1    IP_2   54793    8080 182808054793      3 409124 secs       2           1
17: 2017-05-11 07:12:28    IP_1    IP_2   54813    8080 182808054813      3    Inf secs       1           2
18: 2017-05-11 07:12:28    IP_2    IP_1    8080   54813 182808054813      3      0 secs       1           2
19: 2017-05-17 07:37:16    IP_1    IP_2   54813    8080 182808054813      3 519888 secs       2           1

Now, it has to be decided which rows to keep. In this small sample there are only groups with two memebers (pairs) or singles.

DT[count.pairs > 1]

shows only pairs

              cdatetime srcaddr dstaddr srcport dstport          key totals   time_diff pair.id count.pairs
 1: 2017-05-02 05:51:16    IP_1    IP_2   50794    8080 182808050794      5    Inf secs       1           2
 2: 2017-05-02 05:51:16    IP_2    IP_1    8080   50794 182808050794      5      0 secs       1           2
 3: 2017-05-11 05:32:49    IP_1    IP_2   50794    8080 182808050794      5 257741 secs       3           2
 4: 2017-05-11 05:32:49    IP_2    IP_1    8080   50794 182808050794      5      0 secs       3           2
 5: 2017-05-04 05:52:05    IP_1    IP_2   51896    8080 182808051896      5    Inf secs       1           2
 6: 2017-05-04 05:52:05    IP_2    IP_1    8080   51896 182808051896      5      0 secs       1           2
 7: 2017-05-04 09:22:26    IP_1    IP_2   51896    8080 182808051896      5  12621 secs       2           2
 8: 2017-05-04 09:22:26    IP_2    IP_1    8080   51896 182808051896      5      0 secs       2           2
 9: 2017-05-12 13:58:32    IP_1    IP_2   54793    8080 182808054793      3    Inf secs       1           2
10: 2017-05-12 13:58:32    IP_2    IP_1    8080   54793 182808054793      3      0 secs       1           2
11: 2017-05-11 07:12:28    IP_1    IP_2   54813    8080 182808054813      3    Inf secs       1           2
12: 2017-05-11 07:12:28    IP_2    IP_1    8080   54813 182808054813      3      0 secs       1           2

while

DT[count.pairs <= 1]

shows the singles to be removed:

             cdatetime srcaddr dstaddr srcport dstport          key totals   time_diff pair.id count.pairs
1: 2017-05-15 04:56:00    IP_1    IP_2   62744     162  17016262744      3    Inf secs       1           1
2: 2017-05-17 09:41:00    IP_1    IP_2   62744     162  17016262744      3 189900 secs       2           1
3: 2017-05-18 08:31:00    IP_1    IP_2   62744     162  17016262744      3  82200 secs       3           1
4: 2017-05-08 05:57:08    IP_1    IP_2   50794    8080 182808050794      5 518752 secs       2           1
5: 2017-05-08 06:22:47    IP_1    IP_2   51896    8080 182808051896      5 334821 secs       3           1
6: 2017-05-17 07:37:16    IP_1    IP_2   54793    8080 182808054793      3 409124 secs       2           1
7: 2017-05-17 07:37:16    IP_1    IP_2   54813    8080 182808054813      3 519888 secs       2           1

Caveat It might happen in production data that more than two events with the same key fall within a 30 seconds timespan. There are some options to deal with this:

  • Keep only exact pairs: DT[count.pairs == 2] ignoring all other cases.
  • Keep only "pairs" with an even number of partners: DT[count.pairs %% 2L == 0L] (although this may keep cases where the number of inbound connections does not equal the number of outbound connections)
  • and other options which would require additional work, e.g., separately counting of inbound and outbound connections.
  • Finally, the 30 secs time window could be decreased.

The statistics can be created using

DT[, .N, count.pairs]
#   count.pairs  N
#1:           1  7
#2:           2 12

It would be interesting to see this for the production data set.

like image 135
Uwe Avatar answered Nov 09 '22 08:11

Uwe


Apparently this works:

DT[, keep := 
  (!is.na(time_diff) & time_diff < 30) | 
  shift(time_diff, type="lead", fill = 999) < 30
, by=key]

DT[(keep), !"keep"]

              cdatetime srcaddr dstaddr srcport dstport          key totals time_diff
 1: 2017-05-12 07:58:32    IP_1    IP_2   54793    8080 182808054793      3        NA
 2: 2017-05-12 07:58:32    IP_2    IP_1    8080   54793 182808054793      3         0
 3: 2017-05-11 01:12:28    IP_1    IP_2   54813    8080 182808054813      3        NA
 4: 2017-05-11 01:12:28    IP_2    IP_1    8080   54813 182808054813      3         0
 5: 2017-05-01 23:51:16    IP_1    IP_2   50794    8080 182808050794      5        NA
 6: 2017-05-01 23:51:16    IP_2    IP_1    8080   50794 182808050794      5         0
 7: 2017-05-10 23:32:49    IP_1    IP_2   50794    8080 182808050794      5    257741
 8: 2017-05-10 23:32:49    IP_2    IP_1    8080   50794 182808050794      5         0
 9: 2017-05-03 23:52:05    IP_1    IP_2   51896    8080 182808051896      5        NA
10: 2017-05-03 23:52:05    IP_2    IP_1    8080   51896 182808051896      5         0
11: 2017-05-04 03:22:26    IP_1    IP_2   51896    8080 182808051896      5     12621
12: 2017-05-04 03:22:26    IP_2    IP_1    8080   51896 182808051896      5         0

There are plenty of ways to make this faster, I guess, but it's probably more important to clarify whether it's doing what the OP wants (considering the code is much simpler than the OP's rules).

like image 22
Frank Avatar answered Nov 09 '22 07:11

Frank