How to Effectively Use ExecutorService in Kafka Consumers

Apache Kafka is one of today’s most commonly used event streaming platforms. While using the Kafka platform, quite often, we run into a scenario where we have to process a large number of events/messages that are placed on a broker. Traditional approaches, where a consumer is listening to a topic and then processes these message within the consumer itself, can become a performance bottleneck if the number of messages being placed on the topic is high. In such cases, the rate at which a consumer can process messages will be very low, as there are a large number of messages getting placed on the topic. A potential solution that can be applied in such a scenario is to offload message processing to the worker threads in a thread pool.

In this section, we will take a look into how a Kafka consumer can offload its work to a thread pool. We will leverage Java’s ExecutorService framework to create a thread pool.

This approach primarily involves two steps. The first step is to create a KafkaConsumer that can read messages from a topic. Once the messages are read, they are delivered to a threadpool for further processing. The second step is to create worker threads that perform further processing of each message.

Step 1, Kafka Consumer Implementation: Here, we read the messages from a topic and dispatch the messages to a thread pool created using ThreadPoolExecutorService.

public class KafkaProcessor {
    private final KafkaConsumer<String, String> myConsumer;
    private ExecutorService executor;
    private static final Properties KAFKA_PROPERTIES = new Properties();
    static {
        KAFKA_PROPERTIES.put("bootstrap.servers", "localhost:9092");
        KAFKA_PROPERTIES.put("group.id", "test-consumer-group");
        KAFKA_PROPERTIES.put("enable.auto.commit", "true");
        KAFKA_PROPERTIES.put("auto.commit.interval.ms", "1000");
        KAFKA_PROPERTIES.put("session.timeout.ms", "30000");
        KAFKA_PROPERTIES.put("key.deserializer", "org.apache.kafka.common.serialization.StngDeserializer");'        
        KAFKA_PROPERTIES.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
   }
    public KafkaProcessor() {
        this.myConsumer = new KafkaConsumer<>(KAFKA_PROPERTIES);
        this.myConsumer.subscribe(Arrays.asList("testTopic"));
    }
    public void init(int numberOfThreads) {
      //Create a threadpool
      executor = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 0L, TimeUnit.MILLISECONDS,
               new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
    while (true) {
        ConsumerRecords<String, String> records = myConsumer.poll(100);
        for (final ConsumerRecord<String, String> record : records) {
        executor.submit(new KafkaRecordHandler(record)); 
    }
   }
}
public void shutdown() {
        if (myConsumer != null) {
        myConsumer.close();
        }
        if (executor != null) {
        executor.shutdown();
        }
        try {
          if (executor != null && !executor.awaitTermination(60, TimeUnit.MILLISECONDS)) {
          executor.shutdownNow();
          }
        }catch (InterruptedException e) {
        executor.shutdownNow();
        }
  }
}

Step 2, Worker Thread(Message/Record Handler) Implementation: Here, we perform further processing of the messages.

public class KafkaRecordHandler implements Runnable {
private ConsumerRecord<String, String> record;
    public KafkaRecordHandler(ConsumerRecord<String, String> record) {
    this.record = record;
    }
    @Override
    public void run() { // this is where further processing happens
        System.out.println("value = "+record.value());
        System.out.println("Thread id = "+ Thread.currentThread().getId());
    }
}

The final step is to create a KafkaConsumer (KafkaProcessor) and specify the number of worker threads through the init() method.

public class ConsumerTest {
    public static void main(String[] args) {
      KafkaProcessor processor = new KafkaProcessor();
      try {
          processor.init(5);
      }catch (Exception exp) {
          processor.shutdown();
      }
    }
}

This approach might not be needed/suitable for all scenarios. You have to carefully evaluate the best approach to be used with your Kafka consumer implementation.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s