I have some files stored in a google bucket. Those are my settings as suggested here.
spark = SparkSession.builder.\
master("local[*]").\
appName("TestApp").\
config("spark.serializer", KryoSerializer.getName).\
config("spark.jars", "/usr/local/.sdkman/candidates/spark/2.4.4/jars/gcs-connector-hadoop2-2.1.1.jar").\
config("spark.kryo.registrator", GeoSparkKryoRegistrator.getName).\
getOrCreate()
#Recommended settings for using GeoSpark
spark.conf.set("spark.driver.memory", 6)
spark.conf.set("spark.network.timeout", 1000)
#spark.conf.set("spark.driver.maxResultSize", 5)
spark.conf.set
spark._jsc.hadoopConfiguration().set('fs.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem')
# This is required if you are using service account and set true,
spark._jsc.hadoopConfiguration().set('fs.gs.auth.service.account.enable', 'false')
spark._jsc.hadoopConfiguration().set('google.cloud.auth.service.account.json.keyfile', "myJson.json")
path = 'mBucket-c892b51f8579.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = path
client = storage.Client()
name = 'https://console.cloud.google.com/storage/browser/myBucket/'
bucket_id = 'myBucket'
bucket = client.get_bucket(bucket_id)
I can read them simple using the following:
df = pd.read_csv('gs://myBucket/myFile.csv.gz', compression='gzip')
df.head()
time_zone_name province_short
0 America/Chicago US.TX
1 America/Chicago US.TX
2 America/Los_Angeles US.CA
3 America/Chicago US.TX
4 America/Los_Angeles US.CA
I am trying to read the same file with pyspark
myTable = spark.read.format("csv").schema(schema).load('gs://myBucket/myFile.csv.gz', compression='gzip')
but I get the following error
Py4JJavaError: An error occurred while calling o257.load.
: java.lang.NoClassDefFoundError: Could not initialize class com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2134)
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2099)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:45)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:332)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Welcome to the hadoop dependency hell !
1. Use packages rather than jars
Your configuration is basically correct but when you add the gcs-connector as a local jar you also need to manually ensure all its dependencies are available in the JVM classpath.
It's usually easier to add the connector as a package and let spark deal with the dependencies so instead of config("spark.jars", "/usr/local/.sdkman/candidates/spark/2.4.4/jars/gcs-connector-hadoop2-2.1.1.jar")
use config('spark.jars.packages', 'com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.1.1')
2. Manage ivy2 dependencies resolution issues
When you do as above, spark will likely complain that it can't download some dependencies due to resolution differences between maven (used for publication) and ivy2 (used by spark for dependency resolution).
You can usually fix this by simply asking spark to ignore the unresolved dependencies using spark.jars.excludes
so add a new config line such as
config('spark.jars.excludes','androidx.annotation:annotation,org.slf4j:slf4j-api')
3. Manage classpath conflicts
When this is done, the SparkSession will start but the filesystem will still fail because the standard distribution of pyspark packages an old version of guava library that doesn't implement the API the gcs-connector relies on.
You need to ensure that gcs-connector will find its expected version first by using the following configs config('spark.driver.userClassPathFirst','true')
and config('spark.executor.userClassPathFirst','true')
4. Manage dependency conflicts
Now you may think everything is OK but actually no because the default pyspark distribution contains version 2.7.3 of hadoop libraries but the gcs-connector version 2.1.1 relies on 2.8+ only APIs.
Now your options are:
5. A working config at last
Assuming you want to stick with the PyPi or Anaconda latest distribution of pyspark, the following config should work as expected.
I've included only the gcs relevant configs, moved the Hadoop config directly into the spark config and assumed you are correctly setting your GOOGLE_APPLICATION_CREDENTIALS:
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
master("local[*]").\
appName("TestApp").\
config('spark.jars.packages',
'com.google.cloud.bigdataoss:gcs-connector:hadoop2-1.9.17').\
config('spark.jars.excludes',
'javax.jms:jms,com.sun.jdmk:jmxtools,com.sun.jmx:jmxri').\
config('spark.driver.userClassPathFirst','true').\
config('spark.executor.userClassPathFirst','true').\
config('spark.hadoop.fs.gs.impl',
'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem').\
config('spark.hadoop.fs.gs.auth.service.account.enable', 'false').\
getOrCreate()
Note that gcs-connector version 1.9.17 has a different set of excludes than 2.1.1 because why not...
PS: You also need to ensure you're using a Java 1.8 JVM because Spark 2.4 doesn't work on newer JVMs.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With