I read many documents about google bigquery-python, but I can't understand how to manage bigquery data by python code.
At first, I make a new table as below.
credentials = GoogleCredentials.get_application_default()
service = build('bigquery', 'v2', credentials = credentials)
project_id = 'my_project'
dataset_id = 'my_dataset'
table_id = 'my_table'
project_ref = {'projectId': project_id}
dataset_ref = {'datasetId': dataset_id,
'projectId': project_id}
table_ref = {'tableId': table_id,
'datasetId': dataset_id,
'projectId': project_id}
dataset = {'datasetReference': dataset_ref}
table = {'tableReference': table_ref}
table['schema'] = {'fields': [
{'name': 'id', 'type': 'string'},
...
]}
table = service.tables().insert(body = table, **dataset_ref).execute()
And then I want to insert a data into this table, so I tried to do like below.
fetch_list = []
patch = {'key': 'value'}
fetch_list.append(patch)
table = service.tables().patch(body = fetch_list, **table_ref).execute()
But nothing happened.
How can I update new data into bigquery table?
Please show me some example codes.
To append to or overwrite a table using query results, specify a destination table and set the write disposition to either: Append to table — Appends the query results to an existing table. Overwrite table — Overwrites an existing table with the same name using the query results.
The BigQuery data manipulation language (DML) enables you to update, insert, and delete data from your BigQuery tables. You can execute DML statements just as you would a SELECT statement, with the following conditions: You must use standard SQL.
Mechanism of Google BigQuery Streaming Insert Instead of using a job to load data into BigQuery, you can choose to stream your data into Google BigQuery with one record at a time by using the tabledata(). insertAll() method. This approach enables querying data without any delay in running a load job.
EDIT Nov 2018:
The answer of this question is outdated already as the google cloud client has evolved considerably since this last post.
The official docs contains all information needed already; here you can find everything needed for streaming insert and this one has a complete overview of all methods available so far (you'll also find Python code examples on each page and each method).
Original Answer:
There are a few different ways that you can use to insert data to BQ.
For a deeper understanding of how the python-api works, here's everything you'll need: bq-python-api (at first the docs are somewhat scary but after you get a hang of it it's rather quite simple).
There are 2 main methods that I use to insert data to BQ. The first one is data streaming and it's supposed to be used when you can insert row by row in a real time fashion. Code example:
import uuid
def stream_data(self, table, data, schema):
# first checks if table already exists. If it doesn't, then create it
r = self.service.tables().list(projectId=your_project_id,
datasetId=your_dataset_id).execute()
table_exists = [row['tableReference']['tableId'] for row in
r['tables'] if
row['tableReference']['tableId'] == table]
if not table_exists:
body = {
'tableReference': {
'tableId': table,
'projectId': your_project_id,
'datasetId': your_dataset_id
},
'schema': schema
}
self.service.tables().insert(projectId=your_project_id,
datasetId=your_dataset_id,
body=body).execute()
# with table created, now we can stream the data
# to do so we'll use the tabledata().insertall() function.
body = {
'rows': [
{
'json': data,
'insertId': str(uuid.uuid4())
}
]
}
self.service.tabledata().insertAll(projectId=your_project_id),
datasetId=your_dataset_id,
tableId=table,
body=body).execute(num_retries=5)
Here my self.service
is correspondent to your service
object.
An example of input data
that we have in our project:
data = {u'days_validated': '20', u'days_trained': '80', u'navigated_score': '1', u'description': 'First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5', u'init_cv_date': '2016-03-06', u'metric': 'rank', u'unix_date': '1461610020241117', u'purchased_score': '10', u'result': '0.32677139316724546', u'date': '2016-04-25', u'carted_score': '3', u'end_cv_date': '2016-03-25'}
And its correspondent schema
:
schema = {u'fields': [{u'type': u'STRING', u'name': u'date', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'unix_date', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'init_cv_date', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'end_cv_date', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'days_trained', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'days_validated', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'navigated_score', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'carted_score', u'mode': u'NULLABLE'}, {u'type': u'INTEGER', u'name': u'purchased_score', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'description', u'mode': u'NULLABLE'}, {u'type': u'STRING', u'name': u'metric', u'mode': u'NULLABLE'}, {u'type': u'FLOAT', u'name': u'result', u'mode': u'NULLABLE'}]}
The other way to insert data is to use the job insert function. As you can see in the documentation, it accepts several sources for your data. I have an example of how you can do so by loading the results of a query into another table:
def create_table_from_query(self,
query,
dest_table,
how):
body = {
'configuration': {
'query': {
'destinationTable': {
'projectId': your_project_id,
'tableId': dest_table,
'datasetId': your_dataset_id
},
'writeDisposition': how,
'query': query,
},
}
}
response = self.connector.jobs().insert(projectId=self._project_id,
body=body).execute()
self.wait_job_completion(response['jobReference']['jobId'])
def wait_job_completion(self, job_id):
while True:
response = self.connector.jobs().get(projectId=self._project_id,
jobId=job_id).execute()
if response['status']['state'] == 'DONE':
return
The how
input accepts the available options for this field in the documentation (such as "WRITE_TRUNCATE", or "WRITE_APPEND").
You can load the data from a csv file for instance, in this case, the configuration
variable would be defined something along the lines:
"configuration": {
"load": {
"fieldDelimiter": "\t"
"sourceFormat": "CSV"
"destinationTable": {
"projectId": your_project_id,
"tableId": table_id,
"datasetId": your_dataset_id
},
"writeDisposition": "WRITE_TRUNCATE"
"schema": schema,
"sourceUris": file_location_in_google_cloud_storage
},
}
(Using as example a csv file delimited by tabs. It could be a json file as well, the documentation will walk you through the available options).
Running jobs() require some time for it to complete (that's why we created the wait_job_completion
method). It should be cheaper though as compared to real time streaming.
Any questions let us know,
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With