Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Getting error while reading from S3 server using pyspark : [java.lang.IllegalArgumentException]

i am trying to read a file using pyspark from S3 and getting below error --

Traceback (most recent call last):
  File "C:/Users/Desktop/POC_Pyspark/create_csv_ecs.py", line 41, in <module>
df = sc.read.csv("s3a://my_bucket/my_folder/test_data.csv")
File "C:\Users\sandbox\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\readwriter.py", line 166, in load
return self._df(self._jreader.load(path))
  File "C:\Users\sandbox\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py", line 1160, in __call__
  File "C:\Users\sandbox\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\utils.py", line 63, in deco
return f(*a, **kw)
  File "C:\Users\sandbox\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o29.load.
: java.lang.IllegalArgumentException
at java.util.concurrent.ThreadPoolExecutor.<init>(ThreadPoolExecutor.java:1307)
at java.util.concurrent.ThreadPoolExecutor.<init>(ThreadPoolExecutor.java:1230)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:280)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:705)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:389)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:389)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:388)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)

my code is simple however i able to connect using BOTO3 but i need to use pyspark as the files i am processing are huge and also need to do some aggregation over the CSV --

from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import Window, SparkSession
import pyspark as spark
import pyspark.sql.functions as fn

conf = spark.SparkConf().setAppName("Full PSGL Aggregation - PySpark")

sc = spark.SQLContext(spark.SparkContext(conf=conf))
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "access_key")
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "secret key")
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://end point:8020")

df = sc.read.csv("s3a://my_bucket/my_folder/test_data.csv")
print(df)

java version -

java version "1.8.0_66"

python -

Python 3.6.1 |Anaconda 4.4.0 (64-bit)

pyspark/spark -

      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.0
      /_/

Using Python version 3.6.1 (default, May 11 2017 13:25:24)

Please do let me know if more information is required.

like image 201
mradul Avatar asked Nov 29 '19 09:11

mradul


1 Answers

Looking into the source code of S3AFileSystem, it seems that either fs.s3a.threads.max or fs.s3a.threads.core parameter is missing or irrelevant.

In my case, adding

sc._jsc.hadoopConfiguration().set("fs.s3a.threads.max", "4")
sc._jsc.hadoopConfiguration().set("fs.s3a.threads.core", "4")

solved the issue.

like image 177
Mikalai Lushchytski Avatar answered Sep 21 '22 03:09

Mikalai Lushchytski