Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Nifi/Cassandra - how to load CSV into Cassandra table

I have various CSV files incoming several times per day, storing timeseries data from sensors, which are parts of sensors stations. Each CSV is named after the sensor station and sensor id from which it is coming from, for instance "station1_sensor2.csv". At the moment, data is stored like this :

> cat station1_sensor2.csv
2016-05-04 03:02:01.001000+0000;0;
2016-05-04 03:02:01.002000+0000;0.1234;
2016-05-04 03:02:01.003000+0000;0.2345;

I have created a Cassandra table to store them and to be able to query them for various identified tasks. The Cassandra table looks like this :

cqlsh > CREATE KEYSPACE data with replication = {'class' : 'SimpleStrategy', 'replication_factor' : 3};

        CREATE TABLE sensor_data (
        station_id text, // id of the station
        sensor_id text,  // id of the sensor
        tps timestamp,   // timestamp of the measure
        val float,       // measured value
        PRIMARY KEY ((station_id, sensor_id), tps)
        );

I would like to use Apache Nifi to automatically store the data from the CSV into this Cassandra Table, but I can't find example or scheme to do it right. I have tried to use the "PutCassandraQL" processor, but I am struggling without any clear example. So, any help on how to execute a Cassandra put query with Apache Nifi to insert the data into the table would be appreciated !

like image 520
Piar Avatar asked Aug 29 '16 08:08

Piar


1 Answers

TL;DR I have a NiFi 1.0 template to accomplish this on Gist and in the NiFi Wiki.

NiFi encourages very modular design, so let's break this down into smaller tasks, I'll describe a possible flow and explain what each processor is used for in terms of your use case:

enter image description here

  1. Read in the CSV file. This can be done with GetFile, or preferably ListFile -> FetchFile. In my example I am using a scripting processor to create a flow file in-line, containing your example data from above. This makes my template portable for others to use.

  2. Parse the filename to get the station and sensor fields. This uses NiFi Expression Language to get the parts of the filename before the underscore (for station) and after the underscore (minus the CSV extension) for sensor.

  3. Split the single CSV flow file into one flow file per line. This is done so we can create individual CQL INSERT statements later.

  4. Extract the column values from each line. I used ExtractText and a regular expression for this, if you have very complicated logic you may want to check out a scripting processor such as ExecuteScript.

  5. Alter the timestamp. IIRC, CQL doesn't accept microseconds on timestamp literals. You can either try to parse out the microseconds (might best be done in an ExecuteScript processor) or just re-format the timestamp. Note that "re-formatting", since the microseconds couldn't be parsed, causes all fractional seconds to be truncated in my example.

  6. Build a CQL INSERT statement. At this point the data (in my template anyway) is all in flow file attributes, the original content can be replaced with a CQL INSERT statement (which is the way PutCassandraQL expects it). You can keep the data in attributes (using UpdateAttribute to name them correctly, see the PutCassandraQL doc) and use a prepared statement, but IMHO it's simpler to write an explicit CQL statement. At the time of this writing, PutCassandraQL is not caching PreparedStatements, so it is actually less performant right now to do things that way.

  7. Execute the CQL statements with PutCassandraQL.

I didn't go into detail as far as the names of my attributes and such, but by the time the flow gets to ReplaceText, I have the following attributes:

  • station.name: Contains the name of the station parsed from the filename
  • sensor.name: Contains the name of the sensor parsed from the filename
  • tps: Contains the updated timestamp value
  • columns.2: Contains (presumably) the value of the sensor reading

The ReplaceText sets the content to the following (using Expression Language to fill in the values):

insert into sensor_data (station_id, sensor_id, tps, val) values ('${station.name}', '${sensor.name}', '${tps}', ${column.2})

Hopefully that helps, please let me know if you have any questions or issues. Cheers!

like image 121
mattyb Avatar answered Sep 27 '22 22:09

mattyb