415 lines
16 KiB
Python
Executable file
415 lines
16 KiB
Python
Executable file
#!/bin/bash /usr/scripts/python.sh
|
|
# -*- coding: utf-8 -*-
|
|
#
|
|
# This code is a Crans production, based on pika documentation examples
|
|
# and on the RabbitMQ tutorials.
|
|
#
|
|
# This code is placed under the terms of GNU General Public License v3.
|
|
#
|
|
# Author : Pierre-Elliott Bécue <becue@crans.org>
|
|
# Date : 27/04/2014
|
|
# License : GPLv3
|
|
|
|
import sys
|
|
import pika
|
|
import cranslib.clogger as clogger
|
|
|
|
logger = clogger.CLogger(sys.argv[0].split("/")[-1].replace(".py", ""), "info")
|
|
|
|
class AsynchronousConsumer(object):
|
|
"""AsynchronousConsumer class
|
|
|
|
"""
|
|
|
|
def __init__(self, url, exchange_name, exchange_type):
|
|
"""Create a new instance of the asynchronous consumer.
|
|
|
|
"""
|
|
self._connection = None
|
|
self._channel = None
|
|
self._closing = False
|
|
self._consumer_tag = {}
|
|
self._queues = {}
|
|
self._exchange_name = exchange_name
|
|
self._exchange_type = exchange_type
|
|
self._url = url
|
|
|
|
#+--------------------------------+
|
|
#| Connection workers |
|
|
#+--------------------------------+
|
|
|
|
def connect(self):
|
|
"""Opens connection to the RabbitMQ server, and stores the connection
|
|
handle. Invokes the callback self.on_connection_open when things are
|
|
okay.
|
|
|
|
This callback will execute further operations.
|
|
|
|
"""
|
|
logger.info("Opening connection to RabbitMQ AMQP host %s…", self._url)
|
|
return pika.SelectConnection(pika.ConnectionParameters(self._url),
|
|
self.on_connection_open,
|
|
stop_ioloop_on_close=False
|
|
)
|
|
|
|
|
|
def on_connection_open(self, connection):
|
|
"""This method is called by pika once the connection to RabbitMQ has
|
|
been established. It passes the handle to the connection object in
|
|
case we need it.
|
|
|
|
Opens a channel when everything is ok.
|
|
|
|
:type connection: pika.SelectConnection
|
|
|
|
"""
|
|
logger.info('Connection to %s opened', self._url)
|
|
|
|
self.add_on_connection_close_callback()
|
|
self.open_channel()
|
|
|
|
def add_on_connection_close_callback(self):
|
|
"""This method adds an on close callback that will be invoked by pika
|
|
when RabbitMQ closes the connection to the publisher unexpectedly.
|
|
|
|
"""
|
|
logger.info('Adding connection close callback')
|
|
self._connection.add_on_close_callback(self.on_connection_closed)
|
|
|
|
def on_connection_closed(self, connection, reply_code, reply_text):
|
|
"""This method is invoked by pika when the connection to RabbitMQ is
|
|
closed unexpectedly. Since it is unexpected, we will reconnect to
|
|
RabbitMQ if it disconnects.
|
|
|
|
:param pika.connection.Connection connection: The closed connection obj
|
|
:param int reply_code: The server provided reply_code if given
|
|
:param str reply_text: The server provided reply_text if given
|
|
|
|
"""
|
|
self._channel = None
|
|
if self._closing:
|
|
self._connection.ioloop.stop()
|
|
else:
|
|
logger.warning('Connection closed, reopening in 5 seconds: (%s) %s',
|
|
reply_code, reply_text)
|
|
|
|
def reconnect(self):
|
|
"""Will be invoked by the IOLoop timer if the connection is
|
|
closed. See the on_connection_closed method.
|
|
|
|
"""
|
|
# This is the old connection IOLoop instance, stop its ioloop
|
|
self._connection.ioloop.stop()
|
|
|
|
if not self._closing:
|
|
# Create a new connection
|
|
self._connection = self.connect()
|
|
|
|
# There is now a new connection, needs a new ioloop to run
|
|
self._connection.ioloop.start()
|
|
|
|
def close_connection(self):
|
|
"""This method closes the connection to RabbitMQ."""
|
|
logger.info('Closing connection')
|
|
self._connection.close()
|
|
|
|
#+-------------------------------------+
|
|
#| Channel workspace |
|
|
#+-------------------------------------+
|
|
|
|
def open_channel(self):
|
|
"""Open a new channel with RabbitMQ by issuing the Channel.Open RPC
|
|
command. When RabbitMQ responds that the channel is open, the
|
|
on_channel_open callback will be invoked by pika.
|
|
|
|
"""
|
|
logger.info('Creating a new channel…')
|
|
self._connection.channel(on_open_callback=self.on_channel_open)
|
|
|
|
def on_channel_open(self, channel):
|
|
"""This method is invoked by pika when the channel has been opened.
|
|
The channel object is passed in so we can make use of it.
|
|
|
|
Since the channel is now open, we'll declare the exchange to use.
|
|
|
|
:param pika.channel.Channel channel: The channel object
|
|
|
|
"""
|
|
logger.info('Channel created.')
|
|
self._channel = channel
|
|
self.add_on_channel_close_callback()
|
|
self.setup_exchange(self._exchange_name, self._exchange_type)
|
|
|
|
def add_on_channel_close_callback(self):
|
|
"""This method tells pika to call the on_channel_closed method if
|
|
RabbitMQ unexpectedly closes the channel.
|
|
|
|
"""
|
|
logger.info('Adding channel close callback')
|
|
self._channel.add_on_close_callback(self.on_channel_closed)
|
|
|
|
def on_channel_closed(self, channel, reply_code, reply_text):
|
|
"""Invoked by pika when RabbitMQ unexpectedly closes the channel.
|
|
Channels are usually closed if you attempt to do something that
|
|
violates the protocol, such as re-declare an exchange or queue with
|
|
different parameters. In this case, we'll close the connection
|
|
to shutdown the object.
|
|
|
|
:param pika.channel.Channel: The closed channel
|
|
:param int reply_code: The numeric reason the channel was closed
|
|
:param str reply_text: The text reason the channel was closed
|
|
|
|
"""
|
|
logger.info("Channel %i was closed : (%s) %s",
|
|
channel, reply_code, reply_text)
|
|
logger.info('Channel was closed : (%s) %s',
|
|
reply_code, reply_text)
|
|
if not self._closing:
|
|
self.close_connection()
|
|
|
|
def close_channel(self):
|
|
"""Call to close the channel with RabbitMQ cleanly by issuing the
|
|
Channel.Close RPC command.
|
|
|
|
"""
|
|
logger.info('Closing the channel')
|
|
self._channel.close()
|
|
|
|
#+-------------------------------------+
|
|
#| Exchange methods |
|
|
#+-------------------------------------+
|
|
|
|
def setup_exchange(self, exchange_name, exchange_type):
|
|
"""Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC
|
|
command. When it is complete, the on_exchange_declareok method will
|
|
be invoked by pika.
|
|
|
|
:param str|unicode exchange_name: The name of the exchange to declare
|
|
|
|
"""
|
|
logger.info('Declaring exchange %s', exchange_name)
|
|
self._channel.exchange_declare(callback=self.on_exchange_declareok,
|
|
exchange=exchange_name,
|
|
exchange_type=exchange_type,
|
|
durable=True
|
|
)
|
|
|
|
def on_exchange_declareok(self, frame):
|
|
"""Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
|
|
command.
|
|
|
|
:param pika.Frame.Method frame: Exchange.DeclareOk response frame
|
|
|
|
"""
|
|
logger.info('Exchange declared')
|
|
self.setup_queues()
|
|
|
|
#+-------------------------------------+
|
|
#| Queue methods |
|
|
#+-------------------------------------+
|
|
|
|
def add_queue(self, queue_name, routing_key=''):
|
|
"""Add to self._queues the queue and the routing key
|
|
|
|
:param str|unicode queue_name: The name of the queue to add
|
|
:param str|unicode routing_key: The routing key associated
|
|
|
|
"""
|
|
self._queues[queue_name] = routing_key
|
|
|
|
def setup_queues(self):
|
|
"""Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
|
|
command. When it is complete, the on_queue_declareok method will
|
|
be invoked by pika.
|
|
|
|
"""
|
|
for (queue_name, routing_key) in self._queues.items():
|
|
logger.info('Setting queue %s with routing key %s' % (queue_name, routing_key))
|
|
self._channel.queue_declare(
|
|
callback=self.on_queue_declareok,
|
|
queue=queue_name,
|
|
durable=True
|
|
)
|
|
|
|
def on_queue_declareok(self, method_frame):
|
|
"""Method invoked by pika when the Queue.Declare RPC call made in
|
|
setup_queues has completed. In this method we will bind the queue
|
|
and exchange together with the routing key by issuing the Queue.Bind
|
|
RPC command. When this command is complete, the on_bindok method will
|
|
be invoked by pika.
|
|
|
|
:param pika.frame.Method method_frame: The Queue.DeclareOk frame
|
|
|
|
"""
|
|
queue_name = method_frame.method.queue
|
|
routing_key = self._queues[queue_name]
|
|
logger.info('Binding %s to %s with %s',
|
|
self._exchange_name, queue_name, routing_key)
|
|
def on_bindok(frame):
|
|
"""Invoked by pika when the Queue.Bind method has completed. At this
|
|
point we will start consuming messages by calling start_consuming
|
|
which will invoke the needed RPC commands to start the process.
|
|
|
|
:param pika.frame.Method frame: The Queue.BindOk response frame
|
|
|
|
"""
|
|
logger.info('Queue %s bound.', queue_name)
|
|
self.start_consuming(queue_name)
|
|
self._channel.queue_bind(
|
|
callback=on_bindok,
|
|
queue=queue_name,
|
|
exchange=self._exchange_name,
|
|
routing_key=routing_key
|
|
)
|
|
|
|
def on_bindok(self, frame):
|
|
"""Invoked by pika when the Queue.Bind method has completed. At this
|
|
point we will start consuming messages by calling start_consuming
|
|
which will invoke the needed RPC commands to start the process.
|
|
|
|
:param pika.frame.Method frame: The Queue.BindOk response frame
|
|
|
|
"""
|
|
logger.info('Queue bound')
|
|
self.start_consuming(frame.method.queue)
|
|
|
|
#+-------------------------------------+
|
|
#| Cancel handle |
|
|
#+-------------------------------------+
|
|
|
|
def add_on_cancel_callback(self):
|
|
"""Add a callback that will be invoked if RabbitMQ cancels the consumer
|
|
for some reason. If RabbitMQ does cancel the consumer,
|
|
on_consumer_cancelled will be invoked by pika.
|
|
|
|
"""
|
|
logger.info('Adding consumer cancellation callback')
|
|
self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
|
|
|
|
def on_consumer_cancelled(self, method_frame):
|
|
"""Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer
|
|
receiving messages.
|
|
|
|
:param pika.frame.Method method_frame: The Basic.Cancel frame
|
|
|
|
"""
|
|
logger.info('Consumer was cancelled remotely, shutting down: %r',
|
|
method_frame)
|
|
if self._channel:
|
|
self._channel.close()
|
|
|
|
def on_cancelok(self, frame):
|
|
"""This method is invoked by pika when RabbitMQ acknowledges the
|
|
cancellation of a consumer. At this point we will close the channel.
|
|
This will invoke the on_channel_closed method once the channel has been
|
|
closed, which will in-turn close the connection.
|
|
|
|
:param pika.frame.Method frame: The Basic.CancelOk frame
|
|
|
|
"""
|
|
logger.info('RabbitMQ acknowledged the cancellation of the consumer %s' %(frame.method.consumer_tag,))
|
|
self.close_channel()
|
|
|
|
#+-------------------------------------+
|
|
#| Message handling |
|
|
#+-------------------------------------+
|
|
|
|
def acknowledge_message(self, delivery_tag):
|
|
"""Acknowledge the message delivery from RabbitMQ by sending a
|
|
Basic.Ack RPC method for the delivery tag.
|
|
|
|
:param int delivery_tag: The delivery tag from the Basic.Deliver frame
|
|
|
|
"""
|
|
logger.info('Acknowledging message %s', delivery_tag)
|
|
self._channel.basic_ack(delivery_tag)
|
|
|
|
def on_message(self, channel, basic_deliver, properties, body):
|
|
"""Invoked by pika when a message is delivered from RabbitMQ. The
|
|
channel is passed for your convenience. The basic_deliver object that
|
|
is passed in carries the exchange, routing key, delivery tag and
|
|
a redelivered flag for the message. The properties passed in is an
|
|
instance of BasicProperties with the message properties and the body
|
|
is the message that was sent.
|
|
|
|
:param pika.channel.Channel channel: The channel object
|
|
:param pika.Spec.Basic.Deliver: basic_deliver method
|
|
:param pika.Spec.BasicProperties: properties
|
|
:param str|unicode body: The message body
|
|
|
|
"""
|
|
logger.info('Received message # %s from %s: %s',
|
|
basic_deliver.delivery_tag, properties.app_id, body)
|
|
self.acknowledge_message(basic_deliver.delivery_tag)
|
|
|
|
#+-------------------------------------+
|
|
#| Consumer handling |
|
|
#+-------------------------------------+
|
|
|
|
def stop_consuming(self, queue, tag):
|
|
"""Tell RabbitMQ that you would like to stop consuming by sending the
|
|
Basic.Cancel RPC command.
|
|
|
|
"""
|
|
if self._channel:
|
|
logger.info('Sending a Basic.Cancel RPC command to RabbitMQ for queue %s (tag : %s)' % (queue, tag,))
|
|
self._channel.basic_cancel(self.on_cancelok, tag)
|
|
|
|
def start_consuming(self, queue):
|
|
"""This method sets up the consumer by first calling
|
|
add_on_cancel_callback so that the object is notified if RabbitMQ
|
|
cancels the consumer. It then issues the Basic.Consume RPC command
|
|
which returns the consumer tag that is used to uniquely identify the
|
|
consumer with RabbitMQ. We keep the value to use it when we want to
|
|
cancel consuming. The on_message method is passed in as a callback pika
|
|
will invoke when a message is fully received.
|
|
|
|
"""
|
|
logger.info('Issuing consumer related RPC commands')
|
|
self.add_on_cancel_callback()
|
|
self._consumer_tag[queue] = self._channel.basic_consume(self.on_message,
|
|
queue)
|
|
|
|
def run(self):
|
|
"""Run the example consumer by connecting to RabbitMQ and then
|
|
starting the IOLoop to block and allow the SelectConnection to operate.
|
|
|
|
"""
|
|
logger.info("""Crans Message Broker
|
|
+--------------------------------------------+
|
|
| Welcome on CMB |
|
|
+--------------------------------------------+""")
|
|
self._connection = self.connect()
|
|
self.add_queue("test", "*.*.dhcp")
|
|
self._connection.ioloop.start()
|
|
|
|
def stop(self):
|
|
"""Cleanly shutdown the connection to RabbitMQ by stopping the consumer
|
|
with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok
|
|
will be invoked by pika, which will then closing the channel and
|
|
connection. The IOLoop is started again because this method is invoked
|
|
when CTRL-C is pressed raising a KeyboardInterrupt exception. This
|
|
exception stops the IOLoop which needs to be running for pika to
|
|
communicate with RabbitMQ. All of the commands issued prior to starting
|
|
the IOLoop will be buffered but not processed.
|
|
|
|
"""
|
|
logger.info('Stopping')
|
|
self._closing = True
|
|
for (queue, tag) in self._consumer_tag.items():
|
|
self.stop_consuming(queue, tag)
|
|
self._consumer_tag.pop(queue)
|
|
self.close_channel()
|
|
self.close_connection()
|
|
logger.info('Stopped')
|
|
|
|
def main():
|
|
example = AsynchronousConsumer("civet.adm.crans.org", "exchange", "topic")
|
|
try:
|
|
example.run()
|
|
except KeyboardInterrupt:
|
|
logger.warning("Caught SIGINT, will now go for shutdown.")
|
|
example.stop()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|