I stored my sensor data in S3 (write data every 5 minutes):
farm_iot/sensor_data/farm/farm0001/sensor01/1541252701443
1541252701443 is a json file containing measurements:
{ "temperature": 14.78, "pressure": 961.70, "humidity": 68.32}
I am definitely missing some hive skill. Unfortunately I did not find an example that extracts timeseries json data that gets me started. I am also not sure wheather Hive / Athena does support this kind of data wresting.
I am struggeling with creating an Athena table for this data...
CREATE EXTERNAL TABLE IF NOT EXISTS farm.sensor_data (
device string,
sensor string,
data_point string,
value double
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 's3://farm-iot/sensor_data/farm/farm0001/sensor01/'
PARTITIONED BY (timestamp string)
TBLPROPERTIES ('has_encrypted_data'='false')
Another road I am thinking is to store the data in a structure that is easier to process / maybe I have not partitioned the data enough??!
so maybe I should add dt to the structure like this:
farm_iot/sensor_data/2018-11-03-02-45-02/farm/farm0001/sensor01/1541252701443
still does not get me where I want to be:
+---------------+----------+----------+-------------+--------+
| timestamp | device | sensor | data_point | value |
+---------------+----------+----------+-------------+--------+
| 1541252701443 | farm0001 | sensor01 | temperature | 14.78 |
+---------------+----------+----------+-------------+--------+
| 1541252701443 | farm0001 | sensor01 | humidity | 68.32 |
+---------------+----------+----------+-------------+--------+
| 1541252701443 | farm0001 | sensor01 | pressure | 961.70 |
+---------------+----------+----------+-------------+--------+
Any pointer towards this goal would be much appreciated. Thank you!
please note: I do not want to use glue and like to understand how to do it manually. besides glue already created ~16.000 tables yesterday :)
First of all many thanks to @hlagos for his help.
AWS Athena was not able to transform the json sensor data the way I needed it (we discussed this in comments to @hlagos answer). Consequently the "simplest" way to deal with that situation was to change the data format from json to CSV to be closer to the format I needed.
I now store the sensor data in S3 in CSV format (write data every 5 minutes) plus I added the day and device partitions we discussed.
Resulting folder structure:
farm_iot/sensor_data/farm/day=20181129/device=farm0001/1543535738493
the data contents of the CSV file:
sensor01,temperature,2.82
sensor01,pressure,952.83
sensor01,humidity,83.64
sensor02,temperature,2.61
sensor02,pressure,952.74
sensor02,humidity,82.41
the AWS Athena table definition:
CREATE EXTERNAL TABLE IF NOT EXISTS farm.sensor_data (
`sensor` string,
`data_point` string,
`value` double
)
PARTITIONED BY (day string, device string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
ESCAPED BY '\\'
LINES TERMINATED BY '\n'
LOCATION 's3://farm-iot/sensor_data/farm/'
TBLPROPERTIES ('has_encrypted_data'='false');
The partitions I add like this (later I will have a script to create the partitions in advance):
msck repair table farm.sensor_data
now I can query the data:
select regexp_extract("$path", '[^/]+$') as timestamp, device, sensor,
data_point, value from farm.sensor_data where day='20181104'
Results
timestamp device sensor data_point value
1 1541310040278 farm0001 sensor01 temperature 21.61
2 1541310040278 farm0001 sensor01 pressure 643.65
3 1541310040278 farm0001 sensor01 humidity 74.84
4 1541310040278 farm0001 sensor02 temperature 9.14
5 1541310040278 farm0001 sensor02 pressure 956.04
6 1541310040278 farm0001 sensor02 humidity 88.01
7 1541311840309 farm0001 sensor01 temperature 21.61
8 ...
Let me try to explain a few problems that I see on front.
partitionname=partitionvalue
, this is not mandatory but useful if you want to take advance of commands to automatically add partitions based on your folder structures. This is how I would solve your problem if you will mainly query by sensor or device
Your folder sturcture ideally should go from
farm_iot/sensor_data/farm/farm0001/sensor01/1541252701443
to farm_iot/sensor_data/farm/device=farm0001/sensor=sensor01/1541252701443
Your table definition should contain your partition locations to be able to select it without regex and take advantage of the performance improvement of it (I am guessing a common query will filter by device or sensor. Additional to that you need to add all your json columns that are part of your file
CREATE EXTERNAL TABLE IF NOT EXISTS farm.sensor_data (
temperature double,
preassure double,
humidity double
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 's3://farm-iot/sensor_data/farm/'
PARTITIONED BY (device string, sensor string)
TBLPROPERTIES ('has_encrypted_data'='false')
We are missing the timestamp which is essentially part of your filename with the json input. We can include the file name during the select statement using virtual column INPUT__FILE__NAME
as follow
select device, sensor, temperature, preassure, humidity, INPUT__FILE__NAME as mytimestamp from farm.sensor_data
If you want preassure, temperature and humidity and different rows, I would recommend create an array with those three and explode it, it should be much efficient that run 3 queries using UNION ALL to append the results
If you follow Hive convention, you can take advantage of the command msck repair table to automatically add new partitions once new devices/sensors are included. In the worst case if you want to keep your folder structure, you can add partitions as follow
ALTER TABLE test ADD PARTITION (device='farm0001', sensor='sensor01') location 's3://farm_iot/sensor_data/farm/farm0001/sensor01'
NOTE: new partitions will not be automatically added, you always need to add them
I tried to add as much detail as possible. If something is not clear let me know.
EDIT: If your queries will be mostly based on timeseries (date range for example) I would recommend add a partition at day level (not smaller than this) to improve the performance of your queries. So your table definition would looks like
CREATE EXTERNAL TABLE IF NOT EXISTS farm.sensor_data (
temperature double,
preassure double,
humidity double
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 's3://farm-iot/sensor_data/farm/'
PARTITIONED BY (dt=long, device string, sensor string)
TBLPROPERTIES ('has_encrypted_data'='false')
And your folder structure would looks like
farm_iot/sensor_data/farm/dt=20191204/device=farm0001/sensor=sensor01/1541252701443
As clarification, you do not need to modify the table for each new partition, only add this partitions to the table, this is essentially how Hive will know that a new partition was created. If you decide to use partitions, this is the only way, if you don't (this will impact the performance), there are some other alternatives to make it work
EDIT2:
If you want to keep your data structure as is and do not use partitions, it is possible to get the expected results as follow
CREATE EXTERNAL TABLE IF NOT EXISTS yourdb.sensordata (
temperature double,
pressure double,
humidity double
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
)
LOCATION 's3://farm-iot/sensor_data/farm/'
TBLPROPERTIES ('has_encrypted_data'='false');
SET hive.mapred.supports.subdirectories=TRUE;
SET mapred.input.dir.recursive=TRUE;
select * from yourdb.sensordata;
select
split(input__file__name, "/")[size(split(input__file__name, "/")) - 1] as ts,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 3] as device,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 2] as sensor,
'temperature' as data_point,
temperature as value
from yourdb.sensordata
union all
select
split(input__file__name, "/")[size(split(input__file__name, "/")) - 1] as ts,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 3] as device,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 2] as sensor,
'pressure' as data_point,
pressure as value
from yourdb.sensordata
union all
select
split(input__file__name, "/")[size(split(input__file__name, "/")) - 1] as ts,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 3] as device,
split(input__file__name, "/")[size(split(input__file__name, "/")) - 2] as sensor,
'humidity' as data_point,
humidity as value
from yourdb.sensordata;
As you see, I am getting the most of the information from the file path, however it is required to set some flags to tell Hive read folders recursively
ts,device,sensor,_data_point,value
1541252701443,farm0001,sensor01,temperature,14.78
1541252701443,farm0001,sensor01,pressure,961.7
1541252701443,farm0001,sensor01,humidity,68.32
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With