From cc3bdf8b7cb23ae59d070781fd9422ccac5e268b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pierre-Elliott=20B=C3=A9cue?= Date: Wed, 30 Apr 2014 21:30:17 +0200 Subject: [PATCH] [cmb] Un message broker asynchrone pour RabbitMQ * Il n'y a que consumer pour l'instant, le producer viendra quand on en aura besoin --- cmb/__init__.py | 1 + cmb/consumer.py | 415 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 416 insertions(+) create mode 100644 cmb/__init__.py create mode 100755 cmb/consumer.py diff --git a/cmb/__init__.py b/cmb/__init__.py new file mode 100644 index 00000000..119d52ee --- /dev/null +++ b/cmb/__init__.py @@ -0,0 +1 @@ +from consumer import AsynchronousConsumer diff --git a/cmb/consumer.py b/cmb/consumer.py new file mode 100755 index 00000000..5471c563 --- /dev/null +++ b/cmb/consumer.py @@ -0,0 +1,415 @@ +#!/bin/bash /usr/scripts/python.sh +# -*- coding: utf-8 -*- +# +# This code is a Crans production, based on pika documentation examples +# and on the RabbitMQ tutorials. +# +# This code is placed under the terms of GNU General Public License v3. +# +# Author : Pierre-Elliott Bécue +# Date : 27/04/2014 +# License : GPLv3 + +import sys +import pika +import cranslib.clogger as clogger + +logger = clogger.CLogger(sys.argv[0].split("/")[-1].replace(".py", ""), "info") + +class AsynchronousConsumer(object): + """AsynchronousConsumer class + + """ + + def __init__(self, url, exchange_name, exchange_type): + """Create a new instance of the asynchronous consumer. + + """ + self._connection = None + self._channel = None + self._closing = False + self._consumer_tag = {} + self._queues = {} + self._exchange_name = exchange_name + self._exchange_type = exchange_type + self._url = url + + #+--------------------------------+ + #| Connection workers | + #+--------------------------------+ + + def connect(self): + """Opens connection to the RabbitMQ server, and stores the connection + handle. Invokes the callback self.on_connection_open when things are + okay. + + This callback will execute further operations. + + """ + logger.info("Opening connection to RabbitMQ AMQP host %s…", self._url) + return pika.SelectConnection(pika.ConnectionParameters(self._url), + self.on_connection_open, + stop_ioloop_on_close=False + ) + + + def on_connection_open(self, connection): + """This method is called by pika once the connection to RabbitMQ has + been established. It passes the handle to the connection object in + case we need it. + + Opens a channel when everything is ok. + + :type connection: pika.SelectConnection + + """ + logger.info('Connection to %s opened', self._url) + + self.add_on_connection_close_callback() + self.open_channel() + + def add_on_connection_close_callback(self): + """This method adds an on close callback that will be invoked by pika + when RabbitMQ closes the connection to the publisher unexpectedly. + + """ + logger.info('Adding connection close callback') + self._connection.add_on_close_callback(self.on_connection_closed) + + def on_connection_closed(self, connection, reply_code, reply_text): + """This method is invoked by pika when the connection to RabbitMQ is + closed unexpectedly. Since it is unexpected, we will reconnect to + RabbitMQ if it disconnects. + + :param pika.connection.Connection connection: The closed connection obj + :param int reply_code: The server provided reply_code if given + :param str reply_text: The server provided reply_text if given + + """ + self._channel = None + if self._closing: + self._connection.ioloop.stop() + else: + logger.warning('Connection closed, reopening in 5 seconds: (%s) %s', + reply_code, reply_text) + + def reconnect(self): + """Will be invoked by the IOLoop timer if the connection is + closed. See the on_connection_closed method. + + """ + # This is the old connection IOLoop instance, stop its ioloop + self._connection.ioloop.stop() + + if not self._closing: + # Create a new connection + self._connection = self.connect() + + # There is now a new connection, needs a new ioloop to run + self._connection.ioloop.start() + + def close_connection(self): + """This method closes the connection to RabbitMQ.""" + logger.info('Closing connection') + self._connection.close() + + #+-------------------------------------+ + #| Channel workspace | + #+-------------------------------------+ + + def open_channel(self): + """Open a new channel with RabbitMQ by issuing the Channel.Open RPC + command. When RabbitMQ responds that the channel is open, the + on_channel_open callback will be invoked by pika. + + """ + logger.info('Creating a new channel…') + self._connection.channel(on_open_callback=self.on_channel_open) + + def on_channel_open(self, channel): + """This method is invoked by pika when the channel has been opened. + The channel object is passed in so we can make use of it. + + Since the channel is now open, we'll declare the exchange to use. + + :param pika.channel.Channel channel: The channel object + + """ + logger.info('Channel created.') + self._channel = channel + self.add_on_channel_close_callback() + self.setup_exchange(self._exchange_name, self._exchange_type) + + def add_on_channel_close_callback(self): + """This method tells pika to call the on_channel_closed method if + RabbitMQ unexpectedly closes the channel. + + """ + logger.info('Adding channel close callback') + self._channel.add_on_close_callback(self.on_channel_closed) + + def on_channel_closed(self, channel, reply_code, reply_text): + """Invoked by pika when RabbitMQ unexpectedly closes the channel. + Channels are usually closed if you attempt to do something that + violates the protocol, such as re-declare an exchange or queue with + different parameters. In this case, we'll close the connection + to shutdown the object. + + :param pika.channel.Channel: The closed channel + :param int reply_code: The numeric reason the channel was closed + :param str reply_text: The text reason the channel was closed + + """ + logger.info("Channel %i was closed : (%s) %s", + channel, reply_code, reply_text) + logger.info('Channel was closed : (%s) %s', + reply_code, reply_text) + if not self._closing: + self.close_connection() + + def close_channel(self): + """Call to close the channel with RabbitMQ cleanly by issuing the + Channel.Close RPC command. + + """ + logger.info('Closing the channel') + self._channel.close() + + #+-------------------------------------+ + #| Exchange methods | + #+-------------------------------------+ + + def setup_exchange(self, exchange_name, exchange_type): + """Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC + command. When it is complete, the on_exchange_declareok method will + be invoked by pika. + + :param str|unicode exchange_name: The name of the exchange to declare + + """ + logger.info('Declaring exchange %s', exchange_name) + self._channel.exchange_declare(callback=self.on_exchange_declareok, + exchange=exchange_name, + exchange_type=exchange_type, + durable=True + ) + + def on_exchange_declareok(self, frame): + """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC + command. + + :param pika.Frame.Method frame: Exchange.DeclareOk response frame + + """ + logger.info('Exchange declared') + self.setup_queues() + + #+-------------------------------------+ + #| Queue methods | + #+-------------------------------------+ + + def add_queue(self, queue_name, routing_key=''): + """Add to self._queues the queue and the routing key + + :param str|unicode queue_name: The name of the queue to add + :param str|unicode routing_key: The routing key associated + + """ + self._queues[queue_name] = routing_key + + def setup_queues(self): + """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC + command. When it is complete, the on_queue_declareok method will + be invoked by pika. + + """ + for (queue_name, routing_key) in self._queues.items(): + logger.info('Setting queue %s with routing key %s' % (queue_name, routing_key)) + self._channel.queue_declare( + callback=self.on_queue_declareok, + queue=queue_name, + durable=True + ) + + def on_queue_declareok(self, method_frame): + """Method invoked by pika when the Queue.Declare RPC call made in + setup_queues has completed. In this method we will bind the queue + and exchange together with the routing key by issuing the Queue.Bind + RPC command. When this command is complete, the on_bindok method will + be invoked by pika. + + :param pika.frame.Method method_frame: The Queue.DeclareOk frame + + """ + queue_name = method_frame.method.queue + routing_key = self._queues[queue_name] + logger.info('Binding %s to %s with %s', + self._exchange_name, queue_name, routing_key) + def on_bindok(frame): + """Invoked by pika when the Queue.Bind method has completed. At this + point we will start consuming messages by calling start_consuming + which will invoke the needed RPC commands to start the process. + + :param pika.frame.Method frame: The Queue.BindOk response frame + + """ + logger.info('Queue %s bound.', queue_name) + self.start_consuming(queue_name) + self._channel.queue_bind( + callback=on_bindok, + queue=queue_name, + exchange=self._exchange_name, + routing_key=routing_key + ) + + def on_bindok(self, frame): + """Invoked by pika when the Queue.Bind method has completed. At this + point we will start consuming messages by calling start_consuming + which will invoke the needed RPC commands to start the process. + + :param pika.frame.Method frame: The Queue.BindOk response frame + + """ + logger.info('Queue bound') + self.start_consuming(frame.method.queue) + + #+-------------------------------------+ + #| Cancel handle | + #+-------------------------------------+ + + def add_on_cancel_callback(self): + """Add a callback that will be invoked if RabbitMQ cancels the consumer + for some reason. If RabbitMQ does cancel the consumer, + on_consumer_cancelled will be invoked by pika. + + """ + logger.info('Adding consumer cancellation callback') + self._channel.add_on_cancel_callback(self.on_consumer_cancelled) + + def on_consumer_cancelled(self, method_frame): + """Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer + receiving messages. + + :param pika.frame.Method method_frame: The Basic.Cancel frame + + """ + logger.info('Consumer was cancelled remotely, shutting down: %r', + method_frame) + if self._channel: + self._channel.close() + + def on_cancelok(self, frame): + """This method is invoked by pika when RabbitMQ acknowledges the + cancellation of a consumer. At this point we will close the channel. + This will invoke the on_channel_closed method once the channel has been + closed, which will in-turn close the connection. + + :param pika.frame.Method frame: The Basic.CancelOk frame + + """ + logger.info('RabbitMQ acknowledged the cancellation of the consumer %s' %(frame.method.consumer_tag,)) + self.close_channel() + + #+-------------------------------------+ + #| Message handling | + #+-------------------------------------+ + + def acknowledge_message(self, delivery_tag): + """Acknowledge the message delivery from RabbitMQ by sending a + Basic.Ack RPC method for the delivery tag. + + :param int delivery_tag: The delivery tag from the Basic.Deliver frame + + """ + logger.info('Acknowledging message %s', delivery_tag) + self._channel.basic_ack(delivery_tag) + + def on_message(self, channel, basic_deliver, properties, body): + """Invoked by pika when a message is delivered from RabbitMQ. The + channel is passed for your convenience. The basic_deliver object that + is passed in carries the exchange, routing key, delivery tag and + a redelivered flag for the message. The properties passed in is an + instance of BasicProperties with the message properties and the body + is the message that was sent. + + :param pika.channel.Channel channel: The channel object + :param pika.Spec.Basic.Deliver: basic_deliver method + :param pika.Spec.BasicProperties: properties + :param str|unicode body: The message body + + """ + logger.info('Received message # %s from %s: %s', + basic_deliver.delivery_tag, properties.app_id, body) + self.acknowledge_message(basic_deliver.delivery_tag) + + #+-------------------------------------+ + #| Consumer handling | + #+-------------------------------------+ + + def stop_consuming(self, queue, tag): + """Tell RabbitMQ that you would like to stop consuming by sending the + Basic.Cancel RPC command. + + """ + if self._channel: + logger.info('Sending a Basic.Cancel RPC command to RabbitMQ for queue %s (tag : %s)' % (queue, tag,)) + self._channel.basic_cancel(self.on_cancelok, tag) + + def start_consuming(self, queue): + """This method sets up the consumer by first calling + add_on_cancel_callback so that the object is notified if RabbitMQ + cancels the consumer. It then issues the Basic.Consume RPC command + which returns the consumer tag that is used to uniquely identify the + consumer with RabbitMQ. We keep the value to use it when we want to + cancel consuming. The on_message method is passed in as a callback pika + will invoke when a message is fully received. + + """ + logger.info('Issuing consumer related RPC commands') + self.add_on_cancel_callback() + self._consumer_tag[queue] = self._channel.basic_consume(self.on_message, + queue) + + def run(self): + """Run the example consumer by connecting to RabbitMQ and then + starting the IOLoop to block and allow the SelectConnection to operate. + + """ + logger.info("""Crans Message Broker ++--------------------------------------------+ +| Welcome on CMB | ++--------------------------------------------+""") + self._connection = self.connect() + self.add_queue("test", "*.*.dhcp") + self._connection.ioloop.start() + + def stop(self): + """Cleanly shutdown the connection to RabbitMQ by stopping the consumer + with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok + will be invoked by pika, which will then closing the channel and + connection. The IOLoop is started again because this method is invoked + when CTRL-C is pressed raising a KeyboardInterrupt exception. This + exception stops the IOLoop which needs to be running for pika to + communicate with RabbitMQ. All of the commands issued prior to starting + the IOLoop will be buffered but not processed. + + """ + logger.info('Stopping') + self._closing = True + for (queue, tag) in self._consumer_tag.items(): + self.stop_consuming(queue, tag) + self._consumer_tag.pop(queue) + self.close_channel() + self.close_connection() + logger.info('Stopped') + +def main(): + example = AsynchronousConsumer("civet.adm.crans.org", "exchange", "topic") + try: + example.run() + except KeyboardInterrupt: + logger.warning("Caught SIGINT, will now go for shutdown.") + example.stop() + +if __name__ == '__main__': + main()