Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka: Cant Create Multiple Stream Consumers

I just got up and running with Kafka 0.8 beta 1. I have a really simple example up and running, the problem is, I can only get one message consumer to work, not several. That is, the runSingleWorker() method WORKS. The run() method DOES NOT WORK:

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.consumer.ConsumerConfig;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.Map;
import java.util.List;
import java.util.HashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import com.truecar.inventory.worker.core.application.config.AppConfig;

public class ConsumerThreadPool {

    private final ConsumerConnector consumer;
    private final String topic;

    private ExecutorService executor;
    private static ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class);

    public ConsumerThreadPool(String topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector((ConsumerConfig)context.getBean("consumerConfig"));
        this.topic = topic;
    }

    public void shutdown() {
        if (consumer != null) consumer.shutdown();
        if (executor != null) executor.shutdown();
    }

    public void run(Integer numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

        topicCountMap.put(topic, numThreads);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> topicListeners = consumerMap.get(topic);

        executor = Executors.newFixedThreadPool(numThreads);

        for(Integer i = 0; i < numThreads; i++ ){
            KafkaStream<byte[], byte[]> stream =  topicListeners.get(i);
            executor.submit(new Consumer(stream, i));
        }
    }


    public void runSingleWorker(Integer numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

        topicCountMap.put(topic, new Integer(1));

        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

        KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while(true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            while(it.hasNext()){
                System.out.println(new String(it.next().message()));

            }
        }
    }
}

And inside my toy consumer:

import kafka.consumer.KafkaStream;
import kafka.consumer.ConsumerIterator;

public class Consumer implements Runnable {

    private KafkaStream kafkaStream;
    private Integer threadNumber;

    public Consumer(KafkaStream kafkaStream, Integer threadNumber) {
        this.threadNumber = threadNumber;
        this.kafkaStream = kafkaStream;
    }

    public void run() {
        ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
        System.out.println("Created iterator " + it.toString() + " thread number " + threadNumber);
        while(true) {

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                break;
            }

            while(it.hasNext()) {
                System.out.println("Thread " + threadNumber + ": " + new String(it.next().message()));
            }
        }
        System.out.println("Shutting down Thread: " + threadNumber);
    }
}

The problem is, the pool of workers does not pick up messages:

Created iterator empty iterator thread number 3
Created iterator empty iterator thread number 6
Created iterator empty iterator thread number 9
Created iterator empty iterator thread number 7
Created iterator empty iterator thread number 0
Created iterator empty iterator thread number 0
Created iterator empty iterator thread number 8
Created iterator empty iterator thread number 3
etc...

When I add messages via the produce command line, the messages are printed under the single threaded worker version, but messages are not printed under the multi-stream situation. Whats going on here? How can I fix this?

Btw, the pom.xml for kafka 0.8 is not a valid pom and will not acquire dependencies, so here is a pom with complete dependencies.

<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
    http://maven.apache.org/POM/4.0.0
    http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>group1</groupId>
<artifactId>artifact1</artifactId>
<version>0.1.0</version>
<packaging>jar</packaging>
<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <org.springframework.version>3.2.4.RELEASE</org.springframework.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-core</artifactId>
        <version>3.2.4.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>3.2.4.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.9.2</artifactId>
        <version>0.8.0-beta1</version>
    </dependency>
    <dependency>
        <groupId>javax.inject</groupId>
        <artifactId>javax.inject</artifactId>
        <version>1</version>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.9.2</version>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
    <dependency>
        <groupId>com.101tec</groupId>
        <artifactId>zkclient</artifactId>
        <version>0.3</version>
    </dependency>
    <dependency>
        <groupId>com.yammer.metrics</groupId>
        <artifactId>metrics-core</artifactId>
        <version>2.2.0</version>
    </dependency>
</dependencies>
<build>
    <finalName>inventory-core</finalName>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.0</version>
            <configuration>
                <source>1.7</source>
                <target>1.7</target>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-jar-plugin</artifactId>
            <configuration>
                <archive>
                    <manifest>
                        <mainClass>com.truecar.inventory.worker.core.application.Starter</mainClass>
                    </manifest>
                </archive>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.dstovall</groupId>
            <artifactId>onejar-maven-plugin</artifactId>
            <version>1.4.4</version>
            <executions>
                <execution>
                    <configuration>
                        <onejarVersion>0.97</onejarVersion>
                        <classifier>onejar</classifier>
                    </configuration>
                    <goals>
                        <goal>one-jar</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
<pluginRepositories>
    <pluginRepository>
        <id>onejar-maven-plugin.googlecode.com</id>
        <url>http://onejar-maven-plugin.googlecode.com/svn/mavenrepo</url>
    </pluginRepository>
</pluginRepositories>
</project>
like image 271
David Williams Avatar asked Aug 27 '13 23:08

David Williams


1 Answers

Maybe too late for questioner but could be useful to other developers. Seems you have used only one Partition for couple of Consumers – that's wrong. Quote from Documentation:

Since there are many partitions this still balances the load over many consumer instances. Note however that there cannot be more consumer instances than partitions.

So when you think about Consumers you should think how to divide messages by partitions. In most cases you should use some high-level grouping to it or even let it be random by default.

like image 184
asktomsk Avatar answered Oct 07 '22 14:10

asktomsk