Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Read Avro File and Write it into BigQuery table

My objective is to read the avro file data from Cloud storage and write it to BigQuery table using Java. It would be good if some one provide the code snipet/ideas to read avro format data and write it to BigQuery table using Cloud Dataflow.

like image 458
lourdu rajan Avatar asked Jan 27 '23 18:01

lourdu rajan


1 Answers

I see two possible approaches:

  1. Using Dataflow:
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
    Pipeline p = Pipeline.create(options);

    // Read an AVRO file.
    // Alternatively, read the schema from a file.
    // https://beam.apache.org/releases/javadoc/2.11.0/index.html?org/apache/beam/sdk/io/AvroIO.html
    Schema avroSchema = new Schema.Parser().parse(
        "{\"type\": \"record\", "
            + "\"name\": \"quote\", "
            + "\"fields\": ["
            + "{\"name\": \"source\", \"type\": \"string\"},"
            + "{\"name\": \"quote\", \"type\": \"string\"}"
            + "]}");
    PCollection<GenericRecord> avroRecords = p.apply(
        AvroIO.readGenericRecords(avroSchema).from("gs://bucket/quotes.avro"));

    // Convert Avro GenericRecords to BigQuery TableRows.
    // It's probably better to use Avro-generated classes instead of manually casting types.
    // https://beam.apache.org/documentation/io/built-in/google-bigquery/#writing-to-bigquery
    PCollection<TableRow> bigQueryRows = avroRecords.apply(
        MapElements.into(TypeDescriptor.of(TableRow.class))
            .via(
                (GenericRecord elem) ->
                    new TableRow()
                        .set("source", ((Utf8) elem.get("source")).toString())
                        .set("quote", ((Utf8) elem.get("quote")).toString())));

    // https://cloud.google.com/bigquery/docs/schemas
    TableSchema bigQuerySchema =
        new TableSchema()
            .setFields(
                ImmutableList.of(
                    new TableFieldSchema()
                        .setName("source")
                        .setType("STRING"),
                    new TableFieldSchema()
                        .setName("quote")
                        .setType("STRING")));

    bigQueryRows.apply(BigQueryIO.writeTableRows()
        .to(new TableReference()
            .setProjectId("project_id")
            .setDatasetId("dataset_id")
            .setTableId("avro_source"))
        .withSchema(bigQuerySchema)
        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));

    p.run().waitUntilFinish();
  1. Import data into BigQuery directly without Dataflow. See this documentation: https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro
like image 58
Udi Meiri Avatar answered Feb 28 '23 15:02

Udi Meiri