I have integrated ELK with Pyspark.
saved RDD as ELK data on local file system
rdd.saveAsTextFile("/tmp/ELKdata")
logData = sc.textFile('/tmp/ELKdata/*')
errors = logData.filter(lambda line: "raw1-VirtualBox" in line)
errors.count()
value i got is 35
errors.first()
i got the output
(u'AVI0UK0KZsowGuTwoQnN', {u'host': u'raw1-VirtualBox', u'ident': u'NetworkManager', u'pid': u'748', u'message': u" (eth0): device state change: ip-config -> secondaries (reason 'none') [70 90 0]", u'@timestamp': u'2016-01-12T10:59:48+05:30'})
when i try to write data in elastic search from pyspark i get errors
errors.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf= {"es.resource" : "logstash-2016.01.12/errors})
Huge java errors
org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used
at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:113)
at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:108)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:921)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:903)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
16/01/12 17:20:13 INFO TaskSetManager: Starting task 1.0 in stage 31.0 (TID 62, localhost, PROCESS_LOCAL, 1181 bytes)
16/01/12 17:20:13 WARN TaskSetManager: Lost task 0.0 in stage 31.0 (TID 61, localhost): org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used
org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:113)
org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:108)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:921)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:903)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
16/01/12 17:20:13 ERROR TaskSetManager: Task 0 in stage 31.0 failed 1 times; aborting job
16/01/12 17:20:13 INFO TaskSchedulerImpl: Cancelling stage 31
16/01/12 17:20:13 INFO TaskSchedulerImpl: Stage 31 was cancelled
16/01/12 17:20:13 INFO Executor: Executor is trying to kill task 1.0 in stage 31.0 (TID 62)
16/01/12 17:20:13 INFO DAGScheduler: Failed to run saveAsNewAPIHadoopFile at PythonRDD.scala:665
Traceback (most recent call last):
File "", line 6, in
File "/opt/spark/python/pyspark/rdd.py", line 1213, in saveAsNewAPIHadoopFile
keyConverter, valueConverter, jconf)
File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError16/01/12 17:20:13 INFO Executor: Running task 1.0 in stage 31.0 (TID 62)
16/01/12 17:20:13 ERROR Executor: Exception in task 1.0 in stage 31.0 (TID 62)
org.apache.spark.TaskKilledException
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:168)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
16/01/12 17:20:13 WARN TaskSetManager: Lost task 1.0 in stage 31.0 (TID 62, localhost): org.apache.spark.TaskKilledException:
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:168)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
16/01/12 17:20:13 INFO TaskSchedulerImpl: Removed TaskSet 31.0, whose tasks have all completed, from pool
: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 31.0 failed 1 times, most recent failure: Lost task 0.0 in stage 31.0 (TID 61, localhost): org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used
org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:113)
org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:108)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:921)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:903)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
if i did it manually am able to write the data
errors = logData.filter(lambda line: "raw1-VirtualBox" in line)
errors = errors.map(lambda item: ('AVI0UK0KZsowGuTwoQnP',{"host": "raw1-VirtualBox",
"ident": "NetworkManager",
"pid": "69",
"message": " sucess <info> (eth0): device state change: ip-config -> secondaries (reason 'none') [70 90 0]",
"@timestamp": "2016-01-12T10:59:48+05:30"
}))
but i want to write the filtered data & managed data in elastic search.
I was having a similar issue and here's how I managed to solve it. First I used a dataframe vs using a RDD.
Once in a dataframe
from pyspark.sql import SQLContext
df.write.format("org.elasticsearch.spark.sql").option("es.resource", "logstash-2016.01.12/errors").save()
Similarly to the accepted answer right now, I was in the same boat, attempting to write out the data as an RDD. The answer above is really close, but there are a myriad of configuration options that would also be helpful. Unless you are using the default localhost for your node, this answer will not work.
A dataframe is the way to go, much cleaner, simpler. If you are using the pyspark shell, when you start the shell, add a path to the elasticsearch hadoop jar.
From the cli start the shell using:
$ pyspark2 --jars <pathtojar>/elasticsearch-hadoop-5.X.X.jar
You do not necessarily need the following line:
from pyspark.sql import SQLContext
When you have your dataframe, you simply need the following, plus possible additional options:
df.write.format("org.elasticsearch.spark.sql")
.option("es.resource", "<index/type>")
.option("es.nodes", "<enter node address or name>").save()
If the index/type you specify doesn't already exist in Elasticsearch, it will be created.
You can add additional options, which can be found here: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With