MoinMoin Logo
  • Comments
  • Immutable Page
  • Menu
    • Navigation
    • RecentChanges
    • FindPage
    • Local Site Map
    • Help
    • HelpContents
    • HelpOnMoinWikiSyntax
    • Display
    • Attachments
    • Info
    • Raw Text
    • Print View
    • Edit
    • Load
    • Save
  • Login

Navigation

  • Start
  • Sitemap

Upload page content

You can upload content for the page named below. If you change the page name, you can also upload content for another page. If the page name is empty, we derive the page name from the file name.

File to load page content from
Page name
Comment

Revision 13 as of 2019-10-12 00:54:44
  • ApacheKafka

Apache Kafka

https://kafka.apache.org/intro

Apache Kafka® is a distributed streaming platform.

A streaming platform has three key capabilities:

  • Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
  • Store streams of records in a fault-tolerant durable way.
  • Process streams of records as they occur.

Publisher/Subscriber, Observer pattern, Message queues.

First a few concepts:

  • Kafka is run as a cluster on one or more servers that can span multiple datacenters.
  • The Kafka cluster stores streams of records in categories called topics.
  • Each record consists of a key, a value, and a timestamp.

Kafka has four core APIs:

  • The Producer API allows an application to publish a stream of records to one or more Kafka topics.
  • The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.
  • The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
  • The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.

Topics in Kafka are always multi-subscriber; that is, a topic can have zero, one, or many consumers that subscribe to the data written to it.

How does Kafka's notion of streams compare to a traditional enterprise messaging system? Messaging traditionally has two models: queuing and publish-subscribe. In a queue, a pool of consumers may read from a server and each record goes to one of them; in publish-subscribe the record is broadcast to all consumers. By having a notion of parallelism—the partition—within the topics, Kafka is able to provide both ordering guarantees and load balancing over a pool of consumer processes.

https://kafka.apache.org/uses

Kafka works well as a replacement for a more traditional message broker. Kafka is comparable to traditional messaging systems such as ActiveMQ or RabbitMQ.

Example

   1 wget http://mirrors.up.pt/pub/apache/kafka/2.3.0/kafka_2.11-2.3.0.tgz
   2 tar xvzf kafka_2.11-2.3.0.tgz 
   3 cd kafka_2.11-2.3.0/
   4 # single-node ZooKeeper instance (port 2181)
   5 bin/zookeeper-server-start.sh config/zookeeper.properties
   6 # new tab ....
   7 cd kafka_2.11-2.3.0/
   8 bin/kafka-server-start.sh config/server.properties # listens port 9092
   9 # create topic
  10 bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
  11 # check topics
  12 bin/kafka-topics.sh --list --bootstrap-server localhost:9092
  13 # send messages to topic
  14 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
  15 >hello
  16 >test
  17 # consume messages
  18 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
  19 # https://pypi.org/project/kafka/
  20 apt install python-pip # as root
  21 pip install kafka
  22 # https://pypi.org/project/kafka/
  23 

   1 #producer.py
   2 from kafka import KafkaProducer
   3 producer = KafkaProducer(bootstrap_servers='localhost:9092',compression_type='gzip' )
   4 for i in range(5):
   5     producer.send('test', b'some_message_bytes [%d]'%(i))
   6 producer.flush()

   1 #consumer.py
   2 from kafka import KafkaConsumer
   3 consumer = KafkaConsumer('test',bootstrap_servers="localhost:9092")
   4 for msg in consumer:
   5     print("%s %d %s"%(msg.topic, msg.timestamp, msg.value))

Create queue adder for 2 consumers

  • bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic adder

Amount of partitions equals the amount of consumers.

   1 #consumer_adder.py
   2 from kafka import KafkaConsumer
   3 import json
   4 import sys
   5 
   6 topic='adder'
   7 consumer = KafkaConsumer('%s-%s'%(topic,sys.argv[1]),bootstrap_servers="localhost:9092")
   8 print consumer.partitions_for_topic(topic)
   9 
  10 for msg in consumer:
  11     vals = json.loads(msg.value)
  12     print("%s %d %s sum: %d"%(msg.topic, msg.timestamp, msg.value, vals['op1']+vals['op2']  ))

   1 #producer_adder.py
   2 from kafka import KafkaProducer
   3 import json
   4 producer = KafkaProducer(bootstrap_servers='localhost:9092',compression_type='gzip' )
   5 topic='adder'
   6 parts = producer.partitions_for(topic)
   7 amount_partitions = len(parts)
   8 
   9 for i in range(10000):
  10     vals = {'op1':i,'op2':i}
  11     #print('adder-%d'%(i%2))
  12     producer.send('%s-%d'%(topic,i%amount_partitions), value=b'%s'%( json.dumps(vals) )  )

List topics using zookeeper

ZooKeeper is a high-performance coordination service for distributed applications. The name space provided by ZooKeeper is much like that of a standard file system.

  • https://zookeeper.apache.org/doc/r3.3.3/index.html

bin/zookeeper-shell.sh localhost:2181
ls /config/topics
[adder-0, adder, adder-1, test, __consumer_offsets]
quit
  • pip install kazoo

   1 from kazoo.client import KazooClient
   2 zk = KazooClient(hosts='127.0.0.1:2181')
   3 zk.start()
   4 zk.ensure_path('/config/topics')
   5 # True
   6 zk.get_children("/config/topics")
   7 #[u'adder-0', u'adder', u'adder-1', u'test', u'__consumer_offsets']
   8 zk.stop()
   9 quit()
  • MoinMoin Powered
  • Python Powered
  • GPL licensed
  • Valid HTML 4.01