MoinMoin Logo
  • Comments
  • Immutable Page
  • Menu
    • Navigation
    • RecentChanges
    • FindPage
    • Local Site Map
    • Help
    • HelpContents
    • HelpOnMoinWikiSyntax
    • Display
    • Attachments
    • Info
    • Raw Text
    • Print View
    • Edit
    • Load
    • Save
  • Login

Navigation

  • Start
  • Sitemap

Upload page content

You can upload content for the page named below. If you change the page name, you can also upload content for another page. If the page name is empty, we derive the page name from the file name.

File to load page content from
Page name
Comment

  • RabbitMQ

RabbitMQ

https://www.rabbitmq.com/

The most widely deployed open source message broker.

https://www.upsolver.com/blog/kafka-versus-rabbitmq-architecture-performance-use-case

RabbitMQ uses a push model where messages are immediately pushed to any subscribed consumer.

RabbitMQ is a queue, so messages are removed once consumed and acknowledged.

Erlang installation on Slackware

Requires Erlang:

   1 su
   2 cd /tmp
   3 wget http://slackbuilds.org/slackbuilds/13.0/development/erlang-otp.tar.gz
   4 tar xvzf erlang-otp.tar.gz
   5 cd erlang-otp
   6 wget http://www.erlang.org/download/otp_src_R13B03.tar.gz
   7 wget http://www.erlang.org/download/otp_doc_man_R13B03.tar.gz
   8 ./erlang-otp.SlackBuild
   9 installpkg /tmp/erlang-otp-13B03-i486-1_SBo.tgz

Slackbuild

   1 wget http://slackbuilds.org/slackbuilds/14.1/development/erlang-otp.tar.gz
   2 tar xvzf erlang-otp.tar.gz
   3 cd erlang-otp
   4 wget http://www.erlang.org/download/otp_src_R16B02.tar.gz
   5 wget http://www.erlang.org/download/otp_doc_man_R16B02.tar.gz
   6 ./erlang-otp.SlackBuild
   7 installpkg  /tmp/erlang-otp-16B02-x86_64-1_SBo.tgz

UNIX server installation

   1 cd /opt
   2 wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.3.1/rabbitmq-server-generic-unix-3.3.1.tar.gz
   3 tar xvzf rabbitmq-server-generic-unix-3.3.1.tar.gz 
   4 ln -s rabbitmq_server-3.3.1 rabbitmq
   5 /opt/rabbitmq/sbin/rabbitmq-server 

Python client

   1 pip install pika # python client
   2 

Examples

http://www.rabbitmq.com/tutorials/tutorial-one-python.html

Producer

   1 #!/usr/bin/env python
   2 import pika
   3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
   4 channel = connection.channel()
   5 channel.queue_declare(queue='hello')
   6 channel.basic_publish(exchange='',  routing_key='hello', body='Hello World!')
   7 print " [x] Sent 'Hello World!'"
   8 connection.close()

Consumer

   1 #!/usr/bin/env python
   2 import pika
   3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
   4 channel = connection.channel()
   5 channel.queue_declare(queue='hello')
   6 print ' [*] Waiting for messages. To exit press CTRL+C'
   7 
   8 def callback(ch, method, properties, body):
   9     print " [x] Received %r" % (body,)
  10 
  11 channel.basic_consume(callback, queue='hello', no_ack=True)
  12 channel.start_consuming()

Topic

Publish/Subscribe, sent message goes to all topic subscribers.

produceTopic.py

   1 #!/usr/bin/env python
   2 import pika
   3 import sys
   4 
   5 if __name__=='__main__':
   6     exchangeName='broadcast'
   7     message = sys.argv[1]
   8     repetition= int( sys.argv[2] )
   9 
  10     connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  11     channel = connection.channel()
  12     channel.exchange_declare(exchange=exchangeName, type='fanout')
  13     for count in range(1,repetition+1):
  14         msg='%s %d'%(message,count)
  15         channel.basic_publish(exchange=exchangeName, routing_key='', body=msg)
  16         print(" [x] Producer sent message %s to topic %s" % (msg , exchangeName) )
  17 
  18     connection.close()

consumeTopic.py

   1 #!/usr/bin/env python
   2 import pika
   3 import sys
   4 
   5 def consumeTopic(ch, method, properties, body):
   6     print(" [x] %s received message %s" % (sys.argv[1] ,  body  ) )
   7 
   8 if __name__=='__main__':
   9     topicName='broadcast'
  10     connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))
  11     channel = connection.channel()
  12     channel.exchange_declare(exchange=topicName, type='fanout')
  13     result = channel.queue_declare(exclusive=True)
  14     privateQueue = result.method.queue
  15     channel.queue_bind(exchange=topicName, queue=privateQueue)
  16 
  17     print(' [*] %s waiting for broadcasts. To exit press CTRL+C'%( sys.argv[1] ) )
  18     channel.basic_consume(consumeTopic, queue=privateQueue, no_ack=True)
  19     channel.start_consuming()

Queue

Several subscribers are bound to a queue. Each message is only consumed/processed by one of the subscribers.

produceQueue.py

   1 #!/usr/bin/env python
   2 import pika
   3 import sys
   4 
   5 if __name__=='__main__':
   6     queueName='hello'
   7     connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
   8     channel = connection.channel()
   9     channel.queue_declare(queue=queueName)
  10     payload = sys.argv[1]
  11     maxCount = int(sys.argv[2]) + 1
  12 
  13     for count in range(1,maxCount):
  14         msg = '%s %d!'%(payload , count)
  15         channel.basic_publish(exchange='',  routing_key=queueName, body=msg  )
  16         print(" [x] Sent message '%s' to queue %s"%(msg , queueName) )
  17     connection.close()

consumeQueue.py

   1 #!/usr/bin/env python
   2 import pika
   3 import sys
   4 
   5 def messageHandler(ch, method, properties, body):
   6     print(" [x] %s received message %s" % (sys.argv[1], body) )
   7 
   8 
   9 if __name__=='__main__':
  10     queueName='hello'
  11     consumerName =  sys.argv[1]
  12     connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  13     channel = connection.channel()
  14     channel.queue_declare(queue=queueName)
  15     print(' [*] %s  Waiting for messages. To exit press CTRL+C'%( consumerName ) )
  16     channel.basic_consume(messageHandler, queue=queueName, no_ack=True)
  17     try:
  18         channel.start_consuming()
  19     except KeyboardInterrupt as ex:
  20         pass

Enable STOMP in RabbitMQ

See STOMP and https://stomp.github.io/ .

   1 rabbitmq-plugins enable rabbitmq_stomp
   2 pip install stomp.py #pip2 pip3
   3 

stompProduce.py

   1 import time
   2 import sys
   3 import stomp
   4 
   5 if __name__=='__main__':
   6     if len(sys.argv)==3:
   7         conn = stomp.Connection([('127.0.0.1',61613)])
   8         conn.start()
   9         conn.connect('rmq', 'rmq', wait=True)
  10         print('Sending %s to %s'%( sys.argv[2] , sys.argv[1] ))
  11         conn.send(body=sys.argv[2], destination=sys.argv[1])
  12         conn.disconnect()
  13     else:
  14         print('Usage: stompProduce.py <destination /topic/topicx /queue/queuex> <message>')

stompConsume.py

   1 import time
   2 import sys
   3 import stomp
   4 
   5 class MessageListener(stomp.ConnectionListener):
   6     def __init__(self,consumerName):
   7         self.consumerName = consumerName
   8     def on_error(self, headers, message):
   9         print('received an error "%s" %s ' %( message , headers ))
  10     def on_message(self, headers, message):
  11         print('%s received a message "%s"' % (self.consumerName, message ))
  12 
  13 if __name__=='__main__':
  14     if len(sys.argv)==3:
  15         conn = stomp.Connection([('127.0.0.1',61613)])
  16         conn.set_listener('', MessageListener(sys.argv[1]))
  17         conn.start()
  18         conn.connect('rmq', 'rmq', wait=True)
  19         conn.subscribe(destination=sys.argv[2], id=1, ack='auto')
  20         print('Waiting for messages for %s'%(sys.argv[2] ) )
  21         try:
  22             while(True):
  23                 time.sleep(1)
  24         except KeyboardInterrupt as ex:
  25             pass
  26         conn.disconnect()
  27         print('Disconnected')
  28     else:
  29         print('Usage: stompComsume.pt <Consumer name> </topic/topicx or /queue/queuex to consume messages from>')
  • MoinMoin Powered
  • Python Powered
  • GPL licensed
  • Valid HTML 4.01