I have a spark dataframe that I am trying to push to AWS Elasticsearch, but before that I was testing this sample code snippet to push to ES,
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ES_indexer').getOrCreate()
df = spark.createDataFrame([{'num': i} for i in xrange(10)])
df = df.drop('_id')
df.write.format(
'org.elasticsearch.spark.sql'
).option(
'es.nodes', 'http://spark-data-push-adertadaltdpioy124.us-west-2.es.amazonaws.com'
).option(
'es.port', 9200
).option(
'es.resource', '%s/%s' % ('index_name', 'doc_type_name'),
).save()
I get an error saying,
java.lang.ClassNotFoundException: Failed to find data source: org.elasticsearch.spark.sql. Please find packages at http://spark.apache.org/third-party-projects.html
Any suggestions would be greatly appreciated.
Error Trace:
Traceback (most recent call last):
File "es_3.py", line 12, in <module>
'es.resource', '%s/%s' % ('index_name', 'doc_type_name'),
File "/usr/local/lib/python2.7/site-packages/pyspark/sql/readwriter.py", line 732, in save
self._jwrite.save()
File "/usr/local/lib/python2.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/local/lib/python2.7/site-packages/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/local/lib/python2.7/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o46.save.
: java.lang.ClassNotFoundException: Failed to find data source: org.elasticsearch.spark.sql. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:245)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.elasticsearch.spark.sql.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)
... 12 more
tl;dr Use pyspark --packages org.elasticsearch:elasticsearch-hadoop:7.2.0
and use format("es")
to reference the connector.
Quoting Installation from the official documentation of the Elasticsearch for Apache Hadoop product:
Just like other libraries, elasticsearch-hadoop needs to be available in Spark’s classpath.
And later in Supported Spark SQL versions:
elasticsearch-hadoop supports both version Spark SQL 1.3-1.6 and Spark SQL 2.0 through two different jars:
elasticsearch-spark-1.x-<version>.jar
andelasticsearch-hadoop-<version>.jar
elasticsearch-spark-2.0-<version>.jar
supports Spark SQL 2.0
That looks like an issue with the document (as they use two different versions of the jar file), but does mean that you have to use the proper jar file on the CLASSPATH of your Spark application.
And later in the same document:
Spark SQL support is available under org.elasticsearch.spark.sql package.
That simply says that the format (in df.write.format('org.elasticsearch.spark.sql')
) is correct.
Further down the document you can find that you could even use an alias df.write.format("es")
(!)
I found Apache Spark section in the project's repository on GitHub more readable and current.
Update: The current ES-hadoop package as of June 2020 is 7.7.1, so I used pyspark --packages org.elasticsearch:elasticsearch-hadoop:7.7.1
instead.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With