Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

JSON table schema to bigquery.TableSchema for BigQuerySink

I have a non-trivial table schema (involving nested and repeated fields) defined in JSON format (with the name, type, mode attributes) and stored in a file. It has been successfully used to populate a bigquery table with bq load command.

But when I try to do the same thing with Dataflow Python SDK and BigQuerySink, the schema argument needs to be either a comma-separated list of 'name':'type' elements, or a bigquery.TableSchema object.

Is there any convenient way of getting my JSON schema to a bigquery.TableSchema, or do I have to transform it to a name:value list?

like image 553
ivarg Avatar asked Nov 28 '22 14:11

ivarg


2 Answers

Currently you cannot directly specify a JSON schema. You have to specify the schema either as a string that contains a comma separated list of fields or a bigquery.TableSchema object.

If the schema is complex and contains nested and/or repeated fields, we recommend building a bigquery.TableSchema object.

Here is an example bigquery.TableSchema object with nested and repeated fields.

from apitools.clients import bigquery

table_schema = bigquery.TableSchema()

# ‘string’ field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'fullName'
field_schema.type = 'string'
field_schema.mode = 'required'
table_schema.fields.append(field_schema)

# ‘integer’ field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'age'
field_schema.type = 'integer'
field_schema.mode = 'nullable'
table_schema.fields.append(field_schema)

# nested field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'phoneNumber'
field_schema.type = 'record'
field_schema.mode = 'nullable'

area_code = bigquery.TableFieldSchema()
area_code.name = 'areaCode'
area_code.type = 'integer'
area_code.mode = 'nullable'
field_schema.fields.append(area_code)

number = bigquery.TableFieldSchema()
number.name = 'number'
number.type = 'integer'
number.mode = 'nullable'
field_schema.fields.append(number)
table_schema.fields.append(field_schema)

# repeated field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'children'
field_schema.type = 'string'
field_schema.mode = 'repeated'
table_schema.fields.append(field_schema)
like image 44
chamikara Avatar answered Dec 10 '22 15:12

chamikara


The above snippet posted by Andrea Pierleoni works with older versions of the google-cloud-bigquery python client, for example for version 0.25.0 of google-cloud-bigquery that happens to install via pip install apache-beam[gcp].

However, the BigQuery Python client API has changed drastically in more recent versions of google-cloud-bigquery, for example in version 1.8.0 that I am currently on, bigquery.TableFieldSchema() and bigquery.TableSchema() don't work.

If you're on a more recent version of the google-cloud-bigquery package, here's how you can get the required SchemaField list (required to create the table, for example) from a JSON file. This is an adaptation of the code posted above by Andrea Pierleoni (thanks for that!)

def _get_field_schema(field):
    name = field['name']
    field_type = field.get('type', 'STRING')
    mode = field.get('mode', 'NULLABLE')
    fields = field.get('fields', [])

    if fields:
        subschema = []
        for f in fields:
            fields_res = _get_field_schema(f)
            subschema.append(fields_res)
    else:
        subschema = []

    field_schema = bigquery.SchemaField(name=name, 
        field_type=field_type,
        mode=mode,
        fields=subschema
    )
    return field_schema


def parse_bq_json_schema(schema_filename):
    schema = []
    with open(schema_filename, 'r') as infile:
        jsonschema = json.load(infile)

    for field in jsonschema:
        schema.append(_get_field_schema(field))

    return schema

Now, suppose you had a table's schema already defined in JSON. Say you had this particular "schema.json" file, then using the above helper methods, you could get the required SchemaField representation for the Python client like so:

>>> res_schema = parse_bq_json_schema("schema.json")

>>> print(res_schema)

[SchemaField(u'event_id', u'INTEGER', u'REQUIRED', None, ()), SchemaField(u'event_name', u'STRING', u'REQUIRED', None, ()), SchemaField(u'event_types', u'STRING', u'REPEATED', None, ()), SchemaField(u'product_code', u'STRING', u'REQUIRED', None, ()), SchemaField(u'product_sub_code', u'STRING', u'REPEATED', None, ()), SchemaField(u'source', u'RECORD', u'REQUIRED', None, (SchemaField(u'internal', u'RECORD', u'NULLABLE', None, (SchemaField(u'name', u'STRING', u'REQUIRED', None, ()), SchemaField(u'timestamp', u'TIMESTAMP', u'REQUIRED', None, ()))), SchemaField(u'external', u'RECORD', u'NULLABLE', None, (SchemaField(u'name', u'STRING', u'REQUIRED', None, ()), SchemaField(u'timestamp', u'TIMESTAMP', u'REQUIRED', None, ()))))), SchemaField(u'timestamp', u'TIMESTAMP', u'REQUIRED', None, ()), SchemaField(u'user_key', u'RECORD', u'REQUIRED', None, (SchemaField(u'device_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'cookie_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'profile_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'best_id', u'STRING', u'REQUIRED', None, ()))), SchemaField(u'message_id', u'STRING', u'REQUIRED', None, ()), SchemaField(u'message_type', u'STRING', u'REQUIRED', None, ()), SchemaField(u'tracking_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'funnel_stage', u'STRING', u'NULLABLE', None, ()), SchemaField(u'location', u'RECORD', u'NULLABLE', None, (SchemaField(u'latitude', u'FLOAT', u'REQUIRED', None, ()), SchemaField(u'longitude', u'FLOAT', u'REQUIRED', None, ()), SchemaField(u'geo_region_id', u'INTEGER', u'NULLABLE', None, ()))), SchemaField(u'campaign_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'topic', u'STRING', u'REQUIRED', None, ())]

Now to create a table having the above schema using the Python SDK, you would do:

dataset_ref = bqclient.dataset('YOUR_DATASET')
table_ref = dataset_ref.table('YOUR_TABLE')
table = bigquery.Table(table_ref, schema=res_schema)

You could optionally set time-based partitioning (if needed) like this:

table.time_partitioning = bigquery.TimePartitioning(
    type_=bigquery.TimePartitioningType.DAY,
    field='timestamp'  # name of column to use for partitioning
)

And this finally creates the table:

table = bqclient.create_table(table)

print('Created table {}, partitioned on column {}'.format(
    table.table_id, table.time_partitioning.field))
like image 83
nonbeing Avatar answered Dec 10 '22 17:12

nonbeing