Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

kafka: what do 'soTimeout', 'bufferSize' and 'minBytes' mean for SimpleConsumer?

I'm using Kafka 0.8.2.1 SimpleConsumer . Could somebody clarify the meaning of a few config parameters for the SimpleConsumer and FetchRequestBuilder? Short of reading KAfka's source code, I could not find any docs on then. (I tried posting this question to the kafka user group - but no luck):

-- Q1: in the signature of the SimpleConsumer constructor I see the Int 'soTimeout' parameter - what is the meaning of this timeout? Is this a timeout to connect to the Kafka broker? Timeout on getting a response from any [or specific??] request to Kafka (like FetchRequest)? Something else?

kafka.javaapi.consumer.SimpleConsumer
    (val host: String,
     val port: Int,
     val soTimeout: Int,
     val bufferSize: Int,
     val clientId: String)

-- Q2: also, SimpleConsumer constructor takes Int 'bufferSize' parameter. What is the meaning of it? Is this how many bytes SimpleConsumer will read when a fetchRequest is issued? Or is it the max number of bytes read per one fetch from Kafka - and multiple fetches will happen if more data is available?

-- When building FetchRequest via FetchRequestBuilder (see below), I also need to specify 'fetchSize':

FetchRequest req= newFetchRequestBuilder ()
  .clientId(kafkaGroupId)
  .addFetch(topic, partition, offset, fetchSizeInBytes)
  .build();

Looking at the source code of the FetchRequestBuilder , I think (I'm not a Scala pro) those calls translate into the below method calls - and there the final parameter passed into the FetchRequest is called 'minBytes', hinting that this is not the exact fetch size, possibly? . Does it mean it won't even fetch anything unless the is at least 'minBytes' of data available?

class FetchRequestBuilder():
    def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int)

    def build() = {
      val fetchRequest= FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap)

FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
    correlationId: Int = FetchRequest.DefaultCorrelationId,
    clientId: String = ConsumerConfig.DefaultClientId,
    replicaId: Int = Request.OrdinaryConsumerId,
    maxWait: Int = FetchRequest.DefaultMaxWait,
    **minBytes: Int = FetchRequest.DefaultMinBytes**,
...)

So, my final question is:

-- Q3: how do 'bufferSize' and 'fetchSize/minBytes' relate? What exactly do they define? Do I have to make sure one is smaller or grater than the other?

Thanks,

Marina

like image 279
Marina Avatar asked Oct 31 '22 02:10

Marina


1 Answers

soTimeout is the time in milliseconds to wait for a connection to the given broker. I don't know that anything special happens with the connection other than you get validation that there's a broker over there that's ready to perform some subsequent actions.

I believe that the bufferSize used in the constructor is the size of the buffer used by the client-side socket to receive data sent by the broker.

For your last question, if the total number of bytes returned for whatever reason by a fetch request is larger than the requested socket buffer size then there needs to be more than one lower level call to retrieve all of the data, even though there is a single higher-level fetch call.

like image 144
Chris Gerken Avatar answered Nov 17 '22 11:11

Chris Gerken