Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to consume continuous streaming data in Snowflake connector for KAFKA [closed]

Anyone can help me in consumption of data in continuous data which is getting streamed. What should be given in the snowflake connector for the topics?

As I am able to populate the data through individual table data with required topic name. But I have requirement for the capturing continuous data streaming into the table.

like image 349
Austin Jackson Avatar asked Oct 07 '21 19:10

Austin Jackson


People also ask

Can Kafka connector directly load data from Kafka topics to staging table in Snowflake?

The Snowflake Connector for Kafka (“Kafka connector”) reads data from one or more Apache Kafka topics and loads the data into a Snowflake table.

Can Snowflake handle streaming data?

Snowflake can ingest streaming data through the Snowflake Connector for Kafka. In addition, Snowflake Snowpipe can help organizations seamlessly load continuously generated data into Snowflake.


2 Answers

To stream data to Snowflake, we utilize Apache NiFi to pull from Kafka, modify/transform, save to S3 bucket, and then kick off a snowpipe (using a NiFi processor) to ingest to Snowflake tables. But once in Snowflake, we utilize dbt to more sql like transformations and enrichments.

like image 187
Jason Zondor Avatar answered Jan 04 '23 02:01

Jason Zondor


From the documentation:

The Kafka connector completes the following process to subscribe to Kafka topics and create Snowflake objects:

The Kafka connector subscribes to one or more Kafka topics based on the configuration information provided via the Kafka configuration file or command line (or the Confluent Control Center; Confluent only).

The connector creates the following objects for each topic:

One internal stage to temporarily store data files for each topic.

One pipe to ingest the data files for each topic partition.

One table for each topic. If the table specified for each topic does not exist, the connector creates it; otherwise, the connector creates the RECORD_CONTENT and RECORD_METADATA columns in the existing table and verifies that the other columns are nullable (and produces an error if they are not).

Ingestion then proceeds as follows:

  1. One or more applications publish JSON or Avro records to a Kafka cluster. The records are split into one or more topic partitions.

  2. The Kafka connector buffers messages from the Kafka topics. When a threshold (time or memory or number of messages) is reached, the connector writes the messages to a temporary file in the internal stage. The connector triggers Snowpipe to ingest the temporary file. Snowpipe copies a pointer to the data file into a queue.

  3. A Snowflake-provided virtual warehouse loads data from the staged file into the target table (i.e. the table specified in the configuration file for the topic) via the pipe created for the Kafka topic partition.

  4. The connector monitors Snowpipe and deletes each file in the internal stage after confirming that the file data was loaded into the table. If a failure prevented the data from loading, the connector moves the file into the table stage and produces an error message.

  5. The connector repeats steps 2-4.

https://docs.snowflake.com/en/user-guide/kafka-connector-overview.html

like image 36
Robert Long Avatar answered Jan 04 '23 00:01

Robert Long