Size: 2076
Comment:
|
Size: 5086
Comment:
|
Deletions are marked like this. | Additions are marked like this. |
Line 29: | Line 29: |
https://kafka.apache.org/uses Kafka works well as a replacement for a more traditional message broker. Kafka is comparable to traditional messaging systems such as ActiveMQ or RabbitMQ. == Example == {{{#!highlight bash wget http://mirrors.up.pt/pub/apache/kafka/2.3.0/kafka_2.11-2.3.0.tgz tar xvzf kafka_2.11-2.3.0.tgz cd kafka_2.11-2.3.0/ # single-node ZooKeeper instance (port 2181) bin/zookeeper-server-start.sh config/zookeeper.properties # new tab .... cd kafka_2.11-2.3.0/ bin/kafka-server-start.sh config/server.properties # listens port 9092 # create topic bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test # check topics bin/kafka-topics.sh --list --bootstrap-server localhost:9092 # send messages to topic bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test >hello >test # consume messages bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning # https://pypi.org/project/kafka/ apt install python-pip # as root pip install kafka # https://pypi.org/project/kafka/ }}} {{{#!highlight python #producer.py from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='localhost:9092',compression_type='gzip' ) for i in range(5): producer.send('test', b'some_message_bytes [%d]'%(i)) producer.flush() }}} {{{#!highlight python #consumer.py from kafka import KafkaConsumer consumer = KafkaConsumer('test',bootstrap_servers="localhost:9092") for msg in consumer: print("%s %d %s"%(msg.topic, msg.timestamp, msg.value)) }}} == Create queue adder for 2 consumers == * bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic adder Amount of partitions equals the amount of consumers. {{{#!highlight python #consumer_adder.py from kafka import KafkaConsumer import json import sys topic='adder' consumer = KafkaConsumer('%s-%s'%(topic,sys.argv[1]),bootstrap_servers="localhost:9092") print consumer.partitions_for_topic(topic) for msg in consumer: vals = json.loads(msg.value) print("%s %d %s sum: %d"%(msg.topic, msg.timestamp, msg.value, vals['op1']+vals['op2'] )) }}} {{{#!highlight python #producer_adder.py from kafka import KafkaProducer import json producer = KafkaProducer(bootstrap_servers='localhost:9092',compression_type='gzip' ) topic='adder' parts = producer.partitions_for(topic) amount_partitions = len(parts) for i in range(10000): vals = {'op1':i,'op2':i} #print('adder-%d'%(i%2)) producer.send('%s-%d'%(topic,i%amount_partitions), value=b'%s'%( json.dumps(vals) ) ) }}} == List topics using zookeeper == ZooKeeper is a high-performance coordination service for distributed applications. * https://zookeeper.apache.org/doc/r3.3.3/index.html {{{ bin/zookeeper-shell.sh localhost:2181 ls /config/topics [adder-0, adder, adder-1, test, __consumer_offsets] quit }}} |
Apache Kafka
https://kafka.apache.org/intro
Apache Kafka® is a distributed streaming platform.
A streaming platform has three key capabilities:
- Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
- Store streams of records in a fault-tolerant durable way.
- Process streams of records as they occur.
Publisher/Subscriber, Observer pattern, Message queues.
First a few concepts:
- Kafka is run as a cluster on one or more servers that can span multiple datacenters.
- The Kafka cluster stores streams of records in categories called topics.
- Each record consists of a key, a value, and a timestamp.
Kafka has four core APIs:
- The Producer API allows an application to publish a stream of records to one or more Kafka topics.
- The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.
- The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
- The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.
Topics in Kafka are always multi-subscriber; that is, a topic can have zero, one, or many consumers that subscribe to the data written to it.
How does Kafka's notion of streams compare to a traditional enterprise messaging system? Messaging traditionally has two models: queuing and publish-subscribe. In a queue, a pool of consumers may read from a server and each record goes to one of them; in publish-subscribe the record is broadcast to all consumers. By having a notion of parallelism—the partition—within the topics, Kafka is able to provide both ordering guarantees and load balancing over a pool of consumer processes.
Kafka works well as a replacement for a more traditional message broker. Kafka is comparable to traditional messaging systems such as ActiveMQ or RabbitMQ.
Example
1 wget http://mirrors.up.pt/pub/apache/kafka/2.3.0/kafka_2.11-2.3.0.tgz
2 tar xvzf kafka_2.11-2.3.0.tgz
3 cd kafka_2.11-2.3.0/
4 # single-node ZooKeeper instance (port 2181)
5 bin/zookeeper-server-start.sh config/zookeeper.properties
6 # new tab ....
7 cd kafka_2.11-2.3.0/
8 bin/kafka-server-start.sh config/server.properties # listens port 9092
9 # create topic
10 bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
11 # check topics
12 bin/kafka-topics.sh --list --bootstrap-server localhost:9092
13 # send messages to topic
14 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
15 >hello
16 >test
17 # consume messages
18 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
19 # https://pypi.org/project/kafka/
20 apt install python-pip # as root
21 pip install kafka
22 # https://pypi.org/project/kafka/
23
Create queue adder for 2 consumers
- bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic adder
Amount of partitions equals the amount of consumers.
1 #consumer_adder.py
2 from kafka import KafkaConsumer
3 import json
4 import sys
5
6 topic='adder'
7 consumer = KafkaConsumer('%s-%s'%(topic,sys.argv[1]),bootstrap_servers="localhost:9092")
8 print consumer.partitions_for_topic(topic)
9
10 for msg in consumer:
11 vals = json.loads(msg.value)
12 print("%s %d %s sum: %d"%(msg.topic, msg.timestamp, msg.value, vals['op1']+vals['op2'] ))
1 #producer_adder.py
2 from kafka import KafkaProducer
3 import json
4 producer = KafkaProducer(bootstrap_servers='localhost:9092',compression_type='gzip' )
5 topic='adder'
6 parts = producer.partitions_for(topic)
7 amount_partitions = len(parts)
8
9 for i in range(10000):
10 vals = {'op1':i,'op2':i}
11 #print('adder-%d'%(i%2))
12 producer.send('%s-%d'%(topic,i%amount_partitions), value=b'%s'%( json.dumps(vals) ) )
List topics using zookeeper
ZooKeeper is a high-performance coordination service for distributed applications.
bin/zookeeper-shell.sh localhost:2181 ls /config/topics [adder-0, adder, adder-1, test, __consumer_offsets] quit