Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to plot a Real time graph in Zeppelin?

I am trying to use zeppelin to plot a realtime graph. I have doing a sentiment analysis on tweets on per minute . I am able to query statically and plot a graph. But i would like this to be done dynamically. I am new to zeppelin and do not have much knowledge about angularJS. What should be the correct approach to this problem?

 val final_score=uni_join.map{case((year,month,day,hour,minutes),(tweet_count,sentiment))=>(year, month, day, hour, minutes(sentiment/tweet_count).ceil)}


final_score.saveToCassandra("twitter", "score",writeConf = WriteConf(ttl = TTLOption.constant(1000)))

final_score.foreachRDD(score => {
val rowRDD =score.map{case(year,month,day,hour,minutes,sentiment) =>(year,month,day,hour,minutes,sentiment) }
  val tempDF = sqlContext.createDataFrame(rowRDD)

   z.angularBindGlobal("stream", parsed) //to bind parsed to stream.
   tempDF.registerTempTable("realTimeTable")
})

Doing a query on the above table , i am able to get the graph. But i would like to dynamically update the graph every minute in order to keep in sync with the sentiment score . Thanks prior. [update] the angular part for the zeppelin notebook are as follows:

%angular
<div id="graph"  style="height: 100%; width: 100%">
<canvas id="myChart" width="400" height="400"></canvas>
<div id="legendDiv"></div>
</div>
<script>
function initMap() {

var colorList = ["#fde577", "#ff6c40", "#c72a40", "#520833", "#a88399"]


var el = angular.element($('#stream'));

console.log("El is "+el) //returns el as object

angular.element(el).ready(function() {
    console.log('Hello')
    window.locationWatcher =el.$scope.$watch('stream', function(new, old){
console.log('changed');}, true)})
</script>

But running this code keeps returning the following error.

vendor.js:29 jQuery.Deferred exception: Cannot read property '$watch' of  undefined TypeError: Cannot read property '$watch' of undefined

The spark version that i am using is 1.6 And Zeppelin is 0.6

like image 281
Agnirudra Avatar asked Mar 11 '23 17:03

Agnirudra


1 Answers

spark-highcharts since version 0.6.3 support Spark Structured Streaming.

For a structuredDataFrame after aggregation, with the following code in one Zeppelin paragraph. The OutputMode can be either append or complete depends how the structureDataFrame is aggregated.

import com.knockdata.spark.highcharts._
import com.knockdata.spark.highcharts.model._

val query = highcharts(
  structuredDataFrame.seriesCol("country")
    .series("x" -> "year", "y" -> "stockpile")
    .orderBy(col("year")), z, "append")

And the following code in the next paragraph. The chart in this paragraph will be updated when there are new data coming to the structureDataFrame.

StreamingChart(z)

Run following code to stop update the chart.

query.stop()

Here is the example generate structureDataFrame.

spark.conf.set("spark.sql.streaming.checkpointLocation","/usr/zeppelin/checkpoint")

case class NuclearStockpile(country: String, stockpile: Int, year: Int)

val USA = Seq(0, 0, 0, 0, 0, 6, 11, 32, 110, 235, 369, 640,
  1005, 1436, 2063, 3057, 4618, 6444, 9822, 15468, 20434, 24126,
  27387, 29459, 31056, 31982, 32040, 31233, 29224, 27342, 26662,
  26956, 27912, 28999, 28965, 27826, 25579, 25722, 24826, 24605,
  24304, 23464, 23708, 24099, 24357, 24237, 24401, 24344, 23586,
  22380, 21004, 17287, 14747, 13076, 12555, 12144, 11009, 10950,
  10871, 10824, 10577, 10527, 10475, 10421, 10358, 10295, 10104).
    zip(1940 to 2006).map(p => NuclearStockpile("USA", p._1, p._2))

val USSR = Seq(0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
  5, 25, 50, 120, 150, 200, 426, 660, 869, 1060, 1605, 2471, 3322,
  4238, 5221, 6129, 7089, 8339, 9399, 10538, 11643, 13092, 14478,
  15915, 17385, 19055, 21205, 23044, 25393, 27935, 30062, 32049,
  33952, 35804, 37431, 39197, 45000, 43000, 41000, 39000, 37000,
  35000, 33000, 31000, 29000, 27000, 25000, 24000, 23000, 22000,
  21000, 20000, 19000, 18000, 18000, 17000, 16000).
    zip(1940 to 2006).map(p => NuclearStockpile("USSR/Russia", p._1, p._2))

input.addData(USA.take(30) ++ USSR.take(30))
val structureDataFrame = input.toDF

And the following code can be simulate to update the chart. The chart will be updated when the following code run.

input.addData(USA.drop(30) ++ USSR.drop(30))

NOTE: The example using Zeppelin 0.6.2 and Spark 2.0

NOTE: Please check the highcharts license for commercial usage

like image 109
Rockie Yang Avatar answered Mar 13 '23 07:03

Rockie Yang