Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Properly Configuring Kafka Connect S3 Sink TimeBasedPartitioner

I am trying to use the TimeBasedPartitioner of the Confluent S3 sink. Here is my config:

{  
"name":"s3-sink",
"config":{  
    "connector.class":"io.confluent.connect.s3.S3SinkConnector",
    "tasks.max":"1",
    "file":"test.sink.txt",
    "topics":"xxxxx",
    "s3.region":"yyyyyy",
    "s3.bucket.name":"zzzzzzz",
    "s3.part.size":"5242880",
    "flush.size":"1000",
    "storage.class":"io.confluent.connect.s3.storage.S3Storage",
    "format.class":"io.confluent.connect.s3.format.avro.AvroFormat",
    "schema.generator.class":"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "timestamp.extractor":"Record",
    "timestamp.field":"local_timestamp",
    "path.format":"YYYY-MM-dd-HH",
    "partition.duration.ms":"3600000",
    "schema.compatibility":"NONE"
}

}

The data is binary and I use an avro scheme for it. I would want to use the actual record field "local_timestamp" which is a UNIX timestamp to partition the data, say into hourly files.

I start the connector with the usual REST API call

curl -X POST -H "Content-Type: application/json" --data @s3-config.json http://localhost:8083/connectors

Unfortunately the data is not partitioned as I wish. I also tried to remove the flush size because this might interfere. But then I got the error

{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nMissing required configuration \"flush.size\" which has no default value.\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}%

Any idea how to properly set the TimeBasedPartioner? I could not find a working example.

Also how can one debug such a problem or gain further insight what the connector is actually doing?

Greatly appreciate any help or further suggestions.

like image 371
Daniel Avatar asked Jan 06 '18 15:01

Daniel


People also ask

What is sink connector in Kafka?

The Kafka Connect JDBC Sink connector allows you to export data from Apache Kafka® topics to any relational database with a JDBC driver. This connector can support a wide variety of databases. The connector polls data from Kafka to write to the database based on the topics subscription.

What is S3 sink?

The Amazon S3 Sink connector exports data from Apache Kafka® topics to S3 objects in either Avro, JSON, or Bytes formats. Depending on your environment, the S3 connector can export data by guaranteeing exactly-once delivery semantics to consumers of the S3 objects it produces.

Is Kafka connect mandatory?

Kafka Connect is a mandatory piece to build a complete and flexible data streaming platform.

What is flush size?

Products from several companies use 1.4 US gallons (5.3 l) to 1.0 US gallon (3.8 l) per flush.


2 Answers

After studying the code at TimeBasedPartitioner.java and the logs with

confluent log connect tail -f

I realized that both timezone and locale are mandatory, although this is not specified as such in the Confluent S3 Connector documentation. The following config fields solve the problem and let me upload the records properly partitioned to S3 buckets:

"flush.size": "10000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "US",
"timezone": "UTC",
"partition.duration.ms": "3600000",
"timestamp.extractor": "RecordField",
"timestamp.field": "local_timestamp",

Note two more things: First a value for flush.size is also necessary, files are partitioned eventually into smaller chunks, not larger than specified by flush.size. Second, the path.format is better selected as displayed above so a proper tree structure is generated.

I am still not 100% sure if really the record field local_timestamp is used to partition the records.

Any comments or improvements are greatly welcome.

like image 184
Daniel Avatar answered Oct 24 '22 19:10

Daniel


Indeed your amended configuration seems correct.

Specifically, setting timestamp.extractor to RecordField allows you to partition your files based on the timestamp field that your records have and which you identify by setting the property timestamp.field.

When instead one sets timestamp.extractor=Record, then a time-based partitioner will use the Kafka timestamp for each record.

Regarding flush.size, setting this property to a high value (e.g. Integer.MAX_VALUE) will be practically synonymous to ignore it.

Finally, schema.generator.class is no longer required in the most recent versions of the connector.

like image 29
Konstantine Karantasis Avatar answered Oct 24 '22 18:10

Konstantine Karantasis