Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to pass data from Kafka to Spark Streaming?

I am trying to pass data from kafka to spark streaming.

This is what I've done till now:

  1. Installed both kafka and spark
  2. Started zookeeper with default properties config
  3. Started kafka server with default properties config
  4. Started kafka producer
  5. Started kafka consumer
  6. Sent message from producer to consumer. Works fine.
  7. Wrote kafka-spark.py to receive messages from kafka to spark.
  8. I try running ./bin/spark-submit examples/src/main/python/kafka-spark.py
  9. I get an error.

kafka-spark.py -

from __future__ import print_function
import sys
from pyspark.streaming import StreamingContext
from pyspark import SparkContext,SparkConf
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
    #conf = SparkConf().setAppName("Kafka-Spark").setMaster("spark://127.0.0.1:7077")
    conf = SparkConf().setAppName("Kafka-Spark")
    #sc = SparkContext(appName="KafkaSpark")
    sc = SparkContext(conf=conf)
    stream=StreamingContext(sc,1)
    map1={'spark-kafka':1}
    kafkaStream = KafkaUtils.createStream(stream, 'localhost:9092', "name", map1) #tried with localhost:2181 too

    print("kafkastream=",kafkaStream)
    sc.stop()

Full Log including the Error on running spark-kafka.py:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/01/18 13:05:33 INFO SparkContext: Running Spark version 1.6.0
16/01/18 13:05:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/01/18 13:05:33 INFO SecurityManager: Changing view acls to: username
16/01/18 13:05:33 INFO SecurityManager: Changing modify acls to: username
16/01/18 13:05:33 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(username); users with modify permissions: Set(username)
16/01/18 13:05:33 INFO Utils: Successfully started service 'sparkDriver' on port 54446.
16/01/18 13:05:34 INFO Slf4jLogger: Slf4jLogger started
16/01/18 13:05:34 INFO Remoting: Starting remoting
16/01/18 13:05:34 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:50386]
16/01/18 13:05:34 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 50386.
16/01/18 13:05:34 INFO SparkEnv: Registering MapOutputTracker
16/01/18 13:05:34 INFO SparkEnv: Registering BlockManagerMaster
16/01/18 13:05:34 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-f5490271-cdb7-467d-a915-4f5ccab57f0e
16/01/18 13:05:34 INFO MemoryStore: MemoryStore started with capacity 511.1 MB
16/01/18 13:05:34 INFO SparkEnv: Registering OutputCommitCoordinator
16/01/18 13:05:34 INFO Server: jetty-8.y.z-SNAPSHOT
16/01/18 13:05:34 INFO AbstractConnector: Started [email protected]:4040
16/01/18 13:05:34 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/01/18 13:05:34 INFO SparkUI: Started SparkUI at http://127.0.0.1:4040
Java HotSpot(TM) Server VM warning: You have loaded library /tmp/libnetty-transport-native-epoll561240765619860252.so which might have disabled stack guard. The VM will try to fix the stack guard now.
It's highly recommended that you fix the library with 'execstack -c <libfile>', or link it with '-z noexecstack'.
16/01/18 13:05:34 INFO Utils: Copying ~/Dropbox/Work/ITNow/spark/spark-1.6.0/examples/src/main/python/kafka-spark.py to /tmp/spark-18227081-a1c8-43f2-8ca7-cfc4751f023f/userFiles-e93fc252-0ba1-42b7-b4fa-2e46f3a0601e/kafka-spark.py
16/01/18 13:05:34 INFO SparkContext: Added file file:~/Dropbox/Work/ITNow/spark/spark-1.6.0/examples/src/main/python/kafka-spark.py at file:~/Dropbox/Work/ITNow/spark/spark-1.6.0/examples/src/main/python/kafka-spark.py with timestamp 1453118734892
16/01/18 13:05:35 INFO Executor: Starting executor ID driver on host localhost
16/01/18 13:05:35 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 58970.
16/01/18 13:05:35 INFO NettyBlockTransferService: Server created on 58970
16/01/18 13:05:35 INFO BlockManagerMaster: Trying to register BlockManager
16/01/18 13:05:35 INFO BlockManagerMasterEndpoint: Registering block manager localhost:58970 with 511.1 MB RAM, BlockManagerId(driver, localhost, 58970)
16/01/18 13:05:35 INFO BlockManagerMaster: Registered BlockManager

________________________________________________________________________________________________

  Spark Streaming's Kafka libraries not found in class path. Try one of the following.

  1. Include the Kafka library and its dependencies with in the
     spark-submit command as

     $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka:1.6.0 ...

  2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
     Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-assembly, Version = 1.6.0.
     Then, include the jar in the spark-submit command as

     $ bin/spark-submit --jars <spark-streaming-kafka-assembly.jar> ...

________________________________________________________________________________________________


Traceback (most recent call last):
  File "~/Dropbox/Work/ITNow/spark/spark-1.6.0/examples/src/main/python/kafka-spark.py", line 33, in <module>
    kafkaStream = KafkaUtils.createStream(stream, 'localhost:9092', "name", map1)
  File "~/Dropbox/Work/ITNow/spark/spark-1.6.0/python/lib/pyspark.zip/pyspark/streaming/kafka.py", line 80, in createStream
py4j.protocol.Py4JJavaError: An error occurred while calling o22.loadClass.
: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)

16/01/18 13:05:35 INFO SparkContext: Invoking stop() from shutdown hook
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/metrics/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/kill,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/api,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/static,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/threadDump/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/threadDump,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs/job/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs/job,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs/json,null}
16/01/18 13:05:35 INFO ContextHandler: stopped o.e.j.s.ServletContextHandler{/jobs,null}
16/01/18 13:05:35 INFO SparkUI: Stopped Spark web UI at http://127.0.0.1:4040
16/01/18 13:05:35 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/01/18 13:05:35 INFO MemoryStore: MemoryStore cleared
16/01/18 13:05:35 INFO BlockManager: BlockManager stopped
16/01/18 13:05:35 INFO BlockManagerMaster: BlockManagerMaster stopped
16/01/18 13:05:35 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/01/18 13:05:35 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/01/18 13:05:35 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/01/18 13:05:35 INFO SparkContext: Successfully stopped SparkContext
16/01/18 13:05:35 INFO ShutdownHookManager: Shutdown hook called
16/01/18 13:05:35 INFO ShutdownHookManager: Deleting directory /tmp/spark-18227081-a1c8-43f2-8ca7-cfc4751f023f
16/01/18 13:05:35 INFO ShutdownHookManager: Deleting directory /tmp/spark-18227081-a1c8-43f2-8ca7-cfc4751f023f/pyspark-fcd47a97-57ef-46c3-bb16-357632580334

EDIT

On running ./bin/spark-submit --jars spark-streaming-kafka-assembly_2.10-1.6.0.jar examples/src/main/python/kafka-spark.py I get the HEXADECIMAL location instead of the actual string:

kafkastream= <pyspark.streaming.dstream.TransformedDStream object at 0x7fd6c4dad150>

Any idea what am I doing wrong? I'm really new to kakfa and spark so I need some help here. Thanks!

like image 330
HackCode Avatar asked Jan 18 '16 12:01

HackCode


People also ask

How do I transfer data from Kafka to spark?

Submiting to SparkStart Kafka Producer CLI (explained in the previous chapter), create a new topic called my-first-topic and provide some sample messages as shown below. Run the following command to submit the application to spark console. The sample output of this application is shown below. spark console messages ..

How do I use Kafka to stream data?

This quick start follows these steps: Start a Kafka cluster on a single machine. Write example input data to a Kafka topic, using the so-called console producer included in Apache Kafka. Process the input data with a Java application that uses the Kafka Streams library.

How do I connect Kafka to spark streaming Python?

Setup SparkAdd the below entry in spark-env.sh to specify the path to each worker node. Installation of other python dependencies used in this spark app is required. For example, we use Kafka-python to write the processed event back to Kafka.


2 Answers

You need to submit spark-streaming-kafka-assembly_*.jar with your job:

spark-submit --jars spark-streaming-kafka-assembly_2.10-1.5.2.jar ./spark-kafka.py 
like image 56
facha Avatar answered Oct 28 '22 03:10

facha


Alternatively, if you want to also specify resources to be allocated at the same time:

spark-submit --deploy-mode cluster --master yarn --num-executors 5 --executor-cores 5 --executor-memory 20g --jars spark-streaming-kafka-assembly_2.10-1.6.0.jar ./spark-kafka.py 

If you wanna run your code in a Jupyter-notebook, then this could be helpful:

from __future__ import print_function
import sys
from pyspark.streaming import StreamingContext
from pyspark import SparkContext,SparkConf
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":

    os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars spark-streaming-kafka-assembly_2.10-1.6.0.jar pyspark-shell' #note that the "pyspark-shell" part is very important!!.

    #conf = SparkConf().setAppName("Kafka-Spark").setMaster("spark://127.0.0.1:7077")
    conf = SparkConf().setAppName("Kafka-Spark")
    #sc = SparkContext(appName="KafkaSpark")
    sc = SparkContext(conf=conf)
    stream=StreamingContext(sc,1)
    map1={'spark-kafka':1}
    kafkaStream = KafkaUtils.createStream(stream, 'localhost:9092', "name", map1) #tried with localhost:2181 too

    print("kafkastream=",kafkaStream)
    sc.stop()

Note the introduction of the following line in __main__:

os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars spark-streaming-kafka-assembly_2.10-1.6.0.jar pyspark-shell'

Sources: https://github.com/jupyter/docker-stacks/issues/154

like image 26
Tshilidzi Mudau Avatar answered Oct 28 '22 03:10

Tshilidzi Mudau