I have a Dataflow job to write to BigQuery. It works well for non-nested schema, however fails for the nested schema.
Here is my Dataflow pipeline:
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
wordcount_options = pipeline_options.view_as(WordcountTemplatedOptions)
schema = 'url: STRING,' \
'ua: STRING,' \
'method: STRING,' \
'man: RECORD,' \
'man.ip: RECORD,' \
'man.ip.cc: STRING,' \
'man.ip.city: STRING,' \
'man.ip.as: INTEGER,' \
'man.ip.country: STRING,' \
'man.res: RECORD,' \
'man.res.ip_dom: STRING'
first = p | 'read' >> ReadFromText(wordcount_options.input)
second = (first
| 'process' >> (beam.ParDo(processFunction()))
| 'write' >> beam.io.WriteToBigQuery(
'myBucket:tableFolder.test_table',
schema=schema)
)
I created BigQuery Table using the following Schema is:
[
{
"mode": "NULLABLE",
"name": "url",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "ua",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "method",
"type": "STRING"
},
{
"mode": "REPEATED",
"name": "man",
"type": "RECORD",
"fields":
[
{
"mode": "REPEATED",
"name": "ip",
"type": "RECORD",
"fields":
[
{
"mode": "NULLABLE",
"name": "cc",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "city",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "as",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "country",
"type": "STRING"
}
]
},
{
"mode": "REPEATED",
"name": "res",
"type": "RECORD",
"fields":
[
{
"mode": "NULLABLE",
"name": "ip_dom",
"type": "STRING"
}
]
}
]
}
]
I am getting the following error:
BigQuery creation of import job for table "test_table" in dataset "tableFolder" in project "myBucket" failed., BigQuery execution failed., HTTP transport error:
Message: Invalid value for: url is not a valid value
HTTP Code: 400
Question Can someone please guide me? What am I doing wrong? Also, If there is a better way to iterate through all the nested schema and write to BigQuery please suggest?
Additional info My data file:
{"url":"xyz.com","ua":"Mozilla/5.0 Chrome/63","method":"PUT","man":{"ip":{"cc":"IN","city":"delhi","as":274,"country":"States"},"res":{"ip_dom":"v1"}}}
{"url":"xyz.com","ua":"Mozilla/5.0 Chrome/63","method":"PUT","man":{"ip":{"cc":"DK","city":"munlan","as":4865,"country":"United"},"res":{"ip_dom":"v1"}}}
{"url":"xyz.com","ua":"Mozilla/5.0 Chrome/63","method":"GET","man":{"ip":{"cc":"BS","city":"sind","as":7655,"country":"India"},"res":{"ip_dom":"v1"}}}
Parallel Processing: It uses a cloud-based parallel query processing engine that reads data from thousands of disks at the same time. This is one of the main factors that enable the transfer of data from Dataflow to BigQuery efficiently.
Specifying the nested and repeated columns in the example To specify the nested and repeated addresses column in the Google Cloud console: In the Google Cloud console, open the BigQuery page. In the Explorer panel, expand your project and select a dataset. In the details panel, click add_box Create table.
BigQuery automatically flattens nested fields when querying. To query a column with nested data, each field must be identified in the context of the column that contains it. For example: customer.id refers to the id field in the customer column.
The problem with your code is that you try to use nested fields while specifying BigQuery Table Schema as string, which is not supported.
In order to push nested records into BigQuery from Apache Beam you need to create TableSchema
object, i.e using built-in parser:
from apache_beam.io.gcp.bigquery import parse_table_schema_from_json
table_schema = parse_table_schema_from_json(your_bigquery_json_schema)
You need to pass schema as JSON string there, you can obtain it using the following command in your terminal (I assume that you have gcloud tools
installed):
bq --project=your-gcp-project-name --format=json show your.table.name > schema.json
and in Python use it as follows:
table_schema = parse_table_schema_from_json(json.dumps(json.load(open("schema.json"))["schema"]))
Then in your pipeline:
beam.io.WriteToBigQuery(
'myBucket:tableFolder.test_table',
schema=table_schema)
You can also take a look at the example showing manual creation of TableSchema
object:
https://github.com/apache/beam/blob/474345f5987e47a22d063c7bfcb3638c85a57e64/sdks/python/apache_beam/examples/cookbook/bigquery_schema.py
which is (from the linked example):
from apache_beam.io.gcp.internal.clients import bigquery
table_schema = bigquery.TableSchema()
full_name_schema = bigquery.TableFieldSchema()
full_name_schema.name = 'fullName'
full_name_schema.type = 'string'
full_name_schema.mode = 'required'
table_schema.fields.append(full_name_schema)
# A nested field
phone_number_schema = bigquery.TableFieldSchema()
phone_number_schema.name = 'phoneNumber'
phone_number_schema.type = 'record'
phone_number_schema.mode = 'nullable'
number = bigquery.TableFieldSchema()
number.name = 'number'
number.type = 'integer'
number.mode = 'nullable'
phone_number_schema.fields.append(number)
table_schema.fields.append(phone_number_schema)
area_code = bigquery.TableFieldSchema()
area_code.name = 'areaCode'
area_code.type = 'integer'
area_code.mode = 'nullable'
phone_number_schema.fields.append(area_code)
table_schema.fields.append(phone_number_schema)
then just use table_schema
variable in beam.io.WriteToBigQuery
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With