My current workflow in BigQuery is as follows:
(1) query data in a public repository (stored in the US), (2) write it to a table in my repository, (3) export a csv to a cloud bucket and (4) download the csv on the server I work on and (5) work with that on the server.
The problem I have now, is that the server I work on is located in EU. Thus, I have to pay quite some fees for transfering data between my US bucket and my EU server. I could now go ahead and locate my bucket in EU, but then I still have the problem that I would transfer data from the US (BigQuery) to EU (bucket). So I could also set my dataset in bq to be located in the EU, but then I cant do any queries anylonger, because the data in the public repository is located in the US, and queries between different locations are not allowed.
Does anyone have an idea of how to approach this?
One way to copy a BigQuery dataset from one region to another is to take advantage of the Storage Data Transfer Service. It doesn't get around the fact that you still have to pay for bucket-to-bucket network traffic, but might save you some CPU time on copying data to a server in the EU.
The flow would be to:
Python example:
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import sys
import time
import googleapiclient.discovery
from google.cloud import bigquery
import json
import pytz
PROJECT_ID = 'swast-scratch' # TODO: set this to your project name
FROM_LOCATION = 'US' # TODO: set this to the BigQuery location
FROM_DATASET = 'workflow_test_us' # TODO: set to BQ dataset name
FROM_BUCKET = 'swast-scratch-us' # TODO: set to bucket name in same location
TO_LOCATION = 'EU' # TODO: set this to the destination BigQuery location
TO_DATASET = 'workflow_test_eu' # TODO: set to destination dataset name
TO_BUCKET = 'swast-scratch-eu' # TODO: set to bucket name in destination loc
# Construct API clients.
bq_client = bigquery.Client(project=PROJECT_ID)
transfer_client = googleapiclient.discovery.build('storagetransfer', 'v1')
def extract_tables():
# Extract all tables in a dataset to a Cloud Storage bucket.
print('Extracting {}:{} to bucket {}'.format(
PROJECT_ID, FROM_DATASET, FROM_BUCKET))
tables = list(bq_client.list_tables(bq_client.dataset(FROM_DATASET)))
extract_jobs = []
for table in tables:
job_config = bigquery.ExtractJobConfig()
job_config.destination_format = bigquery.DestinationFormat.AVRO
extract_job = bq_client.extract_table(
table.reference,
['gs://{}/{}.avro'.format(FROM_BUCKET, table.table_id)],
location=FROM_LOCATION, # Available in 0.32.0 library.
job_config=job_config) # Starts the extract job.
extract_jobs.append(extract_job)
for job in extract_jobs:
job.result()
return tables
def transfer_buckets():
# Transfer files from one region to another using storage transfer service.
print('Transferring bucket {} to {}'.format(FROM_BUCKET, TO_BUCKET))
now = datetime.datetime.now(pytz.utc)
transfer_job = {
'description': '{}-{}-{}_once'.format(
PROJECT_ID, FROM_BUCKET, TO_BUCKET),
'status': 'ENABLED',
'projectId': PROJECT_ID,
'transferSpec': {
'transferOptions': {
'overwriteObjectsAlreadyExistingInSink': True,
},
'gcsDataSource': {
'bucketName': FROM_BUCKET,
},
'gcsDataSink': {
'bucketName': TO_BUCKET,
},
},
# Set start and end date to today (UTC) without a time part to start
# the job immediately.
'schedule': {
'scheduleStartDate': {
'year': now.year,
'month': now.month,
'day': now.day,
},
'scheduleEndDate': {
'year': now.year,
'month': now.month,
'day': now.day,
},
},
}
transfer_job = transfer_client.transferJobs().create(
body=transfer_job).execute()
print('Returned transferJob: {}'.format(
json.dumps(transfer_job, indent=4)))
# Find the operation created for the job.
job_filter = {
'project_id': PROJECT_ID,
'job_names': [transfer_job['name']],
}
# Wait until the operation has started.
response = {}
while ('operations' not in response) or (not response['operations']):
time.sleep(1)
response = transfer_client.transferOperations().list(
name='transferOperations', filter=json.dumps(job_filter)).execute()
operation = response['operations'][0]
print('Returned transferOperation: {}'.format(
json.dumps(operation, indent=4)))
# Wait for the transfer to complete.
print('Waiting ', end='')
while operation['metadata']['status'] == 'IN_PROGRESS':
print('.', end='')
sys.stdout.flush()
time.sleep(5)
operation = transfer_client.transferOperations().get(
name=operation['name']).execute()
print()
print('Finished transferOperation: {}'.format(
json.dumps(operation, indent=4)))
def load_tables(tables):
# Load all tables into the new dataset.
print('Loading tables from bucket {} to {}:{}'.format(
TO_BUCKET, PROJECT_ID, TO_DATASET))
load_jobs = []
for table in tables:
dest_table = bq_client.dataset(TO_DATASET).table(table.table_id)
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.AVRO
load_job = bq_client.load_table_from_uri(
['gs://{}/{}.avro'.format(TO_BUCKET, table.table_id)],
dest_table,
location=TO_LOCATION, # Available in 0.32.0 library.
job_config=job_config) # Starts the load job.
load_jobs.append(load_job)
for job in load_jobs:
job.result()
# Actually run the script.
tables = extract_tables()
transfer_buckets()
load_tables(tables)
The preceding sample uses google-cloud-bigquery library for BigQuery API and google-api-python-client for Storage Data Transfer API.
Note that this sample does not account for partitioned tables.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With