I have a non-trivial table schema (involving nested and repeated fields) defined in JSON format (with the name, type, mode attributes) and stored in a file. It has been successfully used to populate a bigquery table with bq load command.
But when I try to do the same thing with Dataflow Python SDK and BigQuerySink, the schema argument needs to be either a comma-separated list of 'name':'type' elements, or a bigquery.TableSchema object.
Is there any convenient way of getting my JSON schema to a bigquery.TableSchema, or do I have to transform it to a name:value list?
Currently you cannot directly specify a JSON schema. You have to specify the schema either as a string that contains a comma separated list of fields or a bigquery.TableSchema object.
If the schema is complex and contains nested and/or repeated fields, we recommend building a bigquery.TableSchema object.
Here is an example bigquery.TableSchema object with nested and repeated fields.
from apitools.clients import bigquery
table_schema = bigquery.TableSchema()
# ‘string’ field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'fullName'
field_schema.type = 'string'
field_schema.mode = 'required'
table_schema.fields.append(field_schema)
# ‘integer’ field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'age'
field_schema.type = 'integer'
field_schema.mode = 'nullable'
table_schema.fields.append(field_schema)
# nested field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'phoneNumber'
field_schema.type = 'record'
field_schema.mode = 'nullable'
area_code = bigquery.TableFieldSchema()
area_code.name = 'areaCode'
area_code.type = 'integer'
area_code.mode = 'nullable'
field_schema.fields.append(area_code)
number = bigquery.TableFieldSchema()
number.name = 'number'
number.type = 'integer'
number.mode = 'nullable'
field_schema.fields.append(number)
table_schema.fields.append(field_schema)
# repeated field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'children'
field_schema.type = 'string'
field_schema.mode = 'repeated'
table_schema.fields.append(field_schema)
The above snippet posted by Andrea Pierleoni works with older versions of the google-cloud-bigquery python client, for example for version 0.25.0 of google-cloud-bigquery that happens to install via pip install apache-beam[gcp].
However, the BigQuery Python client API has changed drastically in more recent versions of google-cloud-bigquery, for example in version 1.8.0 that I am currently on, bigquery.TableFieldSchema() and bigquery.TableSchema() don't work.
If you're on a more recent version of the google-cloud-bigquery package, here's how you can get the required SchemaField list (required to create the table, for example) from a JSON file. This is an adaptation of the code posted above by Andrea Pierleoni (thanks for that!)
def _get_field_schema(field):
name = field['name']
field_type = field.get('type', 'STRING')
mode = field.get('mode', 'NULLABLE')
fields = field.get('fields', [])
if fields:
subschema = []
for f in fields:
fields_res = _get_field_schema(f)
subschema.append(fields_res)
else:
subschema = []
field_schema = bigquery.SchemaField(name=name,
field_type=field_type,
mode=mode,
fields=subschema
)
return field_schema
def parse_bq_json_schema(schema_filename):
schema = []
with open(schema_filename, 'r') as infile:
jsonschema = json.load(infile)
for field in jsonschema:
schema.append(_get_field_schema(field))
return schema
Now, suppose you had a table's schema already defined in JSON. Say you had this particular "schema.json" file, then using the above helper methods, you could get the required SchemaField representation for the Python client like so:
>>> res_schema = parse_bq_json_schema("schema.json")
>>> print(res_schema)
[SchemaField(u'event_id', u'INTEGER', u'REQUIRED', None, ()), SchemaField(u'event_name', u'STRING', u'REQUIRED', None, ()), SchemaField(u'event_types', u'STRING', u'REPEATED', None, ()), SchemaField(u'product_code', u'STRING', u'REQUIRED', None, ()), SchemaField(u'product_sub_code', u'STRING', u'REPEATED', None, ()), SchemaField(u'source', u'RECORD', u'REQUIRED', None, (SchemaField(u'internal', u'RECORD', u'NULLABLE', None, (SchemaField(u'name', u'STRING', u'REQUIRED', None, ()), SchemaField(u'timestamp', u'TIMESTAMP', u'REQUIRED', None, ()))), SchemaField(u'external', u'RECORD', u'NULLABLE', None, (SchemaField(u'name', u'STRING', u'REQUIRED', None, ()), SchemaField(u'timestamp', u'TIMESTAMP', u'REQUIRED', None, ()))))), SchemaField(u'timestamp', u'TIMESTAMP', u'REQUIRED', None, ()), SchemaField(u'user_key', u'RECORD', u'REQUIRED', None, (SchemaField(u'device_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'cookie_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'profile_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'best_id', u'STRING', u'REQUIRED', None, ()))), SchemaField(u'message_id', u'STRING', u'REQUIRED', None, ()), SchemaField(u'message_type', u'STRING', u'REQUIRED', None, ()), SchemaField(u'tracking_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'funnel_stage', u'STRING', u'NULLABLE', None, ()), SchemaField(u'location', u'RECORD', u'NULLABLE', None, (SchemaField(u'latitude', u'FLOAT', u'REQUIRED', None, ()), SchemaField(u'longitude', u'FLOAT', u'REQUIRED', None, ()), SchemaField(u'geo_region_id', u'INTEGER', u'NULLABLE', None, ()))), SchemaField(u'campaign_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'topic', u'STRING', u'REQUIRED', None, ())]
Now to create a table having the above schema using the Python SDK, you would do:
dataset_ref = bqclient.dataset('YOUR_DATASET')
table_ref = dataset_ref.table('YOUR_TABLE')
table = bigquery.Table(table_ref, schema=res_schema)
You could optionally set time-based partitioning (if needed) like this:
table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field='timestamp' # name of column to use for partitioning
)
And this finally creates the table:
table = bqclient.create_table(table)
print('Created table {}, partitioned on column {}'.format(
table.table_id, table.time_partitioning.field))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With