Rick

Rick
Rick

Friday, May 19, 2017

Writing Kafka Java Producers and Kafka Java Consumers

Kafka Tutorial: Writing a Kafka Producer in Java

In this tutorial, we are going to create simple Java example that creates a Kafka producer. You create a new replicated Kafka topic called my-example-topic, then you create a Kafka producer that uses this topic to send records. You will send records with the Kafka producer. You will send records synchronously. Later, you will send records asynchronously.

Before you start

Prerequisites to this tutorial are Kafka from the command line and Kafka clustering and failover basics.
This tutorial is part of a series. If you are not sure what Kafka is, you should start with What is Kafka?. If you are unfamiliar with the architecture of Kafka then we suggest reading Kafka ArchitectureKafka Topics ArchitectureKafka Producer Architecture and Kafka Consumer Architecture.


Create Replicated Kafka Topic

Next, you need to create a replicated topic.

~/kafka-training/lab3/create-topic.sh

#!/usr/bin/env bash
cd ~/kafka-training

## Create topics
kafka/bin/kafka-topics.sh --create \
    --replication-factor 3 \
    --partitions 13 \
    --topic my-example-topic \
    --zookeeper  localhost:2181


## List created topics
kafka/bin/kafka-topics.sh --list \
    --zookeeper localhost:2181


Above we create a topic named my-example-topic with 13 partitions and a replication factor of 3. Then we list the Kafka topics.
Runs create-topic.sh as follows.

Output from running create-topic.sh

~/kafka-training/lab3
$ ./create-topic.sh
Created topic "my-example-topic".
__consumer_offsets
my-example-topic
my-failsafe-topic


Gradle Build Script

For this example, we use gradle to build the project.

~/kafka-training/lab3/solution/build.gradle

group 'cloudurable-kafka'
version '1.0-SNAPSHOT'
apply plugin: 'java'
sourceCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencies {
    compile 'org.apache.kafka:kafka-clients:0.10.2.0'
    compile 'ch.qos.logback:logback-classic:1.2.2'
}

Notice that we import the jar file kafka-clients:0.10.2.0. Apache Kafka uses sl4j so to setup logging we use logback (ch.qos.logback:logback-classic:1.2.2).

Construct a Kafka Producer

To create a Kafka producer, you will need to pass it a list of bootstrap servers (a list of Kafka brokers). You will also specify a client.id that uniquely identifies this Producer client. In this example, we are going to send messages with ids. The message body is a string, so we need a record value serializer as we will send the message body in the Kafka’s records value field. The message id (long), will be sent as the Kafka’s records key. You will need to specify a Key serializer and a value serializer, which Kafka will use to encode the message id as a Kafka record key, and the message body as the Kafka record value.


Common Kafka imports and constants

Next, we will import the Kafka packages and define a constant for the topic and a constant to define the list of bootstrap servers that the producer will connect.

KafkaProducerExample.java - imports and constants

~/kafka-training/lab3/src/main/java/com/cloudurable/kafka/KafkaProducerExample.java


package com.cloudurable.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class KafkaProducerExample {

    private final static String TOPIC = "my-example-topic";
    private final static String BOOTSTRAP_SERVERS =
            "localhost:9092,localhost:9093,localhost:9094";


Notice that KafkaProducerExample imports LongSerializer which gets configured as the Kafka record key serializer, and imports StringSerializer which gets configured as the record value serializer. The constant BOOTSTRAP_SERVERS is set tolocalhost:9092,localhost:9093,localhost:9094 which is the three Kafka servers that we started up in the last lesson. Go ahead and make sure all three Kafka servers are running. The constant TOPIC is set to the replicated Kafka topic that we just created.

Create Kafka Producer to send records

Now, that we imported the Kafka classes and defined some constants, let’s create a Kafka producer.

KafkaProducerExample.java - Create Producer to send Records

~/kafka-training/lab3/src/main/java/com/cloudurable/kafka/KafkaProducerExample.java

public class KafkaProducerExample {
    ...
    private static Producer<Long, String> createProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                                            BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                                        LongSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                                    StringSerializer.class.getName());
        return new KafkaProducer<>(props);
    }

To create a Kafka producer, you use java.util.Properties and define certain properties that we pass to the constructor of a KafkaProducer.
Above KafkaProducerExample.createProducer sets the BOOTSTRAP_SERVERS_CONFIG (“bootstrap.servers) property to the list of broker addresses we defined earlier. BOOTSTRAP_SERVERS_CONFIG value is a comma separated list of host/port pairs that the Producer uses to establish an initial connection to the Kafka cluster. The producer uses of all servers in the cluster no matter which ones we list here. This list only specifies the initial Kafka brokers used to discover the full set of servers of the Kafka cluster. If a server in this list is down, the producer will just go to the next broker in the list to discover the full topology of the Kafka cluster.
The CLIENT_ID_CONFIG (“client.id”) is an id to pass to the server when making requests so the server can track the source of requests beyond just IP/port by passing a producer name for things like server-side request logging.
The KEY_SERIALIZER_CLASS_CONFIG (“key.serializer”) is a Kafka Serializer class for Kafka record keys that implements the Kafka Serializer interface. Notice that we set this to LongSerializer as the message ids in our example are longs.
The VALUE_SERIALIZER_CLASS_CONFIG (“value.serializer”) is a Kafka Serializer class for Kafka record values that implements the Kafka Serializer interface. Notice that we set this to StringSerializer as the message body in our example are strings.

Send records synchronously with Kafka Producer

Kafka provides a synchronous send method to send a record to a topic. Let’s use this method to send some message ids and messages to the Kafka topic we created earlier.

KafkaProducerExample.java - Send Records Synchronously

~/kafka-training/lab3/src/main/java/com/cloudurable/kafka/KafkaProducerExample.java

public class KafkaProducerExample {
  ...

  static void runProducer(final int sendMessageCount) throws Exception {
      final Producer<Long, String> producer = createProducer();
      long time = System.currentTimeMillis();

      try {
          for (long index = time; index < time + sendMessageCount; index++) {
              final ProducerRecord<Long, String> record =
                      new ProducerRecord<>(TOPIC, index,
                                  "Hello Mom " + index);

              RecordMetadata metadata = producer.send(record).get();

              long elapsedTime = System.currentTimeMillis() - time;
              System.out.printf("sent record(key=%s value=%s) " +
                              "meta(partition=%d, offset=%d) time=%d\n",
                      record.key(), record.value(), metadata.partition(),
                      metadata.offset(), elapsedTime);

          }
      } finally {
          producer.flush();
          producer.close();
      }
  }
  ...


The above just iterates through a for loop, creating a ProducerRecord sending an example message ("Hello Mom " + index) as the record value and the for loop index as the record key. For each iteration, runProducer calls the send method of the producer (RecordMetadata metadata = producer.send(record).get()). The send method returns a Java Future.
The response RecordMetadata has ‘partition’ where the record was written and the ‘offset’ of the record in that partition.
Notice the call to flush and close. Kafka will auto flush on its own, but you can also call flush explicitly which will send the accumulated records now. It is polite to close the connection when we are done.

Running the Kafka Producer

Next you define the main method.

KafkaProducerExample.java - Running the Producer

~/kafka-training/lab3/src/main/java/com/cloudurable/kafka/KafkaProducerExample.java

public static void main(String... args) throws Exception {
    if (args.length == 0) {
        runProducer(5);
    } else {
        runProducer(Integer.parseInt(args[0]));
    }
}
The main method just calls runProducer.

Send records asynchronously with Kafka Producer

Kafka provides an asynchronous send method to send a record to a topic. Let’s use this method to send some message ids and messages to the Kafka topic we created earlier. The big difference here will be that we use a lambda expression to define a callback.

KafkaProducerExample.java - Send Records Asynchronously with Kafka Producer

~/kafka-training/lab3/src/main/java/com/cloudurable/kafka/KafkaProducerExample.java

static void runProducer(final int sendMessageCount) throws InterruptedException {
    final Producer<Long, String> producer = createProducer();
    long time = System.currentTimeMillis();
    final CountDownLatch countDownLatch = new CountDownLatch(sendMessageCount);

    try {
        for (long index = time; index < time + sendMessageCount; index++) {
            final ProducerRecord<Long, String> record =
                    new ProducerRecord<>(TOPIC, index, "Hello Mom " + index);
            producer.send(record, (metadata, exception) -> {
                long elapsedTime = System.currentTimeMillis() - time;
                if (metadata != null) {
                    System.out.printf("sent record(key=%s value=%s) " +
                                    "meta(partition=%d, offset=%d) time=%d\n",
                            record.key(), record.value(), metadata.partition(),
                            metadata.offset(), elapsedTime);
                } else {
                    exception.printStackTrace();
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await(25, TimeUnit.SECONDS);
    }finally {
        producer.flush();
        producer.close();
    }
}

Notice the use of a CountDownLatch so we can send all N messages and then wait for them all to send.

Async Interface Callback and Async Send Method

Kafka defines a Callback interface that you use for asynchronous operations. The callback interface allows code to execute when the request is complete. The callback executes in a background I/O thread so it should be fast (don’t block it). The onCompletion(RecordMetadata metadata, Exception exception) gets called when the asynchronous operation completes. The metadata gets set (not null) if the operation was a success, and the exception gets set (not null) if the operation had an error.
The async send method is used to send a record to a topic, and the provided callback gets called when the send is acknowledged. The send method is asynchronous, and when called returns immediately once the record gets stored in the buffer of records waiting to post to the Kafka broker. The send method allows sending many records in parallel without blocking to wait for the response after each one.
Since the send call is asynchronous it returns a Future for the RecordMetadata that will be assigned to this record. Invoking get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record. KafkaProducer

Conclusion Kafka Producer example

We created a simple example that creates a Kafka Producer. First, we created a new replicated Kafka topic; then we created Kafka Producer in Java that uses the Kafka replicated topic to send records. We sent records with the Kafka Producer using async and sync send methods.
We hope you enjoyed this article. Please provide feedback. See the Kafka trainingKafka consultingKafka support and helps setting up Kafka clusters in AWS.

Review Kafka Producer

What does the Callback lambda do?

The callback gets notified when the request is complete.

What will happen if the first server is down in the bootstrap list? Can the producer still connect to the other Kafka brokers in the cluster?

The producer will try to contact the next broker in the list. Any of the brokers once contacted, will let the producer know about the entire Kafka cluster. The Producer will connect as long as at least one of the brokers in the list is running. If you have 100 brokers and two of the brokers in a list of three servers in the bootstrap list are down, the producer can still use the 98 remaining brokers.

When would you use Kafka async send vs. sync send?

If you were already using an async code (Akka, QBit, Reakt, Vert.x) base, and you wanted to send records quickly.

Why do you need two serializers for a Kafka record?

One of the serializers is for the Kafka record key, and the other is for the Kafka record value.

Kafka Tutorial: Writing a Kafka Consumer in Java

In this tutorial, you are going to create simple Kafka Consumer. This consumer consumes messages from the Kafka Producer you wrote in the last tutorial. This tutorial demonstrates how to process records from a Kafka topic with a Kafka Consumer.
This tutorial describes how Kafka Consumers in the same group divide up and share partitions while each consumer group appears to get its own copy of the same data.

Construct a Kafka Consumer

Just like we did with the producer, you need to specify bootstrap servers. You also need to define a group.id that identifies which consumer group this consumer belongs. Then you need to designate a Kafka record key deserializer and a record value deserializer. Then you need to subscribe the consumer to the topic you created in the producer tutorial.

Kafka Consumer imports and constants

Next, you import the Kafka packages and define a constant for the topic and a constant to set the list of bootstrap servers that the consumer will connect.

KafkaConsumerExample.java - imports and constants

~/kafka-training/lab4/src/main/java/com/cloudurable/kafka/KafkaConsumerExample.java


package com.cloudurable.kafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;


import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {

    private final static String TOPIC = "my-example-topic";
    private final static String BOOTSTRAP_SERVERS =
            "localhost:9092,localhost:9093,localhost:9094";
    ...
}


Notice that KafkaConsumerExample imports LongDeserializer which gets configured as the Kafka record key deserializer, and imports StringDeserializer which gets set up as the record value deserializer. The constant BOOTSTRAP_SERVERS gets set to localhost:9092,localhost:9093,localhost:9094 which is the three Kafka servers that we started up in the last lesson. Go ahead and make sure all three Kafka servers are running. The constant TOPIC gets set to the replicated Kafka topic that you created in the last tutorial.

Create Kafka Consumer using Topic to Receive Records

Now, that you imported the Kafka classes and defined some constants, let’s create the Kafka consumer.

KafkaConsumerExample.java - Create Consumer to process Records

~/kafka-training/lab4/src/main/java/com/cloudurable/kafka/KafkaConsumerExample.java


public class KafkaConsumerExample {
  ...

  private static Consumer<Long, String> createConsumer() {
      final Properties props = new Properties();
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                                  BOOTSTRAP_SERVERS);
      props.put(ConsumerConfig.GROUP_ID_CONFIG,
                                  "KafkaExampleConsumer");
      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
              LongDeserializer.class.getName());
      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
              StringDeserializer.class.getName());

      // Create the consumer using props.
      final Consumer<Long, String> consumer =
                                  new KafkaConsumer<>(props);

      // Subscribe to the topic.
      consumer.subscribe(Collections.singletonList(TOPIC));
      return consumer;
  }
  ...
}

To create a Kafka consumer, you use java.util.Properties and define certain properties that we pass to the constructor of a KafkaConsumer.
Above KafkaConsumerExample.createConsumer sets the BOOTSTRAP_SERVERS_CONFIG (“bootstrap.servers”) property to the list of broker addresses we defined earlier. BOOTSTRAP_SERVERS_CONFIG value is a comma separated list of host/port pairs that the Consumer uses to establish an initial connection to the Kafka cluster. Just like the producer, the consumer uses of all servers in the cluster no matter which ones we list here.
The GROUP_ID_CONFIG identifies the consumer group of this consumer.
The KEY_DESERIALIZER_CLASS_CONFIG (“key.deserializer”) is a Kafka Deserializer class for Kafka record keys that implements the Kafka Deserializer interface. Notice that we set this to LongDeserializer as the message ids in our example are longs.
The VALUE_DESERIALIZER_CLASS_CONFIG (“value.deserializer”) is a Kafka Serializer class for Kafka record values that implements the Kafka Deserializer interface. Notice that we set this to StringDeserializer as the message body in our example are strings.
Important notice that you need to subscribe the consumer to the topic consumer.subscribe(Collections.singletonList(TOPIC));. The subscribe method takes a list of topics to subscribe to, and this list will replace the current subscriptions if any.

Process messages from Kafka with Consumer

Now, let’s process some records with our Kafka Producer.

KafkaConsumerExample.java - Process records from Consumer

~/kafka-training/lab4/src/main/java/com/cloudurable/kafka/KafkaConsumerExample.java

public class KafkaConsumerExample {
  ...


    static void runConsumer() throws InterruptedException {
        final Consumer<Long, String> consumer = createConsumer();

        final int giveUp = 100;   int noRecordsCount = 0;

        while (true) {
            final ConsumerRecords<Long, String> consumerRecords =
                    consumer.poll(1000);

            if (consumerRecords.count()==0) {
                noRecordsCount++;
                if (noRecordsCount > giveUp) break;
                else continue;
            }

            consumerRecords.forEach(record -> {
                System.out.printf("Consumer Record:(%d, %s, %d, %d)\n",
                        record.key(), record.value(),
                        record.partition(), record.offset());
            });

            consumer.commitAsync();
        }
        consumer.close();
        System.out.println("DONE");
    }
}

Notice you use ConsumerRecords which is a group of records from a Kafka topic partition. The ConsumerRecords class is a container that holds a list of ConsumerRecord(s) per partition for a particular topic. There is one ConsumerRecord list for every topic partition returned by a the consumer.poll().
Notice if you receive records (consumerRecords.count()!=0), then runConsumer method calls consumer.commitAsync() which commit offsets returned on the last call to consumer.poll(…) for all the subscribed list of topic partitions.

Kafka Consumer Poll method

The poll method returns fetched records based on current partition offset. The poll method is a blocking method waiting for specified time in seconds. If no records are available after the time period specified, the poll method returns an empty ConsumerRecords.
When new records become available, the poll method returns straight away.
You can can control the maximum records returned by the poll() with props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);. The poll method is not thread safe and is not meant to get called from multiple threads.

Running the Kafka Consumer

Next you define the main method.

KafkaConsumerExample.java - Running the Consumer

~/kafka-training/lab4/src/main/java/com/cloudurable/kafka/KafkaConsumerExample.java


public class KafkaConsumerExample {

  public static void main(String... args) throws Exception {
      runConsumer();
  }
}

The main method just calls runConsumer.

Try running the consumer and producer

Run the consumer from your IDE. Then run the producer from the last tutorial from your IDE. You should see the consumer get the records that the producer sent.

Logging set up for Kafka

If you don’t set up logging well, it might be hard to see the consumer get the messages.
Kafka like most Java libs these days uses sl4j. You can use Kafka with Log4j, Logback or JDK logging. We used logback in our gradle build (compile 'ch.qos.logback:logback-classic:1.2.2').

~/kafka-training/lab4/solution/src/main/resources/logback.xml

<configuration>

    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>


    <logger name="org.apache.kafka" level="INFO"/>
    <logger name="org.apache.kafka.common.metrics" level="INFO"/>

    <root level="debug">
        <appender-ref ref="STDOUT" />
    </root>
</configuration>

Notice that we set org.apache.kafka to INFO, otherwise we will get a lot of log messages. You should run it set to debug and read through the log messages. It gives you a flavor of what Kafka is doing under the covers. Leave org.apache.kafka.common.metrics or what Kafka is doing under the covers is drowned by metrics logging.

Try this: Three Consumers in same group and one Producer sending 25 messages

Run the consumer example three times from your IDE. Then change Producer to send 25 records instead of 5. Then run the producer once from your IDE. What happens? The consumers should share the messages.

Producer Output

sent record(key=1495048417121 value=..) meta(partition=6, offset=16) time=118
sent record(key=1495048417131 value=..) meta(partition=6, offset=17) time=120
sent record(key=1495048417133 value=..) meta(partition=12, offset=17) time=120
sent record(key=1495048417140 value=..) meta(partition=12, offset=18) time=121
sent record(key=1495048417143 value=..) meta(partition=12, offset=19) time=121
sent record(key=1495048417123 value=..) meta(partition=0, offset=19) time=121
sent record(key=1495048417126 value=..) meta(partition=0, offset=20) time=121
sent record(key=1495048417134 value=..) meta(partition=0, offset=21) time=122
sent record(key=1495048417122 value=..) meta(partition=3, offset=19) time=122
sent record(key=1495048417127 value=..) meta(partition=3, offset=20) time=122
sent record(key=1495048417139 value=..) meta(partition=3, offset=21) time=123
sent record(key=1495048417142 value=..) meta(partition=3, offset=22) time=123
sent record(key=1495048417136 value=..) meta(partition=10, offset=19) time=127
sent record(key=1495048417144 value=..) meta(partition=1, offset=26) time=128
sent record(key=1495048417125 value=..) meta(partition=5, offset=22) time=128
sent record(key=1495048417138 value=..) meta(partition=5, offset=23) time=128
sent record(key=1495048417128 value=..) meta(partition=8, offset=21) time=129
sent record(key=1495048417124 value=..) meta(partition=11, offset=18) time=129
sent record(key=1495048417130 value=..) meta(partition=11, offset=19) time=129
sent record(key=1495048417132 value=..) meta(partition=11, offset=20) time=130
sent record(key=1495048417141 value=..) meta(partition=11, offset=21) time=130
sent record(key=1495048417145 value=..) meta(partition=11, offset=22) time=131
sent record(key=1495048417129 value=..) meta(partition=2, offset=24) time=132
sent record(key=1495048417135 value=..) meta(partition=2, offset=25) time=132
sent record(key=1495048417137 value=..) meta(partition=2, offset=26) time=132
Notice the producer sends 25 messages.

Consumer 0 in same group

Consumer Record:(1495048417121, Hello Mom 1495048417121, 6, 16)
Consumer Record:(1495048417131, Hello Mom 1495048417131, 6, 17)
Consumer Record:(1495048417125, Hello Mom 1495048417125, 5, 22)
Consumer Record:(1495048417138, Hello Mom 1495048417138, 5, 23)
Consumer Record:(1495048417128, Hello Mom 1495048417128, 8, 21)

Consumer 1 in same group

Consumer Record:(1495048417123, Hello Mom 1495048417123, 0, 19)
Consumer Record:(1495048417126, Hello Mom 1495048417126, 0, 20)
Consumer Record:(1495048417134, Hello Mom 1495048417134, 0, 21)
Consumer Record:(1495048417144, Hello Mom 1495048417144, 1, 26)
Consumer Record:(1495048417122, Hello Mom 1495048417122, 3, 19)
Consumer Record:(1495048417127, Hello Mom 1495048417127, 3, 20)
Consumer Record:(1495048417139, Hello Mom 1495048417139, 3, 21)
Consumer Record:(1495048417142, Hello Mom 1495048417142, 3, 22)
Consumer Record:(1495048417129, Hello Mom 1495048417129, 2, 24)
Consumer Record:(1495048417135, Hello Mom 1495048417135, 2, 25)
Consumer Record:(1495048417137, Hello Mom 1495048417137, 2, 26)

Consumer 2 in same group

Consumer Record:(1495048417136, Hello Mom 1495048417136, 10, 19)
Consumer Record:(1495048417133, Hello Mom 1495048417133, 12, 17)
Consumer Record:(1495048417140, Hello Mom 1495048417140, 12, 18)
Consumer Record:(1495048417143, Hello Mom 1495048417143, 12, 19)
Consumer Record:(1495048417124, Hello Mom 1495048417124, 11, 18)
Consumer Record:(1495048417130, Hello Mom 1495048417130, 11, 19)
Consumer Record:(1495048417132, Hello Mom 1495048417132, 11, 20)
Consumer Record:(1495048417141, Hello Mom 1495048417141, 11, 21)
Consumer Record:(1495048417145, Hello Mom 1495048417145, 11, 22)
Can you answer these questions?

Which consumer owns partition 10?

How many ConsumerRecords objects did Consumer 0 get?

What is the next offset from Partition 11 that Consumer 2 should get?

Why does each consumer get unique messages?

Which consumer owns partition 10?

Consumer 2 owns partition 10.

How many ConsumerRecords objects did Consumer 0 get?

3

What is the next offset from Partition 11 that Consumer 2 should get?

22

Why does each consumer get unique messages?

Each gets its share of partitions for the topic.

Try this: Three Consumers in different Consumer group and one Producer sending 5 messages

Modify the consumer, so each consumer processes will have a unique group id.
Stop all consumers and producers processes from the last run.
Then execute the consumer example three times from your IDE. Then change producer to send five records instead of 25. Then run the producer once from your IDE. What happens? The consumers should each get a copy of the messages.
First, let’s modify the Consumer to make their group id unique as follows:

KafkaConsumerExample - Make the Consumer group id unique

~/kafka-training/lab4/src/main/java/com/cloudurable/kafka/KafkaConsumerExample.java

public class KafkaConsumerExample {

    private final static String TOPIC = "my-example-topic";
    private final static String BOOTSTRAP_SERVERS =
            "localhost:9092,localhost:9093,localhost:9094";


    private static Consumer<Long, String> createConsumer() {
        final Properties props = new Properties();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                                    BOOTSTRAP_SERVERS);

        props.put(ConsumerConfig.GROUP_ID_CONFIG,
                                    "KafkaExampleConsumer" +
                                            System.currentTimeMillis());

                                            ...
  }
...
}
Notice, to make the group id unique you just add System.currentTimeMillis() to it.

Producer Output

sent record(key=1495049585396 value=..) meta(partition=7, offset=30) time=134
sent record(key=1495049585392 value=..) meta(partition=4, offset=24) time=138
sent record(key=1495049585393 value=..) meta(partition=4, offset=25) time=139
sent record(key=1495049585395 value=..) meta(partition=4, offset=26) time=139
sent record(key=1495049585394 value=..) meta(partition=11, offset=25) time=140
Notice the producer sends 25 messages.

Consumer 0 in own group

Consumer Record:(1495049585396, Hello Mom 1495049585396, 7, 30)
Consumer Record:(1495049585394, Hello Mom 1495049585394, 11, 25)
Consumer Record:(1495049585392, Hello Mom 1495049585392, 4, 24)
Consumer Record:(1495049585393, Hello Mom 1495049585393, 4, 25)
Consumer Record:(1495049585395, Hello Mom 1495049585395, 4, 26)

Consumer 1 in unique consumer group

Consumer Record:(1495049585396, Hello Mom 1495049585396, 7, 30)
Consumer Record:(1495049585394, Hello Mom 1495049585394, 11, 25)
Consumer Record:(1495049585392, Hello Mom 1495049585392, 4, 24)
Consumer Record:(1495049585393, Hello Mom 1495049585393, 4, 25)
Consumer Record:(1495049585395, Hello Mom 1495049585395, 4, 26)

Consumer 2 in its own consumer group

Consumer Record:(1495049585396, Hello Mom 1495049585396, 7, 30)
Consumer Record:(1495049585394, Hello Mom 1495049585394, 11, 25)
Consumer Record:(1495049585392, Hello Mom 1495049585392, 4, 24)
Consumer Record:(1495049585393, Hello Mom 1495049585393, 4, 25)
Consumer Record:(1495049585395, Hello Mom 1495049585395, 4, 26)
Can you answer these questions?

Which consumer owns partition 10?

How many ConsumerRecords objects did Consumer 0 get?

What is the next offset from Partition 11 that Consumer 2 should get?

Why does each consumer get unique messages?

Which consumer owns partition 10?

They all do! Since they are all in a unique consumer group, and there is only one consumer in each group, then each consumer we ran owns all of the partitions.

How many ConsumerRecords objects did Consumer 0 get?

3

What is the next offset from Partition 11 that Consumer 2 should get?

26

Why does each consumer get the same messages?

They do because they are each in their own consumer group, and each consumer group is a subscription to the topic.

Conclusion Kafka Consumer example

You created a simple example that creates a Kafka consumer to consume messages from the Kafka Producer you created in the last tutorial. We used the replicated Kafka topic from producer lab. You created a Kafka Consumer that uses the topic to receive messages. The Kafka consumer uses the poll method to get N number of records.
Consumers in the same group divide up and share partitions as we demonstrated by running three consumers in the same group and one producer. Each consumer groups gets a copy of the same data. More precise, each consumer group really has a unique set of offset/partition pairs per.

Review Kafka Consumer

How did we demonstrate Consumers in a Consumer Group dividing up topic partitions and sharing them?

We ran three consumers in the same consumer group, and then sent 25 messages from the producer. We saw that each consumer owned a set of partitions.

How did we demonstrate Consumers in different Consumer Groups each getting their own offsets?

We ran three consumers each in its own unique consumer group, and then sent 5 messages from the producer. We saw that each consumer owned every partition.

How many records does poll get?

However many you set in with props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); in the properties that you pass to KafkaConsumer.

Does a call to poll ever get records from two different partitions?

No

We hope you enjoyed this article. Please provide feedback. See the Kafka trainingKafka consultingKafka support and helps setting up Kafka clusters in AWS.
Kafka and Cassandra support, training for AWS EC2 Cassandra 3.0 Training