= Apache Kafka =

https://kafka.apache.org/intro

Apache Kafka® is a distributed streaming platform. 

A streaming platform has three key capabilities:
 * Publish and subscribe to streams of records (topics), similar to a message queue or enterprise messaging system.
 * Store streams of records in a fault-tolerant durable way.
 * Process streams of records as they occur. 

'''Publisher/Subscriber, Observer pattern, Message queues.'''

First a few concepts:
 * Kafka is run as a cluster on one or more servers that can span multiple datacenters.
 * The Kafka cluster stores streams of records in categories called topics.
 * Each record consists of a '''key, a value, and a timestamp'''. 

Kafka has four core APIs:
 * The Producer API allows an application to publish a stream of records to one or more Kafka topics.
 * The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.
 * The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
 * The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table. 

Topics in Kafka are always multi-subscriber; that is, a topic can have zero, one, or many consumers that subscribe to the data written to it.

How does Kafka's notion of streams compare to a traditional enterprise messaging system?
Messaging traditionally has two models: queuing and publish-subscribe.  In a queue, a pool of consumers may read from a server and each record goes to one of them; in publish-subscribe the record is broadcast to all consumers. By having a notion of parallelism—the partition—within the topics, Kafka is able to provide both ordering guarantees and load balancing over a pool of consumer processes.

https://kafka.apache.org/uses

Kafka works well as a replacement for a more traditional message broker.  '''Kafka is comparable to traditional messaging systems such as [[ActiveMQ]] or [[RabbitMQ]]. '''

https://www.upsolver.com/blog/kafka-versus-rabbitmq-architecture-performance-use-case

Kafka uses a '''pull model''', the Kafka broker waits for the consumer to ask for data. 

Kafka provides '''message ordering''' (stream/streaming). 

Kafka is a '''log''', which means that it '''retains messages by default'''. 

== JMS client ==
https://docs.confluent.io/current/clients/kafka-jms-client/index.html

Java Message Service (JMS) is a widely used messaging API that is included as part of the Java Platform, Enterprise Edition. Confluent JMS Client (kafka-jms-client) is an implementation of the JMS 1.1 provider interface that allows Apache Kafka® or Confluent Platform to be used as a JMS message broker.

Kafka topics can mimic the behavior of either topics or queues in the traditional messaging system sense. Both JMS messaging models are supported: Publish/Subscribe (Topics), Point-to-Point (Queues)


== Example ==
{{{#!highlight bash
wget http://mirrors.up.pt/pub/apache/kafka/2.3.0/kafka_2.11-2.3.0.tgz
tar xvzf kafka_2.11-2.3.0.tgz 
cd kafka_2.11-2.3.0/
# single-node ZooKeeper instance (port 2181)
bin/zookeeper-server-start.sh config/zookeeper.properties
# new tab ....
cd kafka_2.11-2.3.0/
bin/kafka-server-start.sh config/server.properties # listens port 9092
# create topic
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
# check topics
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
# send messages to topic
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>hello
>test
# consume messages
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
# https://pypi.org/project/kafka/
apt install python-pip # as root
pip install kafka
# https://pypi.org/project/kafka/
}}}

{{{#!highlight python
#producer.py
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092',compression_type='gzip' )
for i in range(5):
    producer.send('test', b'some_message_bytes [%d]'%(i))
producer.flush()
}}}

{{{#!highlight python
#consumer.py
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',bootstrap_servers="localhost:9092")
for msg in consumer:
    print("%s %d %s"%(msg.topic, msg.timestamp, msg.value))
}}}

== Create queue adder for 2 consumers ==
 * bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic adder
Amount of partitions equals the amount of consumers.
{{{#!highlight python
#consumer_adder.py
from kafka import KafkaConsumer
import json
import sys

topic='adder'
consumer = KafkaConsumer('%s-%s'%(topic,sys.argv[1]),bootstrap_servers="localhost:9092")
print consumer.partitions_for_topic(topic)

for msg in consumer:
    vals = json.loads(msg.value)
    print("%s %d %s sum: %d"%(msg.topic, msg.timestamp, msg.value, vals['op1']+vals['op2']  ))
}}}

{{{#!highlight python
#producer_adder.py
from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers='localhost:9092',compression_type='gzip' )
topic='adder'
parts = producer.partitions_for(topic)
amount_partitions = len(parts)

for i in range(10000):
    vals = {'op1':i,'op2':i}
    #print('adder-%d'%(i%2))
    producer.send('%s-%d'%(topic,i%amount_partitions), value=b'%s'%( json.dumps(vals) )  )

}}}

== List topics using zookeeper ==
ZooKeeper is a high-performance coordination service for distributed applications. The name space provided by ZooKeeper is much like that of a standard file system. 
 * https://zookeeper.apache.org/doc/r3.3.3/index.html
{{{
bin/zookeeper-shell.sh localhost:2181
ls /config/topics
[adder-0, adder, adder-1, test, __consumer_offsets]
quit
}}}
 * pip install kazoo
{{{#!highlight python
from kazoo.client import KazooClient
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
zk.ensure_path('/config/topics')
# True
zk.get_children("/config/topics")
#[u'adder-0', u'adder', u'adder-1', u'test', u'__consumer_offsets']
zk.stop()
quit()
}}}

== Spring kafka ==
=== Dockerfile ===
{{{#!highlight sh
FROM eclipse-temurin:17-jdk-alpine
ENV PATH=/opt/java/openjdk/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/root/kafka_2.12-3.4.0/bin/
RUN apk add --update --no-cache curl wget nano vim bash gcompat
RUN cd ~ && wget https://downloads.apache.org/kafka/3.4.0/kafka_2.12-3.4.0.tgz && tar xvzf kafka_2.12-3.4.0.tgz
CMD ["/bin/bash","/mnt/start-servers.sh"]
}}}

=== start-servers.sh ===
{{{#!highlight sh
#!/bin/bash
nohup /root/kafka_2.12-3.4.0/bin/zookeeper-server-start.sh /root/kafka_2.12-3.4.0/config/zookeeper.properties &
nohup /root/kafka_2.12-3.4.0/bin/kafka-server-start.sh /root/kafka_2.12-3.4.0/config/server.properties &
cat
}}}

== Build kafka container ==
{{{#!highlight sh
docker build -t kafka-test-image .
docker run --rm -dit --name kafka-test -p 2181:2181 -p 9092:9092 -v $PWD:/mnt/ kafka-test-image
mvn clean install
docker cp target/demo-0.0.1-SNAPSHOT.jar kafka-test:/tmp/
docker exec -it kafka-test bash
}}}

=== pom.xml ===
{{{#!highlight xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>3.1.0</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.example</groupId>
	<artifactId>demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>demo</name>
	<description>Demo project for Spring Boot</description>
	<properties>
		<java.version>17</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>
                <dependency>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-web</artifactId>
                </dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>
}}}

=== src/main/resources/application.properties ===
{{{#!highlight sh
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.group-id=group-id
}}}

=== src/main/java/com/example/demo/DemoApplication.java ===
{{{#!highlight java
package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class DemoApplication {

	public static void main(String[] args) {
		SpringApplication.run(DemoApplication.class, args);
	}
}
}}}

=== src/main/java/com/example/demo/DemoController.java ===
{{{#!highlight java
package com.example.demo;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
public class DemoController {
    private KafkaTemplate<String, String> kafkaTemplate;

    public DemoController(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @GetMapping("/uppercase/{text}")
    @ResponseBody
    public String uppercase(@PathVariable String text) {
        String message = String.format("text to be sent in uppercase %s", text);
        kafkaTemplate.send(KafkaTopicConfig.TOPIC_TASK, message);
        return message;
    }

}
}}}

=== src/main/java/com/example/demo/KafkaConsumerConfig.java ===
{{{#!highlight java
package com.example.demo;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;
    @Value(value = "${spring.kafka.consumer.group-id}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }    
}
}}}

=== src/main/java/com/example/demo/KafkaMessageListener.java ===
{{{#!highlight java
package com.example.demo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaMessageListener {
    private Logger logger;

    public KafkaMessageListener() {
        this.logger = LoggerFactory.getLogger(MessageListener.class);
        this.logger.info("Created rest MessageListener");
    }

    @KafkaListener(topics = KafkaTopicConfig.TOPIC_TASK)
    public void listen(String message) {
        System.out.println("Received Message in topicTask: " + message + " in uppercase " + message.toUpperCase());
    }
}
}}}

=== src/main/java/com/example/demo/KafkaProducerConfig.java ===
{{{#!highlight java
package com.example.demo;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

@Configuration
public class KafkaProducerConfig {
    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        configProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
}}}

=== src/main/java/com/example/demo/KafkaTopicConfig.java ===
{{{#!highlight java
package com.example.demo;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;

@Configuration
public class KafkaTopicConfig {
    public static final String TOPIC_TASK = "topicTask";
    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic topicTask() {
        return new NewTopic(TOPIC_TASK, 1, (short) 1);
    }
}
}}}