i am trying to read a file using pyspark from S3 and getting below error --
Traceback (most recent call last):
File "C:/Users/Desktop/POC_Pyspark/create_csv_ecs.py", line 41, in <module>
df = sc.read.csv("s3a://my_bucket/my_folder/test_data.csv")
File "C:\Users\sandbox\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\readwriter.py", line 166, in load
return self._df(self._jreader.load(path))
File "C:\Users\sandbox\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py", line 1160, in __call__
File "C:\Users\sandbox\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\utils.py", line 63, in deco
return f(*a, **kw)
File "C:\Users\sandbox\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o29.load.
: java.lang.IllegalArgumentException
at java.util.concurrent.ThreadPoolExecutor.<init>(ThreadPoolExecutor.java:1307)
at java.util.concurrent.ThreadPoolExecutor.<init>(ThreadPoolExecutor.java:1230)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:280)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:705)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:389)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:389)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:388)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
my code is simple however i able to connect using BOTO3 but i need to use pyspark as the files i am processing are huge and also need to do some aggregation over the CSV --
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import Window, SparkSession
import pyspark as spark
import pyspark.sql.functions as fn
conf = spark.SparkConf().setAppName("Full PSGL Aggregation - PySpark")
sc = spark.SQLContext(spark.SparkContext(conf=conf))
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "access_key")
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "secret key")
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://end point:8020")
df = sc.read.csv("s3a://my_bucket/my_folder/test_data.csv")
print(df)
java version -
java version "1.8.0_66"
python -
Python 3.6.1 |Anaconda 4.4.0 (64-bit)
pyspark/spark -
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.3.0
/_/
Using Python version 3.6.1 (default, May 11 2017 13:25:24)
Please do let me know if more information is required.
Looking into the source code of S3AFileSystem, it seems that either fs.s3a.threads.max
or fs.s3a.threads.core
parameter is missing or irrelevant.
In my case, adding
sc._jsc.hadoopConfiguration().set("fs.s3a.threads.max", "4")
sc._jsc.hadoopConfiguration().set("fs.s3a.threads.core", "4")
solved the issue.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With