I have a tens of millions of rows of data. Is it possible to analyze all of these within a week or a day using spark streaming? What's the limit to spark streaming in terms of data amount? I am not sure what's the upper limit and when I should put them into my database since Stream probably can't handle them anymore. I also have different time windows 1,3, 6 hours etc. where I use window operations to separate the data.
Please find my code below:
conf = SparkConf().setAppName(appname)
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc,300)
sqlContext = SQLContext(sc)
channels = sc.cassandraTable("abc","channels")
topic = 'abc.crawled_articles'
kafkaParams = {"metadata.broker.list": "0.0.0.0:9092"}
category = 'abc.crawled_article'
category_stream = KafkaUtils.createDirectStream(ssc, [category], kafkaParams)
category_join_stream = category_stream.map(lambda x:read_json(x[1])).filter(lambda x:x!=0).map(lambda x:categoryTransform(x)).filter(lambda x:x!=0).map(lambda x:(x['id'],x))
article_stream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams)
article_join_stream=article_stream.map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:TransformInData(x)).filter(lambda x: x!=0).flatMap(lambda x:(a for a in x)).map(lambda x:(x['id'].encode("utf-8") ,x))
#axes topic integration the article and the axes
axes_topic = 'abc.crawled_axes'
axes_stream = KafkaUtils.createDirectStream(ssc, [axes_topic], kafkaParams)
axes_join_stream = axes_stream.filter(lambda x:'delete' not in str(x)).map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:axesTransformData(x)).filter(lambda x: x!=0).map(lambda x:(str(x['id']),x)).map(lambda x:(x[0],{'id':x[0], 'attitudes':x[1]['likes'],'reposts':0,'comments':x[1]['comments'],'speed':x[1]['comments']}))
#axes_join_stream.reduceByKeyAndWindow(lambda x, y: x + y, 30, 10).transform(axestrans).pprint()
#join
statistics = article_join_stream.window(1*60*60,5*60).cogroup(category_join_stream.window(1*60*60,60)).cogroup((axes_join_stream.window(24*60*60,5*60)))
statistics.transform(joinstream).pprint()
ssc.start() # Start the computation ssc.awaitTermination()
ssc.awaitTermination()
One at a time:
Generally, yes - Spark allows you to scale out across many machines, so in principle you should be able to start a large cluster and crunch lots of data in relatively short time (assuming we're talking hours or days, not seconds or less, which might be problematic due to overhead).
Specifically, performing the kind of processing illustrated in your questions on tens of millions of records seems to me feasible in a reasonable amount of time (i.e. without using an extremely large cluster).
I don't know, but you will have hard time getting to it. There are examples of extremely large deployments, e.g. in ebay ("hundreds of metrics over an average of 30TB daily"). Also, see the FAQ, which mentions a cluster of 8000 machines and processing PB of data.
According to the basic model of Spark-Streaming, data is processed in micro-batches. If your data is indeed a stream (i.e. has no definite ending), then the most simple approach would be to store the processing results of each RDD (i.e., microbatch).
If your data is NOT a stream, e.g. you're processing a bunch of static files from time to time, you should probably consider giving up the stream part (e.g. using just Spark as a batch processor).
Since your question mentions window sizes of a few hours, I suspect you may want to consider the batch option.
If you're using Spark-Streaming, you could maintain multiple states (e.g. using mapWithState
) - one for each time window.
Another idea (simpler in code, more complicated in terms of ops) - you can start multiple clusters, each with its own window, reading from the same stream.
If you're batch-processing, you could run the same operation multiple times with different time windows, e.g. reduceByWindow
with multiple window sizes.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With