TL;DR I'm using Influxdb v2.0 and use the Influx query syntax (as in the GUI). I'm having multiple series (same _field, different tag) of digital 0/1 state, and I want to sum them up. The problem is that the state is stored in the database with irregular time interval, meaning that for any time the real actual value for each tag should be queried with the last point possible. I have tried aggregateWindow with 'last' as the function but last just drop table for the windows with no point stored. Is there anyway I could sum them up? I accept any method (including exporting the data and use other language script instead lmao). Thank you in advance.
The Scenario
My team had implemented a check-in/check-out system with phone number representing each person for a real world event earlier, and had decided to use InfluxDB v2.0 as a database (We choose it so we can monitor through Grafana easily). I have a bucket storing points of checkin/checkout value, all the same schema. The schema is as followed:
measurement: 'user'
tags: [phone, type] // type is either ['normal', 'staff']
value: 0 or 1 // 0 for checking out event, 1 for checking in event
Whenever someone checks in the event, a point of value 1 is inserted, and vice versa, a point of value 0 is inserted whenever someone checks out the event. Keep in mind that the point can duplicate if user decided to trigger the api again like having already checked in earlier and check in again (although, we view this as having the same state of 1). So the data is like a digital 0/1 state but with irregular time interval of points, one graph line for each phone number. Same phone numbers but with different types are seen as different person for us.
The project had already deployed and we are tasked to do post-processing with the data. The problem is to visualize a graph of in-event population for the entire time. In mathematic point of view, this should be easily solved by summing all the state of each person (The 0/1 line) over time. I first tried something like this in the Influx query:
from(bucket: "event_name")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "user")
|> group(columns: ["type"])
|> aggregateWindow(every: v.windowPeriod, fn: sum, createEmpty: true)
|> yield()
The result looks very promising, a population graph with 2 colours of type normal and staff. But when I look carefully, the sum function of Influx actually sum the _value of each point in each window. Meaning that for some window with no point, the sum function does not actually sum up everyone in the database. The goal is to sum the actual _value for those window with no point (the _value of these window should be the same as the _value of the last point, ex. like I had checked in at 7.00pm and the _value should be 1 all the time after 7.00pm even some window does not have any point). I then tried something like this:
from(bucket: "event_name")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "user")
|> aggregateWindow(every: 1m, fn: last, createEmpty: true)
|> fill(usePrevious: true)
|> group(columns: ["type"])
|> aggregateWindow(every: 1m, fn: sum)
|> yield()
I use last point for each window, then fill the window with empty _value with previous possible point, then sum up the _value of each window again. But then I found out that last function actually drop empty table, meaning that the window with no point is dropped (createEmpty is then useless). The problem then scope into that I must find function like last but without dropping empty table. I have tried reduce to create my own logic like last but sadly it didn't go like I want (might be that I coded it wrong).
If you have any idea, please help. Thank you very much.
Nvm, I have found the solution, here is for those who are right in the same situation, though not very elegant in performance but it's the only query I have found it works.
from(bucket: "event_name")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "user")
|> aggregateWindow(every: 1m, fn: last, createEmpty: false)
|> aggregateWindow(every: 1m, fn: mean, createEmpty: true)
|> fill(usePrevious: true)
|> fill(value: 0.0)
|> group(columns: ["type"])
|> aggregateWindow(every: 1m, fn: sum, createEmpty: false)
|> yield(name: "population")
last first to have the latest state for each window (though last actually drop empty tables, so making createEmpty: true is useless anyways)mean with createEmpty: true in order to create points with null _value for empty windows. For windows that actually have real points, mean shouldn't change the value as there should only be 1 point per window because we used last earlier. The point of using mean here is just to create null points for empty windows. The step here is just to find a not-doing-anything function that doesn't drop empty table created by createEmpty. Fyi, I have tried many functions including making my own custom function like reduce and map but they do drop empty tables (and assigning null isn't even allowed), I even create an empty function like fn: (tables=<-, x) => tables for aggregateWindow but it drop empty tables anyways. So mean is my best bet here, though the side effect is my values changes from int to float.fill here to replace null points with value from the last window. This is why I'm trying to assign null to the point in empty windows from the last step, and mean can only does this. The second fill is for those early empty window which should represent 0-state.group by the type and summing them up should be the result I looking forHope I could help anyone who are in the same situation like me in the future
I’m not sure why they made this so complicated, but I found an alternative approach that’s a bit more elegant:
|> window(every: 1s)
|> mean()
|> group(columns: ["_stop"])
|> sum()
|> duplicate(column: "_stop", as: "_time")
|> window(every: inf)
Most of this is adapted from this page.
Window the data:
We first split all series into uniform 1-second windows using window(every: 1s). This gives us fixed time intervals for aggregation.
Average within each series
Before combining series, we compute the mean() of values in each 1-second window per series. This is important when sampling rates vary — e.g., one series may have multiple points in a 1-second window, another only one. We want each series to contribute a single representative value per window.
Group across all series:
By default, aggregation functions like sum() operate per series (i.e., per tag combination). To combine all series, we group by the _stop timestamp of the windows:
|> group(columns: ["_stop"])
This merges all series' data points that fall into the same 1-second window.
Aggregate:
Now when we sum(), we’re summing across all series, not just within one.
Restore the time column:
Flux removes _time when windowing; we bring it back by copying _stop to _time:
|> duplicate(column: "_stop", as: "_time")
Unwindow the data:
Finally, we collapse everything back into a single continuous series:
|> window(every: inf)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With