Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Stream BigQuery table into Google Pub/Sub

I have a Google bigQuery Table and I want to stream the entire table into pub-sub Topic

what should be the easy/fast way to do it?

Thank you in advance,

like image 573
MoShe Avatar asked Dec 25 '16 14:12

MoShe


People also ask

Can Pubsub write to BigQuery?

The Pub/Sub Subscription to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub subscription and writes them to a BigQuery table. You can use the template as a quick solution to move Pub/Sub data to BigQuery.

Does BigQuery support streaming inserts?

Streaming is not available through the free tier. If you attempt to use streaming without enabling billing, you receive the following error: BigQuery: Streaming insert is not allowed in the free tier.


1 Answers

2019 update:

Now it's really easy with a click-to-bigquery option in Pub/Sub:

enter image description here

Find it on: https://console.cloud.google.com/cloudpubsub/topicList


The easiest way I know of is going through Google Cloud Dataflow, which natively knows how to access BigQuery and Pub/Sub.

In theory it should be as easy as the following Python lines:

p = beam.Pipeline(options=pipeline_options)
tablerows = p | 'read' >> beam.io.Read(
  beam.io.BigQuerySource('clouddataflow-readonly:samples.weather_stations'))
tablerows | 'write' >> beam.io.Write(
  beam.io.PubSubSink('projects/fh-dataflow/topics/bq2pubsub-topic'))

This combination of Python/Dataflow/BigQuery/PubSub doesn't work today (Python Dataflow is in beta, but keep an eye on the changelog).

We can do the same with Java, and it works well - I just tested it. It runs either locally, and also in the hosted Dataflow runner:

Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

PCollection<TableRow> weatherData = p.apply(
        BigQueryIO.Read.named("ReadWeatherStations").from("clouddataflow-readonly:samples.weather_stations"));
weatherData.apply(ParDo.named("tableRow2string").of(new DoFn<TableRow, String>() {
    @Override
    public void processElement(DoFn<TableRow, String>.ProcessContext c) throws Exception {
        c.output(c.element().toString());
    }
})).apply(PubsubIO.Write.named("WriteToPubsub").topic("projects/myproject/topics/bq2pubsub-topic"));

p.run();

Test if the messages are there with:

gcloud --project myproject beta pubsub subscriptions  pull --auto-ack sub1

Hosted Dataflow screenshot:

Hosted Dataflow at work

like image 181
Felipe Hoffa Avatar answered Nov 09 '22 09:11

Felipe Hoffa