Wednesday, May 8, 2024

Spring Boot + Kafka Example (Single and Multiple Consumer) in Java

Hello guys, if you want to use Apache Kafka with Spring Framework or Spring Boot and looking for an example then you have come to the right place. In the past, I have shared the best Spring Boot courses as well as best Apache Kafka courses for Java developers, and in this article, I am going to share how to use Apache Kafka with Spring Boot. You will learn both, to publish and consume messages from Apache Kafka topics in your Spring boot application. By the way, if you don't know, Apache Kafka is an open-source stream-processing software platform developed by LinkedIn and later they donated to Apache software foundation, king of open source development in Java world. 

Apache Kafka framework is designed for performance and it is developed using Java and Scala. Real-time data processing can be done using Apache Kafka. 

Earlier, I have talked about Apache Kafka architecture and how Apache Kafka works and in this tutorial, we will discuss the spring boot and Kafka example with single/multiple consumers. 

So let's have a look into this. 


Apache Kafka + Spring Boot Example

So let's have an example to discuss Apache Kafka with an example of configuring multiple Kafka consumers and producers.

Before that, you need to have the following dependencies.

Spring-boot-starter-web  - for the rest of API's

Spring-Kafka - Contains spring classes, interfaces, and annotations for interacting with Kafka broker and other messaging functionalities.

pom.xml file looks like below.

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>




1. Kafka Topic Configurations

We are creating two topics, test-log, and user-log.

 test-log - Used for publishing simple string messages

 user-log - Used for serialized user objects.

The TopicConfiguration.java file looks like below.

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaAdmin;

@Configuration
public class TopicConfiguration
{
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

@Value(value = "${general.topic.name}")
private String topicName;

@Value(value = "${user.topic.name}")
private String userTopicName;

@Bean
public NewTopic generalTopic() {
return TopicBuilder.name(topicName)
.partitions(1)
.replicas(1)
.build();
}

@Bean
public KafkaAdmin kafkaAdmin()
{
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}

@Bean
public NewTopic userTopic() {
return TopicBuilder.name(userTopicName)
.partitions(1)
.replicas(1)
.build();
}
}

Here, Spring Kafka is responsible for adding topics for all beans of type NewTopic. We can also add the number of partitions and replicas for the topic. Spring boot is capable of creating the kafkaAdmin() bean for us by using @Bean annotation as seen in the code. 


How to use Kafka with Spring Boot in Java




2. Message Consumers Configuration.

Here, we are creating two consumers who are listening to two topics(test-log and user-log) that we have created earlier. Kafka multiple configurations include the following classes. 

ConsumerConfig - Hold consumer configuration keys.

ConcurrentKafkaListenerContainerFactory - This is used to build ConcurrentMessageListenerContainer. 

DefaultKafkaConsumerFactory - Used to create new consumer instances where all consumers share common configuration properties mentioned in this bean.

Once these beans are available in the bean factory, POJO-based consumers can be configured using @KafkaListener annotation.


@EnableKafka
@Configuration
public class KafkaConsumerConfig {

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
groupId);
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}



2. Producing Messages

In order to produce messages, we need to configure a ProductFactory. A single instance throughout an application context will give high performance as the producer instances are thread-safe.


@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}

Spring Boot + Kafka Example (Single and Multiple Consumer) in Java


2. Publishing Messages
You can send messages using KafkaTemplate class. This class is similar to JdbcTemplate and RestTemplate from Spring framework which simplifies database access and REST API access from Spring Framework. 
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String msg) {
kafkaTemplate.send(topicName, msg);
}

We can implement multiple listeners for a topic and each with a different group Id. This allows to one consumer can listen to messages from various topics. 
 @KafkaListener(topics = "test-log, user-log", groupId = "abc")

The @Header annotation in the listener can also be used to retrieve one or more message headers in Spring:
@KafkaListener(topics = "topicName")
public void listenWithHeaders(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println(
"Received Message: " + message"
+ "from partition: " + partition);
}



Where to use Apache Kafka?

One of the most popular tools for working with streaming data is Apache Kafka which is an open-source streaming platform. Kafka is distributed, which means that it can be scaled up when needed.  All you need to do is add new nodes to the Kafka cluster. The best places where you should use apache Kafka is given below.

1. Real-time data processing

At present, most of the system's requirement is to process the data as soon as they are available. As an example, in predictive maintenance, the models should constantly analyze streams of metrics from the working equipment and trigger alarms immediately when the deviations are detected. 

So Kafka can be useful as it is able to transmit data from procedures to data handlers and then to the data storage.


2. Login and monitoring system

Apache Kafka can be used in logging and monitoring systems. The logs can be stored in an Apache Kafka cluster for sometimes and it is possible to build pipelines that consist of several producers/consumers where the logs are transformed in a certain way. This process is used in real-time monitoring systems as this can be used to read from the Kafka topics.


3. Application Activity Tracking

The whole purpose of Apache Kafka comes to develop is this. Initially, LinkedIn developed the Apache Kafka to track down the clicks, likes, time spend on several pages by the users who will be sent to Kafka topics. Other applications can subscribe to Kafka topics and process the received data for several purposes. 


That's all about Spring Boot and Apache Kafka example. This is a great set of technologies and eveyr Java programmer should spend some time learning these two technologies. Apache Kafka is a strategic messaging technolgoy and more and more companies are using Apache kafka for messaging. In this tutorial, we learned to create multiple topics using TopicBuilder API. Then configure one consumer and one producer per created topic. 


Other Spring Framework articles you may like to explore 


    Thanks for reading this article so far. If you find this Spring Framework and Apache Kafka tutorial useful, please share them with your friends and colleagues. If you have any questions or feedback, then please drop a note.

    1 comment:

    1. Does Kafka support transactions? How long does the message Stays in broker? What will happen if Producer of Consumer Process die? Does it will result in message loss?

      ReplyDelete