I am about doing some signal analysis with Hadoop/Spark and I need help on how to structure the whole process.
Signals are now stored in a database, that we will read with Sqoop and will be transformed in files on HDFS, with a schema similar to:
<Measure ID> <Source ID> <Measure timestamp> <Signal values>
where signal values are just string made of floating point comma separated numbers.
000123 S001 2015/04/22T10:00:00.000Z 0.0,1.0,200.0,30.0 ... 100.0
000124 S001 2015/04/22T10:05:23.245Z 0.0,4.0,250.0,35.0 ... 10.0
...
000126 S003 2015/04/22T16:00:00.034Z 0.0,0.0,200.0,00.0 ... 600.0
We would like to write interactive/batch queries to:
apply aggregation functions over signal values
SELECT *
FROM SIGNALS
WHERE MAX(VALUES) > 1000.0
To select signals that had a peak over 1000.0.
apply aggregation over aggregation
SELECT SOURCEID, MAX(VALUES)
FROM SIGNALS
GROUP BY SOURCEID
HAVING MAX(MAX(VALUES)) > 1500.0
To select sources having at least a single signal that exceeded 1500.0.
apply user defined functions over samples
SELECT *
FROM SIGNALS
WHERE MAX(LOW_BAND_FILTER("5.0 KHz", VALUES)) > 100.0)
to select signals that after being filtered at 5.0 KHz have at least a value over 100.0.
We need some help in order to:
Thank you very much!
1) Parquet as columnar format is good for OLAP. Spark support of Parquet is mature enough for production use. I suggest to parse string representing signal values into following data structure (simplified):
case class Data(id: Long, signals: Array[Double])
val df = sqlContext.createDataFrame(Seq(Data(1L, Array(1.0, 1.0, 2.0)), Data(2L, Array(3.0, 5.0)), Data(2L, Array(1.5, 7.0, 8.0))))
Keeping array of double allows to define and use UDFs like this:
def maxV(arr: mutable.WrappedArray[Double]) = arr.max
sqlContext.udf.register("maxVal", maxV _)
df.registerTempTable("table")
sqlContext.sql("select * from table where maxVal(signals) > 2.1").show()
+---+---------------+
| id| signals|
+---+---------------+
| 2| [3.0, 5.0]|
| 2|[1.5, 7.0, 8.0]|
+---+---------------+
sqlContext.sql("select id, max(maxVal(signals)) as maxSignal from table group by id having maxSignal > 1.5").show()
+---+---------+
| id|maxSignal|
+---+---------+
| 1| 2.0|
| 2| 8.0|
+---+---------+
Or, if you want some type-safety, using Scala DSL:
import org.apache.spark.sql.functions._
val maxVal = udf(maxV _)
df.select("*").where(maxVal($"signals") > 2.1).show()
df.select($"id", maxVal($"signals") as "maxSignal").groupBy($"id").agg(max($"maxSignal")).where(max($"maxSignal") > 2.1).show()
+---+--------------+
| id|max(maxSignal)|
+---+--------------+
| 2| 8.0|
+---+--------------+
2) It depends: if size of your data allows to do all processing in query time with reasonable latency - go for it. You can start with this approach, and build optimized structures for slow/popular queries later
3) Hive is slow, it's outdated by Impala and Spark SQL. Choice is not easy sometimes, we use rule of thumb: Impala is good for queries without joins if all your data stored in HDFS/Hive, Spark has bigger latency but joins are reliable, it supports more data sources and has rich non-SQL processing capabilities (like MLlib and GraphX)
4) Keep it simple: store you raw data (master dataset) de-duplicated and partitioned (we use time-based partitions). If new data arrives into partition and your already have downstream datasets generated - restart your pipeline for this partition.
Hope this helps
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With