I'm using Kafka 0.8.2.1 SimpleConsumer . Could somebody clarify the meaning of a few config parameters for the SimpleConsumer and FetchRequestBuilder? Short of reading KAfka's source code, I could not find any docs on then. (I tried posting this question to the kafka user group - but no luck):
-- Q1: in the signature of the SimpleConsumer constructor I see the Int 'soTimeout' parameter - what is the meaning of this timeout? Is this a timeout to connect to the Kafka broker? Timeout on getting a response from any [or specific??] request to Kafka (like FetchRequest)? Something else?
kafka.javaapi.consumer.SimpleConsumer
(val host: String,
val port: Int,
val soTimeout: Int,
val bufferSize: Int,
val clientId: String)
-- Q2: also, SimpleConsumer constructor takes Int 'bufferSize' parameter. What is the meaning of it? Is this how many bytes SimpleConsumer will read when a fetchRequest is issued? Or is it the max number of bytes read per one fetch from Kafka - and multiple fetches will happen if more data is available?
-- When building FetchRequest via FetchRequestBuilder (see below), I also need to specify 'fetchSize':
FetchRequest req= newFetchRequestBuilder ()
.clientId(kafkaGroupId)
.addFetch(topic, partition, offset, fetchSizeInBytes)
.build();
Looking at the source code of the FetchRequestBuilder , I think (I'm not a Scala pro) those calls translate into the below method calls - and there the final parameter passed into the FetchRequest is called 'minBytes', hinting that this is not the exact fetch size, possibly? . Does it mean it won't even fetch anything unless the is at least 'minBytes' of data available?
class FetchRequestBuilder():
def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int)
def build() = {
val fetchRequest= FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap)
FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
correlationId: Int = FetchRequest.DefaultCorrelationId,
clientId: String = ConsumerConfig.DefaultClientId,
replicaId: Int = Request.OrdinaryConsumerId,
maxWait: Int = FetchRequest.DefaultMaxWait,
**minBytes: Int = FetchRequest.DefaultMinBytes**,
...)
So, my final question is:
-- Q3: how do 'bufferSize' and 'fetchSize/minBytes' relate? What exactly do they define? Do I have to make sure one is smaller or grater than the other?
Thanks,
Marina
soTimeout is the time in milliseconds to wait for a connection to the given broker. I don't know that anything special happens with the connection other than you get validation that there's a broker over there that's ready to perform some subsequent actions.
I believe that the bufferSize used in the constructor is the size of the buffer used by the client-side socket to receive data sent by the broker.
For your last question, if the total number of bytes returned for whatever reason by a fetch request is larger than the requested socket buffer size then there needs to be more than one lower level call to retrieve all of the data, even though there is a single higher-level fetch call.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With