From 8b07fcf5bb9f5934d1152b09b25c982f05ee9cf6 Mon Sep 17 00:00:00 2001 From: Tomas Kopecek Date: May 10 2021 07:51:09 +0000 Subject: PR#2844: protonmsg: use consistent data format for messages Merges #2844 https://pagure.io/koji/pull-request/2844 Fixes: #2846 https://pagure.io/koji/issue/2846 protonmsg: db queue is never cleared Fixes: #2841 https://pagure.io/koji/issue/2841 protonmsg db queue: incorrect data format for _send_msg() --- diff --git a/plugins/hub/protonmsg.py b/plugins/hub/protonmsg.py index 83b3b7c..3fc62b0 100644 --- a/plugins/hub/protonmsg.py +++ b/plugins/hub/protonmsg.py @@ -77,7 +77,7 @@ class TimeoutHandler(MessagingHandler): def send_msgs(self, event): prefix = self.conf.get('broker', 'topic_prefix') for msg in self.msgs: - address = 'topic://' + prefix + '.' + msg[0] + address = 'topic://' + prefix + '.' + msg['address'] if address in self.senders: sender = self.senders[address] self.log.debug('retrieved cached sender for %s', address) @@ -85,15 +85,15 @@ class TimeoutHandler(MessagingHandler): sender = event.container.create_sender(event.connection, target=address) self.log.debug('created new sender for %s', address) self.senders[address] = sender - pmsg = Message(properties=msg[1], body=msg[2]) + pmsg = Message(properties=msg['props'], body=msg['body']) delivery = sender.send(pmsg) - self.log.debug('sent message: %s', msg[1]) + self.log.debug('sent message: %s', msg['props']) self.pending[delivery] = msg def update_pending(self, event): msg = self.pending[event.delivery] del self.pending[event.delivery] - self.log.debug('removed message from self.pending: %s', msg[1]) + self.log.debug('removed message from self.pending: %s', msg['props']) if not self.pending: if self.msgs: self.log.error('%s messages unsent (rejected or released)', len(self.msgs)) @@ -112,17 +112,17 @@ class TimeoutHandler(MessagingHandler): def on_settled(self, event): msg = self.pending[event.delivery] self.msgs.remove(msg) - self.log.debug('removed message from self.msgs: %s', msg[1]) + self.log.debug('removed message from self.msgs: %s', msg['props']) self.update_pending(event) def on_rejected(self, event): msg = self.pending[event.delivery] - self.log.error('message was rejected: %s', msg[1]) + self.log.error('message was rejected: %s', msg['props']) self.update_pending(event) def on_released(self, event): msg = self.pending[event.delivery] - self.log.error('message was released: %s', msg[1]) + self.log.error('message was released: %s', msg['props']) self.update_pending(event) def on_transport_tail_closed(self, event): @@ -169,7 +169,7 @@ def queue_msg(address, props, data): msgs = [] context.protonmsg_msgs = msgs body = json.dumps(data, default=json_serialize) - msgs.append((address, props, body)) + msgs.append({'address': address, 'props': props, 'body': body}) @convert_datetime @@ -322,19 +322,11 @@ def store_to_db(msgs): # we're running in postCommit, so we need to handle new transaction c.execute('BEGIN') for msg in msgs: - if isinstance(msg, tuple): - address = msg[0] - props = json.dumps(msg[1]) - body = msg[2] - else: - address = msg['address'] - body = msg['body'] # already serialized - props = json.dumps(msg['props']) + address = msg['address'] + body = msg['body'] + props = json.dumps(msg['props']) insert = InsertProcessor(table='proton_queue') insert.set(address=address, props=props, body=body) - if 'id' in msg: - # if we've something from db, we should store it in correct order - insert.set(id=msg['db_id']) insert.execute() c.execute('COMMIT') @@ -356,14 +348,16 @@ def handle_db_msgs(urls, CONFIG): columns=('id', 'address', 'props', 'body'), opts={'order': 'id', 'limit': limit}) msgs = list(query.execute()) + if not msgs: + return if CONFIG.getboolean('broker', 'test_mode', fallback=False): - if msgs: - LOG.debug('test mode: skipping send for %i messages from db', len(msgs)) + LOG.debug('test mode: skipping send for %i messages from db', len(msgs)) unsent = [] else: - unsent = {m['id'] for m in _send_msgs(urls, msgs, CONFIG)} + # we pass a copy of msgs because _send_msgs modifies it + unsent = {m['id'] for m in _send_msgs(urls, list(msgs), CONFIG)} sent = [m for m in msgs if m['id'] not in unsent] - if msgs: + if sent: c.execute('DELETE FROM proton_queue WHERE id IN %(ids)s', {'ids': [msg['id'] for msg in sent]}) finally: diff --git a/tests/test_plugins/test_protonmsg.py b/tests/test_plugins/test_protonmsg.py index 25d59fe..7bbd6eb 100644 --- a/tests/test_plugins/test_protonmsg.py +++ b/tests/test_plugins/test_protonmsg.py @@ -36,13 +36,13 @@ extra_limit = 2048 self.assertTrue(hasattr(context, 'protonmsg_msgs')) self.assertEqual(len(context.protonmsg_msgs), 1) msg = context.protonmsg_msgs[0] - self.assertEqual(msg[0], topic) + self.assertEqual(msg['address'], topic) for kw in kws: - self.assertTrue(kw in msg[1]) - self.assertEqual(msg[1][kw], kws[kw]) - self.assertEqual(len(msg[1]), len(kws)) + self.assertTrue(kw in msg['props']) + self.assertEqual(msg['props'][kw], kws[kw]) + self.assertEqual(len(msg['props']), len(kws)) if body: - self.assertEqual(msg[2], body) + self.assertEqual(msg['body'], body) def test_queue_msg(self): protonmsg.queue_msg('test.msg', {'testheader': 1}, 'test body') @@ -207,7 +207,8 @@ extra_limit = 2048 @patch('protonmsg.Container') def test_send_queued_msgs_fail(self, Container): - context.protonmsg_msgs = [('test.topic', {'testheader': 1}, 'test body')] + context.protonmsg_msgs = [{'address': 'test.topic', 'props': {'testheader': 1}, + 'body': 'test body'}] protonmsg.send_queued_msgs('postCommit') log = protonmsg.LOG @@ -219,7 +220,8 @@ extra_limit = 2048 @patch('protonmsg.Container') def test_send_queued_msgs_success(self, Container): - context.protonmsg_msgs = [('test.topic', {'testheader': 1}, 'test body')] + context.protonmsg_msgs = [{'address': 'test.topic', 'props': {'testheader': 1}, + 'body': 'test body'}] def clear_msgs(): del context.protonmsg_msgs[:] Container.return_value.run.side_effect = clear_msgs @@ -231,7 +233,8 @@ extra_limit = 2048 @patch('protonmsg.Container') def test_send_queued_msgs_test_mode(self, Container): - context.protonmsg_msgs = [('test.topic', {'testheader': 1}, 'test body')] + context.protonmsg_msgs = [{'address': 'test.topic', 'props': {'testheader': 1}, + 'body': 'test body'}] conf = tempfile.NamedTemporaryFile() conf.write(six.b("""[broker] urls = amqps://broker1.example.com:5671 amqps://broker2.example.com:5671 @@ -336,7 +339,8 @@ send_timeout = 60 def test_send_msgs(self, SSLDomain, Message): event = MagicMock() self.handler.on_start(event) - self.handler.msgs = [('testtopic', {'testheader': 1}, '"test body"')] + self.handler.msgs = [{'address': 'testtopic', 'props': {'testheader': 1}, + 'body': '"test body"'}] self.handler.on_connection_opened(event) event.container.create_sender.assert_called_once_with(event.connection, target='topic://koji.testtopic') @@ -349,8 +353,10 @@ send_timeout = 60 def test_update_pending(self, SSLDomain, Message): event = MagicMock() self.handler.on_start(event) - self.handler.msgs = [('testtopic', {'testheader': 1}, '"test body"'), - ('testtopic', {'testheader': 2}, '"test body"')] + self.handler.msgs = [{'address': 'testtopic', 'props': {'testheader': 1}, + 'body': '"test body"'}, + {'address': 'testtopic', 'props': {'testheader': 2}, + 'body': '"test body"'}] delivery0 = MagicMock() delivery1 = MagicMock() sender = event.container.create_sender.return_value @@ -378,7 +384,8 @@ send_timeout = 60 def test_on_settled(self, SSLDomain, Message): event = MagicMock() self.handler.on_start(event) - self.handler.msgs = [('testtopic', {'testheader': 1}, '"test body"')] + self.handler.msgs = [{'address': 'testtopic', 'props': {'testheader': 1}, + 'body': '"test body"'}] self.handler.on_connection_opened(event) delivery = event.container.create_sender.return_value.send.return_value self.assertTrue(delivery in self.handler.pending) @@ -392,7 +399,8 @@ send_timeout = 60 def test_on_rejected(self, SSLDomain, Message): event = MagicMock() self.handler.on_start(event) - self.handler.msgs = [('testtopic', {'testheader': 1}, '"test body"')] + self.handler.msgs = [{'address': 'testtopic', 'props': {'testheader': 1}, + 'body': '"test body"'}] self.handler.on_connection_opened(event) delivery = event.container.create_sender.return_value.send.return_value self.assertTrue(delivery in self.handler.pending) @@ -406,7 +414,8 @@ send_timeout = 60 def test_on_released(self, SSLDomain, Message): event = MagicMock() self.handler.on_start(event) - self.handler.msgs = [('testtopic', {'testheader': 1}, '"test body"')] + self.handler.msgs = [{'address': 'testtopic', 'props': {'testheader': 1}, + 'body': '"test body"'}] self.handler.on_connection_opened(event) delivery = event.container.create_sender.return_value.send.return_value self.assertTrue(delivery in self.handler.pending)