Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Initiating and reading from multiple streams with the BigQuery Storage API (Beta)

The BigQuery Storage API (https://googleapis.github.io/google-cloud-python/latest/bigquery_storage/gapic/v1beta1/api.html) is incredibly useful for reading data from a BigQuery table almost 10x faster than the standard BigQuery API. To make it even faster, it supports multiple read streams, each which reads a dynamically allocated set of rows from the relevant table.

My problem is this: Although you may request a number of streams, the allocated streams after the request is not within your control. As such, I have not been able to initiate more than 1 stream.

The data I'm reading consists of 3 columns and 6 million rows as you can see below. I print the total number of streams created to the console.

from google.cloud import bigquery_storage_v1beta1

project_id = 'myproject'
client = bigquery_storage_v1beta1.BigQueryStorageClient()

table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = "bigquery-public-data"
table_ref.dataset_id = "usa_names"
table_ref.table_id = "usa_1910_current"

read_options = bigquery_storage_v1beta1.types.TableReadOptions()
read_options.selected_fields.append("year")
read_options.selected_fields.append("name")
read_options.selected_fields.append("number")

# I request 3 streams to be created!
requested_streams = 3  

parent = "projects/{}".format(project_id)
session = client.create_read_session(
    table_ref, parent, table_modifiers=modifiers, read_options=read_options, 
    requested_streams=requested_streams
)  

response = client.batch_create_read_session_streams(session, requested_streams)

# I see only 1 stream being created.
print("Streams created: " + str(len(session.streams)))
print("Stream names array: " + str(session.streams))


reader = client.read_rows(
    bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[0])
)

rows = reader.rows(session)

names = set()

import time
start = time.time()
#---------------------------------------------------
i=0
for row in rows:
    i += 1
    names.add(row["name"])
    if i > 6000000:
        break
#---------------------------------------------------    
end = time.time()
print(end - start)
print("Got {} unique names and {} total rows.".format(len(names), i))

I have a few questions:

1) Am I only seeing 1 stream because the multi-stream implementation isn't complete (The API is in Beta release)?

2) Am I seeing only 1 stream because the data is relatively "small" for the stream allocation algorithm? 6m rows are sizeable already.

3) If I were to start seeing multiple streams created, the API documentation doesn't describe how to read from these in parallel. Any thoughts on how to do this?

like image 890
Eben du Toit Avatar asked May 18 '19 06:05

Eben du Toit


People also ask

How get data from API to BigQuery?

We can load data into BigQuery directly using API call or can create CSV file and then load into BigQuery table. Create a Python script to extract data from API URL and load (UPSERT mode) into BigQuery table. Here UPSERT is nothing but Update and Insert operations.

What is BigQuery storage write API?

The BigQuery Storage Write API is a unified data-ingestion API for BigQuery. It combines streaming ingestion and batch loading into a single high-performance API.


1 Answers

The issue is the table you're reading from only having a single input file available. While it has 6 million rows, the data is highly compressible and as such there's only a single backing columnar file for the data. Currently, the storage API will not split data more granularly than this.

You would see the same thing (there's only a single input) if you examine a query plan that SELECTs from this table.

like image 105
shollyman Avatar answered Oct 15 '22 08:10

shollyman