First, I get all data using MySQL query from production database then store that data as NEW LINE DELIMITED JSON
in google cloud storage, what I want to do is:
1. check if the table exists
2. if the table doesn't exist, create the table using autodetect schema
3. store data
All of this will be scheduled in airflow. What really confused me is number 2
, how can I do this in Python ? or can Airflow do this automatically?
Airflow can do this automatically. The create_disposition
parameter creates the table if needed. And the autodetect
parameter does exactly what you need. This is for Airflow 1.10.2.
GCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
task_id='gcs_to_bq',
bucket='test_bucket',
source_objects=['folder1/*.csv', 'folder2/*.csv'],
destination_project_dataset_table='dest_table',
source_format='CSV',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
bigquery_conn_id='bq-conn',
google_cloud_storage_conn_id='gcp-conn',
autodetect=True, # This uses autodetect
dag=dag
)
From BigQuery commandline, if your json file is on GCS, then Loading JSON data with schema auto-detection does 2 + 3 for you in one command.
Looking at AirFlow document, GoogleCloudStorageToBigQueryOperator seems doing the same thing. I checked its source, it simply call BigQuery load api. I believe it will do what you want.
When it is unclear what each argument means, you can search BigQuery Jobs api using argument name.
E.g., to achieve 1 in your task list, you only need to specify:
write_disposition (string) – The write disposition if the table already exists.
But in order to know what string you need to pass as write_disposition, you have to search on BigQuery document.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With