#2356 Scheduler for distgit
Merged 2 years ago by praiskup. Opened 2 years ago by frostyx.
copr/ frostyx/copr distgit-scheduler  into  main

file modified
+1 -1
@@ -6,7 +6,7 @@ 

  %global tests_version 2

  %global tests_tar test-data-copr-backend

  

- %global copr_common_version 0.16.1.dev

+ %global copr_common_version 0.16.2.dev

  

  Name:       copr-backend

  Version:    1.161

@@ -17,8 +17,9 @@ 

  

  from copr_common.rpm import splitFilename

  from copr_common.enums import ActionResult

+ from copr_common.worker_manager import WorkerManager

  

- from copr_backend.worker_manager import WorkerManager, QueueTask

+ from copr_backend.worker_manager import BackendQueueTask

  

  from .sign import create_user_keys, CoprKeygenRequestError

  from .exceptions import CreateRepoError, CoprSignError, FrontendClientException
@@ -552,7 +553,7 @@ 

      REMOVE_DIRS = 11

  

  

- class ActionQueueTask(QueueTask):

+ class ActionQueueTask(BackendQueueTask):

      def __init__(self, task):

          self.task = task

  

@@ -4,141 +4,30 @@ 

  

  import os

  import sys

- import argparse

- import contextlib

  import logging

- import daemon

- 

- import setproctitle

  

+ from copr_common.background_worker import BackgroundWorker

  from copr_backend.frontend import FrontendClient

- from copr_backend.helpers import (BackendConfigReader, get_redis_logger,

-                                   get_redis_connection)

+ from copr_backend.helpers import (BackendConfigReader, get_redis_logger)

  

  

- class BackgroundWorker:

+ class BackendBackgroundWorker(BackgroundWorker):

      """

      copr-backend-process-* abstraction

      """

  

-     redis_logger_id = 'unknown'

-     frontend_client = None

-     _redis_conn = None

+     # This is still a base class, abstract methods will be overriden

+     # in its descendants

+     # pylint: disable=abstract-method

  

      def __init__(self):

-         # just setup temporary stderr logger

-         self.log = logging.getLogger()

-         self.log.setLevel(logging.DEBUG)

-         self.log.addHandler(logging.StreamHandler())

- 

-         if os.getuid() == 0:

-             self.log.error("this needs to be run as 'copr' user")

-             sys.exit(1)

+         super().__init__()

  

-         self.args = self._get_argparser().parse_args(sys.argv[1:])

          be_cfg = self.args.backend_config or '/etc/copr/copr-be.conf'

          self.opts = BackendConfigReader(be_cfg).read()

  

-     @staticmethod

-     def setproctitle(text):

-         """ set the process title, beginning with the script name """

-         command = " ".join(sys.argv)

-         setproctitle.setproctitle("{} (command: {})".format(text, command))

- 

-     @property

-     def _redis(self):

-         if not self._redis_conn:

-             self._redis_conn = get_redis_connection(self.opts)

-         return self._redis_conn

- 

-     def redis_set_worker_flag(self, flag, value=1):

-         """

-         Set flag in Reids DB for corresponding worker.  NO-OP if there's no

-         redis connection (when run manually).

-         """

-         if not self.has_wm:

-             return

-         self._redis.hset(self.args.worker_id, flag, value)

- 

-     def redis_get_worker_flag(self, flag):

-         """

-         Get flag from Redis DB entry of corresponding worker.  If there's no

-         Redis connection (manual run) return None

-         """

-         if not self.has_wm:

-             return None

-         return self._redis.hget(self.args.worker_id, flag)

- 

-     @classmethod

-     def _get_argparser(cls):

-         parser = argparse.ArgumentParser()

-         parser.add_argument(

-             "--worker-id",

-             help=("worker ID which already exists in redis DB (used by "

-                   "WorkerManager only, copr-internal option)"),

-         )

-         parser.add_argument(

-             "--daemon",

-             action='store_true',

-             help="execute the task on background, as daemon process"

-         )

-         parser.add_argument(

-             "--silent",

-             action='store_true',

-             help="don't print logs, even when run without --daemon",

-         )

-         parser.add_argument(

-             "--backend-config",

-             help="alternative path to /etc/copr/copr-be.conf",

-         )

-         cls.adjust_arg_parser(parser)

-         return parser

- 

-     @classmethod

-     def adjust_arg_parser(cls, parser):

-         """

-         The inherited worker class will need more commandline options than those

-         we provide by default.  Override is required.

-         """

-         raise NotImplementedError()

- 

-     def handle_task(self):

-         """

-         Abstract method for handling the task.  This should never throw any

-         exception, and we don't expect it to return any value.

-         """

-         raise NotImplementedError()

- 

-     @property

-     def has_wm(self):

-         """

-         Returns True if worker manager started this process, and False

-         if it is manual run.

-         """

-         return bool(self.worker_id)

- 

-     @property

-     def worker_id(self):

-         """ Return worker ID if set by worker manager, or None.  """

-         return self.args.worker_id

- 

-     def _wm_started(self):

-         if not self.has_wm:

-             return True

- 

-         self.redis_set_worker_flag('started', 1)

-         self.redis_set_worker_flag('PID', os.getpid())

- 

-         data = self._redis.hgetall(self.args.worker_id)

-         if 'allocated' not in data:

-             self.log.error("too slow box, manager thinks we are dead")

-             self._redis.delete(self.args.worker_id)

-             return False

- 

-         # There's still small race on a very slow box (TOCTOU in manager, the

-         # db entry can be deleted after our check above ^^).  But we don't risk

-         # anything else than concurrent run of multiple workers in such case.

-         return True

+         self.frontend_client = FrontendClient(self.opts, self.log,

+                                               try_indefinitely=True)

  

      def _switch_logger_to_redis(self):

          if not self.has_wm:
@@ -164,20 +53,4 @@ 

          # setup logging early, to have as complete logs as possible

          self._switch_logger_to_redis()

  

-         self.frontend_client = FrontendClient(self.opts, self.log,

-                                               try_indefinitely=True)

- 

-         try:

-             self.handle_task()

-         except Exception as exc:  # pylint: disable=W0703

-             self.log.exception("unexpected failure %s", str(exc))

-             sys.exit(1)

- 

-     def process(self):

-         """ process the task """

-         context = contextlib.nullcontext()

-         if self.args.daemon:

-             context = daemon.DaemonContext(umask=0o022)

- 

-         with context:

-             self._daemonized_part()

+         super()._daemonized_part()

@@ -15,7 +15,7 @@ 

  

  from copr_common.enums import StatusEnum

  

- from copr_backend.background_worker import BackgroundWorker

+ from copr_backend.background_worker import BackendBackgroundWorker

  from copr_backend.cancellable_thread import CancellableThreadTask

  from copr_backend.constants import build_log_format

  from copr_backend.exceptions import CoprSignError, CoprBackendError
@@ -107,7 +107,7 @@ 

  

          return 1

  

- class BuildBackgroundWorker(BackgroundWorker):

+ class BuildBackgroundWorker(BackendBackgroundWorker):

      """

      The (S)RPM build logic.

      """

@@ -1,7 +1,7 @@ 

  import json

  import os

  

- from copr_backend.helpers import get_redis_connection

+ from copr_common.redis_helpers import get_redis_connection

  

  # todo: add logging here

  # from copr_backend.helpers import BackendConfigReader, get_redis_logger

@@ -3,11 +3,11 @@ 

  """

  

  from copr_backend.exceptions import FrontendClientException

- from copr_backend.dispatcher import Dispatcher

+ from copr_backend.dispatcher import BackendDispatcher

  

  from ..actions import ActionWorkerManager, ActionQueueTask, Action

  

- class ActionDispatcher(Dispatcher):

+ class ActionDispatcher(BackendDispatcher):

      """

      Kick-off action dispatcher daemon.

      """

@@ -2,14 +2,14 @@ 

  BuildDispatcher related classes.

  """

  

- from copr_backend.dispatcher import Dispatcher

+ from copr_common.worker_manager import GroupWorkerLimit

+ from copr_backend.dispatcher import BackendDispatcher

  from copr_backend.rpm_builds import (

      ArchitectureWorkerLimit,

      BuildTagLimit,

      RPMBuildWorkerManager,

      BuildQueueTask,

  )

- from copr_backend.worker_manager import GroupWorkerLimit

  from ..exceptions import FrontendClientException

  

  
@@ -72,7 +72,7 @@ 

          return arch[task.sandbox]

  

  

- class BuildDispatcher(Dispatcher):

+ class BuildDispatcher(BackendDispatcher):

      """

      Kick-off build dispatcher daemon.

      """

@@ -7,6 +7,7 @@ 

  import os

  from setproctitle import setproctitle

  

+ from copr_common.redis_helpers import get_redis_connection

  

  # TODO: remove when RedisLogHandler works fine

  from .. import constants
@@ -67,7 +68,7 @@ 

          self.setup_logging()

          setproctitle("RedisLogHandler")

  

-         rc = helpers.get_redis_connection(self.opts)

+         rc = get_redis_connection(self.opts)

          while True:

              # indefinitely wait for the next entry, note that blpop returns

              # tuple (FIFO_NAME, ELEMENT)

@@ -2,129 +2,26 @@ 

  Abstract class Dispatcher for Build/Action dispatchers.

  """

  

- import time

- import multiprocessing

- from setproctitle import setproctitle

- 

+ from copr_common.dispatcher import Dispatcher

  from copr_backend.frontend import FrontendClient

- from copr_backend.worker_manager import WorkerManager

- from copr_backend.helpers import get_redis_logger, get_redis_connection

+ from copr_backend.helpers import get_redis_logger

  

  

- class Dispatcher(multiprocessing.Process):

+ class BackendDispatcher(Dispatcher):

      """

-     1) Fetch tasks from frontend.

-     2) Fill the WorkerManager queue.

-     3) Run 'WorkerManager.run()'.

-     4) Go to 1)

- 

-     See also:

-     https://docs.pagure.org/copr.copr/developer_documentation/dispatchers.html

-     https://docs.pagure.org/copr.copr/developer_documentation/worker_manager.html

+     Base class for backend dispatchers

      """

  

-     # set to either 'action' or 'build' in sub-class

-     task_type = 'task_type'

- 

-     # either ActionWorkerManager or BuildWorkerManager

-     worker_manager_class = WorkerManager

- 

-     # how many background workers we let the WorkerManager start, by default

-     # there's no limit

-     max_workers = float("inf")

- 

-     # we keep track what build's newly appeared in the task list after fetching

-     # the new set from frontend after get_frontend_tasks() call

-     _previous_task_fetch_ids = set()

+     # This is still a base class, abstract methods will be overriden

+     # in its descendants

+     # pylint: disable=abstract-method

  

-     def __init__(self, backend_opts):

-         super().__init__(name=self.task_type + '-dispatcher')

+     def __init__(self, opts):

+         super().__init__(opts)

  

-         self.sleeptime = backend_opts.sleeptime

-         self.opts = backend_opts

          logger_name = 'backend.{}_dispatcher'.format(self.task_type)

          logger_redis_who = '{}_dispatcher'.format(self.task_type)

          self.log = get_redis_logger(self.opts, logger_name, logger_redis_who)

-         self.frontend_client = FrontendClient(self.opts, self.log)

-         # list of applied WorkerLimit instances

-         self.limits = []

- 

-     @classmethod

-     def _update_process_title(cls, msg=None):

-         proc_title = "{} dispatcher".format(cls.task_type.capitalize())

-         if msg:

-             proc_title += " - " + msg

-         setproctitle(proc_title)

- 

-     def get_frontend_tasks(self):

-         """

-         Get _unfiltered_ list of tasks (QueueTask objects) from frontend (the

-         set needs to contain both running and pending jobs).

-         """

-         raise NotImplementedError

- 

-     def get_cancel_requests_ids(self):

-         """

-         Return list of QueueTask IDS that should be canceled.

-         """

-         _subclass_can_use = (self)

-         return []

- 

-     def report_canceled_task_id(self, task_id, was_running):

-         """

-         Report back to Frontend that this task was canceled.  By default this

-         isn't called, so it is NO-OP by default.

-         """

  

-     def _print_added_jobs(self, tasks):

-         job_ids = {task.id for task in tasks}

-         new_job_ids = job_ids - self._previous_task_fetch_ids

-         if new_job_ids:

-             self.log.info("Got new '%s' tasks: %s", self.task_type, new_job_ids)

-         self._previous_task_fetch_ids = job_ids

- 

-     def run(self):

-         """

-         Starts the infinite task dispatching process.

-         """

-         self.log.info("%s dispatching started", self.task_type.capitalize())

-         self._update_process_title()

- 

-         redis = get_redis_connection(self.opts)

-         worker_manager = self.worker_manager_class(

-             redis_connection=redis,

-             log=self.log,

-             max_workers=self.max_workers,

-             frontend_client=self.frontend_client,

-             limits=self.limits,

-         )

- 

-         timeout = self.sleeptime

-         while True:

-             self._update_process_title("getting tasks from frontend")

-             self.log.info("getting %ss from frontend", self.task_type)

-             start = time.time()

- 

-             tasks = self.get_frontend_tasks()

-             if tasks:

-                 worker_manager.clean_tasks()

- 

-             self._print_added_jobs(tasks)

-             for task in tasks:

-                 worker_manager.add_task(task)

- 

-             self._update_process_title("getting cancel requests")

-             for task_id in self.get_cancel_requests_ids():

-                 was_running = worker_manager.cancel_task_id(task_id)

-                 self.report_canceled_task_id(task_id, was_running)

- 

-             # process the tasks

-             self._update_process_title("processing tasks")

-             worker_manager.run(timeout=timeout)

- 

-             sleep_more = timeout - (time.time() - start)

-             if sleep_more > 0:

-                 time.sleep(sleep_more)

- 

-         # reset the title

-         self._update_process_title()

+         self.frontend_client = FrontendClient(self.opts, self.log)

+         self.sleeptime = opts.sleeptime

@@ -28,8 +28,7 @@ 

  import munch

  from munch import Munch

  

- from redis import StrictRedis

- 

+ from copr_common.redis_helpers import get_redis_connection

  from copr.v3 import Client

  from copr_backend.constants import DEF_BUILD_USER, DEF_BUILD_TIMEOUT, DEF_CONSECUTIVE_FAILURE_THRESHOLD, \

      CONSECUTIVE_FAILURE_REDIS_KEY, default_log_format
@@ -466,23 +465,6 @@ 

          conn.incr(key)

  

  

- def get_redis_connection(opts):

-     """

-     Creates redis client object using backend config

- 

-     :rtype: StrictRedis

-     """

-     kwargs = {}

-     if hasattr(opts, "redis_db"):

-         kwargs["db"] = opts.redis_db

-     if hasattr(opts, "redis_host"):

-         kwargs["host"] = opts.redis_host

-     if hasattr(opts, "redis_port"):

-         kwargs["port"] = opts.redis_port

- 

-     return StrictRedis(encoding="utf-8", decode_responses=True, **kwargs)

- 

- 

  def format_tb(ex, ex_traceback):

      tb_lines = traceback.format_exception(ex.__class__, ex, ex_traceback)

      return ''.join(tb_lines)

@@ -2,16 +2,14 @@ 

  Abstraction for RPM and SRPM builds on backend.

  """

  

+ from copr_common.worker_manager import WorkerManager, PredicateWorkerLimit

+ from copr_backend.worker_manager import BackendQueueTask

  from copr_backend.helpers import get_chroot_arch

- from copr_backend.worker_manager import (

-     PredicateWorkerLimit,

-     QueueTask,

-     WorkerManager,

- )

+ 

  

  PRIORITY_SECTION_SIZE = 1000000

  

- class BuildQueueTask(QueueTask):

+ class BuildQueueTask(BackendQueueTask):

      """

      Build-task abstraction.  Needed for build our build scheduler (the

      WorkerManager class).

@@ -1,213 +1,11 @@ 

- import os

- import time

- from heapq import heappop, heappush

- import itertools

- import logging

- import subprocess

+ from copr_common.worker_manager import QueueTask

  

  

- class WorkerLimit:

+ class BackendQueueTask(QueueTask):

      """

-     Limit for the number of tasks being processed concurrently

- 

-     WorkerManager expects that it's caller fills the task queue only with tasks

-     that should be processed.  Then WorkerManager is completely responsible for

-     sorting out the queue, and behave -> respect the given limits.

- 

-     WorkerManager implements really stupid algorithm to respect the limits;  we

-     simply drop the task from queue (and continue to the next task) if it was

-     about to exceed any given limit.  This is the only option at this moment

-     because JobQueue doesn't allow us to skip some task, and return to it later.

-     It is not a problem for Copr dispatchers though because we re-add the

-     dropped tasks to JobQueue anyways -- after the next call to the

-     Dispatcher.get_frontend_tasks() method (see "sleeptime" configuration

-     option).

- 

-     Each Limit object works as a statistic counter for the list of _currently

-     processed_ tasks (i.e. not queued tasks!).  And we may want to query the

-     statistics anytime we want to.  Both calculating and querying the statistics

-     must be as fast as possible, therefore the interface only re-calculates the

-     stats when WorkerManager starts/stops working on some task.

- 

-     One may wonder why to bother with limits, and why not to delegate this

-     responsibility on WorkerManager caller (IOW don't put the task to queue if

-     it is not yet the right time process it..).  That would be ideal case, but

-     at the time of filling the queue caller has no idea about the currently

-     running BackgroundWorker instances (those need to be calculated to

-     statistics, too).

-     """

- 

-     def __init__(self, name=None):

-         self._name = name

- 

-     def worker_added(self, worker_id, task):

-         """ Add worker and it's task to statistics.  """

-         raise NotImplementedError

- 

-     def check(self, task):

-         """ Check if the task can be added without crossing the limit. """

-         raise NotImplementedError

- 

-     def clear(self):

-         """ Clear the statistics. """

-         raise NotImplementedError

- 

-     def info(self):

-         """ Get the user-readable info about the limit object """

-         if self._name:

-             return "'{}'".format(self._name)

-         return "Unnamed '{}' limit".format(type(self).__name__)

- 

- 

- class PredicateWorkerLimit(WorkerLimit):

-     """

-     Calculate how many tasks being processed by currently running workers match

-     the given predicate.

-     """

-     def __init__(self, predicate, limit, name=None):

-         """

-         :param predicate: function object taking one QueueTask argument, and

-             returning True or False

-         :param limit: how many tasks matching the ``predicate`` are allowed to

-             be processed concurrently.

-         """

-         super().__init__(name)

-         self._limit = limit

-         self._predicate = predicate

-         self.clear()

- 

-     def clear(self):

-         self._refs = {}

- 

-     def worker_added(self, worker_id, task):

-         if not self._predicate(task):

-             return

-         self._refs[worker_id] = True

- 

-     def check(self, task):

-         if not self._predicate(task):

-             return True

-         return len(self._refs) < self._limit

- 

-     def info(self):

-         text = super().info()

-         matching = ', '.join(self._refs.keys())

-         if not matching:

-             return text

-         return "{}, matching: {}".format(text, matching)

- 

- 

- class StringCounter:

-     """

-     Counter for string occurrences.  When string is None, we don't count it

-     """

-     def __init__(self):

-         self._counter = {}

- 

-     def add(self, string):

-         """ Add string to counter """

-         if string is None:

-             return

-         if string in self._counter:

-             self._counter[string] += 1

-         else:

-             self._counter[string] = 1

- 

-     def count(self, string):

-         """ Return number ``string`` occurrences """

-         return self._counter.get(string, 0)

- 

-     def __str__(self):

-         items = ["{}={}".format(key, value)

-                  for key, value in self._counter.items()]

-         return ", ".join(items)

- 

- 

- class GroupWorkerLimit(WorkerLimit):

-     """

-     Assign task to groups, and set maximum number of workers per each group.

-     """

-     def __init__(self, hasher, limit, name=None):

-         """

-         :param hasher: function object taking one QueueTask argument, and

-             returning string key (name of the ``group``).

-         :param limit: how many tasks in the ``group`` are allowed to be

-             processed at the same time.

-         """

-         super().__init__(name)

-         self._limit = limit

-         self._hasher = hasher

-         self.clear()

- 

-     def clear(self):

-         self._groups = StringCounter()

-         self._refs = {}

- 

-     def worker_added(self, worker_id, task):

-         # remember it

-         group_name = self._refs[worker_id] = self._hasher(task)

-         # count it

-         self._groups.add(group_name)

- 

-     def check(self, task):

-         group_name = self._hasher(task)

-         return self._groups.count(group_name) < self._limit

- 

-     def info(self):

-         text = super().info()

-         return "{}, counter: {}".format(text, str(self._groups))

- 

- 

- class JobQueue():

+     A base class for tasks processed by `BackendDispatcher` implementations

      """

-     Priority "task" queue for WorkerManager.  Taken from:

-     https://docs.python.org/3/library/heapq.html#priority-queue-implementation-notes

-     The higher the 'priority' is, the later the task is taken.

-     """

- 

-     def __init__(self, removed='<removed-task>'):

-         self.prio_queue = []             # list of entries arranged in a heap

-         self.entry_finder = {}           # mapping of tasks to entries

-         self.removed = removed           # placeholder for a removed task

-         self.counter = itertools.count() # unique sequence count

- 

-     def add_task(self, task, priority=0):

-         'Add a new task or update the priority of an existing task'

-         if repr(task) in self.entry_finder:

-             self.remove_task(task)

-         count = next(self.counter)

-         entry = [priority, count, task]

-         self.entry_finder[repr(task)] = entry

-         heappush(self.prio_queue, entry)

- 

-     def remove_task(self, task):

-         'Mark an existing task as removed.  Raise KeyError if not found.'

-         self.remove_task_by_id(repr(task))

- 

-     def remove_task_by_id(self, task_id):

-         """

-         Using task id, drop the task from queue.  Raise KeyError if not found.

-         """

-         entry = self.entry_finder.pop(task_id)

-         entry[-1] = self.removed

- 

-     def pop_task(self):

-         'Remove and return the lowest priority task. Raise KeyError if empty.'

-         while self.prio_queue:

-             priority, count, task = heappop(self.prio_queue)

-             if task is not self.removed:

-                 del self.entry_finder[repr(task)]

-                 return task

-         raise KeyError('pop from an empty priority queue')

- 

- 

- class QueueTask:

-     def __repr__(self):

-         return str(self.id)

- 

-     @property

-     def id(self):

-         raise NotImplementedError

+     # pylint: disable=abstract-method

  

      @property

      def priority(self):
@@ -215,371 +13,20 @@ 

  

      @property

      def frontend_priority(self):

-         return 0

- 

-     @property

-     def backend_priority(self):

-         return 0

- 

- 

- class WorkerManager():

-     """

-     Automatically process 'self.tasks' priority queue, and start background jobs

-     to handle them.

- 

-     :cvar worker_prefix: Unique string across all the WorkerManager child

-             classes, this is used as prefix for the workers in redis database

-             and to easily determine to which WorkerManager the particlar worker

-             belongs to.  So it can be anything reasonable, just make sure it is

-             unique.

-     :cvar worker_timeout_start: The time period we give the background process

-             successfully start and identify itself (see has_worker_started()

-             method).  If the background process isn't indentified after this

-             timeout, we drop it from database and consider it failed.  And the

-             task is re-scheduled.  Float value in seconds.

-     :cvar worker_timeout_deadcheck: If the worker successfully identified itself

-             after start (the has_worker_started() returns True) we know that the

-             worker process at least started.  But after worker_timeout_deadcheck

-             timeout we'll also keep an eye on the process by asking the

-             is_worker_alive() method - whether the task is really still doing

-             something on background or not (== unexpected failure cleanup).

-             Fill float value in seconds.

-     :cvar worker_cleanup_period: How often should WorkerManager try to cleanup

-             workers? (value is a period in seconds)

-     """

- 

-     # pylint: disable=too-many-instance-attributes

- 

-     worker_prefix = 'worker' # make sure this is unique in each class

-     worker_timeout_start = 30

-     worker_timeout_deadcheck = 3*60

-     worker_cleanup_period = 3.0

- 

- 

-     def __init__(self, redis_connection=None, max_workers=8, log=None,

-                  frontend_client=None, limits=None):

-         self.tasks = JobQueue()

-         self.log = log if log else logging.getLogger()

-         self.redis = redis_connection

-         self.max_workers = max_workers

-         self.frontend_client = frontend_client

-         # We have to frequently ask for the actually tracked list of workers —

-         # therefore we keep it here to not re-query the list from Redis all the

-         # time.  We have to load the list from Redis initially, when the process

-         # starts (Manager/Dispatcher class is loaded) because we want the logic

-         # to survive server restarts (we adopt the old background workers).

-         self._tracked_workers = set(self.worker_ids())

-         self._limits = limits or []

-         self._last_worker_cleanup = None

- 

-     def start_task(self, worker_id, task):

-         """

-         Start background job using the 'task' object taken from the 'tasks'

-         queue.  The background task should _on its own_ and ASAP let the manager

-         know that it successfully started (e.g. mark the job 'started' in redis

-         DB), so the has_worker_started() method later gives us valid info.

-         That's btw. the reason why we have the mandatory `worker_id` argument

-         here, the background worker needs to know what key to update in redis.

-         """

-         raise NotImplementedError

- 

-     def finish_task(self, worker_id, task_info):

-         """

-         This is called once the worker manager consider the task to be done,

-         because the `has_worker_ended()` method already returns True.  Override

-         this function and use it to let Frontend know that the task finished,

-         and how (whether it succeeded, etc.).

-         """

-         raise NotImplementedError

- 

-     def has_worker_started(self, worker_id, task_info):

-         """

-         The background task process should somehow notify manager that it

-         already started (so we can have has_worker_started() implemented).

-         By default we expect it sets 'started' attribute in redis DB, but feel

-         free to override this method and invent different notification

-         mechanism.

-         """

-         return 'started' in task_info

- 

-     def has_worker_ended(self, worker_id, task_info):

-         """

-         Check 'task_info' (dictionary output from redis) whether the task is

-         already finished by worker.  If yes, do whatever is needed with the

-         result (contact frontend) and return True.  If the task is still

-         processed, return False.  By default we just check for the ``status``

-         presence in ``task_info``, but this method is to be overridden.

-         """

-         _subclass_can_use = (self, worker_id)

-         return 'status' in task_info

- 

-     def is_worker_alive(self, worker_id, task_info):

-         """

-         Check staled jobs on background, whether they haven't died before they

-         notified us about the status.  We'll keep asking after

-         worker_timeout_deadcheck seconds left since we tried to spawn the

-         worker.

- 

-         By default we check for 'PID' presence in task_info, but users are

-         advised to override this method when necessary.  We keep the 'self'

-         and 'worker_id' arguments as they might be useful.

-         """

-         # pylint: disable=unused-argument,no-self-use

-         if not 'PID' in task_info:

-             return False

-         pid = int(task_info['PID'])

-         try:

-             # Send signal=0 to the process to check whether it still exists.

-             # This is just no-op if the signal was successfully delivered to

-             # existing process, otherwise exception is raised.

-             os.kill(pid, 0)

-         except OSError:

-             return False

-         return True

- 

-     def get_worker_id(self, task_id):

-         """

-         Given the unique task representation form (usually ID), generate worker

-         identificator (redis key).

-         """

-         return '{}:{}'.format(self.worker_prefix, task_id)

- 

-     def get_task_id_from_worker_id(self, worker_id):

-         """

-         Given the unique task representation form (usually ID), generate worker

-         identificator (redis key).

-         """

-         prefix, task_id = worker_id.rsplit(':', 1)

-         assert prefix == self.worker_prefix

-         return task_id

- 

-     def _calculate_limits_for_task(self, worker_id, task):

-         for limit in self._limits:

-             limit.worker_added(worker_id, task)

- 

-     def cancel_request_done(self, task):

-         """ Report back to frontend that the cancel request was finished. """

- 

-     def add_task(self, task):

-         """

-         Add task to queue.

-         """

-         task_id = repr(task)

-         worker_id = self.get_worker_id(task_id)

- 

-         if worker_id in self._tracked_workers:

-             # No need to re-add this to queue, but we need to calculate

-             # it into the limits.

-             self._calculate_limits_for_task(worker_id, task)

-             self.log.debug("Task %s already has a worker process", task_id)

-             return

- 

-         self.log.debug("Adding task %s to queue, priority %s", task_id,

-                        task.priority)

-         self.tasks.add_task(task, task.priority)

- 

-     def _drop_task_id_safe(self, task_id):

-         try:

-             self.tasks.remove_task_by_id(task_id)

-         except KeyError:

-             pass

- 

-     def cancel_task_id(self, task_id):

-         """

-         Using task_id, cancel corresponding task, and request worker

-         shut-down (when already started)

- 

-         :return: True if worker is running on background, False otherwise

-         """

-         self._drop_task_id_safe(task_id)

-         worker_id = self.get_worker_id(task_id)

-         if worker_id not in self.worker_ids():

-             self.log.info("Cancel request, worker %s is not running", worker_id)

-             return False

-         self.log.info("Cancel request, worker %s requested to cancel",

-                       worker_id)

-         self.redis.hset(worker_id, 'cancel_request', 1)

-         return True

- 

-     def worker_ids(self):

-         """

-         Return the redis keys representing workers running on background.

-         """

-         return self.redis.keys(self.worker_prefix + ':*')

- 

-     def run(self, timeout=float('inf')):

-         """

-         Process the task (priority) queue.

          """

-         now = None

-         start_time = time.time()

-         self.log.debug("Worker.run() start at time %s", start_time)

- 

-         # Make sure _cleanup_workers() has some effect during the run() call.

-         # This is here mostly for the test-suite, because in the real use-cases

-         # the worker_cleanup_period is much shorter period than the timeout and

-         # the cleanup is done _several_ times during the run() call.

-         self._last_worker_cleanup = 0.0

- 

-         while True:

-             now = start_time if now is None else time.time()

- 

-             if not now - start_time < timeout:

-                 break

- 

-             self._cleanup_workers(now)

- 

-             worker_count = len(self._tracked_workers)

-             if worker_count >= self.max_workers:

-                 self.log.debug("Worker count on a limit %s", worker_count)

-                 time.sleep(1)

-                 continue

- 

-             # We can allocate some workers, if there's something to do.

-             try:

-                 task = self.tasks.pop_task()

-             except KeyError:

-                 # Empty queue!

-                 if worker_count:

-                     # It still makes sense to cycle to finish the workers.

-                     self.log.debug("No more tasks, waiting for workers")

-                     time.sleep(1)

-                     continue

-                 # Optimization part, nobody is working now, and there's nothing

-                 # to do.  Just simply wait till the end of the cycle.

-                 break

- 

-             break_on_limit = False

-             for limit in self._limits:

-                 # just skip this task, it will be processed in the next

-                 # run because we keep re-filling the queue

-                 if not limit.check(task):

-                     self.log.debug("Task '%s' skipped, limit info: %s",

-                                    task.id, limit.info())

-                     break_on_limit = True

-                     break

-             if break_on_limit:

-                 continue

+         A task priority is calculated based on a frontend priority preference

+         and backend priority preference.

  

-             self._start_worker(task, now)

- 

-         self.log.debug("Reaped %s processes", self._clean_daemon_processes())

-         self.log.debug("Worker.run() stop at time %s", time.time())

- 

-     def _start_worker(self, task, time_now):

-         worker_id = self.get_worker_id(repr(task))

-         self.redis.hset(worker_id, 'allocated', time_now)

-         self._tracked_workers.add(worker_id)

-         self.log.info("Starting worker %s, task.priority=%s", worker_id,

-                       task.priority)

-         self._calculate_limits_for_task(worker_id, task)

-         self.start_task(worker_id, task)

- 

-     def clean_tasks(self):

-         """

-         Remove all tasks from queue.

-         """

-         self.tasks = JobQueue()

-         for limit in self._limits:

-             limit.clear()

- 

-     def _delete_worker(self, worker_id):

-         self.redis.delete(worker_id)

-         self._tracked_workers.discard(worker_id)

- 

-     def _cleanup_workers(self, now):

-         """

-         Go through all the tracked workers and check if they already finished,

-         failed to start or died in the background.

+         This is the frontend priority number

          """

+         return 0

  

-         # This method is called very frequently (several hundreds per second,

-         # for each of the attempts to start a worker in the self.run() method).

-         # Because the likelihood that some of the background workers changed

-         # state is pretty low, we control the frequency of the cleanup here.

-         now = time.time()

-         if now - self._last_worker_cleanup < self.worker_cleanup_period:

-             return

- 

-         self.log.debug("Trying to clean old workers")

-         self._last_worker_cleanup = time.time()

- 

-         for worker_id in self.worker_ids():

-             info = self.redis.hgetall(worker_id)

- 

-             allocated = info.get('allocated', None)

-             if not allocated:

-                 # In worker manager, we _always_ add 'allocated' tag when we

-                 # start worker.  So this may only happen when worker is

-                 # orphaned for some reason (we gave up with him), and it still

-                 # touches the database on background.

-                 self.log.info("Missing 'allocated' flag for worker %s", worker_id)

-                 self._delete_worker(worker_id)

-                 continue

- 

-             allocated = float(allocated)

- 

-             if self.has_worker_ended(worker_id, info):

-                 # finished worker

-                 self.log.info("Finished worker %s", worker_id)

-                 self.finish_task(worker_id, info)

-                 self._delete_worker(worker_id)

-                 continue

- 

-             if info.get('delete'):

-                 self.log.warning("worker %s deleted", worker_id)

-                 self._delete_worker(worker_id)

-                 continue

- 

-             if not self.has_worker_started(worker_id, info):

-                 if now - allocated > self.worker_timeout_start:

-                     # This worker failed to start?

-                     self.log.error("worker %s failed to start", worker_id)

-                     self._delete_worker(worker_id)

-                 continue

- 

-             checked = info.get('checked', allocated)

- 

-             if now - float(checked) > self.worker_timeout_deadcheck:

-                 self.log.info("checking worker %s", worker_id)

-                 self.redis.hset(worker_id, 'checked', now)

-                 if self.is_worker_alive(worker_id, info):

-                     continue

-                 self.log.error("dead worker %s", worker_id)

- 

-                 # The worker could finish in the meantime, make sure we

-                 # hgetall() once more.

-                 self.redis.hset(worker_id, 'delete', 1)

- 

-     def start_daemon_on_background(self, command, env=None):

-         """

-         The daemon.DaemonContext() is pretty slow thing, it may take up to 1s

-         to finish and return the exit status to the parent process.  But if the

-         calling logic is properly prepared for potential startup failures, we

-         don't have to wait at all and we can start the background process on

-         background, too.  Typical work-around for starting the

-         'copr-backend-process-*' scripts that are based on the

-         BackgroundWorker.process() logic.

+     @property

+     def backend_priority(self):

          """

-         # pylint: disable=consider-using-with

-         process = subprocess.Popen(command, env=env)

-         self.log.debug("background pid=%s started (%s)", process.pid, command)

+         A task priority is calculated based on a frontend priority preference

+         and backend priority preference.

  

-     def _clean_daemon_processes(self):

+         This is the backend priority number

          """

-         Wait for all the finished subprocesses to avoid the leftover zombies.

-         Return the number of successfully waited processes.  Complements the

-         start_daemon_on_background() above, but called automatically.

-         """

-         counter = 0

-         try:

-             # Wait for any background process (pid=-1), and no additional

-             # options are needed (options=0).  All the background processes

-             # should quit relatively fast (within one second).

-             while True:

-                 (pid, _) = os.waitpid(-1, 0)

-                 self.log.debug("Worker Manager waited for pid=%s", pid)

-                 counter += 1

-         except ChildProcessError:

-             pass

-         return counter

+         return 0

@@ -4,11 +4,11 @@ 

  Process one Action task provided by frontend (on backend).

  """

  

- from copr_backend.background_worker import BackgroundWorker

+ from copr_backend.background_worker import BackendBackgroundWorker

  from copr_backend.actions import Action, ActionResult

  

  

- class ActionBackgroundWorker(BackgroundWorker):

+ class ActionBackgroundWorker(BackendBackgroundWorker):

      redis_logger_id = 'actions'

  

      @classmethod

@@ -15,7 +15,8 @@ 

  

  sys.path.append(os.path.join(WORKDIR, '..'))

  

- from copr_backend.helpers import get_redis_connection

+ # pylint: disable=wrong-import-position

+ from copr_common.redis_helpers import get_redis_connection

  

  REDIS_OPTS = Munch(

      redis_db=9,

@@ -7,12 +7,13 @@ 

  import testlib

  from testlib import assert_logs_exist, AsyncCreaterepoRequestFactory

  

+ from copr_common.redis_helpers import get_redis_connection

  from copr_backend.createrepo import (

      BatchedCreaterepo,

      MAX_IN_BATCH,

  )

  

- from copr_backend.helpers import BackendConfigReader, get_redis_connection

+ from copr_backend.helpers import BackendConfigReader

  

  # pylint: disable=attribute-defined-outside-init

  

@@ -7,8 +7,9 @@ 

  from munch import Munch

  

  from copr_common.tree import walk_limited

+ from copr_common.redis_helpers import get_redis_connection

  from copr_backend.background_worker_build import BackendError

- from copr_backend.helpers import get_redis_logger, get_chroot_arch, format_filename, get_redis_connection

+ from copr_backend.helpers import get_redis_logger, get_chroot_arch, format_filename

  from copr_backend.constants import LOG_REDIS_FIFO

  

  """

@@ -27,10 +27,10 @@ 

  

  from copr_prune_results import run_prunerepo

  

+ from copr_common.redis_helpers import get_redis_connection

  from copr_backend.helpers import (

      BackendConfigReader,

      call_copr_repo,

-     get_redis_connection,

  )

  

  

@@ -4,12 +4,12 @@ 

  

  # pylint: disable=protected-access

  

- from copr_backend.worker_manager import (

+ from copr_common.worker_manager import (

      GroupWorkerLimit,

      PredicateWorkerLimit,

-     QueueTask,

      StringCounter,

  )

+ from copr_backend.worker_manager import BackendQueueTask

  from copr_backend.rpm_builds import (

      ArchitectureWorkerLimit,

      BuildTagLimit,
@@ -38,7 +38,7 @@ 

      "tags": ["special_requirement"],

  }]

  

- class _QT(QueueTask):

+ class _QT(BackendQueueTask):

      def __init__(self, _id):

          self._id = _id

  

@@ -11,11 +11,14 @@ 

  from munch import Munch

  from copr_common.enums import DefaultActionPriorityEnum

  

- from copr_backend.helpers import get_redis_connection

- from copr_backend.actions import ActionWorkerManager, ActionQueueTask, Action

- from copr_backend.worker_manager import (

-     JobQueue, PredicateWorkerLimit, QueueTask, WorkerManager

+ from copr_common.redis_helpers import get_redis_connection

+ from copr_common.worker_manager import (

+     JobQueue,

+     WorkerManager,

+     PredicateWorkerLimit,

  )

+ from copr_backend.actions import ActionWorkerManager, ActionQueueTask, Action

+ from copr_backend.worker_manager import BackendQueueTask

  

  WORKDIR = os.path.dirname(__file__)

  
@@ -87,7 +90,7 @@ 

      pass

  

  

- class ToyQueueTask(QueueTask):

+ class ToyQueueTask(BackendQueueTask):

      def __init__(self, _id):

          self._id = _id

  
@@ -198,8 +201,8 @@ 

              log=log,

              limits=self.limits)

  

-     @patch('copr_backend.worker_manager.time.sleep')

-     @patch('copr_backend.worker_manager.time.time')

+     @patch('copr_common.worker_manager.time.sleep')

+     @patch('copr_common.worker_manager.time.time')

      def test_that_limits_are_respected(self, mc_time, _mc_sleep, caplog):

          # each time.time() call incremented by 1

          self.worker_manager.task_sleep = 5
@@ -268,7 +271,7 @@ 

          wid = "{}:123".format(self.worker_manager.worker_prefix)

          assert self.worker_manager.get_task_id_from_worker_id(wid) == "123"

  

-     @patch('copr_backend.worker_manager.time.sleep')

+     @patch('copr_common.worker_manager.time.sleep')

      def test_preexisting_broken_worker(self, _mc_sleep, caplog):

          """ from previous systemctl restart """

          fake_worker_name = self.worker_manager.worker_prefix + ":fake"
@@ -398,7 +401,7 @@ 

          raise Exception("Unsuccessful wait for {} in {}".format(worker, field))

  

      @pytest.mark.parametrize('fail', ['FAIL_STARTED_PID', 'FAIL_STARTED'])

-     @patch('copr_backend.worker_manager.time.time')

+     @patch('copr_common.worker_manager.time.time')

      def test_delete_not_finished_workers(self, mc_time, fail):

          self.worker_manager.environ = {fail: '1'}

          self.worker_manager.worker_timeout_deadcheck = 0.4
@@ -407,7 +410,7 @@ 

          mc_time.side_effect = range(1000)

  

          # first loop just starts the toy:0 worker

-         with patch('copr_backend.worker_manager.time.sleep'):

+         with patch('copr_common.worker_manager.time.sleep'):

              self.worker_manager.run(timeout=1)

  

          params = self.wait_field(self.w0, 'started')
@@ -420,12 +423,12 @@ 

              wait_pid_exit(params['PID'])

  

          # toy 0 is marked for deleting

-         with patch('copr_backend.worker_manager.time.sleep'):

+         with patch('copr_common.worker_manager.time.sleep'):

              self.worker_manager.run(timeout=1)

          assert 'delete' in self.redis.hgetall(self.w0)

  

          # toy 0 should be deleted

-         with patch('copr_backend.worker_manager.time.sleep'):

+         with patch('copr_common.worker_manager.time.sleep'):

              self.worker_manager.run(timeout=1)

          keys = self.workers()

          assert self.w1 in keys
@@ -467,7 +470,7 @@ 

          # start the worker

          self.worker_manager.run(timeout=0.0001) # start them task

  

-         with patch('copr_backend.worker_manager.time.sleep') as sleep:

+         with patch('copr_common.worker_manager.time.sleep') as sleep:

              # we can spawn more workers, but queue is empty

              self.worker_manager.run(timeout=0.0001)

              assert sleep.called
@@ -477,7 +480,7 @@ 

          self.wait_field(self.w0, 'status')

  

          # check that we don't sleep here (no worker, no task)

-         with patch('copr_backend.worker_manager.time.sleep') as sleep:

+         with patch('copr_common.worker_manager.time.sleep') as sleep:

              self.worker_manager.run(timeout=0.0001)

              assert not sleep.called

  
@@ -598,7 +601,7 @@ 

  

          # QueueTask.backend_priority is a property which should be

          # overriden when in the class descendants.

-         delattr(QueueTask, "backend_priority")

+         delattr(BackendQueueTask, "backend_priority")

          actions[0].backend_priority = 0

          actions[1].backend_priority = -999

          actions[2].backend_priority = 999

@@ -0,0 +1,154 @@ 

+ """

+ Helper methods/classes for 'copr-backend-*' scripts.

+ """

+ 

+ import os

+ import sys

+ import argparse

+ import contextlib

+ import logging

+ import daemon

+ 

+ import setproctitle

+ 

+ from copr_common.redis_helpers import get_redis_connection

+ 

+ 

+ class BackgroundWorker:

+     """

+     copr-backend-process-* abstraction

+     """

+ 

+     redis_logger_id = 'unknown'

+     frontend_client = None

+     _redis_conn = None

+ 

+     def __init__(self):

+         # just setup temporary stderr logger

+         self.log = logging.getLogger()

+         self.log.setLevel(logging.DEBUG)

+         self.log.addHandler(logging.StreamHandler())

+ 

+         if os.getuid() == 0:

+             self.log.error("this needs to be run as 'copr' user")

+             sys.exit(1)

+ 

+         self.opts = None

+         self.args = self._get_argparser().parse_args(sys.argv[1:])

+ 

+     @staticmethod

+     def setproctitle(text):

+         """ set the process title, beginning with the script name """

+         command = " ".join(sys.argv)

+         setproctitle.setproctitle("{} (command: {})".format(text, command))

+ 

+     @property

+     def _redis(self):

+         if not self._redis_conn:

+             self._redis_conn = get_redis_connection(self.opts)

+         return self._redis_conn

+ 

+     def redis_set_worker_flag(self, flag, value=1):

+         """

+         Set flag in Reids DB for corresponding worker.  NO-OP if there's no

+         redis connection (when run manually).

+         """

+         if not self.has_wm:

+             return

+         self._redis.hset(self.args.worker_id, flag, value)

+ 

+     def redis_get_worker_flag(self, flag):

+         """

+         Get flag from Redis DB entry of corresponding worker.  If there's no

+         Redis connection (manual run) return None

+         """

+         if not self.has_wm:

+             return None

+         return self._redis.hget(self.args.worker_id, flag)

+ 

+     @classmethod

+     def _get_argparser(cls):

+         parser = argparse.ArgumentParser()

+         parser.add_argument(

+             "--worker-id",

+             help=("worker ID which already exists in redis DB (used by "

+                   "WorkerManager only, copr-internal option)"),

+         )

+         parser.add_argument(

+             "--daemon",

+             action='store_true',

+             help="execute the task on background, as daemon process"

+         )

+         parser.add_argument(

+             "--silent",

+             action='store_true',

+             help="don't print logs, even when run without --daemon",

+         )

+         parser.add_argument(

+             "--backend-config",

+             help="alternative path to /etc/copr/copr-be.conf",

+         )

+         cls.adjust_arg_parser(parser)

+         return parser

+ 

+     @classmethod

+     def adjust_arg_parser(cls, parser):

+         """

+         The inherited worker class will need more commandline options than those

+         we provide by default.  Override is required.

+         """

+         raise NotImplementedError()

+ 

+     def handle_task(self):

+         """

+         Abstract method for handling the task.  This should never throw any

+         exception, and we don't expect it to return any value.

+         """

+         raise NotImplementedError()

+ 

+     @property

+     def has_wm(self):

+         """

+         Returns True if worker manager started this process, and False

+         if it is manual run.

+         """

+         return bool(self.worker_id)

+ 

+     @property

+     def worker_id(self):

+         """ Return worker ID if set by worker manager, or None.  """

+         return self.args.worker_id

+ 

+     def _wm_started(self):

+         if not self.has_wm:

+             return True

+ 

+         self.redis_set_worker_flag('started', 1)

+         self.redis_set_worker_flag('PID', os.getpid())

+ 

+         data = self._redis.hgetall(self.args.worker_id)

+         if 'allocated' not in data:

+             self.log.error("too slow box, manager thinks we are dead")

+             self._redis.delete(self.args.worker_id)

+             return False

+ 

+         # There's still small race on a very slow box (TOCTOU in manager, the

+         # db entry can be deleted after our check above ^^).  But we don't risk

+         # anything else than concurrent run of multiple workers in such case.

+         return True

+ 

+     def _daemonized_part(self):

+         try:

+             self.handle_task()

+         except Exception as exc:  # pylint: disable=W0703

+             self.log.exception("unexpected failure %s", str(exc))

+             sys.exit(1)

+ 

+     def process(self):

+         """ process the task """

+         context = contextlib.nullcontext()

+         if self.args.daemon:

+             context = daemon.DaemonContext(umask=0o022)

+ 

+         with context:

+             self._daemonized_part()

@@ -0,0 +1,128 @@ 

+ """

+ Abstract class Dispatcher for Build/Action dispatchers.

+ """

+ 

+ import time

+ import logging

+ import multiprocessing

+ from setproctitle import setproctitle

+ 

+ from copr_common.redis_helpers import get_redis_connection

+ from copr_common.worker_manager import WorkerManager

+ 

+ 

+ class Dispatcher(multiprocessing.Process):

+     """

+     1) Fetch tasks from frontend.

+     2) Fill the WorkerManager queue.

+     3) Run 'WorkerManager.run()'.

+     4) Go to 1)

+ 

+     See also:

+     https://docs.pagure.org/copr.copr/developer_documentation/dispatchers.html

+     https://docs.pagure.org/copr.copr/developer_documentation/worker_manager.html

+     """

+ 

+     # set to either 'action' or 'build' in sub-class

+     task_type = 'task_type'

+ 

+     # either ActionWorkerManager or BuildWorkerManager

+     worker_manager_class = WorkerManager

+ 

+     # how many background workers we let the WorkerManager start, by default

+     # there's no limit

+     max_workers = float("inf")

+ 

+     # we keep track what build's newly appeared in the task list after fetching

+     # the new set from frontend after get_frontend_tasks() call

+     _previous_task_fetch_ids = set()

+ 

+     def __init__(self, opts):

+         super().__init__(name=self.task_type + '-dispatcher')

+ 

+         self.sleeptime = 0

+         self.opts = opts

+         self.log = logging.getLogger()

+         self.frontend_client = None

+         # list of applied WorkerLimit instances

+         self.limits = []

+ 

+     @classmethod

+     def _update_process_title(cls, msg=None):

+         proc_title = "{} dispatcher".format(cls.task_type.capitalize())

+         if msg:

+             proc_title += " - " + msg

+         setproctitle(proc_title)

+ 

+     def get_frontend_tasks(self):

+         """

+         Get _unfiltered_ list of tasks (QueueTask objects) from frontend (the

+         set needs to contain both running and pending jobs).

+         """

+         raise NotImplementedError

+ 

+     def get_cancel_requests_ids(self):

+         """

+         Return list of QueueTask IDS that should be canceled.

+         """

+         _subclass_can_use = (self)

+         return []

+ 

+     def report_canceled_task_id(self, task_id, was_running):

+         """

+         Report back to Frontend that this task was canceled.  By default this

+         isn't called, so it is NO-OP by default.

+         """

+ 

+     def _print_added_jobs(self, tasks):

+         job_ids = {task.id for task in tasks}

+         new_job_ids = job_ids - self._previous_task_fetch_ids

+         if new_job_ids:

+             self.log.info("Got new '%s' tasks: %s", self.task_type, new_job_ids)

+         self._previous_task_fetch_ids = job_ids

+ 

+     def run(self):

+         """

+         Starts the infinite task dispatching process.

+         """

+         self.log.info("%s dispatching started", self.task_type.capitalize())

+         self._update_process_title()

+ 

+         redis = get_redis_connection(self.opts)

+         worker_manager = self.worker_manager_class(

+             redis_connection=redis,

+             log=self.log,

+             max_workers=self.max_workers,

+             frontend_client=self.frontend_client,

+             limits=self.limits,

+         )

+ 

+         timeout = self.sleeptime

+         while True:

+             self._update_process_title("getting tasks from frontend")

+             self.log.info("getting %ss from frontend", self.task_type)

+             start = time.time()

+ 

+             tasks = self.get_frontend_tasks()

+             if tasks:

+                 worker_manager.clean_tasks()

+ 

+             self._print_added_jobs(tasks)

+             for task in tasks:

+                 worker_manager.add_task(task)

+ 

+             self._update_process_title("getting cancel requests")

+             for task_id in self.get_cancel_requests_ids():

+                 was_running = worker_manager.cancel_task_id(task_id)

+                 self.report_canceled_task_id(task_id, was_running)

+ 

+             # process the tasks

+             self._update_process_title("processing tasks")

+             worker_manager.run(timeout=timeout)

+ 

+             sleep_more = timeout - (time.time() - start)

+             if sleep_more > 0:

+                 time.sleep(sleep_more)

+ 

+         # reset the title

+         self._update_process_title()

@@ -0,0 +1,22 @@ 

+ """

+ Copr common code related to redis

+ """

+ 

+ from redis import StrictRedis

+ 

+ 

+ def get_redis_connection(opts):

+     """

+     Creates redis client object using backend config

+ 

+     :rtype: StrictRedis

+     """

+     kwargs = {}

+     if hasattr(opts, "redis_db"):

+         kwargs["db"] = opts.redis_db

+     if hasattr(opts, "redis_host"):

+         kwargs["host"] = opts.redis_host

+     if hasattr(opts, "redis_port"):

+         kwargs["port"] = opts.redis_port

+ 

+     return StrictRedis(encoding="utf-8", decode_responses=True, **kwargs)

@@ -0,0 +1,593 @@ 

+ """

+ WorkerManager for processing a priority queue and starting background jobs

+ """

+ 

+ import os

+ import time

+ from heapq import heappop, heappush

+ import itertools

+ import logging

+ import subprocess

+ 

+ 

+ class WorkerLimit:

+     """

+     Limit for the number of tasks being processed concurrently

+ 

+     WorkerManager expects that it's caller fills the task queue only with tasks

+     that should be processed.  Then WorkerManager is completely responsible for

+     sorting out the queue, and behave -> respect the given limits.

+ 

+     WorkerManager implements really stupid algorithm to respect the limits;  we

+     simply drop the task from queue (and continue to the next task) if it was

+     about to exceed any given limit.  This is the only option at this moment

+     because JobQueue doesn't allow us to skip some task, and return to it later.

+     It is not a problem for Copr dispatchers though because we re-add the

+     dropped tasks to JobQueue anyways -- after the next call to the

+     Dispatcher.get_frontend_tasks() method (see "sleeptime" configuration

+     option).

+ 

+     Each Limit object works as a statistic counter for the list of _currently

+     processed_ tasks (i.e. not queued tasks!).  And we may want to query the

+     statistics anytime we want to.  Both calculating and querying the statistics

+     must be as fast as possible, therefore the interface only re-calculates the

+     stats when WorkerManager starts/stops working on some task.

+ 

+     One may wonder why to bother with limits, and why not to delegate this

+     responsibility on WorkerManager caller (IOW don't put the task to queue if

+     it is not yet the right time process it..).  That would be ideal case, but

+     at the time of filling the queue caller has no idea about the currently

+     running BackgroundWorker instances (those need to be calculated to

+     statistics, too).

+     """

+ 

+     def __init__(self, name=None):

+         self._name = name

+ 

+     def worker_added(self, worker_id, task):

+         """ Add worker and it's task to statistics.  """

+         raise NotImplementedError

+ 

+     def check(self, task):

+         """ Check if the task can be added without crossing the limit. """

+         raise NotImplementedError

+ 

+     def clear(self):

+         """ Clear the statistics. """

+         raise NotImplementedError

+ 

+     def info(self):

+         """ Get the user-readable info about the limit object """

+         if self._name:

+             return "'{}'".format(self._name)

+         return "Unnamed '{}' limit".format(type(self).__name__)

+ 

+ 

+ class PredicateWorkerLimit(WorkerLimit):

+     """

+     Calculate how many tasks being processed by currently running workers match

+     the given predicate.

+     """

+     def __init__(self, predicate, limit, name=None):

+         """

+         :param predicate: function object taking one QueueTask argument, and

+             returning True or False

+         :param limit: how many tasks matching the ``predicate`` are allowed to

+             be processed concurrently.

+         """

+         super().__init__(name)

+         self._limit = limit

+         self._predicate = predicate

+         self.clear()

+ 

+     def clear(self):

+         self._refs = {}

+ 

+     def worker_added(self, worker_id, task):

+         if not self._predicate(task):

+             return

+         self._refs[worker_id] = True

+ 

+     def check(self, task):

+         if not self._predicate(task):

+             return True

+         return len(self._refs) < self._limit

+ 

+     def info(self):

+         text = super().info()

+         matching = ', '.join(self._refs.keys())

+         if not matching:

+             return text

+         return "{}, matching: {}".format(text, matching)

+ 

+ 

+ class StringCounter:

+     """

+     Counter for string occurrences.  When string is None, we don't count it

+     """

+     def __init__(self):

+         self._counter = {}

+ 

+     def add(self, string):

+         """ Add string to counter """

+         if string is None:

+             return

+         if string in self._counter:

+             self._counter[string] += 1

+         else:

+             self._counter[string] = 1

+ 

+     def count(self, string):

+         """ Return number ``string`` occurrences """

+         return self._counter.get(string, 0)

+ 

+     def __str__(self):

+         items = ["{}={}".format(key, value)

+                  for key, value in self._counter.items()]

+         return ", ".join(items)

+ 

+ 

+ class GroupWorkerLimit(WorkerLimit):

+     """

+     Assign task to groups, and set maximum number of workers per each group.

+     """

+     def __init__(self, hasher, limit, name=None):

+         """

+         :param hasher: function object taking one QueueTask argument, and

+             returning string key (name of the ``group``).

+         :param limit: how many tasks in the ``group`` are allowed to be

+             processed at the same time.

+         """

+         super().__init__(name)

+         self._limit = limit

+         self._hasher = hasher

+         self.clear()

+ 

+     def clear(self):

+         self._groups = StringCounter()

+         self._refs = {}

+ 

+     def worker_added(self, worker_id, task):

+         # remember it

+         group_name = self._refs[worker_id] = self._hasher(task)

+         # count it

+         self._groups.add(group_name)

+ 

+     def check(self, task):

+         group_name = self._hasher(task)

+         return self._groups.count(group_name) < self._limit

+ 

+     def info(self):

+         text = super().info()

+         return "{}, counter: {}".format(text, str(self._groups))

+ 

+ 

+ class JobQueue():

+     """

+     Priority "task" queue for WorkerManager.  Taken from:

+     https://docs.python.org/3/library/heapq.html#priority-queue-implementation-notes

+     The higher the 'priority' is, the later the task is taken.

+     """

+ 

+     def __init__(self, removed='<removed-task>'):

+         self.prio_queue = []             # list of entries arranged in a heap

+         self.entry_finder = {}           # mapping of tasks to entries

+         self.removed = removed           # placeholder for a removed task

+         self.counter = itertools.count() # unique sequence count

+ 

+     def add_task(self, task, priority=0):

+         'Add a new task or update the priority of an existing task'

+         if repr(task) in self.entry_finder:

+             self.remove_task(task)

+         count = next(self.counter)

+         entry = [priority, count, task]

+         self.entry_finder[repr(task)] = entry

+         heappush(self.prio_queue, entry)

+ 

+     def remove_task(self, task):

+         'Mark an existing task as removed.  Raise KeyError if not found.'

+         self.remove_task_by_id(repr(task))

+ 

+     def remove_task_by_id(self, task_id):

+         """

+         Using task id, drop the task from queue.  Raise KeyError if not found.

+         """

+         entry = self.entry_finder.pop(task_id)

+         entry[-1] = self.removed

+ 

+     def pop_task(self):

+         'Remove and return the lowest priority task. Raise KeyError if empty.'

+         while self.prio_queue:

+             _priority, _count, task = heappop(self.prio_queue)

+             if task is not self.removed:

+                 del self.entry_finder[repr(task)]

+                 return task

+         raise KeyError('pop from an empty priority queue')

+ 

+ 

+ class QueueTask:

+     """

+     A base class for tasks processed by `Dispatcher` implementations

+     """

+ 

+     def __repr__(self):

+         return str(self.id)

+ 

+     @property

+     def id(self):

+         """

+         Unique ID for distinguishing tasks

+         """

+         raise NotImplementedError

+ 

+     @property

+     def priority(self):

+         """

+         The higher the 'priority' is, the later the task is taken.

+         """

+         return 0

+ 

+ 

+ class WorkerManager():

+     """

+     Automatically process 'self.tasks' priority queue, and start background jobs

+     to handle them.

+ 

+     :cvar worker_prefix: Unique string across all the WorkerManager child

+             classes, this is used as prefix for the workers in redis database

+             and to easily determine to which WorkerManager the particlar worker

+             belongs to.  So it can be anything reasonable, just make sure it is

+             unique.

+     :cvar worker_timeout_start: The time period we give the background process

+             successfully start and identify itself (see has_worker_started()

+             method).  If the background process isn't indentified after this

+             timeout, we drop it from database and consider it failed.  And the

+             task is re-scheduled.  Float value in seconds.

+     :cvar worker_timeout_deadcheck: If the worker successfully identified itself

+             after start (the has_worker_started() returns True) we know that the

+             worker process at least started.  But after worker_timeout_deadcheck

+             timeout we'll also keep an eye on the process by asking the

+             is_worker_alive() method - whether the task is really still doing

+             something on background or not (== unexpected failure cleanup).

+             Fill float value in seconds.

+     :cvar worker_cleanup_period: How often should WorkerManager try to cleanup

+             workers? (value is a period in seconds)

+     """

+ 

+     # pylint: disable=too-many-instance-attributes

+ 

+     worker_prefix = 'worker' # make sure this is unique in each class

+     worker_timeout_start = 30

+     worker_timeout_deadcheck = 3*60

+     worker_cleanup_period = 3.0

+ 

+ 

+     def __init__(self, redis_connection=None, max_workers=8, log=None,

+                  frontend_client=None, limits=None):

+         self.tasks = JobQueue()

+         self.log = log if log else logging.getLogger()

+         self.redis = redis_connection

+         self.max_workers = max_workers

+         self.frontend_client = frontend_client

+         # We have to frequently ask for the actually tracked list of workers —

+         # therefore we keep it here to not re-query the list from Redis all the

+         # time.  We have to load the list from Redis initially, when the process

+         # starts (Manager/Dispatcher class is loaded) because we want the logic

+         # to survive server restarts (we adopt the old background workers).

+         self._tracked_workers = set(self.worker_ids())

+         self._limits = limits or []

+         self._last_worker_cleanup = None

+ 

+     def start_task(self, worker_id, task):

+         """

+         Start background job using the 'task' object taken from the 'tasks'

+         queue.  The background task should _on its own_ and ASAP let the manager

+         know that it successfully started (e.g. mark the job 'started' in redis

+         DB), so the has_worker_started() method later gives us valid info.

+         That's btw. the reason why we have the mandatory `worker_id` argument

+         here, the background worker needs to know what key to update in redis.

+         """

+         raise NotImplementedError

+ 

+     def finish_task(self, worker_id, task_info):

+         """

+         This is called once the worker manager consider the task to be done,

+         because the `has_worker_ended()` method already returns True.  Override

+         this function and use it to let Frontend know that the task finished,

+         and how (whether it succeeded, etc.).

+         """

+         raise NotImplementedError

+ 

+     def has_worker_started(self, worker_id, task_info):

+         """

+         The background task process should somehow notify manager that it

+         already started (so we can have has_worker_started() implemented).

+         By default we expect it sets 'started' attribute in redis DB, but feel

+         free to override this method and invent different notification

+         mechanism.

+         """

+         # pylint: disable=no-self-use

+         # pylint: disable=unused-argument

+         return 'started' in task_info

+ 

+     def has_worker_ended(self, worker_id, task_info):

+         """

+         Check 'task_info' (dictionary output from redis) whether the task is

+         already finished by worker.  If yes, do whatever is needed with the

+         result (contact frontend) and return True.  If the task is still

+         processed, return False.  By default we just check for the ``status``

+         presence in ``task_info``, but this method is to be overridden.

+         """

+         _subclass_can_use = (self, worker_id)

+         return 'status' in task_info

+ 

+     def is_worker_alive(self, worker_id, task_info):

+         """

+         Check staled jobs on background, whether they haven't died before they

+         notified us about the status.  We'll keep asking after

+         worker_timeout_deadcheck seconds left since we tried to spawn the

+         worker.

+ 

+         By default we check for 'PID' presence in task_info, but users are

+         advised to override this method when necessary.  We keep the 'self'

+         and 'worker_id' arguments as they might be useful.

+         """

+         # pylint: disable=unused-argument,no-self-use

+         if not 'PID' in task_info:

+             return False

+         pid = int(task_info['PID'])

+         try:

+             # Send signal=0 to the process to check whether it still exists.

+             # This is just no-op if the signal was successfully delivered to

+             # existing process, otherwise exception is raised.

+             os.kill(pid, 0)

+         except OSError:

+             return False

+         return True

+ 

+     def get_worker_id(self, task_id):

+         """

+         Given the unique task representation form (usually ID), generate worker

+         identificator (redis key).

+         """

+         return '{}:{}'.format(self.worker_prefix, task_id)

+ 

+     def get_task_id_from_worker_id(self, worker_id):

+         """

+         Given the unique task representation form (usually ID), generate worker

+         identificator (redis key).

+         """

+         prefix, task_id = worker_id.rsplit(':', 1)

+         assert prefix == self.worker_prefix

+         return task_id

+ 

+     def _calculate_limits_for_task(self, worker_id, task):

+         for limit in self._limits:

+             limit.worker_added(worker_id, task)

+ 

+     def cancel_request_done(self, task):

+         """ Report back to frontend that the cancel request was finished. """

+ 

+     def add_task(self, task):

+         """

+         Add task to queue.

+         """

+         task_id = repr(task)

+         worker_id = self.get_worker_id(task_id)

+ 

+         if worker_id in self._tracked_workers:

+             # No need to re-add this to queue, but we need to calculate

+             # it into the limits.

+             self._calculate_limits_for_task(worker_id, task)

+             self.log.debug("Task %s already has a worker process", task_id)

+             return

+ 

+         self.log.debug("Adding task %s to queue, priority %s", task_id,

+                        task.priority)

+         self.tasks.add_task(task, task.priority)

+ 

+     def _drop_task_id_safe(self, task_id):

+         try:

+             self.tasks.remove_task_by_id(task_id)

+         except KeyError:

+             pass

+ 

+     def cancel_task_id(self, task_id):

+         """

+         Using task_id, cancel corresponding task, and request worker

+         shut-down (when already started)

+ 

+         :return: True if worker is running on background, False otherwise

+         """

+         self._drop_task_id_safe(task_id)

+         worker_id = self.get_worker_id(task_id)

+         if worker_id not in self.worker_ids():

+             self.log.info("Cancel request, worker %s is not running", worker_id)

+             return False

+         self.log.info("Cancel request, worker %s requested to cancel",

+                       worker_id)

+         self.redis.hset(worker_id, 'cancel_request', 1)

+         return True

+ 

+     def worker_ids(self):

+         """

+         Return the redis keys representing workers running on background.

+         """

+         return self.redis.keys(self.worker_prefix + ':*')

+ 

+     def run(self, timeout=float('inf')):

+         """

+         Process the task (priority) queue.

+         """

+         now = None

+         start_time = time.time()

+         self.log.debug("Worker.run() start at time %s", start_time)

+ 

+         # Make sure _cleanup_workers() has some effect during the run() call.

+         # This is here mostly for the test-suite, because in the real use-cases

+         # the worker_cleanup_period is much shorter period than the timeout and

+         # the cleanup is done _several_ times during the run() call.

+         self._last_worker_cleanup = 0.0

+ 

+         while True:

+             now = start_time if now is None else time.time()

+ 

+             if not now - start_time < timeout:

+                 break

+ 

+             self._cleanup_workers(now)

+ 

+             worker_count = len(self._tracked_workers)

+             if worker_count >= self.max_workers:

+                 self.log.debug("Worker count on a limit %s", worker_count)

+                 time.sleep(1)

+                 continue

+ 

+             # We can allocate some workers, if there's something to do.

+             try:

+                 task = self.tasks.pop_task()

+             except KeyError:

+                 # Empty queue!

+                 if worker_count:

+                     # It still makes sense to cycle to finish the workers.

+                     self.log.debug("No more tasks, waiting for workers")

+                     time.sleep(1)

+                     continue

+                 # Optimization part, nobody is working now, and there's nothing

+                 # to do.  Just simply wait till the end of the cycle.

+                 break

+ 

+             break_on_limit = False

+             for limit in self._limits:

+                 # just skip this task, it will be processed in the next

+                 # run because we keep re-filling the queue

+                 if not limit.check(task):

+                     self.log.debug("Task '%s' skipped, limit info: %s",

+                                    task.id, limit.info())

+                     break_on_limit = True

+                     break

+             if break_on_limit:

+                 continue

+ 

+             self._start_worker(task, now)

+ 

+         self.log.debug("Reaped %s processes", self._clean_daemon_processes())

+         self.log.debug("Worker.run() stop at time %s", time.time())

+ 

+     def _start_worker(self, task, time_now):

+         worker_id = self.get_worker_id(repr(task))

+         self.redis.hset(worker_id, 'allocated', time_now)

+         self._tracked_workers.add(worker_id)

+         self.log.info("Starting worker %s, task.priority=%s", worker_id,

+                       task.priority)

+         self._calculate_limits_for_task(worker_id, task)

+         self.start_task(worker_id, task)

+ 

+     def clean_tasks(self):

+         """

+         Remove all tasks from queue.

+         """

+         self.tasks = JobQueue()

+         for limit in self._limits:

+             limit.clear()

+ 

+     def _delete_worker(self, worker_id):

+         self.redis.delete(worker_id)

+         self._tracked_workers.discard(worker_id)

+ 

+     def _cleanup_workers(self, now):

+         """

+         Go through all the tracked workers and check if they already finished,

+         failed to start or died in the background.

+         """

+ 

+         # This method is called very frequently (several hundreds per second,

+         # for each of the attempts to start a worker in the self.run() method).

+         # Because the likelihood that some of the background workers changed

+         # state is pretty low, we control the frequency of the cleanup here.

+         now = time.time()

+         if now - self._last_worker_cleanup < self.worker_cleanup_period:

+             return

+ 

+         self.log.debug("Trying to clean old workers")

+         self._last_worker_cleanup = time.time()

+ 

+         for worker_id in self.worker_ids():

+             info = self.redis.hgetall(worker_id)

+ 

+             allocated = info.get('allocated', None)

+             if not allocated:

+                 # In worker manager, we _always_ add 'allocated' tag when we

+                 # start worker.  So this may only happen when worker is

+                 # orphaned for some reason (we gave up with him), and it still

+                 # touches the database on background.

+                 self.log.info("Missing 'allocated' flag for worker %s", worker_id)

+                 self._delete_worker(worker_id)

+                 continue

+ 

+             allocated = float(allocated)

+ 

+             if self.has_worker_ended(worker_id, info):

+                 # finished worker

+                 self.log.info("Finished worker %s", worker_id)

+                 self.finish_task(worker_id, info)

+                 self._delete_worker(worker_id)

+                 continue

+ 

+             if info.get('delete'):

+                 self.log.warning("worker %s deleted", worker_id)

+                 self._delete_worker(worker_id)

+                 continue

+ 

+             if not self.has_worker_started(worker_id, info):

+                 if now - allocated > self.worker_timeout_start:

+                     # This worker failed to start?

+                     self.log.error("worker %s failed to start", worker_id)

+                     self._delete_worker(worker_id)

+                 continue

+ 

+             checked = info.get('checked', allocated)

+ 

+             if now - float(checked) > self.worker_timeout_deadcheck:

+                 self.log.info("checking worker %s", worker_id)

+                 self.redis.hset(worker_id, 'checked', now)

+                 if self.is_worker_alive(worker_id, info):

+                     continue

+                 self.log.error("dead worker %s", worker_id)

+ 

+                 # The worker could finish in the meantime, make sure we

+                 # hgetall() once more.

+                 self.redis.hset(worker_id, 'delete', 1)

+ 

+     def start_daemon_on_background(self, command, env=None):

+         """

+         The daemon.DaemonContext() is pretty slow thing, it may take up to 1s

+         to finish and return the exit status to the parent process.  But if the

+         calling logic is properly prepared for potential startup failures, we

+         don't have to wait at all and we can start the background process on

+         background, too.  Typical work-around for starting the

+         'copr-backend-process-*' scripts that are based on the

+         BackgroundWorker.process() logic.

+         """

+         # pylint: disable=consider-using-with

+         process = subprocess.Popen(command, env=env)

+         self.log.debug("background pid=%s started (%s)", process.pid, command)

+ 

+     def _clean_daemon_processes(self):

+         """

+         Wait for all the finished subprocesses to avoid the leftover zombies.

+         Return the number of successfully waited processes.  Complements the

+         start_daemon_on_background() above, but called automatically.

+         """

+         counter = 0

+         try:

+             # Wait for any background process (pid=-1), and no additional

+             # options are needed (options=0).  All the background processes

+             # should quit relatively fast (within one second).

+             while True:

+                 (pid, _) = os.waitpid(-1, 0)

+                 self.log.debug("Worker Manager waited for pid=%s", pid)

+                 counter += 1

+         except ChildProcessError:

+             pass

+         return counter

@@ -16,7 +16,7 @@ 

  %endif

  

  Name:       python-copr-common

- Version:    0.16.1.dev

+ Version:    0.16.2.dev

  Release:    1%{?dist}

  Summary:    Python code used by Copr

  
@@ -115,6 +115,12 @@ 

  

  

  %changelog

+ * Thu Oct 27 2022 Jakub Kadlcik <frostyx@email.cz> - 0.16.2.dev-1

+ - Add background_worker.py from backend

+ - Add get_redis_connection function

+ - Add Dispatcher, WorkerManager, and QueueTask classes

+ - Add WorkerLimit, PredicateWorkerLimit, and GroupWorkerLimit classes

+ 

  * Sun Oct 02 2022 Jakub Kadlcik <frostyx@email.cz> - 0.16-1

  - Add setup_script_logger function

  

@@ -21,3 +21,7 @@ 

  

  # Lock file used to atomically work with cgit_cache_file

  #cgit_cache_lock_file=/var/cache/cgit/copr-repo.lock

+ 

+ # Redis connetion for tracking background workers

+ #redis_host = "localhost"

+ #redis_port = 6379

@@ -8,7 +8,7 @@ 

  User=copr-dist-git

  Group=packager

  AmbientCapabilities=CAP_SETGID

- ExecStart=/usr/bin/python3 /usr/bin/importer_runner.py

+ ExecStart=/usr/bin/copr-run-dispatcher imports

  

  [Install]

  WantedBy=multi-user.target

file modified
+4 -1
@@ -1,4 +1,4 @@ 

- %global copr_common_version 0.14.1.dev

+ %global copr_common_version 0.16.2.dev

  

  Name:       copr-dist-git

  Version:    0.57
@@ -37,9 +37,12 @@ 

  Requires: python3-munch

  Requires: python3-oslo-concurrency

  Requires: python3-setproctitle

+ Requires: python3-daemon

+ Requires: python3-redis

  Requires: findutils

  Requires: (copr-selinux if selinux-policy-targeted)

  Requires: crontabs

+ Requires: redis

  

  Recommends: python3-copr

  

@@ -1,4 +1,5 @@ 

  import os

+ import sys

  import logging

  import subprocess

  import munch
@@ -145,6 +146,14 @@ 

          opts.git_user_email = _get_conf(

              cp, "dist-git", "git_user_email", "copr-devel@lists.fedorahosted.org"

          )

+ 

+         opts.max_workers = _get_conf(

+             cp, "dist-git", "max_workers", default=10, mode="int"

+         )

+ 

+         opts.redis_host = _get_conf(cp, "dist-git", "redis_host", "localhost")

+         opts.redis_port = _get_conf(cp, "dist-git", "redis_port", "6379")

These new options deserve documentation in copr-dist-git.conf.example.

+ 

          return opts

  

  
@@ -208,3 +217,16 @@ 

          raise RunCommandException(result.stderr)

  

      return result

+ 

+ 

+ def get_distgit_opts(path):

+     """

+     Return a parsed config file as a `dict`

+     """

+     config_reader = ConfigReader(path)

+ 

+     if not os.path.exists(path):

+         sys.stderr.write("No config file found at: {0}\n".format(path))

+         sys.exit(1)

+ 

+     return config_reader.read()

@@ -0,0 +1,71 @@ 

+ """

+ ImportDispatcher related classes.

+ """

+ 

+ import os

+ import sys

+ import logging

+ from copr_common.dispatcher import Dispatcher

+ from copr_common.worker_manager import GroupWorkerLimit

+ from copr_dist_git.importer import Importer, ImportWorkerManager

+ 

+ 

+ # TODO Move this to the config file

+ LIMITS = {

+     "sandbox": 3,

+     "owner": 5,

+ }

+ 

+ 

+ class ImportDispatcher(Dispatcher):

+     """

+     Kick-off a dispatcher daemon for importing tasks into DistGit.

+     """

+     task_type = 'import'

+     worker_manager_class = ImportWorkerManager

+ 

+     def __init__(self, opts):

+         super().__init__(opts)

+ 

+         self.log = self._get_logger()

+         self.sleeptime = opts.sleep_time

+         self.max_workers = self.opts.max_workers

+ 

+         for limit_type in ['sandbox', 'owner']:

+             limit = LIMITS[limit_type]

+             self.log.info("setting %s limit to %s", limit_type, limit)

+             self.limits.append(GroupWorkerLimit(

+                 lambda x, limit=limit_type: getattr(x, limit),

+                 limit,

+                 name=limit_type,

+             ))

+ 

+         self._create_per_task_logs_directory(self.opts.per_task_log_dir)

+ 

+     def get_frontend_tasks(self):

+         importer = Importer(self.opts)

+         tasks = importer.try_to_obtain_new_tasks(limit=999999)

+         return tasks

+ 

+     def _create_per_task_logs_directory(self, path):

+         self.log.info("Make sure per-task-logs dir exists at: %s", path)

+         try:

+             os.makedirs(path)

+         except OSError:

+             if not os.path.isdir(path):

+                 self.log.error(

+                     "Could not create per-task-logs directory at path %s", path)

+                 sys.exit(1)

+ 

+     def _get_logger(self):

+         formatstr = ("[%(asctime)s][%(levelname)s][%(name)s]"

+                      "[%(module)s:%(lineno)d][pid:%(process)d] %(message)s")

+         logging.basicConfig(

+             filename=os.path.join(self.opts.log_dir, "main.log"),

+             level=logging.DEBUG,

+             format=formatstr,

+             datefmt='%H:%M:%S'

+         )

+         logging.getLogger('requests.packages.urllib3').setLevel(logging.WARN)

+         logging.getLogger('urllib3').setLevel(logging.WARN)

+         return logging.getLogger(__name__)

@@ -1,14 +1,19 @@ 

  # coding: utf-8

  

+ from copr_common.worker_manager import QueueTask

  from .exceptions import PackageImportException

  

- class ImportTask(object):

+ class ImportTask(QueueTask):

+     # pylint: disable=too-many-instance-attributes

+ 

      def __init__(self):

          self.build_id = None

          self.owner = None

          self.project = None

          self.branches = []

          self.srpm_url = None

+         self.sandbox = None

+         self.background = None

  

      @staticmethod

      def from_dict(task_dict):
@@ -21,12 +26,22 @@ 

              task.branches = task_dict["branches"]

              task.srpm_url = task_dict["srpm_url"]

              task.pkg_name = task_dict["pkg_name"]

+             task.sandbox = task_dict["sandbox"]

+             task.background = task_dict["background"]

          except (KeyError, ValueError) as e:

              raise PackageImportException(str(e))

  

          return task

  

      @property

+     def id(self):

+         return self.build_id

+ 

+     @property

+     def priority(self):

+         return 100 if self.background else 0

+ 

+     @property

      def repo_namespace(self):

          return "{}/{}".format(self.owner, self.project)

  

@@ -7,6 +7,8 @@ 

  

  from requests import get, post

  

+ from copr_common.worker_manager import WorkerManager

+ 

  from .package_import import import_package

  from .process_pool import Worker, Pool, SingleThreadWorker

  from .exceptions import PackageImportException
@@ -138,3 +140,25 @@ 

                  log.info("Starting worker '{}' with task '{}' (timeout={})"

                           .format(p.name, mb_task.build_id, p.timeout))

                  p.start()

+ 

+ 

+ class ImportWorkerManager(WorkerManager):

+     """

+     Manager taking care of background import workers.

+     """

+ 

+     worker_prefix = 'import_worker'

+ 

+     def start_task(self, worker_id, task):

+         command = [

+             "copr-distgit-process-import",

+             "--daemon",

+             "--build-id", str(task.build_id),

+             "--worker-id", worker_id,

+         ]

+         self.log.info("running worker: %s", " ".join(command))

+         self.start_daemon_on_background(command)

+ 

+     def finish_task(self, worker_id, task_info):

+         self.get_task_id_from_worker_id(worker_id)

+         return True

@@ -0,0 +1,63 @@ 

+ #! /usr/bin/python3

+ 

+ """

+ Process one import task provided by frontend (on distgit).

+ """

+ 

+ import os

+ import sys

+ import requests

+ from copr_common.background_worker import BackgroundWorker

+ from copr_dist_git.helpers import get_distgit_opts

+ from copr_dist_git.importer import Importer

+ from copr_dist_git.import_task import ImportTask

+ 

+ 

+ class ImportBackgroundWorker(BackgroundWorker):

+     """

+     copr-distgit-process-import abstraction

+     """

+ 

+     redis_logger_id = "import"

+ 

+     def __init__(self):

+         super().__init__()

+ 

+         config = self.args.backend_config or "/etc/copr/copr-dist-git.conf"

+         if not os.path.exists(config):

+             sys.stderr.write("No config file found at: {0}\n".format(config))

+         self.opts = get_distgit_opts(config)

+ 

+     @classmethod

+     def adjust_arg_parser(cls, parser):

+         parser.add_argument(

+             "--build-id",

+             type=int,

+             required=True,

+             help="build ID to process",

+         )

+ 

+     def handle_import(self, build_id):

+         """

+         Import a single task

+         """

+         importer = Importer(self.opts)

+         url = ("{0}/backend/get-import-task/{1}"

+                .format(self.opts.frontend_base_url, build_id))

+         response = requests.get(url)

+         task_dict = response.json()

+ 

+         if not task_dict:

+             self.log.error("No such build: %s", build_id)

+             return

+ 

+         task = ImportTask.from_dict(task_dict)

+         importer.do_import(task)

+ 

+     def handle_task(self):

+         self.handle_import(self.args.build_id)

+ 

+ 

+ if __name__ == "__main__":

+     worker = ImportBackgroundWorker()

+     worker.process()

@@ -0,0 +1,28 @@ 

+ #! /usr/bin/python3

+ 

+ """

+ Simple wrapper around our Dispatcher classed that is used to start the

+ dispatchers from our systemd unit files.

+ """

+ 

+ import sys

+ from copr_dist_git.helpers import get_distgit_opts

+ from copr_dist_git.import_dispatcher import ImportDispatcher

+ 

+ 

+ def _main():

+     dispatcher = None

+     request = sys.argv[1]

+     if request == "imports":

+         dispatcher = ImportDispatcher

+     else:

+         raise NotImplementedError(

+             "Not implemented '{}' dispatcher".format(request))

+ 

+     config = "/etc/copr/copr-dist-git.conf"

+     opts = get_distgit_opts(config)

+     dispatcher(opts).run()

+ 

+ 

+ if __name__ == "__main__":

+     _main()

@@ -1,79 +0,0 @@ 

- #!/usr/bin/python3

- # coding: utf-8

- 

- import os

- import sys

- import argparse

- import logging

- 

- from copr_dist_git.helpers import ConfigReader

- from copr_dist_git.importer import Importer

- 

- log = logging.getLogger(__name__)

- 

- 

- def get_arg_parser():

-     """

-     Parser for commandline options

-     """

-     description = "copr-dist-git process for importing packages"

-     parser = argparse.ArgumentParser("importer_runner", description=description)

-     parser.add_argument(

-         "--foreground",

-         action="store_true",

-         help="Run this process on foreground, using just a single thread",

-     )

-     parser.add_argument(

-         "config",

-         help="Path to a config file",

-         nargs="?",

-     )

-     return parser

- 

- 

- def main():

-     parser = get_arg_parser()

-     args = parser.parse_args()

- 

-     config_reader = ConfigReader(args.config)

-     try:

-         opts = config_reader.read()

-     except Exception:

-         print("Failed to read config file, used file location: `{}`"

-               .format(config_file))

-         sys.exit(1)

- 

-     if args.foreground:

-         opts.multiple_threads = False

- 

-     logging.basicConfig(

-         filename=os.path.join(opts.log_dir, "main.log"),

-         level=logging.DEBUG,

-         format='[%(asctime)s][%(levelname)s][%(name)s][%(module)s:%(lineno)d][pid:%(process)d] %(message)s',

-         datefmt='%H:%M:%S'

-     )

- 

-     logging.getLogger('requests.packages.urllib3').setLevel(logging.WARN)

-     logging.getLogger('urllib3').setLevel(logging.WARN)

- 

-     log.info("Make sure per-task-logs dir exists at: {}".format(opts.per_task_log_dir))

-     try:

-         os.makedirs(opts.per_task_log_dir)

-     except OSError:

-         if not os.path.isdir(opts.per_task_log_dir):

-             log.error("Could not create per-task-logs directory at path {}"

-                       .format(opts.per_task_log_dir))

-             sys.exit(1)

- 

-     log.info("Logging configuration done")

-     log.info("Using configuration: \n"

-              "{}".format(opts))

-     importer = Importer(opts)

-     try:

-         importer.run()

-     except:

-         log.exception("Unexpected exception raised")

- 

- 

- if __name__ == "__main__":

-     main()

file modified
+4
@@ -50,6 +50,8 @@ 

              "branches": [ self.BRANCH ],

              "srpm_url": "http://example.com/pkg.src.rpm",

              "pkg_name": "pkg",

+             "sandbox": "{0}/{1}--{0}".format(self.USER_NAME, self.PROJECT_NAME),

+             "background": False,

          }

          self.upload_task_data = {

              "build_id": 124,
@@ -59,6 +61,8 @@ 

              "branches": [ self.BRANCH ],

              "srpm_url": "http://front/tmp/tmp_2/pkg_2.src.rpm",

              "pkg_name": "pkg_2",

+             "sandbox": "{0}/{1}--{0}".format(self.USER_NAME, self.PROJECT_NAME),

+             "background": False,

          }

  

          self.url_task = import_task.ImportTask.from_dict(self.url_task_data)

file modified
+4 -5
@@ -29,10 +29,9 @@ 

  RUN mkdir /tmp/copr-dist-git

  RUN chown copr-dist-git:packager /tmp/copr-dist-git

  

- RUN echo "[dist-git]" > /etc/copr/copr-dist-git.conf && \

-     echo "frontend_base_url=http://frontend:5000" >> /etc/copr/copr-dist-git.conf && \

-     echo "frontend_auth=1234"  >> /etc/copr/copr-dist-git.conf && \

-     chmod 644 /etc/copr/copr-dist-git.conf

+ # copy filesystem setup and setup ownership and permissions

+ COPY files/ /

+ RUN chmod 644 /etc/copr/copr-dist-git.conf

  

  RUN echo " [user]" >> /home/copr-dist-git/.gitconfig && \

      echo " email = copr-devel@lists.fedorahosted.org" >> /home/copr-dist-git/.gitconfig && \
@@ -49,4 +48,4 @@ 

  USER copr-dist-git

  

  ENTRYPOINT ["/usr/bin/tini", "--"]

- CMD ["bash", "-c", "mkdir -p /var/lib/dist-git/cache /var/lib/dist-git/git && exec /usr/bin/importer_runner.py"]

+ CMD ["bash", "-c", "mkdir -p /var/lib/dist-git/cache /var/lib/dist-git/git && exec /usr/bin/copr-run-dispatcher imports"]

@@ -0,0 +1,6 @@ 

+ [dist-git]

+ frontend_base_url=http://frontend:5000

+ frontend_auth=1234

+ 

+ redis_host=redis

+ redis_port=6379

@@ -14,7 +14,6 @@ 

  

  from coprs.views import misc

  from coprs.views.backend_ns import backend_ns

- from sqlalchemy.sql import false, true

  

  

  @backend_ns.after_request
@@ -33,27 +32,48 @@ 

      """

      Return list of builds that are waiting for dist-git to import the sources.

      """

-     tasks = []

- 

-     builds_for_import = BuildsLogic.get_build_importing_queue().filter(models.Build.is_background == false()).limit(100).all()

-     builds_for_import += BuildsLogic.get_build_importing_queue().filter(models.Build.is_background == true()).limit(30).all()

- 

-     for build in builds_for_import:

-         branches = set()

-         for b_ch in build.build_chroots:

-             branches.add(b_ch.mock_chroot.distgit_branch_name)

- 

-         tasks.append({

-             "build_id": build.id,

-             "owner": build.copr.owner_name,

-             "project": build.copr_dirname,

-             # TODO: we mix PR with normal builds here :-(

-             "branches": list(branches),

-             "pkg_name": build.package.name,

-             "srpm_url": build.srpm_url,

-         })

+     def _stream():

+         builds_for_import = BuildsLogic.get_build_importing_queue()

+         for build in builds_for_import:

+             task = get_import_record(build)

+             yield task

+     return streamed_json(_stream())

+ 

+ 

+ @backend_ns.route("/get-import-task/<build_id>")

+ def get_import_task(build_id):

+     """

+     Return a single task that DistGit should import

+     """

+     build = BuildsLogic.get(build_id).one_or_none()

+     task = get_import_record(build)

+     return flask.jsonify(task)

+ 

+ 

+ def get_import_record(build):

+     """

+     Transform an ORM Build instance into a Python dictionary that is later

+     converted to a JSON string and sent (as task build instructions) to Copr

+     DistGit machine.

+     """

+     if not build:

+         return None

  

-     return flask.jsonify(tasks)

+     branches = set()

+     for b_ch in build.build_chroots:

+         branches.add(b_ch.mock_chroot.distgit_branch_name)

+ 

+     return {

+         "build_id": build.id,

+         "owner": build.copr.owner_name,

+         "project": build.copr_dirname,

+         # TODO: we mix PR with normal builds here :-(

+         "branches": list(branches),

+         "pkg_name": build.package.name,

+         "srpm_url": build.srpm_url,

+         "sandbox": build.sandbox,

+         "background": build.is_background,

+     }

  

  

  @backend_ns.route("/import-completed/", methods=["POST", "PUT"])

@@ -557,7 +557,13 @@ 

  

          r = self.tc.get("/backend/importing/")

          data = json.loads(r.data.decode("utf-8"))

-         assert data[0]["srpm_url"] == "http://bar"

+ 

+         # Make sure we set the `background` key, but ignore the task order.

+         # Tasks will be prioritized appropriately on DistGit

+         assert data[0]["srpm_url"] == "http://foo"

+         assert data[0]["background"] is True

+         assert data[1]["srpm_url"] == "http://bar"

+         assert data[1]["background"] is False

  

      def test_importing_queue_multiple_bg(self, f_users, f_coprs, f_mock_chroots, f_db):

          BuildsLogic.create_new_from_url(self.u1, self.c1, "foo", background=True)

Moving backend scheduler to copr-common, so it can be used for distgit

Metadata Update from @frostyx:
- Pull-request tagged with: wip

2 years ago

I marked the PR as WIP. So far I only moved the scheduler to copr-common. I still need to re-work distgit to use it.

Build failed. More information on how to proceed and troubleshoot errors available at https://fedoraproject.org/wiki/Zuul-based-ci

1 new commit added

  • distgit, frontend: add BackgroundWorker script for importing
2 years ago

Still WIP but at this moment, I can do

copr-distgit-process-import --build-id 3522414

Build failed. More information on how to proceed and troubleshoot errors available at https://fedoraproject.org/wiki/Zuul-based-ci

2 new commits added

  • distgit: use WorkerManager and Dispatcher
  • backend, common: move Dispatcher and WorkerManager to copr-common
2 years ago

Build failed. More information on how to proceed and troubleshoot errors available at https://fedoraproject.org/wiki/Zuul-based-ci

4 new commits added

  • distgit: use WorkerManager and Dispatcher
  • backend, common: move Dispatcher and WorkerManager to copr-common
  • distgit, frontend: add BackgroundWorker script for importing
  • backend, common: move background_worker.py to copr-common
2 years ago

Build failed. More information on how to proceed and troubleshoot errors available at https://fedoraproject.org/wiki/Zuul-based-ci

4 new commits added

  • distgit: use WorkerManager and Dispatcher
  • backend, common: move Dispatcher and WorkerManager to copr-common
  • distgit, frontend: add BackgroundWorker script for importing
  • backend, common: move background_worker.py to copr-common
2 years ago

Build failed. More information on how to proceed and troubleshoot errors available at https://fedoraproject.org/wiki/Zuul-based-ci

rebased onto 22578b7

2 years ago

Build succeeded.

4 new commits added

  • distgit: use WorkerManager and Dispatcher
  • backend, common: move Dispatcher and WorkerManager to copr-common
  • distgit, frontend: add BackgroundWorker script for importing
  • backend, common: move background_worker.py to copr-common
2 years ago

rebased onto 298d401

2 years ago

Build failed. More information on how to proceed and troubleshoot errors available at https://fedoraproject.org/wiki/Zuul-based-ci

Build failed. More information on how to proceed and troubleshoot errors available at https://fedoraproject.org/wiki/Zuul-based-ci

My apologies to anyone reviewing the PR.
It is way more messy and complicated than I originally expected.

I tested this on devel instance and it seems to work, PTAL.

Metadata Update from @frostyx:
- Pull-request untagged with: wip

2 years ago

4 new commits added

  • distgit: use WorkerManager and Dispatcher
  • backend, common: move Dispatcher and WorkerManager to copr-common
  • distgit, frontend: add BackgroundWorker script for importing
  • backend, common: move background_worker.py to copr-common
2 years ago

Build failed. More information on how to proceed and troubleshoot errors available at https://fedoraproject.org/wiki/Zuul-based-ci

4 new commits added

  • distgit: use WorkerManager and Dispatcher
  • backend, common: move Dispatcher and WorkerManager to copr-common
  • distgit, frontend: add BackgroundWorker script for importing
  • backend, common: move background_worker.py to copr-common
2 years ago

Build failed. More information on how to proceed and troubleshoot errors available at https://fedoraproject.org/wiki/Zuul-based-ci

I'd probably prefer doing the movement stuff in one comment, but not sure how complicated the rebase would be? Just a preference.

What is missing here though is the movement of WorkerLimit concept. That should be moved, I think as well, ideally together with the other movements.

While on it, I think we need to have at least some limit set since beginning, let's say some limit per-owner?

BTW I don't think this is anyhow messy or complicated PR

These new options deserve documentation in copr-dist-git.conf.example.

Shouldn't we move QueueTask, too?

7 new commits added

  • add worker limits on distgit
  • frontend: send sandbox value
  • inherit QueueTask on distgit
  • move limits to copr-common
  • move QueueTask to copr-common
  • document redis options
  • s/action/import task/?
2 years ago

I added the requested changes, PTAL.

I'd probably prefer doing the movement stuff in one comment, but not sure how complicated the rebase would be? Just a preference.

Don't merge yet, I will try to rebase and squash as many things as possible

Build failed. More information on how to proceed and troubleshoot errors available at https://fedoraproject.org/wiki/Zuul-based-ci

I would use much smaller defaults for these IO intensive things, e.g. 3-per-sandbox, 5-per-owner?

Prior this pull-request we "prioritized" the non-background jobs. Can you please implement some dummy priority? Deprioritize (means e.g. +100) the background jobs?

We discussed that this will break podman compose, and fixing it would collide with #2193. Instead of postponing podman-compose fixes, I'd prefer merging #2193 first so you can fix podman-compose right away. WDYT?

rebased onto 39db6a2

2 years ago

Build failed. More information on how to proceed and troubleshoot errors available at https://fedoraproject.org/wiki/Zuul-based-ci

8 new commits added

  • add docstrings
  • add docstring for QueueTask
  • streamed response from frontend
  • deprioritize background tasks on distgit
  • reduce limits to 3-per-sandbox, 5-per-owner
  • make sure per task logs directory is created
  • configure redis
  • install python3-redis
2 years ago

Build succeeded.

2 new commits added

  • fix distgit tests
  • fix frontend tests
2 years ago

Build failed. More information on how to proceed and troubleshoot errors available at https://fedoraproject.org/wiki/Zuul-based-ci

1 new commit added

  • fix pylint warnings
2 years ago

Build succeeded.

I added all the requested fixes. PTAL.
A rebase after the review is needed.

Veeery nice, thank you! +1

We should probably tweak the job priority a bit more, though that can be done in a separate PR. Awesome.

rebased onto c83b3b5

2 years ago

Build succeeded.

This is soooo complete. Good job! +1

Commit 8c7665c fixes this pull-request

Pull-Request has been merged by praiskup

2 years ago

Commit 3612309 fixes this pull-request

Pull-Request has been merged by praiskup

2 years ago
Metadata
Changes Summary 39
+1 -1
file changed
backend/copr-backend.spec
+3 -2
file changed
backend/copr_backend/actions.py
+10 -137
file changed
backend/copr_backend/background_worker.py
+2 -2
file changed
backend/copr_backend/background_worker_build.py
+1 -1
file changed
backend/copr_backend/createrepo.py
+2 -2
file changed
backend/copr_backend/daemons/action_dispatcher.py
+3 -3
file changed
backend/copr_backend/daemons/build_dispatcher.py
+2 -1
file changed
backend/copr_backend/daemons/log.py
+11 -114
file changed
backend/copr_backend/dispatcher.py
+1 -19
file changed
backend/copr_backend/helpers.py
+4 -6
file changed
backend/copr_backend/rpm_builds.py
+14 -567
file changed
backend/copr_backend/worker_manager.py
+2 -2
file changed
backend/run/copr-backend-process-action
+2 -1
file changed
backend/tests/action-processor.py
+2 -1
file changed
backend/tests/test_createrepo.py
+2 -1
file changed
backend/tests/test_helpers.py
+1 -1
file changed
backend/tests/test_modifyrepo.py
+3 -3
file changed
backend/tests/test_worker_limits.py
+18 -15
file changed
backend/tests/test_worker_manager.py
+154
file added
common/copr_common/background_worker.py
+128
file added
common/copr_common/dispatcher.py
+22
file added
common/copr_common/redis_helpers.py
+593
file added
common/copr_common/worker_manager.py
+7 -1
file changed
common/python-copr-common.spec
+4 -0
file changed
dist-git/conf/copr-dist-git.conf.example
+1 -1
file changed
dist-git/copr-dist-git.service
+4 -1
file changed
dist-git/copr-dist-git.spec
+22 -0
file changed
dist-git/copr_dist_git/helpers.py
+71
file added
dist-git/copr_dist_git/import_dispatcher.py
+16 -1
file changed
dist-git/copr_dist_git/import_task.py
+24 -0
file changed
dist-git/copr_dist_git/importer.py
+63
file added
dist-git/run/copr-distgit-process-import
+28
file added
dist-git/run/copr-run-dispatcher
-79
file removed
dist-git/run/importer_runner.py
+4 -0
file changed
dist-git/tests/base.py
+4 -5
file changed
docker/distgit/Dockerfile
+6
file added
docker/distgit/files/etc/copr/copr-dist-git.conf
+41 -21
file changed
frontend/coprs_frontend/coprs/views/backend_ns/backend_general.py
+7 -1
file changed
frontend/coprs_frontend/tests/test_views/test_backend_ns/test_backend_general.py