From 3e37644f258d8726ec9d2486442a9d2ae71b31b6 Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: Dec 04 2018 04:49:37 +0000 Subject: remove old messagebus plugin There is no more known user. If you need messaging functionality, you could migrate to protonmsg plugin. Fixes: https://pagure.io/koji/issue/878 --- diff --git a/plugins/hub/messagebus.conf b/plugins/hub/messagebus.conf deleted file mode 100644 index fe18a1c..0000000 --- a/plugins/hub/messagebus.conf +++ /dev/null @@ -1,24 +0,0 @@ -# config file for the Koji messagebus plugin - -[broker] -host = amqp.example.com -port = 5671 -ssl = true -timeout = 10 -heartbeat = 60 -# PLAIN options -auth = PLAIN -username = guest -password = guest -# GSSAPI options -# auth = GSSAPI -# keytab = /etc/koji-hub/plugins/koji-messagebus.keytab -# principal = messagebus/koji.example.com@EXAMPLE.COM - -[exchange] -name = koji.events -type = topic -durable = true - -[topic] -prefix = koji.event diff --git a/plugins/hub/messagebus.py b/plugins/hub/messagebus.py deleted file mode 100644 index 615f77a..0000000 --- a/plugins/hub/messagebus.py +++ /dev/null @@ -1,278 +0,0 @@ -# Koji callback for sending notifications about events to a messagebus (amqp broker) -# Copyright (c) 2009-2014 Red Hat, Inc. -# -# Authors: -# Mike Bonnet - -from __future__ import absolute_import -from koji import PluginError -from koji.context import context -from koji.plugin import callbacks, callback, ignore_error, convert_datetime -import six.moves.configparser -import logging -import qpid.messaging -import qpid.messaging.transports -from ssl import wrap_socket -import socket -import os -try: - import krbV -except ImportError: # pragma: no cover - krbV = None - -MAX_KEY_LENGTH = 255 -CONFIG_FILE = '/etc/koji-hub/plugins/messagebus.conf' - -config = None -session = None -target = None - -def connect_timeout(host, port, timeout): - for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM): - af, socktype, proto, canonname, sa = res - sock = socket.socket(af, socktype, proto) - sock.settimeout(timeout) - try: - sock.connect(sa) - break - except socket.error: - sock.close() - else: - # If we got here then we couldn't connect (yet) - raise - return sock - -class tlstimeout(qpid.messaging.transports.tls): - def __init__(self, conn, host, port): - self.socket = connect_timeout(host, port, getattr(conn, '_timeout')) - if conn.tcp_nodelay: - self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - self.tls = wrap_socket(self.socket, keyfile=conn.ssl_keyfile, certfile=conn.ssl_certfile, ca_certs=conn.ssl_trustfile) - self.socket.setblocking(0) - self.state = None - self.write_retry = None - -qpid.messaging.transports.TRANSPORTS['tls+timeout'] = tlstimeout - -class Connection(qpid.messaging.Connection): - """ - A connection class which supports a timeout option - to the establish() method. Only necessary until - upstream Apache Qpid commit 1487578 is available in - a supported release. - """ - @staticmethod - def establish(url=None, timeout=None, **options): - conn = Connection(url, **options) - conn._timeout = timeout - conn.open() - return conn - - def _wait(self, predicate, timeout=None): - if timeout is None and hasattr(self, '_timeout'): - timeout = self._timeout - return qpid.messaging.Connection._wait(self, predicate, timeout) - - -def get_config(): - global config - if config: - return config - - config = six.moves.configparser.SafeConfigParser() - config.read(CONFIG_FILE) - if not config.has_option('broker', 'timeout'): - config.set('broker', 'timeout', '60') - if not config.has_option('broker', 'heartbeat'): - config.set('broker', 'heartbeat', '60') - return config - - -def get_sender(): - global session, target - if session and target: - try: - return session.sender(target) - except: - logging.getLogger('koji.plugin.messagebus').warning('Error getting session, will retry', exc_info=True) - session = None - target = None - - config = get_config() - - if config.getboolean('broker', 'ssl'): - url = 'amqps://' - else: - url = 'amqp://' - auth = config.get('broker', 'auth') - if auth == 'PLAIN': - url += config.get('broker', 'username') + '/' - url += config.get('broker', 'password') + '@' - elif auth == 'GSSAPI': - if krbV is None: - # TODO: port this to python-gssapi - raise PluginError('krbV module not installed') - ccname = 'MEMORY:messagebus' - os.environ['KRB5CCNAME'] = ccname - ctx = krbV.default_context() - ccache = krbV.CCache(name=ccname, context=ctx) - cprinc = krbV.Principal(name=config.get('broker', 'principal'), context=ctx) - ccache.init(principal=cprinc) - keytab = krbV.Keytab(name='FILE:' + config.get('broker', 'keytab'), context=ctx) - ccache.init_creds_keytab(principal=cprinc, keytab=keytab) - else: - raise PluginError('unsupported auth type: %s' % auth) - - url += config.get('broker', 'host') + ':' - url += config.get('broker', 'port') - - conn = Connection.establish(url, - sasl_mechanisms=config.get('broker', 'auth'), - transport='tls+timeout', - timeout=config.getfloat('broker', 'timeout'), - heartbeat=config.getint('broker', 'heartbeat')) - sess = conn.session() - tgt = """%s; - { create: sender, - assert: always, - node: { type: topic, - durable: %s, - x-declare: { exchange: "%s", - type: %s } } }""" % \ - (config.get('exchange', 'name'), config.getboolean('exchange', 'durable'), - config.get('exchange', 'name'), config.get('exchange', 'type')) - sender = sess.sender(tgt) - session = sess - target = tgt - - return sender - -def _token_append(tokenlist, val): - # Replace any periods with underscores so we have a deterministic number of tokens - val = val.replace('.', '_') - tokenlist.append(val) - -def get_message_subject(msgtype, *args, **kws): - key = [config.get('topic', 'prefix'), msgtype] - - if msgtype == 'PackageListChange': - _token_append(key, kws['tag']['name']) - _token_append(key, kws['package']['name']) - elif msgtype == 'TaskStateChange': - _token_append(key, kws['info']['method']) - _token_append(key, kws['attribute']) - elif msgtype == 'BuildStateChange': - info = kws['info'] - _token_append(key, kws['attribute']) - _token_append(key, info['name']) - elif msgtype == 'Import': - _token_append(key, kws['type']) - elif msgtype in ('Tag', 'Untag'): - _token_append(key, kws['tag']['name']) - build = kws['build'] - _token_append(key, build['name']) - _token_append(key, kws['user']['name']) - elif msgtype == 'RepoInit': - _token_append(key, kws['tag']['name']) - elif msgtype == 'RepoDone': - _token_append(key, kws['repo']['tag_name']) - - key = '.'.join(key) - key = key[:MAX_KEY_LENGTH] - return key - -def get_message_headers(msgtype, *args, **kws): - headers = {'type': msgtype} - - if msgtype == 'PackageListChange': - headers['tag'] = kws['tag']['name'] - headers['package'] = kws['package']['name'] - elif msgtype == 'TaskStateChange': - headers['id'] = kws['info']['id'] - headers['parent'] = kws['info']['parent'] - headers['method'] = kws['info']['method'] - headers['attribute'] = kws['attribute'] - headers['old'] = kws['old'] - headers['new'] = kws['new'] - elif msgtype == 'BuildStateChange': - info = kws['info'] - headers['name'] = info['name'] - headers['version'] = info['version'] - headers['release'] = info['release'] - headers['attribute'] = kws['attribute'] - headers['old'] = kws['old'] - headers['new'] = kws['new'] - elif msgtype == 'Import': - headers['importType'] = kws['type'] - elif msgtype in ('Tag', 'Untag'): - headers['tag'] = kws['tag']['name'] - build = kws['build'] - headers['name'] = build['name'] - headers['version'] = build['version'] - headers['release'] = build['release'] - headers['user'] = kws['user']['name'] - elif msgtype == 'RepoInit': - headers['tag'] = kws['tag']['name'] - elif msgtype == 'RepoDone': - headers['tag'] = kws['repo']['tag_name'] - - return headers - -@callback(*[c for c in callbacks.keys() if c.startswith('post') - and c != 'postCommit']) -@ignore_error -@convert_datetime -def prep_message(cbtype, *args, **kws): - if cbtype.startswith('post'): - msgtype = cbtype[4:] - else: - msgtype = cbtype[3:] - - data = kws.copy() - if args: - data['args'] = list(args) - - config = get_config() - exchange_type = config.get('exchange', 'type') - if exchange_type == 'topic': - subject = get_message_subject(msgtype, *args, **kws) - message = qpid.messaging.Message(subject=subject, content=data) - elif exchange_type == 'headers': - headers = get_message_headers(msgtype, *args, **kws) - message = qpid.messaging.Message(properties=headers, content=data) - else: - raise PluginError('unsupported exchange type: %s' % exchange_type) - - messages = getattr(context, 'messagebus_plugin_messages', None) - if messages is None: - messages = [] - context.messagebus_plugin_messages = messages - messages.append(message) - - -@callback('postCommit') -@ignore_error -def send_messages(cbtype, *args, **kws): - '''Send the messages cached by prep_message''' - - logger = logging.getLogger('koji.plugin.messagebus') - config = get_config() - messages = getattr(context, 'messagebus_plugin_messages', []) - if not messages: - return - test_mode = False - if config.has_option('broker', 'test_mode'): - test_mode = config.getboolean('broker', 'test_mode') - if test_mode: - logger.debug('test mode: skipping broker connection') - for message in messages: - logger.debug('test mode: skipping message: %r', message) - else: - sender = get_sender() - for message in messages: - sender.send(message, sync=False, - timeout=config.getfloat('broker', 'timeout')) - sender.close(timeout=config.getfloat('broker', 'timeout')) - - # koji should do this for us, but just in case... - del context.messagebus_plugin_messages