Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to write dictionaries to Bigquery in Dataflow using python

I am trying to read from a csv from in GCP Storage, converting that into dictionaries and then write to a Bigquery table as follows:

p | ReadFromText("gs://bucket/file.csv") 
  | (beam.ParDo(BuildAdsRecordFn()))
  | WriteToBigQuery('ads_table',dataset='dds',project='doubleclick-2',schema=ads_schema)

where: 'doubleclick-2' and 'dds' are existing project and dataset, ads_schema is defined as follows:

ads_schema='Advertiser_ID:INTEGER,Campaign_ID:INTEGER,Ad_ID:INTEGER,Ad_Name:STRING,Click_through_URL:STRING,Ad_Type:STRING'

BuildAdsRecordFn() is defined as follows:

class AdsRecord:
  dict = {}

  def __init__(self, line):
    record = line.split(",")
    self.dict['Advertiser_ID'] = record[0]
    self.dict['Campaign_ID'] = record[1]
    self.dict['Ad_ID'] = record[2]
    self.dict['Ad_Name'] = record[3]
    self.dict['Click_through_URL'] = record[4]
    self.dict['Ad_Type'] = record[5]


class BuildAdsRecordFn(beam.DoFn):
  def __init__(self):
    super(BuildAdsRecordFn, self).__init__()

  def process(self, element):
    text_line = element.strip()
    ads_record = AdsRecord(text_line).dict
    return ads_record

However, when I run the pipeline, I got the following error:

"dataflow_job_18146703755411620105-B" failed., (6c011965a92e74fa): BigQuery job "dataflow_job_18146703755411620105-B" in project "doubleclick-2" finished with error(s): errorResult: JSON table encountered too many errors, giving up. Rows: 1; errors: 1., error: JSON table encountered too many errors, giving up. Rows: 1; errors: 1., error: JSON parsing error in row starting at position 0: Value encountered without start of object

Here is the sample testing data I used:

100001,1000011,10000111,ut,https://bloomberg.com/aliquam/lacus/morbi.xml,Brand-neutral
100001,1000011,10000112,eu,http://weebly.com/sed/vel/enim/sit.jsp,Dynamic Click

I'm new to both Dataflow and python so could not figure out what could be wrong in the above code. Greatly appreciate any help!

like image 626
michael99 Avatar asked Dec 13 '25 23:12

michael99


1 Answers

I just implemented your code and it didn't work as well, but I got a different message error (something like "you can't return a dict as the result of a ParDo").

This code worked normally for me, notice not only I'm not using the class attribute dict as well as now a list is returned:

ads_schema='Advertiser_ID:INTEGER,Campaign_ID:INTEGER,Ad_ID:INTEGER,Ad_Name:STRING,Click_through_URL:STRING,Ad_Type:STRING'

class BuildAdsRecordFn(beam.DoFn):
    def __init__(self):
      super(BuildAdsRecordFn, self).__init__()

    def process(self, element):
      text_line = element.strip()
      ads_record = self.process_row(element)      
      return ads_record

    def process_row(self, row):
        dict_ = {}

        record = row.split(",")
        dict_['Advertiser_ID'] = int(record[0]) if record[0] else None
        dict_['Campaign_ID'] = int(record[1]) if record[1] else None
        dict_['Ad_ID'] = int(record[2]) if record[2] else None
        dict_['Ad_Name'] = record[3]
        dict_['Click_through_URL'] = record[4]
        dict_['Ad_Type'] = record[5]
        return [dict_]

with beam.Pipeline() as p:

    (p | ReadFromText("gs://bucket/file.csv")
       | beam.Filter(lambda x: x[0] != 'A')
       | (beam.ParDo(BuildAdsRecordFn()))
       | WriteToBigQuery('ads_table', dataset='dds',
           project='doubleclick-2', schema=ads_schema))
      #| WriteToText('test.csv'))

This is the data I simulated:

Advertiser_ID,Campaign_ID,Ad_ID,Ad_Name,Click_through_URL,Ad_Type
1,1,1,name of ad,www.url.com,sales
1,1,2,name of ad2,www.url2.com,sales with sales

I also filtered out the header line that I created in my file (in the Filter operation), if you don't have a header then this is not necessary

like image 169
Willian Fuks Avatar answered Dec 16 '25 15:12

Willian Fuks



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!