I have two kafka streams that contain results for two parallel operations, I need a way to combine both streams so I can process the results in a single spark transform. Is this possible? (illustration below)
Stream 1 {id:1,result1:True}
Stream 2 {id:1,result2:False}
JOIN(Stream 1, Stream 2, On "id") -> Output Stream {id:1,result1:True,result2:False}
Current code that isn't working:
kvs1 = KafkaUtils.createStream(sparkstreamingcontext, ZOOKEEPER, NAME+"_stream", {"test_join_1": 1})
kvs2 = KafkaUtils.createStream(sparkstreamingcontext, ZOOKEEPER, NAME+"_stream", {"test_join_2": 1})
messages_RDDstream1 = kvs1.map(lambda x: x[1])
messages_RDDstream2 = kvs2.map(lambda x: x[1])
messages_RDDstream_Final = messages_RDDstream1.join(messages_RDDstream2)
When I pass two sample jsons to each Kafka queue with the same ID field, nothing is returned in my final RDD stream. I imaging I am missing the stage of converting my Kafka JSON string message into a Tuple?
I have also tried the following:
kvs1.map(lambda (key, value): json.loads(value))
and
kvs1.map(lambda x: json.loads(x))
To no avail
Cheers
Adam
A simple lookup on Spark's documentation would have given you the answer..
You can use the join
operation.
join(otherStream, [numTasks]) :
When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key.
For example : val streamJoined = stream1.join(stream2)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With