I am trying to use the TimeBasedPartitioner of the Confluent S3 sink. Here is my config:
{
"name":"s3-sink",
"config":{
"connector.class":"io.confluent.connect.s3.S3SinkConnector",
"tasks.max":"1",
"file":"test.sink.txt",
"topics":"xxxxx",
"s3.region":"yyyyyy",
"s3.bucket.name":"zzzzzzz",
"s3.part.size":"5242880",
"flush.size":"1000",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"format.class":"io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class":"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"timestamp.extractor":"Record",
"timestamp.field":"local_timestamp",
"path.format":"YYYY-MM-dd-HH",
"partition.duration.ms":"3600000",
"schema.compatibility":"NONE"
}
}
The data is binary and I use an avro scheme for it. I would want to use the actual record field "local_timestamp" which is a UNIX timestamp to partition the data, say into hourly files.
I start the connector with the usual REST API call
curl -X POST -H "Content-Type: application/json" --data @s3-config.json http://localhost:8083/connectors
Unfortunately the data is not partitioned as I wish. I also tried to remove the flush size because this might interfere. But then I got the error
{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nMissing required configuration \"flush.size\" which has no default value.\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}%
Any idea how to properly set the TimeBasedPartioner? I could not find a working example.
Also how can one debug such a problem or gain further insight what the connector is actually doing?
Greatly appreciate any help or further suggestions.
The Kafka Connect JDBC Sink connector allows you to export data from Apache Kafka® topics to any relational database with a JDBC driver. This connector can support a wide variety of databases. The connector polls data from Kafka to write to the database based on the topics subscription.
The Amazon S3 Sink connector exports data from Apache Kafka® topics to S3 objects in either Avro, JSON, or Bytes formats. Depending on your environment, the S3 connector can export data by guaranteeing exactly-once delivery semantics to consumers of the S3 objects it produces.
Kafka Connect is a mandatory piece to build a complete and flexible data streaming platform.
Products from several companies use 1.4 US gallons (5.3 l) to 1.0 US gallon (3.8 l) per flush.
After studying the code at TimeBasedPartitioner.java and the logs with
confluent log connect tail -f
I realized that both timezone and locale are mandatory, although this is not specified as such in the Confluent S3 Connector documentation. The following config fields solve the problem and let me upload the records properly partitioned to S3 buckets:
"flush.size": "10000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "US",
"timezone": "UTC",
"partition.duration.ms": "3600000",
"timestamp.extractor": "RecordField",
"timestamp.field": "local_timestamp",
Note two more things: First a value for flush.size is also necessary, files are partitioned eventually into smaller chunks, not larger than specified by flush.size. Second, the path.format is better selected as displayed above so a proper tree structure is generated.
I am still not 100% sure if really the record field local_timestamp is used to partition the records.
Any comments or improvements are greatly welcome.
Indeed your amended configuration seems correct.
Specifically, setting timestamp.extractor
to RecordField
allows you to partition your files based on the timestamp field that your records have and which you identify by setting the property timestamp.field
.
When instead one sets timestamp.extractor=Record
, then a time-based partitioner will use the Kafka timestamp for each record.
Regarding flush.size
, setting this property to a high value (e.g. Integer.MAX_VALUE
) will be practically synonymous to ignore it.
Finally, schema.generator.class
is no longer required in the most recent versions of the connector.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With