#!/bin/bash /usr/scripts/python.sh # -*- coding: utf-8 -*- # # This code is a Crans production, based on pika documentation examples # and on the RabbitMQ tutorials. # # This code is placed under the terms of GNU General Public License v3. # # Author : Pierre-Elliott Bécue # Date : 18/05/2014 # License : GPLv3 import sys import pika import cranslib.clogger as clogger DEBUG = False logger = clogger.CLogger("cmbprod", sys.argv[0].split("/")[-1].replace(".py", ""), "info", DEBUG) class BasicProducer(object): """ This is CMB basic producer, it doesn't have to be asynchronous or anything, it's just a basic object that sends messages. """ def __init__(self, url, exchange_name, app_id, port=5672, credentials=None, ssl=False): """Init """ self._connection = None self._channel = None self._exchange_name = exchange_name self._app_id = app_id self._url = url self._port = port self._credentials = credentials self._ssl = ssl logger.info("Initializing with app_id %s" % (self._app_id,)) def connect(self): """Opens a basic connection which handles almost anything we need. """ logger.info("Connecting to %s…" % (self._url)) return pika.BlockingConnection(pika.ConnectionParameters(host=self._url, port=self._port, credentials=self._credentials, ssl=self._ssl)) def get_chan(self): """Creates a channel and reopens connection if needed.""" try: logger.info("Opening channel…") self._channel = self._connection.channel() logger.info("Channel active.") except AttributeError: logger.warning("Connection no longer working, reconnecting…") self._connection = self.connect() self.get_chan() def send_message(self, routing_key, body): """Sends basic message with app_id and body """ try: logger.info("Sending message %s with routing_key %s." % (body, routing_key)) self._channel.basic_publish(exchange=self._exchange_name, routing_key=routing_key, body=body, properties=pika.BasicProperties( delivery_mode=2, app_id=self._app_id, )) except: print "Failure on cmb.producer" raise def close(self): """Closes the connection """ self._channel.close() self._connection.close()