Consider the following sample data table,
dt <- data.table(src = LETTERS[1:10],
dst = LETTERS[10:1],
src1 = letters[15:24],
dst1 = letters[24:15])
#which looks like,
# src dst src1 dst1
# 1: A J o x
# 2: B I p w
# 3: C H q v
# 4: D G r u
# 5: E F s t
# 6: F E t s
# 7: G D u r
# 8: H C v q
# 9: I B w p
#10: J A x o
The first goal is to order it based on reversed rowwise paired elements (src - dst & src1 - dst1) which can be achieved as follows to create the 5 'pairs':
dt[, key := paste0(pmin(src, dst), pmax(src, dst), pmin(src1, dst1), pmax(src1, dst1))][order(key)]
# src dst src1 dst1 key
# 1: A J o x AJox
# 2: J A x o AJox
# 3: B I p w BIpw
# 4: I B w p BIpw
# 5: C H q v CHqv
# 6: H C v q CHqv
# 7: D G r u DGru
# 8: G D u r DGru
# 9: E F s t EFst
#10: F E t s EFst
However, in real life there could be at least one row that does not have a pair ("incomplete flow"). So a real life example of that would be,
# cdatetime srcaddr dstaddr srcport dstport key totals time_diff
# 1: 2017-05-12 14:58:32 IP_1 IP_2 54793 8080 182808054793 3 NA
# 2: 2017-05-12 14:58:32 IP_2 IP_1 8080 54793 182808054793 3 0
# 3: 2017-05-17 08:37:16 IP_1 IP_2 54793 8080 182808054793 3 409124
# 4: 2017-05-11 08:12:28 IP_1 IP_2 54813 8080 182808054813 3 NA
# 5: 2017-05-11 08:12:28 IP_2 IP_1 8080 54813 182808054813 3 0
# 6: 2017-05-17 08:37:16 IP_1 IP_2 54813 8080 182808054813 3 519888
# 7: 2017-05-02 06:51:16 IP_1 IP_2 50794 8080 182808050794 5 NA
# 8: 2017-05-02 06:51:16 IP_2 IP_1 8080 50794 182808050794 5 0
# 9: 2017-05-08 06:57:08 IP_1 IP_2 50794 8080 182808050794 5 518752
#10: 2017-05-11 06:32:49 IP_1 IP_2 50794 8080 182808050794 5 257741
#11: 2017-05-11 06:32:49 IP_2 IP_1 8080 50794 182808050794 5 0
#12: 2017-05-04 06:52:05 IP_1 IP_2 51896 8080 182808051896 5 NA
#13: 2017-05-04 06:52:05 IP_2 IP_1 8080 51896 182808051896 5 0
#14: 2017-05-04 10:22:26 IP_1 IP_2 51896 8080 182808051896 5 12621
#15: 2017-05-04 10:22:26 IP_2 IP_1 8080 51896 182808051896 5 0
#16: 2017-05-08 07:22:47 IP_1 IP_2 51896 8080 182808051896 5 334821
#17: 2017-05-15 05:56:00 IP_1 IP_2 62744 162 17016262744 3 NA
#18: 2017-05-17 10:41:00 IP_1 IP_2 62744 162 17016262744 3 189900
#19: 2017-05-18 09:31:00 IP_1 IP_2 62744 162 17016262744 3 82200
The second goal now is to remove those incomplete flows. Now, to identify those "incomplete flows" we calculate the time difference between them and set 30 as the threshold. The tricky part comes here; If we have only 2 rows and they are more than 30 seconds apart, then filter them both out and for 3 rows (for a certain key) then the one with time diff > 30 needs to go - or if two of them have > 30 seconds apart, remove all 3. However, when we have 4 or more rows, then we need to drop the one(s) with time difference > 30 that do not have another pair with < 30. From the above table, rows 3, 6, 9, 16, 17, 18, 19 have to be removed based on the condition that they are more than 30 seconds apart. Looking at cdatetime
will clarify which are the complete flows.
Expected output would be,
# cdatetime srcaddr dstaddr srcport dstport key totals time_diff
# 1: 2017-05-12 14:58:32 IP_1 IP_2 54793 8080 182808054793 3 NA
# 2: 2017-05-12 14:58:32 IP_2 IP_1 8080 54793 182808054793 3 0
# 3: 2017-05-11 08:12:28 IP_1 IP_2 54813 8080 182808054813 3 NA
# 4: 2017-05-11 08:12:28 IP_2 IP_1 8080 54813 182808054813 3 0
# 5: 2017-05-02 06:51:16 IP_1 IP_2 50794 8080 182808050794 5 NA
# 6: 2017-05-02 06:51:16 IP_2 IP_1 8080 50794 182808050794 5 0
# 7: 2017-05-11 06:32:49 IP_1 IP_2 50794 8080 182808050794 5 257741
# 8: 2017-05-11 06:32:49 IP_2 IP_1 8080 50794 182808050794 5 0
# 9: 2017-05-04 06:52:05 IP_1 IP_2 51896 8080 182808051896 5 NA
#10: 2017-05-04 06:52:05 IP_2 IP_1 8080 51896 182808051896 5 0
#11: 2017-05-04 10:22:26 IP_1 IP_2 51896 8080 182808051896 5 12621
#12: 2017-05-04 10:22:26 IP_2 IP_1 8080 51896 182808051896 5 0
DATA OF REAL LIFE EXAMPLES ABOVE
structure(list(cdatetime = structure(c(1494590312, 1494590312,
1494999436, 1494479548, 1494479548, 1494999436, 1493697076, 1493697076,
1494215828, 1494473569, 1494473569, 1493869925, 1493869925, 1493882546,
1493882546, 1494217367, 1494816960, 1495006860, 1495089060), class = c("POSIXct",
"POSIXt"), tzone = ""), srcaddr = structure(c(1L, 2L, 1L, 1L,
2L, 1L, 1L, 2L, 1L, 1L, 2L, 1L, 2L, 1L, 2L, 1L, 1L, 1L, 1L), .Label = c("IP_1",
"IP_2"), class = "factor"), dstaddr = structure(c(2L, 1L, 2L,
2L, 1L, 2L, 2L, 1L, 2L, 2L, 1L, 2L, 1L, 2L, 1L, 2L, 2L, 2L, 2L
), .Label = c("IP_1", "IP_2"), class = "factor"), srcport = c(54793L,
8080L, 54793L, 54813L, 8080L, 54813L, 50794L, 8080L, 50794L,
50794L, 8080L, 51896L, 8080L, 51896L, 8080L, 51896L, 62744L,
62744L, 62744L), dstport = c(8080L, 54793L, 8080L, 8080L, 54813L,
8080L, 8080L, 50794L, 8080L, 8080L, 50794L, 8080L, 51896L, 8080L,
51896L, 8080L, 162L, 162L, 162L), key = c(182808054793, 182808054793,
182808054793, 182808054813, 182808054813, 182808054813, 182808050794,
182808050794, 182808050794, 182808050794, 182808050794, 182808051896,
182808051896, 182808051896, 182808051896, 182808051896, 17016262744,
17016262744, 17016262744), totals = c(3L, 3L, 3L, 3L, 3L, 3L,
5L, 5L, 5L, 5L, 5L, 5L, 5L, 5L, 5L, 5L, 3L, 3L, 3L), time_diff = c(NA,
0, 409124, NA, 0, 519888, NA, 0, 518752, 257741, 0, NA, 0, 12621,
0, 334821, NA, 189900, 82200)), .Names = c("cdatetime", "srcaddr",
"dstaddr", "srcport", "dstport", "key", "totals", "time_diff"
), row.names = c(NA, -19L), class = c("data.table", "data.frame"
))
All the above will run on a dataSet with approx. 180M rows, so efficiency is the key word here.
This is my suggestion to identify pairs of complete flows. (However, note the caveats at the end)
library(data.table) # CRAN version 1.10.4 used
# set keys to avoid using order() repeatedly
setkey(DT, key, cdatetime)
# compute time diff again, grouped by key,
# using shift() with default "lag" type and a useful fill value
DT[, time_diff := difftime(cdatetime, shift(cdatetime, fill = -Inf), units = "secs"), by = key]
# now find the begin of a potentially new pair where time_diff > 30 secs
# and count the jumps within each key group using cumsum()
DT[, pair.id := cumsum(time_diff > 30), by = key]
# count the number of partners within each supposedly pair
DT[, count.pairs := .N, .(key, pair.id)]
Note that in R, the logical values can be coerced to numeric:
as.integer(FALSE)
#[1] 0
as.integer(TRUE)
#[1] 1
So, everytime cumsum()
finds time_diff > 30
to be TRUE
the pair.id
count is advanced by one. Otherwise, i.e., if the time differences is less than 30 secs, the pair.id
remains the same. By this, pairs or groups of events laying in the 30 secs time window are identified.
Now, DT
has gained two additional columns (note that setkey()
has changed the order of rows):
cdatetime srcaddr dstaddr srcport dstport key totals time_diff pair.id count.pairs
1: 2017-05-15 04:56:00 IP_1 IP_2 62744 162 17016262744 3 Inf secs 1 1
2: 2017-05-17 09:41:00 IP_1 IP_2 62744 162 17016262744 3 189900 secs 2 1
3: 2017-05-18 08:31:00 IP_1 IP_2 62744 162 17016262744 3 82200 secs 3 1
4: 2017-05-02 05:51:16 IP_1 IP_2 50794 8080 182808050794 5 Inf secs 1 2
5: 2017-05-02 05:51:16 IP_2 IP_1 8080 50794 182808050794 5 0 secs 1 2
6: 2017-05-08 05:57:08 IP_1 IP_2 50794 8080 182808050794 5 518752 secs 2 1
7: 2017-05-11 05:32:49 IP_1 IP_2 50794 8080 182808050794 5 257741 secs 3 2
8: 2017-05-11 05:32:49 IP_2 IP_1 8080 50794 182808050794 5 0 secs 3 2
9: 2017-05-04 05:52:05 IP_1 IP_2 51896 8080 182808051896 5 Inf secs 1 2
10: 2017-05-04 05:52:05 IP_2 IP_1 8080 51896 182808051896 5 0 secs 1 2
11: 2017-05-04 09:22:26 IP_1 IP_2 51896 8080 182808051896 5 12621 secs 2 2
12: 2017-05-04 09:22:26 IP_2 IP_1 8080 51896 182808051896 5 0 secs 2 2
13: 2017-05-08 06:22:47 IP_1 IP_2 51896 8080 182808051896 5 334821 secs 3 1
14: 2017-05-12 13:58:32 IP_1 IP_2 54793 8080 182808054793 3 Inf secs 1 2
15: 2017-05-12 13:58:32 IP_2 IP_1 8080 54793 182808054793 3 0 secs 1 2
16: 2017-05-17 07:37:16 IP_1 IP_2 54793 8080 182808054793 3 409124 secs 2 1
17: 2017-05-11 07:12:28 IP_1 IP_2 54813 8080 182808054813 3 Inf secs 1 2
18: 2017-05-11 07:12:28 IP_2 IP_1 8080 54813 182808054813 3 0 secs 1 2
19: 2017-05-17 07:37:16 IP_1 IP_2 54813 8080 182808054813 3 519888 secs 2 1
Now, it has to be decided which rows to keep. In this small sample there are only groups with two memebers (pairs) or singles.
DT[count.pairs > 1]
shows only pairs
cdatetime srcaddr dstaddr srcport dstport key totals time_diff pair.id count.pairs
1: 2017-05-02 05:51:16 IP_1 IP_2 50794 8080 182808050794 5 Inf secs 1 2
2: 2017-05-02 05:51:16 IP_2 IP_1 8080 50794 182808050794 5 0 secs 1 2
3: 2017-05-11 05:32:49 IP_1 IP_2 50794 8080 182808050794 5 257741 secs 3 2
4: 2017-05-11 05:32:49 IP_2 IP_1 8080 50794 182808050794 5 0 secs 3 2
5: 2017-05-04 05:52:05 IP_1 IP_2 51896 8080 182808051896 5 Inf secs 1 2
6: 2017-05-04 05:52:05 IP_2 IP_1 8080 51896 182808051896 5 0 secs 1 2
7: 2017-05-04 09:22:26 IP_1 IP_2 51896 8080 182808051896 5 12621 secs 2 2
8: 2017-05-04 09:22:26 IP_2 IP_1 8080 51896 182808051896 5 0 secs 2 2
9: 2017-05-12 13:58:32 IP_1 IP_2 54793 8080 182808054793 3 Inf secs 1 2
10: 2017-05-12 13:58:32 IP_2 IP_1 8080 54793 182808054793 3 0 secs 1 2
11: 2017-05-11 07:12:28 IP_1 IP_2 54813 8080 182808054813 3 Inf secs 1 2
12: 2017-05-11 07:12:28 IP_2 IP_1 8080 54813 182808054813 3 0 secs 1 2
while
DT[count.pairs <= 1]
shows the singles to be removed:
cdatetime srcaddr dstaddr srcport dstport key totals time_diff pair.id count.pairs
1: 2017-05-15 04:56:00 IP_1 IP_2 62744 162 17016262744 3 Inf secs 1 1
2: 2017-05-17 09:41:00 IP_1 IP_2 62744 162 17016262744 3 189900 secs 2 1
3: 2017-05-18 08:31:00 IP_1 IP_2 62744 162 17016262744 3 82200 secs 3 1
4: 2017-05-08 05:57:08 IP_1 IP_2 50794 8080 182808050794 5 518752 secs 2 1
5: 2017-05-08 06:22:47 IP_1 IP_2 51896 8080 182808051896 5 334821 secs 3 1
6: 2017-05-17 07:37:16 IP_1 IP_2 54793 8080 182808054793 3 409124 secs 2 1
7: 2017-05-17 07:37:16 IP_1 IP_2 54813 8080 182808054813 3 519888 secs 2 1
Caveat It might happen in production data that more than two events with the same key fall within a 30 seconds timespan. There are some options to deal with this:
DT[count.pairs == 2]
ignoring all other cases.DT[count.pairs %% 2L == 0L]
(although this may keep cases where the number of inbound connections does not equal the number of outbound connections)The statistics can be created using
DT[, .N, count.pairs]
# count.pairs N
#1: 1 7
#2: 2 12
It would be interesting to see this for the production data set.
Apparently this works:
DT[, keep :=
(!is.na(time_diff) & time_diff < 30) |
shift(time_diff, type="lead", fill = 999) < 30
, by=key]
DT[(keep), !"keep"]
cdatetime srcaddr dstaddr srcport dstport key totals time_diff
1: 2017-05-12 07:58:32 IP_1 IP_2 54793 8080 182808054793 3 NA
2: 2017-05-12 07:58:32 IP_2 IP_1 8080 54793 182808054793 3 0
3: 2017-05-11 01:12:28 IP_1 IP_2 54813 8080 182808054813 3 NA
4: 2017-05-11 01:12:28 IP_2 IP_1 8080 54813 182808054813 3 0
5: 2017-05-01 23:51:16 IP_1 IP_2 50794 8080 182808050794 5 NA
6: 2017-05-01 23:51:16 IP_2 IP_1 8080 50794 182808050794 5 0
7: 2017-05-10 23:32:49 IP_1 IP_2 50794 8080 182808050794 5 257741
8: 2017-05-10 23:32:49 IP_2 IP_1 8080 50794 182808050794 5 0
9: 2017-05-03 23:52:05 IP_1 IP_2 51896 8080 182808051896 5 NA
10: 2017-05-03 23:52:05 IP_2 IP_1 8080 51896 182808051896 5 0
11: 2017-05-04 03:22:26 IP_1 IP_2 51896 8080 182808051896 5 12621
12: 2017-05-04 03:22:26 IP_2 IP_1 8080 51896 182808051896 5 0
There are plenty of ways to make this faster, I guess, but it's probably more important to clarify whether it's doing what the OP wants (considering the code is much simpler than the OP's rules).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With