Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

BigQuery connector for pyspark via Hadoop Input Format example

I have a large dataset stored into a BigQuery table and I would like to load it into a pypark RDD for ETL data processing.

I realized that BigQuery supports the Hadoop Input / Output format

https://cloud.google.com/hadoop/writing-with-bigquery-connector

and pyspark should be able to use this interface in order to create an RDD by using the method "newAPIHadoopRDD".

http://spark.apache.org/docs/latest/api/python/pyspark.html

Unfortunately, the documentation on both ends seems scarce and goes beyond my knowledge of Hadoop/Spark/BigQuery. Is there anybody who has figured out how to do this?

like image 873
Luca Fiaschi Avatar asked Jul 14 '15 08:07

Luca Fiaschi


People also ask

How do you connect PySpark to BigQuery?

Install the spark-bigquery-connector in the Spark jars directory of every node by using the Dataproc connectors initialization action when you create your cluster. Provide the connector URI when you submit your job: Google Cloud console: Use the Spark job Jars files item on the Dataproc Submit a job page.

How can you set up your Dataproc environment to use BigQuery as an input and output source?

Install the BigQuery connector on your Dataproc cluster. i. You can install the BigQuery connector to your cluster for direct programmatic read/write access to BigQuery. Note that a Cloud Storage bucket is used between the two services, but you'll interact directly with BigQuery from Dataproc.

What file format does BigQuery use?

Loading from Cloud Storage to BigQuery supports multiple file formats—CSV, JSON, Avro, Parquet, and ORC.

Which of these source formats does BigQuery support?

BigQuery supports UTF-8 encoding for both nested or repeated and flat data. BigQuery supports ISO-8859-1 encoding for flat data only for CSV files.


1 Answers

Google now has an example on how to use the BigQuery connector with Spark.

There does seem to be a problem using the GsonBigQueryInputFormat, but I got a simple Shakespeare word counting example working

import json
import pyspark
sc = pyspark.SparkContext()

hadoopConf=sc._jsc.hadoopConfiguration()
hadoopConf.get("fs.gs.system.bucket")

conf = {"mapred.bq.project.id": "<project_id>", "mapred.bq.gcs.bucket": "<bucket>", "mapred.bq.input.project.id": "publicdata", "mapred.bq.input.dataset.id":"samples", "mapred.bq.input.table.id": "shakespeare"  }

tableData = sc.newAPIHadoopRDD("com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat", "org.apache.hadoop.io.LongWritable", "com.google.gson.JsonObject", conf=conf).map(lambda k: json.loads(k[1])).map(lambda x: (x["word"], int(x["word_count"]))).reduceByKey(lambda x,y: x+y)
print tableData.take(10)
like image 91
Matt J Avatar answered Oct 17 '22 21:10

Matt J