#4277 kojira: split currency and regen
Merged a month ago by tkopecek. Opened 3 months ago by mikem.
mikem/koji repo-updates  into  master

file modified
+3
@@ -295,6 +295,9 @@ 

                  raise koji.GenericError("Unsuccessfully waited %s for a new %s repo" %

                                          (koji.util.duration(self.start), self.taginfo['name']))

              if not check['request']['active']:

+                 if check['request']['task_id']:

+                     tstate = koji.TASK_STATES[check['request']['task_state']]

+                     self.logger.error('Task %s state is %s', check['request']['task_id'], tstate)

                  raise koji.GenericError("Repo request no longer active")

              self.pause()

  

file modified
+1 -1
@@ -390,7 +390,7 @@ 

      n_cached = 0

      tag_last = {}

      updates = []

-     for repo in query.execute():

+     for repo in repos:

          tag_id = repo['tag_id']

          # use cache to avoid redundant calls

          if tag_id in tag_last and tag_last[tag_id] <= repo['create_event']:

@@ -0,0 +1,84 @@ 

+ from __future__ import absolute_import

+ 

+ import os.path

+ import shutil

+ import signal

+ import tempfile

+ import time

+ import unittest

+ 

+ import mock

+ import pytest

+ 

+ import koji

+ 

+ from . import loadkojira

+ 

+ kojira = loadkojira.kojira

+ 

+ 

+ class MyError(Exception):

+     """sentinel exception"""

+     pass

+ 

+ 

+ class MainTest(unittest.TestCase):

+ 

+     def setUp(self):

+         self.start_thread = mock.patch.object(kojira, 'start_thread').start()

+         self.repomgr = mock.MagicMock()

+         self.RepoManager = mock.patch.object(kojira, 'RepoManager', return_value=self.repomgr).start()

+         self.session = mock.MagicMock()

+         self.options = mock.MagicMock()

+         self.options.sleeptime = 1

+         self.workdir = tempfile.mkdtemp()

+         self.topdir = self.workdir + '/koji'

+         self.pathinfo = koji.PathInfo(self.topdir)

+         mock.patch.object(kojira, 'pathinfo', create=True, new=self.pathinfo).start()

+         self.logger = mock.patch.object(kojira, 'logger', create=True).start()

+         self.sleep = mock.patch('time.sleep').start()

+         self.signal = mock.patch('signal.signal').start()

+ 

+     def tearDown(self):

+         mock.patch.stopall()

+         shutil.rmtree(self.workdir)

+ 

+     def test_userkill1(self):

+         self.sleep.side_effect = [None] * 10 + [KeyboardInterrupt()]

+         kojira.main(self.options, self.session)

+ 

+     def test_terminal_errors(self):

+         for cls in KeyboardInterrupt, koji.AuthExpired, koji.AuthError, SystemExit:

+             err = cls()

+             self.sleep.side_effect = [None] * 10 + [Exception()]

+             self.repomgr.updateRepos.side_effect = [None] * 5 + [err]

+             kojira.main(self.options, self.session)

+             self.assertEqual(len(self.repomgr.pruneLocalRepos.mock_calls), 5)

+             self.repomgr.reset_mock()

+ 

+     def test_nonterminal_error(self):

+         err = MyError()

+         self.sleep.side_effect = [None] * 10 + [KeyboardInterrupt()]

+         self.repomgr.updateRepos.side_effect = [None] * 5 + [err] * 6

+         kojira.main(self.options, self.session)

+         self.assertEqual(len(self.repomgr.updateRepos.mock_calls), 11)

+         self.assertEqual(len(self.repomgr.pruneLocalRepos.mock_calls), 5)

+         self.repomgr.reset_mock()

+ 

+     def test_shutdown_handler(self):

+         self.signal.side_effect = [Exception('stop here')]

+         with self.assertRaises(Exception):

+             kojira.main(self.options, self.session)

+ 

+         # grab the handler

+         self.assertEqual(self.signal.mock_calls[0][1][0], signal.SIGTERM)

+         handler = self.signal.mock_calls[0][1][1]

+ 

+         self.signal.side_effect = None

+ 

+         # make sure the handler does what it should

+         with self.assertRaises(SystemExit):

+             handler()

+ 

+ 

+ # the end

@@ -42,54 +42,16 @@ 

          return pid, 0

  

      @mock.patch('time.sleep')

-     def test_regen_loop(self, sleep):

+     def test_thread_loop(self, sleep):

          subsession = mock.MagicMock()

-         self.mgr.regenRepos = mock.MagicMock()

-         self.mgr.regenRepos.side_effect = [None] * 10 + [OurException()]

+         self.mgr.do_regen = mock.MagicMock()

+         self.mgr.do_regen.side_effect = [None] * 10 + [OurException()]

          # we need the exception to terminate the infinite loop

  

          with self.assertRaises(OurException):

-             self.mgr.regenLoop(subsession)

+             self.mgr.threadLoop(subsession, 'regen')

  

-         self.assertEqual(self.mgr.regenRepos.call_count, 11)

-         subsession.logout.assert_called_once()

- 

-     @mock.patch('time.sleep')

-     def test_rmtree_loop(self, sleep):

-         subsession = mock.MagicMock()

-         self.mgr.checkQueue = mock.MagicMock()

-         self.mgr.checkQueue.side_effect = [None] * 10 + [OurException()]

-         # we need the exception to terminate the infinite loop

- 

-         with self.assertRaises(OurException):

-             self.mgr.rmtreeLoop(subsession)

- 

-         self.assertEqual(self.mgr.checkQueue.call_count, 11)

-         subsession.logout.assert_called_once()

- 

-     @mock.patch('time.sleep')

-     def test_currency_loop(self, sleep):

-         subsession = mock.MagicMock()

-         subsession.repo.updateEndEvents.side_effect = [None] * 10 + [OurException()]

-         # we need the exception to terminate the infinite loop

- 

-         with self.assertRaises(OurException):

-             self.mgr.currencyChecker(subsession)

- 

-         self.assertEqual(subsession.repo.updateEndEvents.call_count, 11)

-         subsession.logout.assert_called_once()

- 

-     @mock.patch('time.sleep')

-     def test_external_loop(self, sleep):

-         subsession = mock.MagicMock()

-         self.mgr.checkExternalRepos = mock.MagicMock()

-         self.mgr.checkExternalRepos.side_effect = [None] * 10 + [OurException()]

-         # we need the exception to terminate the infinite loop

- 

-         with self.assertRaises(OurException):

-             self.mgr.currencyExternalChecker(subsession)

- 

-         self.assertEqual(self.mgr.checkExternalRepos.call_count, 11)

+         self.assertEqual(self.mgr.do_regen.call_count, 11)

          subsession.logout.assert_called_once()

  

      def test_rmtree(self):

file modified
+36 -99
@@ -442,65 +442,43 @@ 

              data['id'] = erepo_id

              self.checkExternalRepo(data, arches, ts_cache)

  

-     def currencyChecker(self, session):

-         """Continually checks repos for currency. Runs as a separate thread"""

+     def threadLoop(self, session, name):

+         """Wrapper for running thread handlers in a loop"""

+         # we should be passed a subsession of main

          self.session = session

-         self.logger = logging.getLogger("koji.repo.currency")

-         self.logger.info('currencyChecker starting')

+         handler = getattr(self, f'do_{name}')

+         self.logger = logging.getLogger(f'koji.repo.{name}')

+         self.logger.info(f'{name} thread starting')

          try:

              while True:

-                 self.session.repo.updateEndEvents()

-                 # TODO does this still need to be its own thread?

+                 handler()

                  time.sleep(self.options.sleeptime)

          except Exception:

-             self.logger.exception('Error in currency checker thread')

+             self.logger.exception(f'Error in {name} thread')

              raise

          finally:

              session.logout()

  

-     def currencyExternalChecker(self, session):

-         """Continually checks repos for external repo currency. Runs as a separate thread"""

-         self.session = session

-         self.logger = logging.getLogger("koji.repo.currency_external")

-         self.logger.info('currencyExternalChecker starting')

-         try:

-             while True:

-                 self.checkExternalRepos()

-                 time.sleep(self.options.sleeptime)

-         except Exception:

-             self.logger.exception('Error in external currency checker thread')

-             raise

-         finally:

-             session.logout()

+     def do_currency(self):

+         """Checks repos for currency"""

+         # this call can take a while

+         self.session.repo.updateEndEvents()

  

-     def regenLoop(self, session):

-         """Triggers regens as needed/possible. Runs in a separate thread"""

-         self.session = session

-         self.logger = logging.getLogger("koji.repo.regen")

-         self.logger.info('regenLoop starting')

-         try:

-             while True:

-                 self.regenRepos()

-                 time.sleep(self.options.sleeptime)

-         except Exception:

-             self.logger.exception('Error in regen thread')

-             raise

-         finally:

-             session.logout()

+     def do_check_external(self):

+         """Check external repos"""

+         self.checkExternalRepos()

  

-     def rmtreeLoop(self, session):

-         self.session = session

-         logger = logging.getLogger("koji.repo.rmtree")

-         try:

-             while True:

-                 logger.debug('queue length: %d', len(self.delete_queue))

-                 self.checkQueue()

-                 time.sleep(self.options.sleeptime)

-         except Exception:

-             logger.exception('Error in rmtree thread')

-             raise

-         finally:

-             session.logout()

+     def do_regen(self):

+         """Triggers regens as needed/possible"""

+         self.session.repo.checkQueue()

+ 

+     def do_autoregen(self):

+         """Triggers automatic regens as needed/possible"""

+         self.session.repo.autoRequests()

+ 

+     def do_rmtree(self):

+         logger.debug('queue length: %d', len(self.delete_queue))

+         self.checkQueue()

  

      def pruneLocalRepos(self):

          # non-dist repos are always on the default volume
@@ -648,43 +626,11 @@ 

              elif repo.state == koji.REPO_PROBLEM:

                  repo.handle_problem()

  

-     def regenRepos(self):

-         """Trigger repo requests as needed"""

-         self.session.repo.autoRequests()

-         self.session.repo.checkQueue()

- 

- 

- def start_currency_checker(session, repomgr):

-     subsession = session.subsession()

-     thread = threading.Thread(name='currencyChecker',

-                               target=repomgr.currencyChecker, args=(subsession,))

-     thread.daemon = True

-     thread.start()

-     return thread

- 

- 

- def start_external_currency_checker(session, repomgr):

-     subsession = session.subsession()

-     thread = threading.Thread(name='currencyExternalChecker',

-                               target=repomgr.currencyExternalChecker, args=(subsession,))

-     thread.daemon = True

-     thread.start()

-     return thread

- 

- 

- def start_regen_loop(session, repomgr):

-     subsession = session.subsession()

-     thread = threading.Thread(name='regenLoop',

-                               target=repomgr.regenLoop, args=(subsession,))

-     thread.daemon = True

-     thread.start()

-     return thread

- 

  

- def start_rmtree_loop(session, repomgr):

+ def start_thread(session, repomgr, name):

+     handler = getattr(repomgr, 'threadLoop')

      subsession = session.subsession()

-     thread = threading.Thread(name='rmtreeLoop',

-                               target=repomgr.rmtreeLoop, args=(subsession,))

+     thread = threading.Thread(name=name, target=handler, args=(subsession, name))

      thread.daemon = True

      thread.start()

      return thread
@@ -697,29 +643,20 @@ 

      def shutdown(*args):

          raise SystemExit

      signal.signal(signal.SIGTERM, shutdown)

-     curr_chk_thread = start_currency_checker(session, repomgr)

+     tnames = ['currency', 'regen', 'autoregen', 'rmtree']

      if options.check_external_repos:

-         curr_ext_chk_thread = start_external_currency_checker(session, repomgr)

-     regen_thread = start_regen_loop(session, repomgr)

-     rmtree_thread = start_rmtree_loop(session, repomgr)

+         tnames.append('check_external')

+     threads = {name: start_thread(session, repomgr, name) for name in tnames}

      logger.info("Entering main loop")

      while True:

          try:

              repomgr.updateRepos()

              repomgr.printState()

              repomgr.pruneLocalRepos()

-             if not curr_chk_thread.is_alive():

-                 logger.error("Currency checker thread died. Restarting it.")

-                 curr_chk_thread = start_currency_checker(session, repomgr)

-             if options.check_external_repos and not curr_ext_chk_thread.is_alive():

-                 logger.error("External currency checker thread died. Restarting it.")

-                 curr_ext_chk_thread = start_external_currency_checker(session, repomgr)

-             if not regen_thread.is_alive():

-                 logger.error("Regeneration thread died. Restarting it.")

-                 regen_thread = start_regen_loop(session, repomgr)

-             if not rmtree_thread.is_alive():

-                 logger.error("rmtree thread died. Restarting it.")

-                 rmtree_thread = start_rmtree_loop(session, repomgr)

+             for name in tnames:

+                 if not threads[name].is_alive():

+                     logger.error(f'{name} thread died. Restarting it.')

+                     threads[name] = start_thread(session, repomgr, name)

          except KeyboardInterrupt:

              logger.warning("User exit")

              break

A single kojira thread was running both repo.autoRequests and repo.checkQueue. On some systems, the autoRequests call can take a while, resulting is slower response times for repo requests.

Rather than duplicate the thread boilerplate yet another time, I unified it, reducing quite a bit of repetitive code.

$ git diff origin...  --stat=80 util
 util/kojira | 135 ++++++++++++++++--------------------------------------------
 1 file changed, 36 insertions(+), 99 deletions(-)

Also added a unit test and a couple minor fixes

Fixes https://pagure.io/koji/issue/4278

Metadata Update from @tkopecek:
- Pull-request tagged with: testing-ready

3 months ago

Metadata Update from @tkopecek:
- Pull-request untagged with: testing-ready
- Pull-request tagged with: no_qe

a month ago

Commit 4270cdd fixes this pull-request

Pull-Request has been merged by tkopecek

a month ago