I have a non-trivial table schema (involving nested and repeated fields) defined in JSON format (with the name, type, mode attributes) and stored in a file. It has been successfully used to populate a bigquery table with bq load command.
But when I try to do the same thing with Dataflow Python SDK and BigQuerySink, the schema
argument needs to be either a comma-separated list of 'name':'type'
elements, or a bigquery.TableSchema
object.
Is there any convenient way of getting my JSON schema to a bigquery.TableSchema
, or do I have to transform it to a name:value
list?
Currently you cannot directly specify a JSON schema. You have to specify the schema either as a string that contains a comma separated list of fields or a bigquery.TableSchema
object.
If the schema is complex and contains nested and/or repeated fields, we recommend building a bigquery.TableSchema
object.
Here is an example bigquery.TableSchema
object with nested and repeated fields.
from apitools.clients import bigquery
table_schema = bigquery.TableSchema()
# ‘string’ field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'fullName'
field_schema.type = 'string'
field_schema.mode = 'required'
table_schema.fields.append(field_schema)
# ‘integer’ field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'age'
field_schema.type = 'integer'
field_schema.mode = 'nullable'
table_schema.fields.append(field_schema)
# nested field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'phoneNumber'
field_schema.type = 'record'
field_schema.mode = 'nullable'
area_code = bigquery.TableFieldSchema()
area_code.name = 'areaCode'
area_code.type = 'integer'
area_code.mode = 'nullable'
field_schema.fields.append(area_code)
number = bigquery.TableFieldSchema()
number.name = 'number'
number.type = 'integer'
number.mode = 'nullable'
field_schema.fields.append(number)
table_schema.fields.append(field_schema)
# repeated field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'children'
field_schema.type = 'string'
field_schema.mode = 'repeated'
table_schema.fields.append(field_schema)
The above snippet posted by Andrea Pierleoni works with older versions of the google-cloud-bigquery
python client, for example for version 0.25.0
of google-cloud-bigquery
that happens to install via pip install apache-beam[gcp]
.
However, the BigQuery Python client API has changed drastically in more recent versions of google-cloud-bigquery
, for example in version 1.8.0
that I am currently on, bigquery.TableFieldSchema()
and bigquery.TableSchema()
don't work.
If you're on a more recent version of the google-cloud-bigquery
package, here's how you can get the required SchemaField
list (required to create the table, for example) from a JSON file. This is an adaptation of the code posted above by Andrea Pierleoni (thanks for that!)
def _get_field_schema(field):
name = field['name']
field_type = field.get('type', 'STRING')
mode = field.get('mode', 'NULLABLE')
fields = field.get('fields', [])
if fields:
subschema = []
for f in fields:
fields_res = _get_field_schema(f)
subschema.append(fields_res)
else:
subschema = []
field_schema = bigquery.SchemaField(name=name,
field_type=field_type,
mode=mode,
fields=subschema
)
return field_schema
def parse_bq_json_schema(schema_filename):
schema = []
with open(schema_filename, 'r') as infile:
jsonschema = json.load(infile)
for field in jsonschema:
schema.append(_get_field_schema(field))
return schema
Now, suppose you had a table's schema already defined in JSON. Say you had this particular "schema.json" file, then using the above helper methods, you could get the required SchemaField
representation for the Python client like so:
>>> res_schema = parse_bq_json_schema("schema.json")
>>> print(res_schema)
[SchemaField(u'event_id', u'INTEGER', u'REQUIRED', None, ()), SchemaField(u'event_name', u'STRING', u'REQUIRED', None, ()), SchemaField(u'event_types', u'STRING', u'REPEATED', None, ()), SchemaField(u'product_code', u'STRING', u'REQUIRED', None, ()), SchemaField(u'product_sub_code', u'STRING', u'REPEATED', None, ()), SchemaField(u'source', u'RECORD', u'REQUIRED', None, (SchemaField(u'internal', u'RECORD', u'NULLABLE', None, (SchemaField(u'name', u'STRING', u'REQUIRED', None, ()), SchemaField(u'timestamp', u'TIMESTAMP', u'REQUIRED', None, ()))), SchemaField(u'external', u'RECORD', u'NULLABLE', None, (SchemaField(u'name', u'STRING', u'REQUIRED', None, ()), SchemaField(u'timestamp', u'TIMESTAMP', u'REQUIRED', None, ()))))), SchemaField(u'timestamp', u'TIMESTAMP', u'REQUIRED', None, ()), SchemaField(u'user_key', u'RECORD', u'REQUIRED', None, (SchemaField(u'device_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'cookie_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'profile_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'best_id', u'STRING', u'REQUIRED', None, ()))), SchemaField(u'message_id', u'STRING', u'REQUIRED', None, ()), SchemaField(u'message_type', u'STRING', u'REQUIRED', None, ()), SchemaField(u'tracking_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'funnel_stage', u'STRING', u'NULLABLE', None, ()), SchemaField(u'location', u'RECORD', u'NULLABLE', None, (SchemaField(u'latitude', u'FLOAT', u'REQUIRED', None, ()), SchemaField(u'longitude', u'FLOAT', u'REQUIRED', None, ()), SchemaField(u'geo_region_id', u'INTEGER', u'NULLABLE', None, ()))), SchemaField(u'campaign_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'topic', u'STRING', u'REQUIRED', None, ())]
Now to create a table having the above schema using the Python SDK, you would do:
dataset_ref = bqclient.dataset('YOUR_DATASET')
table_ref = dataset_ref.table('YOUR_TABLE')
table = bigquery.Table(table_ref, schema=res_schema)
You could optionally set time-based partitioning (if needed) like this:
table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field='timestamp' # name of column to use for partitioning
)
And this finally creates the table:
table = bqclient.create_table(table)
print('Created table {}, partitioned on column {}'.format(
table.table_id, table.time_partitioning.field))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With