Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Go + Apache Beam GCP Dataflow: Could not find the sink for pubsub, Check that the sink library specifies alwayslink = 1

I am using the Go SDK with Apache Beam to build a simple Dataflow pipeline that will get data from a query and publish the data to pub/sub with the following code:

package main

import (
    "context"
    "flag"
    "github.com/apache/beam/sdks/go/pkg/beam"
    "github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio"
    "github.com/apache/beam/sdks/go/pkg/beam/log"
    "github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts"
    "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
    "gitlab.com/bq-to-pubsub/infra/env"
    "gitlab.com/bq-to-pubsub/sources"
    "gitlab.com/bq-to-pubsub/sources/pp"
)

func main() {
    flag.Parse()
    ctx := context.Background()
    beam.Init()
    log.Info(ctx, "Creating new pipeline")
    pipeline, scope := beam.NewPipelineWithRoot()
    project := gcpopts.GetProject(ctx)

    ppData := pp.Query(scope, project)
    ppMessages := beam.ParDo(scope, pp.ToByteArray, ppData)
    pubsubio.Write(scope, "project", "topic", ppMessages)

    if err := beamx.Run(ctx, pipeline); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }
}

While my pipeline is running on Google Cloud Dataflow, I get the following error:

Workflow failed. Causes: S01:Source pp/bigquery.Query/Impulse+Source pp/bigquery.Query/bigqueryio.queryFn+pp.ToByteArray+pubsubio.Write/External failed., The job failed because a work item has failed 4 times. Look in previous log entries for the cause of each one of the 4 failures. For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors. The work item was attempted on these workers: pp10112132-vhzf-harness-p8v0 Root cause: Could not find the sink for pubsub, Check that the sink library specifies alwayslink = 1., pp10112132-vhzf-harness-p8v0 Root cause: Could not find the sink for pubsub, Check that the sink library specifies alwayslink = 1., pp10112132-vhzf-harness-p8v0 Root cause: Could not find the sink for pubsub, Check that the sink library specifies alwayslink = 1., pp10112132-vhzf-harness-p8v0 Root cause: Could not find the sink for pubsub, Check that the sink library specifies alwayslink = 1.

I have read this thread but I am not sure how it was resolved.

Any idea?

like image 572
PavlMits Avatar asked Oct 20 '21 19:10

PavlMits


1 Answers

Is the job running in Streaming mode or Batch mode? I'd guess Batch mode. It might be the Dataflow internal runner used for batch mode doesn't link in the pub sub sink.

Unfortunately at this time, the Go SDK doesn't provide a local "fallback" for writing to pubsub that the batch runner can use instead.

That said, you should be unblocked pretty easily if you write your own DoFn to write to PubSub using the standard Go package. https://pkg.go.dev/cloud.google.com/go/pubsub#hdr-Publishing

Roughly what you should write would look like the following.

var (
  // Assuming everything is one project
  clientOnce sync.Once
  pubSubClient pubsub.Client
)

type PubSubSinkFn struct{
  Project, Topic string // Whatever configuration you need

  client pubsub.Client  // Client is safe to use on multiple goroutines
  batch []*myMessages   // per bundle batches.
}

func (fn *PubSubSinkFn) Setup(ctx context.Context) {
   clientOnce.Do (... ) // create the client with the sync.Once so it can be shared by all bundles
   fn.client = pubSubClient
}

func (fn *PubSubSinkFn) ProcessElement(ctx context.Context, v *myMessage) {
  fn.batch = append(fn.batch, v)
  if len(fn.batch) > batchSize { // or whatever criteria you want
     fn.publishBatch()
  }
}

func (fn *PubSubSinkFn) FinishBundle() {
  fn.publishBatch()
}

func (fn *PubSubSinkFn) publishBatch() {
  // use fn.client to publish the batch
  fn.batch = nil
}

// When constructing your pipeline
beam.ParDo0(s, &PubSubSinkFn{Project: "foo", Topic: "bar"}, messages)
like image 52
Robert Burke Avatar answered Oct 18 '22 05:10

Robert Burke