Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Analysis of a large json log file in node.js

I have the following JSON file:

sensorlogs.json
{"arr":[{"UTCTime":10000001,"s1":22,"s2":32,"s3":42,"s4":12},
{"UTCTime":10000002,"s1":23,"s2":33,"s4":13},
{"UTCTime":10000003,"s1":24,"s2":34,"s3":43,"s4":14},
{"UTCTime":10000005,"s1":26,"s2":36,"s3":44,"s4":16},
{"UTCTime":10000006,"s1":27,"s2":37,"s4":17},
{"UTCTime":10000004,"s1":25,"s2":35,"s4":15},
...
{"UTCTime":12345678,"s1":57,"s2":35,"s3":77,"s4":99}
]}

Sensors s1, s2, s3, etc all are transmitting at different frequencies (Note that s3 is transmitting every 2 seconds, and timestanps can be out of order).

How can I achieve something like -

Analyzing s1:
s = [[10000001, 22], [10000002, 23],.. [12345678,57]]
s1 had 2 missing entries
Analyzing s2:
s = [[10000001, 32], [10000002, 33],.. [12345678,35]]
s2 had 0 missing entries
Analyzing s3:
s = [[10000001, 42], [10000003, 43],.. [12345678,77]]
s3 had 0 missing entries
Analyzing s4:
s = [[10000001, 12], [10000003, 13],.. [12345678,99]]
s4 had 1 missing entries

sensorlogs.json is 16 GB.

Missing entries can be found based on the difference in the consecutive UTC timestamps. Each sensor is transmitted at a known frequency.

I cannot use multiple large arrays for my analysis due to memory constraints, so I will have to make multiple passes over the same JSON log file and use only single large array for analysis.

What I have till now is following -

var result = [];
//1. Extract all the keys from the log file
console.log("Extracting keys... \n");
var stream = fs.createReadStream(filePath);
var lineReader = lr.createInterface(
{
  input: stream
});

lineReader.on('line', function (line) 
{
  getKeys(line);//extract all the keys from the JSON
});
stream.on('end', function()
{
  //obj -> arr
  for(var key in tmpObj)
    arrStrm.push(key);

  //2. Validate individual sensors
  console.log("Validating the sensor data ...\n");

  //Synchronous execution of the sensors in the array
  async.each(arrStrm, function(key)
  {
    {
        currSensor = key;
        console.log("validating " + currSensor + "...\n");

        stream = fs.createReadStream(filePath);
        lineReader = lr.createInterface(
        {
          input: stream
        });

        lineReader.on('line', function (line) 
        {
          processLine(line);//Create the arrays for the sensors
        });
        stream.on('end', function()
        {
            processSensor(currSensor);//Process the data for the current sensor
        });
    }
  });
});

function getKeys(line) 
{
    if(((pos = line.indexOf('[')) >= 0)||((pos = line.indexOf(']')) >= 0))
        return;
    if (line[line.length-1] == '\r') line=line.substr(0,line.length-1); // discard CR (0x0D)
    if (line[line.length-1] == ',') line=line.substr(0,line.length-1); // discard ,
//  console.log(line);

    if (line.length > 1) 
    { // ignore empty lines
        var obj = JSON.parse(line); // parse the JSON
        for(var key in obj) 
        {
            if(key != "debug")
            {
                if(tmpObj[key] == undefined)
                    tmpObj[key]=[];
            }
        };
    }
}

Of course this doesn't work, and I am not able to find anything on the net which explains how this can be implemented.

Note: I can choose any language of my choice to develop this tool (C/C++,C#/Java/Python), but I am going with JavaScript because of it's capability of parsing JSON arrays easily (and my interest in getting better in JS as well). Does someone like to suggest an alternate language to do this if JavaScript isn't the best language make such a tool?

Edit: Some important info which either is not very clear or I did not include earlier, but looks like it is important to include in the question -

  1. The data in the JSON logs is not streaming live, its a stored JSON file in a hard disk
  2. Data stored is not in chronological order, which means that the timestamps might not be in the correct order. So each sensor data needs to be sorted based on the timestamps after it has been stored in an array
  3. I can not use separate arrays for each sensor (that will be same as storing entire 16 GB JSON in RAM), and to save memory, only one array should be used at a time. And yes, there are more than 4 sensors in my log, this is just a sample (roughly 20 to give an idea)

I have modified my JSON and expected output

One solution might be to make multiple passes over the JSON file, storing one sensor data with timestamps in an array at a time, then sorting the array and then finally analyzing the data for corruption and gaps. And thats what I'm trying to do in my code above

like image 636
user3509549 Avatar asked Sep 13 '16 01:09

user3509549


2 Answers

So you have that big fat sensorslog of 16GB wrapped in json.

To start, an entire json file of 16GB isn't realistic, simply because the opening and closing brackets breaks the regularity and turns into just annoying characters in an array. We know the file has a beginning and an end, and moreover, without them your program can work on chunks of the file or even on a stream directly plugged to the device. So let's assume what we will be processing is this :

{"UTCTime":10000001,"s1":22,"s2":32,"s3":42,"s4":12},
{"UTCTime":10000002,"s1":23,"s2":33,"s4":13},
{"UTCTime":10000003,"s1":24,"s2":34,"s3":43,"s4":14},
...
{"UTCTime":12345678,"s1":57,"s2":35,"s3":77,"s4":99},

and even adding or detecting that missing comma at the end shouldn't be too difficult.

Now every line is formatted the same way, and can be interpreted as json. The problem is : do the sensors outputted data when expected to ? If we are sure they speak at the right time and at the right frequency (case 1) but that sometimes they could miss a writing, all is well. However if they start to make slight slip over the time frame (case 2), then some kind of heuristic to recover the proper line-line frequency is needed and the analysis will be longer.

If we are not processing this realtime, a first and easy validation check for the file is to tell if on every freq line is found the expected sensors data, right ?

In any case, since it's a very big file it has to be processed line by line whenever possible.

In the following program, I considered only case 1, and that we could be processing a continuous stream.

#!/usr/bin/python
import json

sensors={}
sensors['s1']=[1] # frequencies
sensors['s2']=[1]
sensors['s3']=[2]
sensors['s4']=[1]

# append data array and error counter at sensors[i]
# it holds [freq,err,data]
for k,v in sensors.iteritems(): sensors[k].extend([0,[]])
FRQ=0;ERR=1;DAT=2
print list(sorted(sensors.items()))
S=list(sorted(sensors.keys()))

with open('./sensors.json', "r") as stream:
    i=0
    for line in stream:
      if not line.rstrip(): continue # skip blank lines
      j=json.loads(line[:-2]) # skip comma and \n
      t=j["UTCTime"]
      for k in S:
          sensor=sensors[k]
          if i%sensor[FRQ]==0 : # every Nth iteration
            v=j.get(k)
            if v is None:
                sensor[ERR]+=1
                print k,"has",sensor[ERR],"missing entries"
            sensor[DAT].append([t,v]) # append that sensor data
            # filling up the memory...
      i+=1

for k,v in sorted(sensors.iteritems()): print k,sensors[k][DAT]
for k,v in sorted(sensors.iteritems()): print k,'had',sensors[k][ERR],"missing entries"

To handle case 2 we would invert the None check with the modulus check, verify if a sensor wrote something when it wasn't supposed to and then try to detect shifts.

Last note : your program could get short on memory, so perhaps keeping the entire data in memory isn't a good idea. If it's intended to use separate arrays for each sensors to do further processing, it might be wiser to write them to files.

like image 120
Flint Avatar answered Oct 16 '22 10:10

Flint


Edited again to consider your edits:

var fs = require('fs');
var stream = fs.createReadStream('sensorlogs.json', {flags: 'r', encoding: 'utf-8'});
var buffer = '';
var sensor = process.argv[2];
var readings = [];
var missingCont = 0;
console.log('Analizying ' + sensor + ':');

stream.on('data', function(d) {
    buffer += d.toString();
    processBuffer();
    console.log(readings);
    console.log(sensor + ' had ' + missingCont + ' missing entries');
});

function processBuffer() {
  buffer = buffer.slice(buffer.indexOf('[{'));
  while(buffer.indexOf('{') != -1) {
    buffer = buffer.slice(buffer.indexOf('{"'));
    processLine(buffer.slice(0, buffer.indexOf('}') + 1));
    buffer = buffer.slice(buffer.indexOf('}') + 2);
  }
};

function processLine(line) {
  if(line != ""){
    var obj = JSON.parse(line);
    if(!obj[sensor]){
      missingCont++;
    }else{
      var pos;
      for(pos = 0; pos < readings.length; pos++){
        if(obj.UTCTime < readings[pos][0]){
          var reading = [obj.UTCTime, obj[sensor]]
          readings.splice(pos, 0, reading);
          break;
        }
      }
      if(pos == readings.length){
        readings.push([obj.UTCTime, obj[sensor]]);
      }
    }
  }
};

You would have to call it with a param of which sensor you want to analyze:

node.exe scripts\processJson.js <param>

To test it out I took this sample:

{"arr":[{"UTCTime":10000001,"s1":22,"s2":32,"s3":42,"s4":12},
{"UTCTime":10000005,"s1":20,"s2":30,"s3":40,"s4":10},
{"UTCTime":10000002,"s1":23,"s2":33,"s4":13},
{"UTCTime":10000003,"s1":24,"s2":34,"s3":43,"s4":14},
{"UTCTime":12345678,"s1":57,"s2":35,"s3":77,"s4":99}
]}

And the output was:

> node.exe scripts\processJson.js s1
Analizying s1:
[[10000001, 22], [10000002, 23], [10000003, 24], [10000005, 20], [12345678, 57]]
s1 had 0 missing entries

> node.exe scripts\processJson.js s2
Analizying s2:
[[10000001, 32], [10000002, 33], [10000003, 34], [10000005, 30], [12345678, 35]]
s2 had 0 missing entries

> node.exe scripts\processJson.js s3
Analizying s3:
[[10000001, 42], [10000003, 43], [10000005, 40], [12345678, 77]]
s3 had 1 missing entries

> node.exe scripts\processJson.js s4
Analizying s4:
[[10000001, 12], [10000002, 13], [10000003, 14], [10000005, 10], [12345678, 99]]
s4 had 0 missing entries
like image 32
Artemio Ramirez Avatar answered Oct 16 '22 10:10

Artemio Ramirez