I am using the Go SDK with Apache Beam to build a simple Dataflow pipeline that will get data from a query and publish the data to pub/sub with the following code:
package main
import (
"context"
"flag"
"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio"
"github.com/apache/beam/sdks/go/pkg/beam/log"
"github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts"
"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
"gitlab.com/bq-to-pubsub/infra/env"
"gitlab.com/bq-to-pubsub/sources"
"gitlab.com/bq-to-pubsub/sources/pp"
)
func main() {
flag.Parse()
ctx := context.Background()
beam.Init()
log.Info(ctx, "Creating new pipeline")
pipeline, scope := beam.NewPipelineWithRoot()
project := gcpopts.GetProject(ctx)
ppData := pp.Query(scope, project)
ppMessages := beam.ParDo(scope, pp.ToByteArray, ppData)
pubsubio.Write(scope, "project", "topic", ppMessages)
if err := beamx.Run(ctx, pipeline); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}
While my pipeline is running on Google Cloud Dataflow, I get the following error:
Workflow failed. Causes: S01:Source pp/bigquery.Query/Impulse+Source pp/bigquery.Query/bigqueryio.queryFn+pp.ToByteArray+pubsubio.Write/External failed., The job failed because a work item has failed 4 times. Look in previous log entries for the cause of each one of the 4 failures. For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors. The work item was attempted on these workers: pp10112132-vhzf-harness-p8v0 Root cause: Could not find the sink for pubsub, Check that the sink library specifies alwayslink = 1., pp10112132-vhzf-harness-p8v0 Root cause: Could not find the sink for pubsub, Check that the sink library specifies alwayslink = 1., pp10112132-vhzf-harness-p8v0 Root cause: Could not find the sink for pubsub, Check that the sink library specifies alwayslink = 1., pp10112132-vhzf-harness-p8v0 Root cause: Could not find the sink for pubsub, Check that the sink library specifies alwayslink = 1.
I have read this thread but I am not sure how it was resolved.
Any idea?
Is the job running in Streaming mode or Batch mode? I'd guess Batch mode. It might be the Dataflow internal runner used for batch mode doesn't link in the pub sub sink.
Unfortunately at this time, the Go SDK doesn't provide a local "fallback" for writing to pubsub that the batch runner can use instead.
That said, you should be unblocked pretty easily if you write your own DoFn to write to PubSub using the standard Go package. https://pkg.go.dev/cloud.google.com/go/pubsub#hdr-Publishing
Roughly what you should write would look like the following.
var (
// Assuming everything is one project
clientOnce sync.Once
pubSubClient pubsub.Client
)
type PubSubSinkFn struct{
Project, Topic string // Whatever configuration you need
client pubsub.Client // Client is safe to use on multiple goroutines
batch []*myMessages // per bundle batches.
}
func (fn *PubSubSinkFn) Setup(ctx context.Context) {
clientOnce.Do (... ) // create the client with the sync.Once so it can be shared by all bundles
fn.client = pubSubClient
}
func (fn *PubSubSinkFn) ProcessElement(ctx context.Context, v *myMessage) {
fn.batch = append(fn.batch, v)
if len(fn.batch) > batchSize { // or whatever criteria you want
fn.publishBatch()
}
}
func (fn *PubSubSinkFn) FinishBundle() {
fn.publishBatch()
}
func (fn *PubSubSinkFn) publishBatch() {
// use fn.client to publish the batch
fn.batch = nil
}
// When constructing your pipeline
beam.ParDo0(s, &PubSubSinkFn{Project: "foo", Topic: "bar"}, messages)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With