scripts/cmb/producer.py
2014-10-23 23:55:49 +02:00

84 lines
2.7 KiB
Python

#!/bin/bash /usr/scripts/python.sh
# -*- coding: utf-8 -*-
#
# This code is a Crans production, based on pika documentation examples
# and on the RabbitMQ tutorials.
#
# This code is placed under the terms of GNU General Public License v3.
#
# Author : Pierre-Elliott Bécue <becue@crans.org>
# Date : 18/05/2014
# License : GPLv3
import sys
import pika
import cranslib.clogger as clogger
DEBUG = False
logger = clogger.CLogger("cmbprod", sys.argv[0].split("/")[-1].replace(".py", ""), "info", DEBUG)
class BasicProducer(object):
"""
This is CMB basic producer, it doesn't have to be asynchronous nor anything,
it's just a basic object that sends messages.
"""
def __init__(self, url, exchange_name, app_id, port=5672, credentials=None, ssl=False):
"""Init
"""
self._connection = None
self._channel = None
self._exchange_name = exchange_name
self._app_id = app_id
self._url = url
self._port = port
self._credentials = credentials
self._ssl = ssl
logger.info("Initializing with app_id %s" % (self._app_id,))
def connect(self):
"""Opens a basic connection which handles almost anything we need.
"""
logger.info("Connecting to %s" % (self._url))
return pika.BlockingConnection(pika.ConnectionParameters(host=self._url, port=self._port, credentials=self._credentials, ssl=self._ssl))
def get_chan(self):
"""Creates a channel and reopens connection if needed."""
try:
logger.info("Opening channel…")
self._channel = self._connection.channel()
logger.info("Channel active.")
except AttributeError:
logger.warning("Connection no longer working, reconnecting…")
self._connection = self.connect()
self.get_chan()
def send_message(self, routing_key, body):
"""Sends basic message with app_id and body
"""
try:
logger.info("Sending message %s with routing_key %s." % (body, routing_key))
self._channel.basic_publish(exchange=self._exchange_name,
routing_key=routing_key,
body=body,
properties=pika.BasicProperties(
delivery_mode=2,
app_id=self._app_id,
))
except:
print "Failure on cmb.producer"
raise
def close(self):
"""Closes the connection
"""
self._channel.close()
self._connection.close()