I am trying to use zeppelin to plot a realtime graph. I have doing a sentiment analysis on tweets on per minute . I am able to query statically and plot a graph. But i would like this to be done dynamically. I am new to zeppelin and do not have much knowledge about angularJS. What should be the correct approach to this problem?
val final_score=uni_join.map{case((year,month,day,hour,minutes),(tweet_count,sentiment))=>(year, month, day, hour, minutes(sentiment/tweet_count).ceil)}
final_score.saveToCassandra("twitter", "score",writeConf = WriteConf(ttl = TTLOption.constant(1000)))
final_score.foreachRDD(score => {
val rowRDD =score.map{case(year,month,day,hour,minutes,sentiment) =>(year,month,day,hour,minutes,sentiment) }
val tempDF = sqlContext.createDataFrame(rowRDD)
z.angularBindGlobal("stream", parsed) //to bind parsed to stream.
tempDF.registerTempTable("realTimeTable")
})
Doing a query on the above table , i am able to get the graph. But i would like to dynamically update the graph every minute in order to keep in sync with the sentiment score . Thanks prior. [update] the angular part for the zeppelin notebook are as follows:
%angular
<div id="graph" style="height: 100%; width: 100%">
<canvas id="myChart" width="400" height="400"></canvas>
<div id="legendDiv"></div>
</div>
<script>
function initMap() {
var colorList = ["#fde577", "#ff6c40", "#c72a40", "#520833", "#a88399"]
var el = angular.element($('#stream'));
console.log("El is "+el) //returns el as object
angular.element(el).ready(function() {
console.log('Hello')
window.locationWatcher =el.$scope.$watch('stream', function(new, old){
console.log('changed');}, true)})
</script>
But running this code keeps returning the following error.
vendor.js:29 jQuery.Deferred exception: Cannot read property '$watch' of undefined TypeError: Cannot read property '$watch' of undefined
The spark version that i am using is 1.6 And Zeppelin is 0.6
spark-highcharts since version 0.6.3 support Spark Structured Streaming.
For a structuredDataFrame
after aggregation, with the following code in one Zeppelin paragraph. The OutputMode can be either append
or complete
depends how the structureDataFrame is aggregated.
import com.knockdata.spark.highcharts._
import com.knockdata.spark.highcharts.model._
val query = highcharts(
structuredDataFrame.seriesCol("country")
.series("x" -> "year", "y" -> "stockpile")
.orderBy(col("year")), z, "append")
And the following code in the next paragraph. The chart in this paragraph will be updated when there are new data coming to the structureDataFrame.
StreamingChart(z)
Run following code to stop update the chart.
query.stop()
Here is the example generate structureDataFrame.
spark.conf.set("spark.sql.streaming.checkpointLocation","/usr/zeppelin/checkpoint")
case class NuclearStockpile(country: String, stockpile: Int, year: Int)
val USA = Seq(0, 0, 0, 0, 0, 6, 11, 32, 110, 235, 369, 640,
1005, 1436, 2063, 3057, 4618, 6444, 9822, 15468, 20434, 24126,
27387, 29459, 31056, 31982, 32040, 31233, 29224, 27342, 26662,
26956, 27912, 28999, 28965, 27826, 25579, 25722, 24826, 24605,
24304, 23464, 23708, 24099, 24357, 24237, 24401, 24344, 23586,
22380, 21004, 17287, 14747, 13076, 12555, 12144, 11009, 10950,
10871, 10824, 10577, 10527, 10475, 10421, 10358, 10295, 10104).
zip(1940 to 2006).map(p => NuclearStockpile("USA", p._1, p._2))
val USSR = Seq(0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
5, 25, 50, 120, 150, 200, 426, 660, 869, 1060, 1605, 2471, 3322,
4238, 5221, 6129, 7089, 8339, 9399, 10538, 11643, 13092, 14478,
15915, 17385, 19055, 21205, 23044, 25393, 27935, 30062, 32049,
33952, 35804, 37431, 39197, 45000, 43000, 41000, 39000, 37000,
35000, 33000, 31000, 29000, 27000, 25000, 24000, 23000, 22000,
21000, 20000, 19000, 18000, 18000, 17000, 16000).
zip(1940 to 2006).map(p => NuclearStockpile("USSR/Russia", p._1, p._2))
input.addData(USA.take(30) ++ USSR.take(30))
val structureDataFrame = input.toDF
And the following code can be simulate to update the chart. The chart will be updated when the following code run.
input.addData(USA.drop(30) ++ USSR.drop(30))
NOTE: The example using Zeppelin 0.6.2 and Spark 2.0
NOTE: Please check the highcharts license for commercial usage
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With