Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Write BigQuery results to GCS in CSV format using Apache Beam

I am pretty new working on Apache Beam , where in I am trying to write a pipeline to extract the data from Google BigQuery and write the data to GCS in CSV format using Python.

Using beam.io.read(beam.io.BigQuerySource()) I am able to read the data from BigQuery but not sure how to write it to GCS in CSV format.

Is there a custom function to achieve the same , could you please help me?

import logging

import apache_beam as beam
from apache_beam.io.BigQueryDisposition import CREATE_IF_NEEDED
from apache_beam.io.BigQueryDisposition import WRITE_TRUNCATE

PROJECT='project_id'
BUCKET='project_bucket'


def run():
    argv = [
        '--project={0}'.format(PROJECT),
        '--job_name=readwritebq',
        '--save_main_session',
        '--staging_location=gs://{0}/staging/'.format(BUCKET),
        '--temp_location=gs://{0}/staging/'.format(BUCKET),
        '--runner=DataflowRunner'
    ]

    with beam.Pipeline(argv=argv) as p:

        # Execute the SQL in big query and store the result data set into given Destination big query table.
        BQ_SQL_TO_TABLE = p | 'read_bq_view' >> beam.io.Read(
            beam.io.BigQuerySource(query =  'Select * from `dataset.table`', use_standard_sql=True))
        # Extract data from Bigquery to GCS in CSV format.
        # This is where I need your help

        BQ_SQL_TO_TABLE | 'Write_bq_table' >> beam.io.WriteToBigQuery(
                table='tablename',
                dataset='datasetname',
                project='project_id',
                schema='name:string,gender:string,count:integer',
                create_disposition=CREATE_IF_NEEDED,
                write_disposition=WRITE_TRUNCATE)

if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   run()
like image 506
Hari Avatar asked Oct 22 '18 12:10

Hari


2 Answers

You can do so using WriteToText to add a .csv suffix and headers. Take into account that you'll need to parse the query results to CSV format. As an example, I used the Shakespeare public dataset and the following query:

SELECT word, word_count, corpus FROM `bigquery-public-data.samples.shakespeare` WHERE CHAR_LENGTH(word) > 3 ORDER BY word_count DESC LIMIT 10

We now read the query results with:

BQ_DATA = p | 'read_bq_view' >> beam.io.Read(
    beam.io.BigQuerySource(query=query, use_standard_sql=True))

BQ_DATA now contains key-value pairs:

{u'corpus': u'hamlet', u'word': u'HAMLET', u'word_count': 407}
{u'corpus': u'kingrichardiii', u'word': u'that', u'word_count': 319}
{u'corpus': u'othello', u'word': u'OTHELLO', u'word_count': 313}

We can apply a beam.Map function to yield only values:

BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: x.values())

Excerpt of BQ_VALUES:

[u'hamlet', u'HAMLET', 407]
[u'kingrichardiii', u'that', 319]
[u'othello', u'OTHELLO', 313]

And finally map again to have all column values separated by commas instead of a list (take into account that you would need to escape double quotes if they can appear within a field):

BQ_CSV = BQ_VALUES | 'CSV format' >> beam.Map(
    lambda row: ', '.join(['"'+ str(column) +'"' for column in row]))

Now we write the results to GCS with the suffix and headers:

BQ_CSV | 'Write_to_GCS' >> beam.io.WriteToText(
    'gs://{0}/results/output'.format(BUCKET), file_name_suffix='.csv', header='word, word count, corpus')

Written results:

$ gsutil cat gs://$BUCKET/results/output-00000-of-00001.csv
word, word count, corpus
"hamlet", "HAMLET", "407"
"kingrichardiii", "that", "319"
"othello", "OTHELLO", "313"
"merrywivesofwindsor", "MISTRESS", "310"
"othello", "IAGO", "299"
"antonyandcleopatra", "ANTONY", "284"
"asyoulikeit", "that", "281"
"antonyandcleopatra", "CLEOPATRA", "274"
"measureforemeasure", "your", "274"
"romeoandjuliet", "that", "270"
like image 105
Guillem Xercavins Avatar answered Sep 19 '22 10:09

Guillem Xercavins


For anyone looking for an update using Python 3, replace the line of

BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: x.values())

with

BQ_VALUES = BQ_DATA | 'read values' >> beam.Map(lambda x: list(x.values()))
like image 33
Kim Merino Avatar answered Sep 18 '22 10:09

Kim Merino