MoinMoin Logo
  • Comments
  • Immutable Page
  • Menu
    • Navigation
    • RecentChanges
    • FindPage
    • Local Site Map
    • Help
    • HelpContents
    • HelpOnMoinWikiSyntax
    • Display
    • Attachments
    • Info
    • Raw Text
    • Print View
    • Edit
    • Load
    • Save
  • Login

Navigation

  • Start
  • Sitemap
Revision 13 as of 2019-10-12 00:54:44
  • ApacheKafka

Apache Kafka

https://kafka.apache.org/intro

Apache Kafka® is a distributed streaming platform.

A streaming platform has three key capabilities:

  • Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
  • Store streams of records in a fault-tolerant durable way.
  • Process streams of records as they occur.

Publisher/Subscriber, Observer pattern, Message queues.

First a few concepts:

  • Kafka is run as a cluster on one or more servers that can span multiple datacenters.
  • The Kafka cluster stores streams of records in categories called topics.
  • Each record consists of a key, a value, and a timestamp.

Kafka has four core APIs:

  • The Producer API allows an application to publish a stream of records to one or more Kafka topics.
  • The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.
  • The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
  • The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.

Topics in Kafka are always multi-subscriber; that is, a topic can have zero, one, or many consumers that subscribe to the data written to it.

How does Kafka's notion of streams compare to a traditional enterprise messaging system? Messaging traditionally has two models: queuing and publish-subscribe. In a queue, a pool of consumers may read from a server and each record goes to one of them; in publish-subscribe the record is broadcast to all consumers. By having a notion of parallelism—the partition—within the topics, Kafka is able to provide both ordering guarantees and load balancing over a pool of consumer processes.

https://kafka.apache.org/uses

Kafka works well as a replacement for a more traditional message broker. Kafka is comparable to traditional messaging systems such as ActiveMQ or RabbitMQ.

Example

   1 wget http://mirrors.up.pt/pub/apache/kafka/2.3.0/kafka_2.11-2.3.0.tgz
   2 tar xvzf kafka_2.11-2.3.0.tgz 
   3 cd kafka_2.11-2.3.0/
   4 # single-node ZooKeeper instance (port 2181)
   5 bin/zookeeper-server-start.sh config/zookeeper.properties
   6 # new tab ....
   7 cd kafka_2.11-2.3.0/
   8 bin/kafka-server-start.sh config/server.properties # listens port 9092
   9 # create topic
  10 bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
  11 # check topics
  12 bin/kafka-topics.sh --list --bootstrap-server localhost:9092
  13 # send messages to topic
  14 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
  15 >hello
  16 >test
  17 # consume messages
  18 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
  19 # https://pypi.org/project/kafka/
  20 apt install python-pip # as root
  21 pip install kafka
  22 # https://pypi.org/project/kafka/
  23 

   1 #producer.py
   2 from kafka import KafkaProducer
   3 producer = KafkaProducer(bootstrap_servers='localhost:9092',compression_type='gzip' )
   4 for i in range(5):
   5     producer.send('test', b'some_message_bytes [%d]'%(i))
   6 producer.flush()

   1 #consumer.py
   2 from kafka import KafkaConsumer
   3 consumer = KafkaConsumer('test',bootstrap_servers="localhost:9092")
   4 for msg in consumer:
   5     print("%s %d %s"%(msg.topic, msg.timestamp, msg.value))

Create queue adder for 2 consumers

  • bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic adder

Amount of partitions equals the amount of consumers.

   1 #consumer_adder.py
   2 from kafka import KafkaConsumer
   3 import json
   4 import sys
   5 
   6 topic='adder'
   7 consumer = KafkaConsumer('%s-%s'%(topic,sys.argv[1]),bootstrap_servers="localhost:9092")
   8 print consumer.partitions_for_topic(topic)
   9 
  10 for msg in consumer:
  11     vals = json.loads(msg.value)
  12     print("%s %d %s sum: %d"%(msg.topic, msg.timestamp, msg.value, vals['op1']+vals['op2']  ))

   1 #producer_adder.py
   2 from kafka import KafkaProducer
   3 import json
   4 producer = KafkaProducer(bootstrap_servers='localhost:9092',compression_type='gzip' )
   5 topic='adder'
   6 parts = producer.partitions_for(topic)
   7 amount_partitions = len(parts)
   8 
   9 for i in range(10000):
  10     vals = {'op1':i,'op2':i}
  11     #print('adder-%d'%(i%2))
  12     producer.send('%s-%d'%(topic,i%amount_partitions), value=b'%s'%( json.dumps(vals) )  )

List topics using zookeeper

ZooKeeper is a high-performance coordination service for distributed applications. The name space provided by ZooKeeper is much like that of a standard file system.

  • https://zookeeper.apache.org/doc/r3.3.3/index.html

bin/zookeeper-shell.sh localhost:2181
ls /config/topics
[adder-0, adder, adder-1, test, __consumer_offsets]
quit
  • pip install kazoo

   1 from kazoo.client import KazooClient
   2 zk = KazooClient(hosts='127.0.0.1:2181')
   3 zk.start()
   4 zk.ensure_path('/config/topics')
   5 # True
   6 zk.get_children("/config/topics")
   7 #[u'adder-0', u'adder', u'adder-1', u'test', u'__consumer_offsets']
   8 zk.stop()
   9 quit()
  • MoinMoin Powered
  • Python Powered
  • GPL licensed
  • Valid HTML 4.01