Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to save dataframe to Elasticsearch in PySpark?

I have a spark dataframe that I am trying to push to AWS Elasticsearch, but before that I was testing this sample code snippet to push to ES,

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ES_indexer').getOrCreate()
df = spark.createDataFrame([{'num': i} for i in xrange(10)])
df = df.drop('_id')
df.write.format(
    'org.elasticsearch.spark.sql'
).option(
    'es.nodes', 'http://spark-data-push-adertadaltdpioy124.us-west-2.es.amazonaws.com'
).option(
    'es.port', 9200
).option(
    'es.resource', '%s/%s' % ('index_name', 'doc_type_name'),
).save()

I get an error saying,

java.lang.ClassNotFoundException: Failed to find data source: org.elasticsearch.spark.sql. Please find packages at http://spark.apache.org/third-party-projects.html

Any suggestions would be greatly appreciated.

Error Trace:

Traceback (most recent call last):
  File "es_3.py", line 12, in <module>
    'es.resource', '%s/%s' % ('index_name', 'doc_type_name'),
  File "/usr/local/lib/python2.7/site-packages/pyspark/sql/readwriter.py", line 732, in save
    self._jwrite.save()
  File "/usr/local/lib/python2.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/lib/python2.7/site-packages/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/local/lib/python2.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o46.save.
: java.lang.ClassNotFoundException: Failed to find data source: org.elasticsearch.spark.sql. Please find packages at http://spark.apache.org/third-party-projects.html
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:245)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.elasticsearch.spark.sql.DefaultSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
        at scala.util.Try.orElse(Try.scala:84)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)
        ... 12 more
like image 438
Cyber_Tron Avatar asked Jul 17 '19 23:07

Cyber_Tron


2 Answers

tl;dr Use pyspark --packages org.elasticsearch:elasticsearch-hadoop:7.2.0 and use format("es") to reference the connector.


Quoting Installation from the official documentation of the Elasticsearch for Apache Hadoop product:

Just like other libraries, elasticsearch-hadoop needs to be available in Spark’s classpath.

And later in Supported Spark SQL versions:

elasticsearch-hadoop supports both version Spark SQL 1.3-1.6 and Spark SQL 2.0 through two different jars: elasticsearch-spark-1.x-<version>.jar and elasticsearch-hadoop-<version>.jar

elasticsearch-spark-2.0-<version>.jar supports Spark SQL 2.0

That looks like an issue with the document (as they use two different versions of the jar file), but does mean that you have to use the proper jar file on the CLASSPATH of your Spark application.

And later in the same document:

Spark SQL support is available under org.elasticsearch.spark.sql package.

That simply says that the format (in df.write.format('org.elasticsearch.spark.sql')) is correct.

Further down the document you can find that you could even use an alias df.write.format("es") (!)

I found Apache Spark section in the project's repository on GitHub more readable and current.

like image 83
Jacek Laskowski Avatar answered Oct 28 '22 21:10

Jacek Laskowski


Update: The current ES-hadoop package as of June 2020 is 7.7.1, so I used pyspark --packages org.elasticsearch:elasticsearch-hadoop:7.7.1 instead.

like image 25
Ezra Justin Lee Avatar answered Oct 28 '22 22:10

Ezra Justin Lee