| 
  
   Size: 3179 
  
  Comment:  
 | 
    ← Revision 26 as of 2025-03-14 17:42:19  ⇥ 
  Size: 15763 
  
  Comment:  
 | 
| Deletions are marked like this. | Additions are marked like this. | 
| Line 1: | Line 1: | 
| <<TableOfContents(2)>> | 
|
| Line 8: | Line 10: | 
| * Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system. | * Publish and subscribe to streams of records (topics), similar to a message queue or enterprise messaging system. | 
| Line 12: | Line 14: | 
| Publisher/Subscriber, Observer pattern, Message queues. | '''Publisher/Subscriber, Observer pattern, Message queues.''' | 
| Line 17: | Line 19: | 
| * Each record consists of a key, a value, and a timestamp. | * Each record consists of a '''key, a value, and a timestamp'''. | 
| Line 28: | Line 30: | 
| Messaging traditionally has two models: queuing and publish-subscribe. In a queue, a pool of consumers may read from a server and each record goes to one of them; in publish-subscribe the record is broadcast to all consumers. By having a notion of parallelism—the partition—within the topics, Kafka is able to provide both ordering guarantees and load balancing over a pool of consumer processes. | Messaging traditionally has two models: queuing and publish-subscribe. In a queue, a pool of consumers may read from a server and each record goes to one of them; in publish-subscribe the record is broadcast to all consumers. By having a notion of parallelism—the partition—within the topics, Kafka is able to '''provide both ordering guarantees''' and load balancing over a pool of consumer processes. | 
| Line 32: | Line 34: | 
| Kafka works well as a replacement for a more traditional message broker. Kafka is comparable to traditional messaging systems such as ActiveMQ or RabbitMQ. | Kafka works well as a replacement for a more traditional message broker.  '''Kafka is comparable to traditional messaging systems such as [[ActiveMQ]] or [[RabbitMQ]]. ''' https://www.upsolver.com/blog/kafka-versus-rabbitmq-architecture-performance-use-case Kafka uses a '''pull model''', the Kafka broker waits for the consumer to ask for data. Kafka provides '''message ordering''' (stream/streaming). Kafka is a '''log''', which means that it '''retains messages by default'''. == JMS client == https://docs.confluent.io/current/clients/kafka-jms-client/index.html [[Java/JMS|JMS]] is a widely used messaging API that is included as part of the Java Platform, Enterprise Edition. Confluent JMS Client (kafka-jms-client) is an implementation of the [[Java/JMS|JMS]] 1.1 provider interface that allows Apache Kafka® or Confluent Platform to be used as a [[Java/JMS|JMS]] message broker. Kafka topics can mimic the behavior of either topics or queues in the traditional messaging system sense. Both [[Java/JMS|JMS]] messaging models are supported: Publish/Subscribe (Topics), Point-to-Point (Queues)  | 
| Line 57: | Line 75: | 
| }}} | # https://pypi.org/project/kafka/ }}} {{{#!highlight python #producer.py from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='localhost:9092',compression_type='gzip' ) for i in range(5): producer.send('test', b'some_message_bytes [%d]'%(i)) producer.flush() }}} {{{#!highlight python #consumer.py from kafka import KafkaConsumer consumer = KafkaConsumer('test',bootstrap_servers="localhost:9092") for msg in consumer: print("%s %d %s"%(msg.topic, msg.timestamp, msg.value)) }}} == Create queue adder for 2 consumers == * bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic adder Amount of partitions equals the amount of consumers. {{{#!highlight python #consumer_adder.py from kafka import KafkaConsumer import json import sys topic='adder' consumer = KafkaConsumer('%s-%s'%(topic,sys.argv[1]),bootstrap_servers="localhost:9092") print consumer.partitions_for_topic(topic) for msg in consumer: vals = json.loads(msg.value) print("%s %d %s sum: %d"%(msg.topic, msg.timestamp, msg.value, vals['op1']+vals['op2'] )) }}} {{{#!highlight python #producer_adder.py from kafka import KafkaProducer import json producer = KafkaProducer(bootstrap_servers='localhost:9092',compression_type='gzip' ) topic='adder' parts = producer.partitions_for(topic) amount_partitions = len(parts) for i in range(10000): vals = {'op1':i,'op2':i} #print('adder-%d'%(i%2)) producer.send('%s-%d'%(topic,i%amount_partitions), value=b'%s'%( json.dumps(vals) ) ) }}} == List topics using zookeeper == ZooKeeper is a high-performance coordination service for distributed applications. The name space provided by ZooKeeper is much like that of a standard file system. * https://zookeeper.apache.org/doc/r3.3.3/index.html {{{ bin/zookeeper-shell.sh localhost:2181 ls /config/topics [adder-0, adder, adder-1, test, __consumer_offsets] quit }}} * pip install kazoo {{{#!highlight python from kazoo.client import KazooClient zk = KazooClient(hosts='127.0.0.1:2181') zk.start() zk.ensure_path('/config/topics') # True zk.get_children("/config/topics") #[u'adder-0', u'adder', u'adder-1', u'test', u'__consumer_offsets'] zk.stop() quit() }}} == Spring kafka == === Dockerfile === {{{#!highlight sh FROM eclipse-temurin:17-jdk-alpine ENV PATH=/opt/java/openjdk/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/root/kafka_2.12-3.4.0/bin/ RUN apk add --update --no-cache curl wget nano vim bash gcompat RUN cd ~ && wget https://downloads.apache.org/kafka/3.4.0/kafka_2.12-3.4.0.tgz && tar xvzf kafka_2.12-3.4.0.tgz CMD ["/bin/bash","/mnt/start-servers.sh"] }}} === start-servers.sh === {{{#!highlight sh #!/bin/bash nohup /root/kafka_2.12-3.4.0/bin/zookeeper-server-start.sh /root/kafka_2.12-3.4.0/config/zookeeper.properties & nohup /root/kafka_2.12-3.4.0/bin/kafka-server-start.sh /root/kafka_2.12-3.4.0/config/server.properties & cat }}} == Build kafka container == {{{#!highlight sh docker build -t kafka-test-image . docker run --rm -dit --name kafka-test -p 2181:2181 -p 9092:9092 -v $PWD:/mnt/ kafka-test-image mvn clean install docker cp target/demo-0.0.1-SNAPSHOT.jar kafka-test:/tmp/ docker exec -it kafka-test bash }}} === pom.xml === {{{#!highlight xml <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.1.0</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>demo</name> <description>Demo project for Spring Boot</description> <properties> <java.version>17</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project> }}} === src/main/resources/application.properties === {{{#!highlight sh spring.kafka.bootstrap-servers=127.0.0.1:9092 spring.kafka.consumer.group-id=group-id }}} === src/main/java/com/example/demo/DemoApplication.java === {{{#!highlight java package com.example.demo; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } } }}} === src/main/java/com/example/demo/DemoController.java === {{{#!highlight java package com.example.demo; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.ResponseBody; @Controller public class DemoController { private KafkaTemplate<String, String> kafkaTemplate; public DemoController(KafkaTemplate<String, String> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } @GetMapping("/uppercase/{text}") @ResponseBody public String uppercase(@PathVariable String text) { String message = String.format("text to be sent in uppercase %s", text); kafkaTemplate.send(KafkaTopicConfig.TOPIC_TASK, message); return message; } } }}} === src/main/java/com/example/demo/KafkaConsumerConfig.java === {{{#!highlight java package com.example.demo; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; @EnableKafka @Configuration public class KafkaConsumerConfig { @Value(value = "${spring.kafka.bootstrap-servers}") private String bootstrapAddress; @Value(value = "${spring.kafka.consumer.group-id}") private String groupId; @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } } }}} === src/main/java/com/example/demo/KafkaMessageListener.java === {{{#!highlight java package com.example.demo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.listener.MessageListener; import org.springframework.stereotype.Component; @Component public class KafkaMessageListener { private Logger logger; public KafkaMessageListener() { this.logger = LoggerFactory.getLogger(MessageListener.class); this.logger.info("Created rest MessageListener"); } @KafkaListener(topics = KafkaTopicConfig.TOPIC_TASK) public void listen(String message) { System.out.println("Received Message in topicTask: " + message + " in uppercase " + message.toUpperCase()); } } }}} === src/main/java/com/example/demo/KafkaProducerConfig.java === {{{#!highlight java package com.example.demo; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; @Configuration public class KafkaProducerConfig { @Value(value = "${spring.kafka.bootstrap-servers}") private String bootstrapAddress; @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } } }}} === src/main/java/com/example/demo/KafkaTopicConfig.java === {{{#!highlight java package com.example.demo; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.KafkaAdmin; @Configuration public class KafkaTopicConfig { public static final String TOPIC_TASK = "topicTask"; @Value(value = "${spring.kafka.bootstrap-servers}") private String bootstrapAddress; @Bean public KafkaAdmin kafkaAdmin() { Map<String, Object> configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); return new KafkaAdmin(configs); } @Bean public NewTopic topicTask() { return new NewTopic(TOPIC_TASK, 1, (short) 1); } } }}}  | 
Contents
Apache Kafka
https://kafka.apache.org/intro
Apache Kafka® is a distributed streaming platform.
A streaming platform has three key capabilities:
- Publish and subscribe to streams of records (topics), similar to a message queue or enterprise messaging system.
 - Store streams of records in a fault-tolerant durable way.
 - Process streams of records as they occur.
 
Publisher/Subscriber, Observer pattern, Message queues.
First a few concepts:
- Kafka is run as a cluster on one or more servers that can span multiple datacenters.
 - The Kafka cluster stores streams of records in categories called topics.
 Each record consists of a key, a value, and a timestamp.
Kafka has four core APIs:
- The Producer API allows an application to publish a stream of records to one or more Kafka topics.
 - The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.
 - The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
 - The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.
 
Topics in Kafka are always multi-subscriber; that is, a topic can have zero, one, or many consumers that subscribe to the data written to it.
How does Kafka's notion of streams compare to a traditional enterprise messaging system? Messaging traditionally has two models: queuing and publish-subscribe. In a queue, a pool of consumers may read from a server and each record goes to one of them; in publish-subscribe the record is broadcast to all consumers. By having a notion of parallelism—the partition—within the topics, Kafka is able to provide both ordering guarantees and load balancing over a pool of consumer processes.
Kafka works well as a replacement for a more traditional message broker. Kafka is comparable to traditional messaging systems such as ActiveMQ or RabbitMQ.
https://www.upsolver.com/blog/kafka-versus-rabbitmq-architecture-performance-use-case
Kafka uses a pull model, the Kafka broker waits for the consumer to ask for data.
Kafka provides message ordering (stream/streaming).
Kafka is a log, which means that it retains messages by default.
JMS client
https://docs.confluent.io/current/clients/kafka-jms-client/index.html
JMS is a widely used messaging API that is included as part of the Java Platform, Enterprise Edition. Confluent JMS Client (kafka-jms-client) is an implementation of the JMS 1.1 provider interface that allows Apache Kafka® or Confluent Platform to be used as a JMS message broker.
Kafka topics can mimic the behavior of either topics or queues in the traditional messaging system sense. Both JMS messaging models are supported: Publish/Subscribe (Topics), Point-to-Point (Queues)
Example
   1 wget http://mirrors.up.pt/pub/apache/kafka/2.3.0/kafka_2.11-2.3.0.tgz
   2 tar xvzf kafka_2.11-2.3.0.tgz 
   3 cd kafka_2.11-2.3.0/
   4 # single-node ZooKeeper instance (port 2181)
   5 bin/zookeeper-server-start.sh config/zookeeper.properties
   6 # new tab ....
   7 cd kafka_2.11-2.3.0/
   8 bin/kafka-server-start.sh config/server.properties # listens port 9092
   9 # create topic
  10 bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
  11 # check topics
  12 bin/kafka-topics.sh --list --bootstrap-server localhost:9092
  13 # send messages to topic
  14 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
  15 >hello
  16 >test
  17 # consume messages
  18 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
  19 # https://pypi.org/project/kafka/
  20 apt install python-pip # as root
  21 pip install kafka
  22 # https://pypi.org/project/kafka/
  23 
Create queue adder for 2 consumers
- bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic adder
 
Amount of partitions equals the amount of consumers.
   1 #consumer_adder.py
   2 from kafka import KafkaConsumer
   3 import json
   4 import sys
   5 
   6 topic='adder'
   7 consumer = KafkaConsumer('%s-%s'%(topic,sys.argv[1]),bootstrap_servers="localhost:9092")
   8 print consumer.partitions_for_topic(topic)
   9 
  10 for msg in consumer:
  11     vals = json.loads(msg.value)
  12     print("%s %d %s sum: %d"%(msg.topic, msg.timestamp, msg.value, vals['op1']+vals['op2']  ))
   1 #producer_adder.py
   2 from kafka import KafkaProducer
   3 import json
   4 producer = KafkaProducer(bootstrap_servers='localhost:9092',compression_type='gzip' )
   5 topic='adder'
   6 parts = producer.partitions_for(topic)
   7 amount_partitions = len(parts)
   8 
   9 for i in range(10000):
  10     vals = {'op1':i,'op2':i}
  11     #print('adder-%d'%(i%2))
  12     producer.send('%s-%d'%(topic,i%amount_partitions), value=b'%s'%( json.dumps(vals) )  )
List topics using zookeeper
ZooKeeper is a high-performance coordination service for distributed applications. The name space provided by ZooKeeper is much like that of a standard file system.
bin/zookeeper-shell.sh localhost:2181 ls /config/topics [adder-0, adder, adder-1, test, __consumer_offsets] quit
- pip install kazoo
 
Spring kafka
Dockerfile
   1 FROM eclipse-temurin:17-jdk-alpine
   2 ENV PATH=/opt/java/openjdk/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/root/kafka_2.12-3.4.0/bin/
   3 RUN apk add --update --no-cache curl wget nano vim bash gcompat
   4 RUN cd ~ && wget https://downloads.apache.org/kafka/3.4.0/kafka_2.12-3.4.0.tgz && tar xvzf kafka_2.12-3.4.0.tgz
   5 CMD ["/bin/bash","/mnt/start-servers.sh"]
start-servers.sh
Build kafka container
pom.xml
   1 <?xml version="1.0" encoding="UTF-8"?>
   2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   3         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
   4         <modelVersion>4.0.0</modelVersion>
   5         <parent>
   6                 <groupId>org.springframework.boot</groupId>
   7                 <artifactId>spring-boot-starter-parent</artifactId>
   8                 <version>3.1.0</version>
   9                 <relativePath/> <!-- lookup parent from repository -->
  10         </parent>
  11         <groupId>com.example</groupId>
  12         <artifactId>demo</artifactId>
  13         <version>0.0.1-SNAPSHOT</version>
  14         <name>demo</name>
  15         <description>Demo project for Spring Boot</description>
  16         <properties>
  17                 <java.version>17</java.version>
  18         </properties>
  19         <dependencies>
  20                 <dependency>
  21                         <groupId>org.springframework.boot</groupId>
  22                         <artifactId>spring-boot-starter</artifactId>
  23                 </dependency>
  24                 <dependency>
  25                         <groupId>org.springframework.kafka</groupId>
  26                         <artifactId>spring-kafka</artifactId>
  27                 </dependency>
  28                 <dependency>
  29                     <groupId>org.springframework.boot</groupId>
  30                     <artifactId>spring-boot-starter-web</artifactId>
  31                 </dependency>
  32         </dependencies>
  33         <build>
  34                 <plugins>
  35                         <plugin>
  36                                 <groupId>org.springframework.boot</groupId>
  37                                 <artifactId>spring-boot-maven-plugin</artifactId>
  38                         </plugin>
  39                 </plugins>
  40         </build>
  41 </project>
src/main/resources/application.properties
src/main/java/com/example/demo/DemoApplication.java
   1 package com.example.demo;
   2 
   3 import org.springframework.boot.SpringApplication;
   4 import org.springframework.boot.autoconfigure.SpringBootApplication;
   5 
   6 @SpringBootApplication
   7 public class DemoApplication {
   8 
   9         public static void main(String[] args) {
  10                 SpringApplication.run(DemoApplication.class, args);
  11         }
  12 }
src/main/java/com/example/demo/DemoController.java
   1 package com.example.demo;
   2 
   3 import org.springframework.kafka.core.KafkaTemplate;
   4 import org.springframework.stereotype.Controller;
   5 import org.springframework.web.bind.annotation.GetMapping;
   6 import org.springframework.web.bind.annotation.PathVariable;
   7 import org.springframework.web.bind.annotation.ResponseBody;
   8 
   9 @Controller
  10 public class DemoController {
  11     private KafkaTemplate<String, String> kafkaTemplate;
  12 
  13     public DemoController(KafkaTemplate<String, String> kafkaTemplate) {
  14         this.kafkaTemplate = kafkaTemplate;
  15     }
  16 
  17     @GetMapping("/uppercase/{text}")
  18     @ResponseBody
  19     public String uppercase(@PathVariable String text) {
  20         String message = String.format("text to be sent in uppercase %s", text);
  21         kafkaTemplate.send(KafkaTopicConfig.TOPIC_TASK, message);
  22         return message;
  23     }
  24 
  25 }
src/main/java/com/example/demo/KafkaConsumerConfig.java
   1 package com.example.demo;
   2 
   3 import java.util.HashMap;
   4 import java.util.Map;
   5 import org.apache.kafka.clients.consumer.ConsumerConfig;
   6 import org.apache.kafka.common.serialization.StringDeserializer;
   7 import org.springframework.beans.factory.annotation.Value;
   8 import org.springframework.context.annotation.Bean;
   9 import org.springframework.context.annotation.Configuration;
  10 import org.springframework.kafka.annotation.EnableKafka;
  11 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  12 import org.springframework.kafka.core.ConsumerFactory;
  13 import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  14 
  15 @EnableKafka
  16 @Configuration
  17 public class KafkaConsumerConfig {
  18     @Value(value = "${spring.kafka.bootstrap-servers}")
  19     private String bootstrapAddress;
  20     @Value(value = "${spring.kafka.consumer.group-id}")
  21     private String groupId;
  22 
  23     @Bean
  24     public ConsumerFactory<String, String> consumerFactory() {
  25         Map<String, Object> props = new HashMap<>();
  26         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
  27         props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  28         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  29         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  30         return new DefaultKafkaConsumerFactory<>(props);
  31     }
  32 
  33     @Bean
  34     public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
  35         ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  36         factory.setConsumerFactory(consumerFactory());
  37         return factory;
  38     }    
  39 }
src/main/java/com/example/demo/KafkaMessageListener.java
   1 package com.example.demo;
   2 
   3 import org.slf4j.Logger;
   4 import org.slf4j.LoggerFactory;
   5 import org.springframework.kafka.annotation.KafkaListener;
   6 import org.springframework.kafka.listener.MessageListener;
   7 import org.springframework.stereotype.Component;
   8 
   9 @Component
  10 public class KafkaMessageListener {
  11     private Logger logger;
  12 
  13     public KafkaMessageListener() {
  14         this.logger = LoggerFactory.getLogger(MessageListener.class);
  15         this.logger.info("Created rest MessageListener");
  16     }
  17 
  18     @KafkaListener(topics = KafkaTopicConfig.TOPIC_TASK)
  19     public void listen(String message) {
  20         System.out.println("Received Message in topicTask: " + message + " in uppercase " + message.toUpperCase());
  21     }
  22 }
src/main/java/com/example/demo/KafkaProducerConfig.java
   1 package com.example.demo;
   2 
   3 import java.util.HashMap;
   4 import java.util.Map;
   5 import org.apache.kafka.clients.producer.ProducerConfig;
   6 import org.apache.kafka.common.serialization.StringSerializer;
   7 import org.springframework.beans.factory.annotation.Value;
   8 import org.springframework.context.annotation.Bean;
   9 import org.springframework.context.annotation.Configuration;
  10 import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  11 import org.springframework.kafka.core.KafkaTemplate;
  12 import org.springframework.kafka.core.ProducerFactory;
  13 
  14 @Configuration
  15 public class KafkaProducerConfig {
  16     @Value(value = "${spring.kafka.bootstrap-servers}")
  17     private String bootstrapAddress;
  18 
  19     @Bean
  20     public ProducerFactory<String, String> producerFactory() {
  21         Map<String, Object> configProps = new HashMap<>();
  22         configProps.put(
  23                 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
  24                 bootstrapAddress);
  25         configProps.put(
  26                 ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  27                 StringSerializer.class);
  28         configProps.put(
  29                 ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  30                 StringSerializer.class);
  31         return new DefaultKafkaProducerFactory<>(configProps);
  32     }
  33 
  34     @Bean
  35     public KafkaTemplate<String, String> kafkaTemplate() {
  36         return new KafkaTemplate<>(producerFactory());
  37     }
  38 }
src/main/java/com/example/demo/KafkaTopicConfig.java
   1 package com.example.demo;
   2 
   3 import java.util.HashMap;
   4 import java.util.Map;
   5 
   6 import org.apache.kafka.clients.admin.AdminClientConfig;
   7 import org.apache.kafka.clients.admin.NewTopic;
   8 import org.springframework.beans.factory.annotation.Value;
   9 import org.springframework.context.annotation.Bean;
  10 import org.springframework.context.annotation.Configuration;
  11 import org.springframework.kafka.core.KafkaAdmin;
  12 
  13 @Configuration
  14 public class KafkaTopicConfig {
  15     public static final String TOPIC_TASK = "topicTask";
  16     @Value(value = "${spring.kafka.bootstrap-servers}")
  17     private String bootstrapAddress;
  18 
  19     @Bean
  20     public KafkaAdmin kafkaAdmin() {
  21         Map<String, Object> configs = new HashMap<>();
  22         configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
  23         return new KafkaAdmin(configs);
  24     }
  25 
  26     @Bean
  27     public NewTopic topicTask() {
  28         return new NewTopic(TOPIC_TASK, 1, (short) 1);
  29     }
  30 }
