I have a large dataset stored into a BigQuery table and I would like to load it into a pypark RDD for ETL data processing.
I realized that BigQuery supports the Hadoop Input / Output format
https://cloud.google.com/hadoop/writing-with-bigquery-connector
and pyspark should be able to use this interface in order to create an RDD by using the method "newAPIHadoopRDD".
http://spark.apache.org/docs/latest/api/python/pyspark.html
Unfortunately, the documentation on both ends seems scarce and goes beyond my knowledge of Hadoop/Spark/BigQuery. Is there anybody who has figured out how to do this?
Install the spark-bigquery-connector in the Spark jars directory of every node by using the Dataproc connectors initialization action when you create your cluster. Provide the connector URI when you submit your job: Google Cloud console: Use the Spark job Jars files item on the Dataproc Submit a job page.
Install the BigQuery connector on your Dataproc cluster. i. You can install the BigQuery connector to your cluster for direct programmatic read/write access to BigQuery. Note that a Cloud Storage bucket is used between the two services, but you'll interact directly with BigQuery from Dataproc.
Loading from Cloud Storage to BigQuery supports multiple file formats—CSV, JSON, Avro, Parquet, and ORC.
BigQuery supports UTF-8 encoding for both nested or repeated and flat data. BigQuery supports ISO-8859-1 encoding for flat data only for CSV files.
Google now has an example on how to use the BigQuery connector with Spark.
There does seem to be a problem using the GsonBigQueryInputFormat, but I got a simple Shakespeare word counting example working
import json
import pyspark
sc = pyspark.SparkContext()
hadoopConf=sc._jsc.hadoopConfiguration()
hadoopConf.get("fs.gs.system.bucket")
conf = {"mapred.bq.project.id": "<project_id>", "mapred.bq.gcs.bucket": "<bucket>", "mapred.bq.input.project.id": "publicdata", "mapred.bq.input.dataset.id":"samples", "mapred.bq.input.table.id": "shakespeare" }
tableData = sc.newAPIHadoopRDD("com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat", "org.apache.hadoop.io.LongWritable", "com.google.gson.JsonObject", conf=conf).map(lambda k: json.loads(k[1])).map(lambda x: (x["word"], int(x["word_count"]))).reduceByKey(lambda x,y: x+y)
print tableData.take(10)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With