81 lines
2.5 KiB
Python
81 lines
2.5 KiB
Python
#!/bin/bash /usr/scripts/python.sh
|
|
# -*- coding: utf-8 -*-
|
|
#
|
|
# This code is a Crans production, based on pika documentation examples
|
|
# and on the RabbitMQ tutorials.
|
|
#
|
|
# This code is placed under the terms of GNU General Public License v3.
|
|
#
|
|
# Author : Pierre-Elliott Bécue <becue@crans.org>
|
|
# Date : 18/05/2014
|
|
# License : GPLv3
|
|
|
|
import sys
|
|
import pika
|
|
import cranslib.clogger as clogger
|
|
|
|
DEBUG = False
|
|
|
|
logger = clogger.CLogger("cmbprod", sys.argv[0].split("/")[-1].replace(".py", ""), "info", DEBUG)
|
|
|
|
class BasicProducer(object):
|
|
"""
|
|
This is CMB basic producer, it doesn't have to be asynchronous or anything,
|
|
it's just a basic object that sends messages.
|
|
|
|
"""
|
|
|
|
def __init__(self, url, exchange_name, app_id):
|
|
"""Init
|
|
|
|
"""
|
|
|
|
self._connection = None
|
|
self._channel = None
|
|
self._exchange_name = exchange_name
|
|
self._app_id = app_id
|
|
self._url = url
|
|
logger.info("Initializing with app_id %s" % (self._app_id,))
|
|
|
|
def connect(self):
|
|
"""Opens a basic connection which handles almost anything we need.
|
|
|
|
"""
|
|
logger.info("Connecting to %s…" % (self._url))
|
|
return pika.BlockingConnection(pika.ConnectionParameters(self._url))
|
|
|
|
def get_chan(self):
|
|
"""Creates a channel and reopens connection if needed."""
|
|
try:
|
|
logger.info("Opening channel…")
|
|
self._channel = self._connection.channel()
|
|
logger.info("Channel active.")
|
|
except AttributeError:
|
|
logger.warning("Connection no longer working, reconnecting…")
|
|
self._connection = self.connect()
|
|
self.get_chan()
|
|
|
|
def send_message(self, routing_key, body):
|
|
"""Sends basic message with app_id and body
|
|
|
|
"""
|
|
try:
|
|
logger.info("Sending message %s with routing_key %s." % (body, routing_key))
|
|
self._channel.basic_publish(exchange=self._exchange_name,
|
|
routing_key=routing_key,
|
|
body=body,
|
|
properties=pika.BasicProperties(
|
|
delivery_mode=2,
|
|
app_id=self._app_id,
|
|
))
|
|
except:
|
|
print "Failure on cmb.producer"
|
|
raise
|
|
|
|
def close(self):
|
|
"""Closes the connection
|
|
|
|
"""
|
|
|
|
self._channel.close()
|
|
self._connection.close()
|