= RabbitMQ =

== Erlang installation on Slackware ==
Requires Erlang:
{{{#!highlight sh
su
cd /tmp
wget http://slackbuilds.org/slackbuilds/13.0/development/erlang-otp.tar.gz
tar xvzf erlang-otp.tar.gz
cd erlang-otp
wget http://www.erlang.org/download/otp_src_R13B03.tar.gz
wget http://www.erlang.org/download/otp_doc_man_R13B03.tar.gz
./erlang-otp.SlackBuild
installpkg /tmp/erlang-otp-13B03-i486-1_SBo.tgz
}}}

== Slackbuild ==
{{{#!highlight sh
wget http://slackbuilds.org/slackbuilds/14.1/development/erlang-otp.tar.gz
tar xvzf erlang-otp.tar.gz
cd erlang-otp
wget http://www.erlang.org/download/otp_src_R16B02.tar.gz
wget http://www.erlang.org/download/otp_doc_man_R16B02.tar.gz
./erlang-otp.SlackBuild
installpkg  /tmp/erlang-otp-16B02-x86_64-1_SBo.tgz
}}}

== UNIX server installation ==
{{{#!highlight sh
cd /opt
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.3.1/rabbitmq-server-generic-unix-3.3.1.tar.gz
tar xvzf rabbitmq-server-generic-unix-3.3.1.tar.gz 
ln -s rabbitmq_server-3.3.1 rabbitmq
/opt/rabbitmq/sbin/rabbitmq-server 
}}}

== Python client ==
{{{#!highlight sh
pip install pika # python client
}}}

== Examples ==
http://www.rabbitmq.com/tutorials/tutorial-one-python.html

=== Producer ===
{{{#!highlight python
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',  routing_key='hello', body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()
}}}

=== Consumer ===
{{{#!highlight python
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

channel.basic_consume(callback, queue='hello', no_ack=True)
channel.start_consuming()
}}}

== Topic ==
Publish/Subscribe, sent message goes to all topic subscribers.

=== produceTopic.py ===
{{{#!highlight python
#!/usr/bin/env python
import pika
import sys

if __name__=='__main__':
    exchangeName='broadcast'
    message = sys.argv[1]
    repetition= int( sys.argv[2] )

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.exchange_declare(exchange=exchangeName, type='fanout')
    for count in range(1,repetition+1):
        msg='%s %d'%(message,count)
        channel.basic_publish(exchange=exchangeName, routing_key='', body=msg)
        print(" [x] Producer sent message %s to topic %s" % (msg , exchangeName) )

    connection.close()
}}}

=== consumeTopic.py ===
{{{#!highlight python
#!/usr/bin/env python
import pika
import sys

def consumeTopic(ch, method, properties, body):
    print(" [x] %s received message %s" % (sys.argv[1] ,  body  ) )

if __name__=='__main__':
    topicName='broadcast'
    connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))
    channel = connection.channel()
    channel.exchange_declare(exchange=topicName, type='fanout')
    result = channel.queue_declare(exclusive=True)
    privateQueue = result.method.queue
    channel.queue_bind(exchange=topicName, queue=privateQueue)

    print(' [*] %s waiting for broadcasts. To exit press CTRL+C'%( sys.argv[1] ) )
    channel.basic_consume(consumeTopic, queue=privateQueue, no_ack=True)
    channel.start_consuming()
}}}

== Queue ==
Several subscribers are bound to a queue. 
Each message is only consumed/processed by one of the subscribers.

=== produceQueue.py ===
{{{#!highlight python
#!/usr/bin/env python
import pika
import sys

if __name__=='__main__':
    queueName='hello'
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue=queueName)
    payload = sys.argv[1]
    maxCount = int(sys.argv[2]) + 1

    for count in range(1,maxCount):
        msg = '%s %d!'%(payload , count)
        channel.basic_publish(exchange='',  routing_key=queueName, body=msg  )
        print(" [x] Sent message '%s' to queue %s"%(msg , queueName) )
    connection.close()
}}}

=== consumeQueue.py ===
{{{#!highlight python
#!/usr/bin/env python
import pika
import sys

def messageHandler(ch, method, properties, body):
    print(" [x] %s received message %s" % (sys.argv[1], body) )


if __name__=='__main__':
    queueName='hello'
    consumerName =  sys.argv[1]
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue=queueName)
    print(' [*] %s  Waiting for messages. To exit press CTRL+C'%( consumerName ) )
    channel.basic_consume(messageHandler, queue=queueName, no_ack=True)
    try:
        channel.start_consuming()
    except KeyboardInterrupt as ex:
        pass
}}}

== Enable STOMP in RabbitMQ ==
See [[STOMP]] and https://stomp.github.io/ .

{{{#!highlight sh
 * rabbitmq-plugins enable rabbitmq_stomp
 * pip install stomp.py #pip2 pip3
}}}

=== stompProduce.py ===
{{{#!highlight python
import time
import sys
import stomp

if __name__=='__main__':
    if len(sys.argv)==3:
        conn = stomp.Connection([('127.0.0.1',61613)])
        conn.start()
        conn.connect('rmq', 'rmq', wait=True)
        print('Sending %s to %s'%( sys.argv[2] , sys.argv[1] ))
        conn.send(body=sys.argv[2], destination=sys.argv[1])
        conn.disconnect()
    else:
        print('Usage: stompProduce.py <destination /topic/topicx /queue/queuex> <message>')
}}}

=== stompConsume.py ===
{{{#!highlight python
import time
import sys
import stomp

class MessageListener(stomp.ConnectionListener):
    def __init__(self,consumerName):
        self.consumerName = consumerName
    def on_error(self, headers, message):
        print('received an error "%s" %s ' %( message , headers ))
    def on_message(self, headers, message):
        print('%s received a message "%s"' % (self.consumerName, message ))

if __name__=='__main__':
    if len(sys.argv)==3:
        conn = stomp.Connection([('127.0.0.1',61613)])
        conn.set_listener('', MessageListener(sys.argv[1]))
        conn.start()
        conn.connect('rmq', 'rmq', wait=True)
        conn.subscribe(destination=sys.argv[2], id=1, ack='auto')
        print('Waiting for messages for %s'%(sys.argv[2] ) )
        try:
            while(True):
                time.sleep(1)
        except KeyboardInterrupt as ex:
            pass
        conn.disconnect()
        print('Disconnected')
    else:
        print('Usage: stompComsume.pt <Consumer name> </topic/topicx or /queue/queuex to consume messages from>')
}}}