Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Combining Two Spark Streams On Key

I have two kafka streams that contain results for two parallel operations, I need a way to combine both streams so I can process the results in a single spark transform. Is this possible? (illustration below)

Stream 1 {id:1,result1:True}
Stream 2 {id:1,result2:False}
       JOIN(Stream 1, Stream 2, On "id") -> Output Stream {id:1,result1:True,result2:False}

Current code that isn't working:

    kvs1 = KafkaUtils.createStream(sparkstreamingcontext, ZOOKEEPER, NAME+"_stream", {"test_join_1": 1})
    kvs2 = KafkaUtils.createStream(sparkstreamingcontext, ZOOKEEPER, NAME+"_stream", {"test_join_2": 1})

    messages_RDDstream1 = kvs1.map(lambda x: x[1])
    messages_RDDstream2 = kvs2.map(lambda x: x[1])

    messages_RDDstream_Final = messages_RDDstream1.join(messages_RDDstream2)

When I pass two sample jsons to each Kafka queue with the same ID field, nothing is returned in my final RDD stream. I imaging I am missing the stage of converting my Kafka JSON string message into a Tuple?

I have also tried the following:

kvs1.map(lambda (key, value): json.loads(value))

and

kvs1.map(lambda x: json.loads(x))

To no avail

Cheers

Adam

like image 359
Adam Bradbury Avatar asked Jul 12 '16 08:07

Adam Bradbury


1 Answers

A simple lookup on Spark's documentation would have given you the answer..

You can use the join operation.

join(otherStream, [numTasks]) :

When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key.

For example : val streamJoined = stream1.join(stream2)

like image 154
Jonathan Taws Avatar answered Nov 15 '22 14:11

Jonathan Taws