MoinMoin Logo
  • Comments
  • Immutable Page
  • Menu
    • Navigation
    • RecentChanges
    • FindPage
    • Local Site Map
    • Help
    • HelpContents
    • HelpOnMoinWikiSyntax
    • Display
    • Attachments
    • Info
    • Raw Text
    • Print View
    • Edit
    • Load
    • Save
  • Login

Navigation

  • Start
  • Sitemap

Upload page content

You can upload content for the page named below. If you change the page name, you can also upload content for another page. If the page name is empty, we derive the page name from the file name.

File to load page content from
Page name
Comment

Revision 21 as of 2023-05-26 11:18:12
  • ApacheKafka

Apache Kafka

https://kafka.apache.org/intro

Apache Kafka® is a distributed streaming platform.

A streaming platform has three key capabilities:

  • Publish and subscribe to streams of records (topics), similar to a message queue or enterprise messaging system.
  • Store streams of records in a fault-tolerant durable way.
  • Process streams of records as they occur.

Publisher/Subscriber, Observer pattern, Message queues.

First a few concepts:

  • Kafka is run as a cluster on one or more servers that can span multiple datacenters.
  • The Kafka cluster stores streams of records in categories called topics.
  • Each record consists of a key, a value, and a timestamp.

Kafka has four core APIs:

  • The Producer API allows an application to publish a stream of records to one or more Kafka topics.
  • The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.
  • The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
  • The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.

Topics in Kafka are always multi-subscriber; that is, a topic can have zero, one, or many consumers that subscribe to the data written to it.

How does Kafka's notion of streams compare to a traditional enterprise messaging system? Messaging traditionally has two models: queuing and publish-subscribe. In a queue, a pool of consumers may read from a server and each record goes to one of them; in publish-subscribe the record is broadcast to all consumers. By having a notion of parallelism—the partition—within the topics, Kafka is able to provide both ordering guarantees and load balancing over a pool of consumer processes.

https://kafka.apache.org/uses

Kafka works well as a replacement for a more traditional message broker. Kafka is comparable to traditional messaging systems such as ActiveMQ or RabbitMQ.

https://www.upsolver.com/blog/kafka-versus-rabbitmq-architecture-performance-use-case

Kafka uses a pull model, the Kafka broker waits for the consumer to ask for data.

Kafka provides message ordering (stream/streaming).

Kafka is a log, which means that it retains messages by default.

JMS client

https://docs.confluent.io/current/clients/kafka-jms-client/index.html

Java Message Service (JMS) is a widely used messaging API that is included as part of the Java Platform, Enterprise Edition. Confluent JMS Client (kafka-jms-client) is an implementation of the JMS 1.1 provider interface that allows Apache Kafka® or Confluent Platform to be used as a JMS message broker.

Kafka topics can mimic the behavior of either topics or queues in the traditional messaging system sense. Both JMS messaging models are supported: Publish/Subscribe (Topics), Point-to-Point (Queues)

Example

   1 wget http://mirrors.up.pt/pub/apache/kafka/2.3.0/kafka_2.11-2.3.0.tgz
   2 tar xvzf kafka_2.11-2.3.0.tgz 
   3 cd kafka_2.11-2.3.0/
   4 # single-node ZooKeeper instance (port 2181)
   5 bin/zookeeper-server-start.sh config/zookeeper.properties
   6 # new tab ....
   7 cd kafka_2.11-2.3.0/
   8 bin/kafka-server-start.sh config/server.properties # listens port 9092
   9 # create topic
  10 bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
  11 # check topics
  12 bin/kafka-topics.sh --list --bootstrap-server localhost:9092
  13 # send messages to topic
  14 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
  15 >hello
  16 >test
  17 # consume messages
  18 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
  19 # https://pypi.org/project/kafka/
  20 apt install python-pip # as root
  21 pip install kafka
  22 # https://pypi.org/project/kafka/
  23 

   1 #producer.py
   2 from kafka import KafkaProducer
   3 producer = KafkaProducer(bootstrap_servers='localhost:9092',compression_type='gzip' )
   4 for i in range(5):
   5     producer.send('test', b'some_message_bytes [%d]'%(i))
   6 producer.flush()

   1 #consumer.py
   2 from kafka import KafkaConsumer
   3 consumer = KafkaConsumer('test',bootstrap_servers="localhost:9092")
   4 for msg in consumer:
   5     print("%s %d %s"%(msg.topic, msg.timestamp, msg.value))

Create queue adder for 2 consumers

  • bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic adder

Amount of partitions equals the amount of consumers.

   1 #consumer_adder.py
   2 from kafka import KafkaConsumer
   3 import json
   4 import sys
   5 
   6 topic='adder'
   7 consumer = KafkaConsumer('%s-%s'%(topic,sys.argv[1]),bootstrap_servers="localhost:9092")
   8 print consumer.partitions_for_topic(topic)
   9 
  10 for msg in consumer:
  11     vals = json.loads(msg.value)
  12     print("%s %d %s sum: %d"%(msg.topic, msg.timestamp, msg.value, vals['op1']+vals['op2']  ))

   1 #producer_adder.py
   2 from kafka import KafkaProducer
   3 import json
   4 producer = KafkaProducer(bootstrap_servers='localhost:9092',compression_type='gzip' )
   5 topic='adder'
   6 parts = producer.partitions_for(topic)
   7 amount_partitions = len(parts)
   8 
   9 for i in range(10000):
  10     vals = {'op1':i,'op2':i}
  11     #print('adder-%d'%(i%2))
  12     producer.send('%s-%d'%(topic,i%amount_partitions), value=b'%s'%( json.dumps(vals) )  )

List topics using zookeeper

ZooKeeper is a high-performance coordination service for distributed applications. The name space provided by ZooKeeper is much like that of a standard file system.

  • https://zookeeper.apache.org/doc/r3.3.3/index.html

bin/zookeeper-shell.sh localhost:2181
ls /config/topics
[adder-0, adder, adder-1, test, __consumer_offsets]
quit
  • pip install kazoo

   1 from kazoo.client import KazooClient
   2 zk = KazooClient(hosts='127.0.0.1:2181')
   3 zk.start()
   4 zk.ensure_path('/config/topics')
   5 # True
   6 zk.get_children("/config/topics")
   7 #[u'adder-0', u'adder', u'adder-1', u'test', u'__consumer_offsets']
   8 zk.stop()
   9 quit()

Spring kafka

Dockerfile

   1 FROM eclipse-temurin:17-jdk-alpine
   2 ENV PATH=/opt/java/openjdk/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/root/kafka_2.12-3.4.0/bin/
   3 RUN apk add --update --no-cache curl wget nano vim bash gcompat
   4 RUN cd ~ && wget https://downloads.apache.org/kafka/3.4.0/kafka_2.12-3.4.0.tgz && tar xvzf kafka_2.12-3.4.0.tgz
   5 CMD ["/bin/bash","/mnt/start-servers.sh"]

start-servers.sh

   1 #!/bin/bash
   2 nohup /root/kafka_2.12-3.4.0/bin/zookeeper-server-start.sh /root/kafka_2.12-3.4.0/config/zookeeper.properties &
   3 nohup /root/kafka_2.12-3.4.0/bin/kafka-server-start.sh /root/kafka_2.12-3.4.0/config/server.properties &
   4 cat

Build kafka container

   1 docker build -t kafka-test-image .
   2 docker run --rm -dit --name kafka-test -p 2181:2181 -p 9092:9092 -v $PWD:/mnt/ kafka-test-image
   3 mvn clean install
   4 docker cp target/demo-0.0.1-SNAPSHOT.jar kafka-test:/tmp/
   5 docker exec -it kafka-test bash

pom.xml

   1 <?xml version="1.0" encoding="UTF-8"?>
   2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   3         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
   4         <modelVersion>4.0.0</modelVersion>
   5         <parent>
   6                 <groupId>org.springframework.boot</groupId>
   7                 <artifactId>spring-boot-starter-parent</artifactId>
   8                 <version>3.1.0</version>
   9                 <relativePath/> <!-- lookup parent from repository -->
  10         </parent>
  11         <groupId>com.example</groupId>
  12         <artifactId>demo</artifactId>
  13         <version>0.0.1-SNAPSHOT</version>
  14         <name>demo</name>
  15         <description>Demo project for Spring Boot</description>
  16         <properties>
  17                 <java.version>17</java.version>
  18         </properties>
  19         <dependencies>
  20                 <dependency>
  21                         <groupId>org.springframework.boot</groupId>
  22                         <artifactId>spring-boot-starter</artifactId>
  23                 </dependency>
  24                 <dependency>
  25                         <groupId>org.springframework.kafka</groupId>
  26                         <artifactId>spring-kafka</artifactId>
  27                 </dependency>
  28                 <dependency>
  29                     <groupId>org.springframework.boot</groupId>
  30                     <artifactId>spring-boot-starter-web</artifactId>
  31                 </dependency>
  32         </dependencies>
  33         <build>
  34                 <plugins>
  35                         <plugin>
  36                                 <groupId>org.springframework.boot</groupId>
  37                                 <artifactId>spring-boot-maven-plugin</artifactId>
  38                         </plugin>
  39                 </plugins>
  40         </build>
  41 </project>

src/main/resources/application.properties

   1 spring.kafka.bootstrap-servers=127.0.0.1:9092
   2 spring.kafka.consumer.group-id=group-id

src/main/java/com/example/demo/DemoApplication.java

   1 package com.example.demo;
   2 
   3 import org.springframework.boot.SpringApplication;
   4 import org.springframework.boot.autoconfigure.SpringBootApplication;
   5 
   6 @SpringBootApplication
   7 public class DemoApplication {
   8 
   9         public static void main(String[] args) {
  10                 SpringApplication.run(DemoApplication.class, args);
  11         }
  12 }

src/main/java/com/example/demo/DemoController.java

   1 package com.example.demo;
   2 
   3 import org.springframework.kafka.core.KafkaTemplate;
   4 import org.springframework.stereotype.Controller;
   5 import org.springframework.web.bind.annotation.GetMapping;
   6 import org.springframework.web.bind.annotation.PathVariable;
   7 import org.springframework.web.bind.annotation.ResponseBody;
   8 
   9 @Controller
  10 public class DemoController {
  11     private KafkaTemplate<String, String> kafkaTemplate;
  12 
  13     public DemoController(KafkaTemplate<String, String> kafkaTemplate) {
  14         this.kafkaTemplate = kafkaTemplate;
  15     }
  16 
  17     @GetMapping("/uppercase/{text}")
  18     @ResponseBody
  19     public String uppercase(@PathVariable String text) {
  20         String message = String.format("text to be sent in uppercase %s", text);
  21         kafkaTemplate.send(KafkaTopicConfig.TOPIC_TASK, message);
  22         return message;
  23     }
  24 
  25 }

src/main/java/com/example/demo/KafkaConsumerConfig.java

   1 package com.example.demo;
   2 
   3 import java.util.HashMap;
   4 import java.util.Map;
   5 import org.apache.kafka.clients.consumer.ConsumerConfig;
   6 import org.apache.kafka.common.serialization.StringDeserializer;
   7 import org.springframework.beans.factory.annotation.Value;
   8 import org.springframework.context.annotation.Bean;
   9 import org.springframework.context.annotation.Configuration;
  10 import org.springframework.kafka.annotation.EnableKafka;
  11 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  12 import org.springframework.kafka.core.ConsumerFactory;
  13 import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  14 
  15 @EnableKafka
  16 @Configuration
  17 public class KafkaConsumerConfig {
  18     @Value(value = "${spring.kafka.bootstrap-servers}")
  19     private String bootstrapAddress;
  20     @Value(value = "${spring.kafka.consumer.group-id}")
  21     private String groupId;
  22 
  23     @Bean
  24     public ConsumerFactory<String, String> consumerFactory() {
  25         Map<String, Object> props = new HashMap<>();
  26         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
  27         props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  28         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  29         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  30         return new DefaultKafkaConsumerFactory<>(props);
  31     }
  32 
  33     @Bean
  34     public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
  35         ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  36         factory.setConsumerFactory(consumerFactory());
  37         return factory;
  38     }    
  39 }

src/main/java/com/example/demo/KafkaMessageListener.java

   1 package com.example.demo;
   2 
   3 import org.slf4j.Logger;
   4 import org.slf4j.LoggerFactory;
   5 import org.springframework.kafka.annotation.KafkaListener;
   6 import org.springframework.kafka.listener.MessageListener;
   7 import org.springframework.stereotype.Component;
   8 
   9 @Component
  10 public class KafkaMessageListener {
  11     private Logger logger;
  12 
  13     public KafkaMessageListener() {
  14         this.logger = LoggerFactory.getLogger(MessageListener.class);
  15         this.logger.info("Created rest MessageListener");
  16     }
  17 
  18     @KafkaListener(topics = KafkaTopicConfig.TOPIC_TASK)
  19     public void listen(String message) {
  20         System.out.println("Received Message in topicTask: " + message + " in uppercase " + message.toUpperCase());
  21     }
  22 }

src/main/java/com/example/demo/KafkaProducerConfig.java

   1 package com.example.demo;
   2 
   3 import java.util.HashMap;
   4 import java.util.Map;
   5 import org.apache.kafka.clients.producer.ProducerConfig;
   6 import org.apache.kafka.common.serialization.StringSerializer;
   7 import org.springframework.beans.factory.annotation.Value;
   8 import org.springframework.context.annotation.Bean;
   9 import org.springframework.context.annotation.Configuration;
  10 import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  11 import org.springframework.kafka.core.KafkaTemplate;
  12 import org.springframework.kafka.core.ProducerFactory;
  13 
  14 @Configuration
  15 public class KafkaProducerConfig {
  16     @Value(value = "${spring.kafka.bootstrap-servers}")
  17     private String bootstrapAddress;
  18 
  19     @Bean
  20     public ProducerFactory<String, String> producerFactory() {
  21         Map<String, Object> configProps = new HashMap<>();
  22         configProps.put(
  23                 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
  24                 bootstrapAddress);
  25         configProps.put(
  26                 ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  27                 StringSerializer.class);
  28         configProps.put(
  29                 ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  30                 StringSerializer.class);
  31         return new DefaultKafkaProducerFactory<>(configProps);
  32     }
  33 
  34     @Bean
  35     public KafkaTemplate<String, String> kafkaTemplate() {
  36         return new KafkaTemplate<>(producerFactory());
  37     }
  38 }

src/main/java/com/example/demo/KafkaTopicConfig.java

   1 package com.example.demo;
   2 
   3 import java.util.HashMap;
   4 import java.util.Map;
   5 
   6 import org.apache.kafka.clients.admin.AdminClientConfig;
   7 import org.apache.kafka.clients.admin.NewTopic;
   8 import org.springframework.beans.factory.annotation.Value;
   9 import org.springframework.context.annotation.Bean;
  10 import org.springframework.context.annotation.Configuration;
  11 import org.springframework.kafka.core.KafkaAdmin;
  12 
  13 @Configuration
  14 public class KafkaTopicConfig {
  15     public static final String TOPIC_TASK = "topicTask";
  16     @Value(value = "${spring.kafka.bootstrap-servers}")
  17     private String bootstrapAddress;
  18 
  19     @Bean
  20     public KafkaAdmin kafkaAdmin() {
  21         Map<String, Object> configs = new HashMap<>();
  22         configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
  23         return new KafkaAdmin(configs);
  24     }
  25 
  26     @Bean
  27     public NewTopic topicTask() {
  28         return new NewTopic(TOPIC_TASK, 1, (short) 1);
  29     }
  30 }
  • MoinMoin Powered
  • Python Powered
  • GPL licensed
  • Valid HTML 4.01