#4034 DNM: hub workflows
Opened a year ago by mikem. Modified a year ago
mikem/koji workflow  into  master

...
Mike McLean • a year ago  
...
Mike McLean • a year ago  
...
Mike McLean • a year ago  
...
Mike McLean • a year ago  
...
Mike McLean • a year ago  
...
Mike McLean • a year ago  
...
Mike McLean • a year ago  
...
Mike McLean • a year ago  
...
Mike McLean • a year ago  
...
Mike McLean • a year ago  
...
Mike McLean • a year ago  
...
Mike McLean • a year ago  
...
Mike McLean • a year ago  
...
Mike McLean • a year ago  
...
Mike McLean • a year ago  
...
Mike McLean • a year ago  
...
Mike McLean • a year ago  
...
Mike McLean • a year ago  
...
Mike McLean • a year ago  
typo
Mike McLean • a year ago  
...
Mike McLean • a year ago  
typos
Mike McLean • a year ago  
...
Mike McLean • a year ago  
file modified
+95 -8
@@ -79,7 +79,8 @@ 

      BaseTaskHandler,

      MultiPlatformTask,

      ServerExit,

-     ServerRestart

+     ServerRestart,

+     RefuseTask,

  )

  from koji.util import (

      dslice,
@@ -181,8 +182,6 @@ 

          try:

              if not taken:

                  # Only sleep if we didn't take a task, otherwise retry immediately.

-                 # The load-balancing code in getNextTask() will prevent a single builder

-                 # from getting overloaded.

                  time.sleep(options.sleeptime)

          except (SystemExit, KeyboardInterrupt):

              logger.warning("Exiting")
@@ -5725,6 +5724,15 @@ 

  

      def handler(self, tag, event=None, src=False, debuginfo=False, separate_src=False):

          tinfo = self.session.getTag(tag, strict=True, event=event)

+ 

+         # check for fs access before we try calling repoInit

+         top_repos_dir = joinpath(self.options.topdir, "repos")

+         if not os.path.isdir(top_repos_dir):

+             # missing or incorrect mount?

+             # refuse and let another host try

+             raise RefuseTask("No access to repos dir %s" % top_repos_dir)

+ 

+         # call repoInit

          kwargs = {}

          if event is not None:

              kwargs['event'] = event
@@ -5735,7 +5743,6 @@ 

          # generate debuginfo repo if requested or if specified in sidetag's extra

          if debuginfo or tinfo['extra'].get('with_debuginfo'):

              kwargs['with_debuginfo'] = True

- 

          repo_id, event_id = self.session.host.repoInit(tinfo['id'], task_id=self.id, **kwargs)

  

          path = koji.pathinfo.repo(repo_id, tinfo['name'])
@@ -5771,7 +5778,6 @@ 

                      oldrepo_path = koji.pathinfo.repo(oldrepo['id'], parenttag['name'])

                      oldrepo['tag_id'] = parenttag['id']

                      break

- 

          newrepo_path = koji.pathinfo.repo(repo_id, tinfo['name'])

          newrepo = {'tag_id': tinfo['id'], 'create_event': event_id}

          if self.options.copy_old_repodata:
@@ -5779,7 +5785,6 @@ 

                                                  oldrepo, newrepo, kwargs)

          else:

              possibly_clonable = False

- 

          subtasks = {}

          data = {}

          cloned_archs = []
@@ -5816,6 +5821,81 @@ 

          return repo_id, event_id

  

  

+ class PrepRepoTask(BaseTaskHandler):

+     Methods = ['prepRepo']

+     _taskWeight = 0.5

+ 

+     def handler(self, tag, repo, opts):

+         # check for fs access before we go any further

+         top_repos_dir = joinpath(self.options.topdir, "repos")

+         if not os.path.isdir(top_repos_dir):

+             # missing or incorrect mount?

+             # refuse and let another host try

+             raise RefuseTask("No access to repos dir %s" % top_repos_dir)

+ 

+         # workflow has already called repo_init

+         tinfo = tag

+         repo_id = repo['id']

+         event_id = repo['create_event']

+ 

+         path = koji.pathinfo.repo(repo_id, tinfo['name'])

+         if not os.path.isdir(path):

+             raise koji.GenericError("Repo directory missing: %s" % path)

+         arches = []

+         for fn in os.listdir(path):

+             if fn != 'groups' and os.path.isfile("%s/%s/pkglist" % (path, fn)):

+                 arches.append(fn)

+         # see if we can find a previous repo to update from

+         # only shadowbuild tags should start with SHADOWBUILD, their repos are auto

+         # expired.  so lets get the most recent expired tag for newRepo shadowbuild tasks.

+         if tinfo['name'].startswith('SHADOWBUILD'):

+             oldrepo_state = koji.REPO_EXPIRED

+         else:

+             oldrepo_state = koji.REPO_READY

+         oldrepo = self.session.getRepo(tinfo['id'], state=oldrepo_state)

+         oldrepo_path = None

+         if oldrepo:

+             oldrepo_path = koji.pathinfo.repo(oldrepo['id'], tinfo['name'])

+             oldrepo['tag_id'] = tinfo['id']

+         # If there is no old repo, try to find first usable repo in

+         # inheritance chain and use it as a source. oldrepo is not used if

+         # createrepo_update is not set, so don't waste call in such case.

+         if not oldrepo and self.options.createrepo_update:

+             tags = self.session.getFullInheritance(tinfo['id'])

+             # we care about best candidate which should be (not necessarily)

+             # something on higher levels. Sort tags according to depth.

+             for tag in sorted(tags, key=lambda x: x['currdepth']):

+                 oldrepo = self.session.getRepo(tag['parent_id'], state=oldrepo_state)

+                 if oldrepo:

+                     parenttag = self.session.getTag(tag['parent_id'])

+                     oldrepo_path = koji.pathinfo.repo(oldrepo['id'], parenttag['name'])

+                     oldrepo['tag_id'] = parenttag['id']

+                     break

+ 

+         newrepo_path = koji.pathinfo.repo(repo_id, tinfo['name'])

+         newrepo = {'tag_id': tinfo['id'], 'create_event': event_id}

+         if self.options.copy_old_repodata:

+             possibly_clonable = self.check_repo(oldrepo_path, newrepo_path,

+                                                 oldrepo, newrepo, opts)

+         else:

+             possibly_clonable = False

+ 

+         data = {}

+         cloned_archs = []

+         needed_archs = []

+         for arch in arches:

+             if possibly_clonable and self.check_arch_repo(oldrepo_path, newrepo_path, arch):

+                 result = self.copy_arch_repo(oldrepo['id'], oldrepo_path, repo_id, arch)

+                 if result:

+                     data[arch] = result

+                     cloned_archs.append(arch)

+                     continue

+             # otherwise we need a createrepo task for this arch

+             needed_archs.append(arch)

+ 

+         return {'cloned': data, 'needed': needed_archs, 'oldrepo': oldrepo}

+ 

+ 

  class CreaterepoTask(BaseTaskHandler):

  

      Methods = ['createrepo']
@@ -5831,7 +5911,14 @@ 

          toprepodir = self.pathinfo.repo(repo_id, rinfo['tag_name'])

          self.repodir = '%s/%s' % (toprepodir, arch)

          if not os.path.isdir(self.repodir):

-             raise koji.GenericError("Repo directory missing: %s" % self.repodir)

+             top_repos_dir = joinpath(self.options.topdir, "repos")

+             if not os.path.isdir(top_repos_dir):

+                 # missing or incorrect mount?

+                 # refuse and let another host try

+                 raise RefuseTask("No access to repos dir %s" % top_repos_dir)

+             else:

+                 # we seem to have fs access, but dir is missing, perhaps a repo_init bug?

+                 raise koji.GenericError("Repo directory missing: %s" % self.repodir)

          groupdata = os.path.join(toprepodir, 'groups', 'comps.xml')

          # set up our output dir

          self.outdir = '%s/repo' % self.workdir
@@ -6700,7 +6787,7 @@ 

                  'allow_noverifyssl': False,

                  'allow_password_in_scm_url': False,

                  'methods': None,

-                }

+                 }

      if config.has_section('kojid'):

          for name, value in config.items('kojid'):

              if name in ['sleeptime', 'maxjobs', 'minspace', 'retry_interval',

file modified
+35 -14
@@ -324,9 +324,8 @@ 

          if 'channel %s already exists' % channel_name in msg:

              error("channel %s already exists" % channel_name)

          elif 'Invalid method:' in msg:

-             version = session.getKojiVersion()

              error("addChannel is available on hub from Koji 1.26 version, your version is %s" %

-                   version)

+                   session.hub_version_str)

          else:

              error(msg)

      print("%s added: id %d" % (args[0], channel_id))
@@ -356,9 +355,8 @@ 

      except koji.GenericError as ex:

          msg = str(ex)

          if 'Invalid method:' in msg:

-             version = session.getKojiVersion()

              error("editChannel is available on hub from Koji 1.26 version, your version is %s" %

-                   version)

+                   session.hub_version_str)

          else:

              warn(msg)

      if not result:
@@ -6295,14 +6293,6 @@ 

      if len(args) == 0:

          parser.error("You must specify at least one task id or build")

      activate_session(session, goptions)

-     older_hub = False

-     try:

-         hub_version = session.getKojiVersion()

-         v = tuple([int(x) for x in hub_version.split('.')])

-         if v < (1, 33, 0):

-             older_hub = True

-     except koji.GenericError:

-         older_hub = True

      tlist = []

      blist = []

      for arg in args:
@@ -6329,7 +6319,7 @@ 

              for task_id in tlist:

                  results.append(remote_fn(task_id, **opts))

          for build in blist:

-             if not older_hub:

+             if session.hub_version >= (1, 33, 0):

                  results.append(m.cancelBuild(build, strict=True))

              else:

                  results.append(m.cancelBuild(build))
@@ -7575,7 +7565,7 @@ 

          u = {'name': 'anonymous user'}

      print("%s, %s!" % (_printable_unicode(random.choice(greetings)), u["name"]))

      print("")

-     print("You are using the hub at %s" % session.baseurl)

+     print("You are using the hub at %s (Koji %s)" % (session.baseurl, session.hub_version_str))

      authtype = u.get('authtype', getattr(session, 'authtype', None))

      if authtype == koji.AUTHTYPES['NORMAL']:

          print("Authenticated via password")
@@ -8130,3 +8120,34 @@ 

          error("Not a draft build: %s" % draft_build)

      rinfo = session.promoteBuild(binfo['id'], force=options.force)

      print("%s has been promoted to %s" % (binfo['nvr'], rinfo['nvr']))

+ 

+ 

+ def anon_handle_list_users(goptions, session, args):

+     """[admin] List of users"""

+     usage = "usage: %prog list-users [options]"

+     parser = OptionParser(usage=get_usage_str(usage))

+     parser.add_option("--usertype", help="List users that have a given usertype "

+                                          "(e.g. NORMAL, HOST, GROUP)")

+     parser.add_option("--prefix", help="List users that have a given prefix")

+     (options, args) = parser.parse_args(args)

+ 

+     if len(args) > 0:

+         parser.error("This command takes no arguments")

+     activate_session(session, goptions)

+ 

+     if options.usertype:

+         if options.usertype.upper() in koji.USERTYPES.keys():

+             usertype = koji.USERTYPES[options.usertype.upper()]

+         else:

+             error("Usertype %s doesn't exist" % options.usertype)

+     else:

+         usertype = koji.USERTYPES['NORMAL']

+ 

+     if options.prefix:

+         prefix = options.prefix

+     else:

+         prefix = None

+ 

+     users_list = session.listUsers(userType=usertype, prefix=prefix)

+     for user in users_list:

+         print(user['name'])

file modified
+28 -1
@@ -2625,6 +2625,30 @@ 

          self.opts.setdefault('timeout', DEFAULT_REQUEST_TIMEOUT)

          self.exclusive = False

          self.auth_method = auth_method

+         self.__hub_version = None

+ 

+     @property

+     def hub_version(self):

+         """Return the hub version as a tuple of ints"""

+         return tuple([int(x) for x in self.hub_version_str.split('.')])

+ 

+     @property

+     def hub_version_str(self):

+         """Return the hub version as string"""

+         # If any call was made before, it should be populated by Koji-Version header

+         # for hub >= 1.35

+         if self.__hub_version is None:

+             # no call was made yet OR hub_version < 1.35

+             try:

+                 self.__hub_version = self.getKojiVersion()

+             except GenericError as e:

+                 if 'Invalid method' in str(e):

+                     # use latest version without the getKojiVersion handler

+                     self.logger.debug("hub is older than 1.23, assuming 1.22.0")

+                     self.__hub_version = '1.22.0'

+                 else:

+                     raise

+         return self.__hub_version

  

      @property

      def multicall(self):
@@ -3056,6 +3080,9 @@ 

              warnings.simplefilter("ignore")

              r = self.rsession.post(handler, **callopts)

              r.raise_for_status()

+             hub_version = r.headers.get('Koji-Version')

+             if hub_version:

+                 self.__hub_version = hub_version

              try:

                  ret = self._read_xmlrpc_response(r)

              finally:
@@ -3753,7 +3780,7 @@ 

          # at this place (e.g. client without knowledge of such signatures)

          # it should still display at least "method (arch)"

          return '%s (%s)' % (method, arch)

-     except koji.ParameterError:

+     except ParameterError:

          return '%s (invalid parameters)' % method

  

  

file modified
+31 -109
@@ -1030,129 +1030,46 @@ 

                      self.logger.info("Lingering task %r (pid %r)" % (id, pid))

  

      def getNextTask(self):

+         """Task the next task

+ 

+         :returns: True if a task was taken, False otherwise

+         """

          self.ready = self.readyForTask()

          self.session.host.updateHost(self.task_load, self.ready)

          if not self.ready:

              self.logger.info("Not ready for task")

              return False

-         hosts, tasks = self.session.host.getLoadData()

-         self.logger.debug("Load Data:")

-         self.logger.debug("  hosts: %r" % hosts)

-         self.logger.debug("  tasks: %r" % tasks)

-         # now we organize this data into channel-arch bins

-         bin_hosts = {}  # hosts indexed by bin

-         bins = {}  # bins for this host

-         our_avail = None

-         for host in hosts:

-             host['bins'] = []

-             if host['id'] == self.host_id:

-                 # note: task_load reported by server might differ from what we

-                 # sent due to precision variation

-                 our_avail = host['capacity'] - host['task_load']

-             for chan in host['channels']:

-                 for arch in host['arches'].split() + ['noarch']:

-                     bin = "%s:%s" % (chan, arch)

-                     bin_hosts.setdefault(bin, []).append(host)

-                     if host['id'] == self.host_id:

-                         bins[bin] = 1

-         self.logger.debug("bins: %r" % bins)

-         if our_avail is None:

-             self.logger.info("Server did not report this host. Are we disabled?")

-             return False

-         elif not bins:

-             self.logger.info("No bins for this host. Missing channel/arch config?")

-             # Note: we may still take an assigned task below

-         # sort available capacities for each of our bins

-         avail = {}

-         for bin in bins:

-             avail[bin] = [host['capacity'] - host['task_load'] for host in bin_hosts[bin]]

-             avail[bin].sort()

-             avail[bin].reverse()

-         self.cleanDelayTimes()

+ 

+         # get our assigned tasks

+         tasks = self.session.host.getTasks()

+         self.logger.debug("Got tasks: %r", tasks)

          for task in tasks:

-             # note: tasks are in priority order

              self.logger.debug("task: %r" % task)

-             if task['method'] not in self.handlers:

-                 self.logger.warning("Skipping task %(id)i, no handler for method %(method)s", task)

-                 self.session.host.refuseTask(task['id'], soft=False, msg="no handler for method")

-                 continue

              if task['id'] in self.tasks:

                  # we were running this task, but it apparently has been

-                 # freed or reassigned. We can't do anything with it until

+                 # reassigned. We can't do anything with it until

                  # updateTasks notices this and cleans up.

-                 self.logger.debug("Task %(id)s freed or reassigned", task)

+                 self.logger.info("Task %(id)s freed or reassigned", task)

+                 continue

+             if task['state'] != koji.TASK_STATES['ASSIGNED']:

+                 # shouldn't happen

+                 self.logger.error("Recieved task %(id)s is not assigned, state=%(state)s", task)

+                 continue

+             if task['host_id'] != self.host_id:

+                 # shouldn't happen

+                 self.logger.error("Recieved task %(id)s is not ours, host=%(host_id)s", task)

                  continue

-             if task['state'] == koji.TASK_STATES['ASSIGNED']:

-                 self.logger.debug("task is assigned")

-                 if self.host_id == task['host_id']:

-                     # assigned to us, we can take it regardless

-                     if self.takeTask(task):

-                         return True

-             elif task['state'] == koji.TASK_STATES['FREE']:

-                 bin = "%(channel_id)s:%(arch)s" % task

-                 self.logger.debug("task is free, bin=%r" % bin)

-                 if bin not in bins:

-                     continue

-                 # see where our available capacity is compared to other hosts for this bin

-                 # (note: the hosts in this bin are exactly those that could

-                 # accept this task)

-                 bin_avail = avail.get(bin, [0])

-                 if self.checkAvailDelay(task, bin_avail, our_avail):

-                     # decline for now and give the upper half a chance

-                     continue

-                 # otherwise, we attempt to open the task

-                 if self.takeTask(task):

-                     return True

-             else:

-                 # should not happen

-                 raise Exception("Invalid task state reported by server")

-         return False

- 

-     def checkAvailDelay(self, task, bin_avail, our_avail):

-         """Check to see if we should still delay taking a task

- 

-         Returns True if we are still in the delay period and should skip the

-         task. Otherwise False (delay has expired).

-         """

- 

-         now = time.time()

-         ts = self.skipped_tasks.get(task['id'])

-         if not ts:

-             ts = self.skipped_tasks[task['id']] = now

- 

-         # determine our normalized bin rank

-         for pos, cap in enumerate(bin_avail):

-             if our_avail >= cap:

-                 break

-         if len(bin_avail) > 1:

-             rank = float(pos) / (len(bin_avail) - 1)

-         else:

-             rank = 0.0

-         # so, 0.0 for highest available capacity, 1.0 for lowest

  

-         delay = getattr(self.options, 'task_avail_delay', 180)

-         delay *= rank

+             # otherwise attempt to take it

+             if self.takeTask(task):

+                 return True

  

-         # return True if we should delay

-         if now - ts < delay:

-             self.logger.debug("skipping task %i, age=%s rank=%s"

-                               % (task['id'], int(now - ts), rank))

+         # if we get no tasks, nudge the workflows

+         if self.session.host.nudgeWork():

+             # if a workflow ran, tell main not to sleep

              return True

-         # otherwise

-         del self.skipped_tasks[task['id']]

-         return False

  

-     def cleanDelayTimes(self):

-         """Remove old entries from skipped_tasks"""

-         now = time.time()

-         delay = getattr(self.options, 'task_avail_delay', 180)

-         cutoff = now - delay * 10

-         # After 10x the delay, we've had plenty of opportunity to take the

-         # task, so either it has already been taken or we can't take it.

-         for task_id in list(self.skipped_tasks):

-             ts = self.skipped_tasks[task_id]

-             if ts < cutoff:

-                 del self.skipped_tasks[task_id]

+         return False

  

      def _waitTask(self, task_id, pid=None):

          """Wait (nohang) on the task, return true if finished"""
@@ -1415,7 +1332,9 @@ 

          if method in self.handlers:

              handlerClass = self.handlers[method]

          else:

-             raise koji.GenericError("No handler found for method '%s'" % method)

+             self.logger.warning("Refusing task %(id)s, no handler for %(method)s", task)

+             self.session.host.refuseTask(task['id'], soft=False, msg="no handler for method")

+             return False

          task_info = self.session.getTaskInfo(task['id'], request=True)

          if task_info.get('request') is None:

              self.logger.warning("Task '%s' has no request" % task['id'])
@@ -1509,6 +1428,9 @@ 

          except (SystemExit, koji.tasks.ServerExit, KeyboardInterrupt):

              # we do not trap these

              raise

+         except koji.tasks.RefuseTask as refuse:

+             self.session.host.refuseTask(handler.id, msg=str(refuse))

+             return

          except koji.tasks.ServerRestart:

              # freeing this task will allow the pending restart to take effect

              self.session.host.freeTasks([handler.id])

file modified
+21
@@ -110,6 +110,11 @@ 

      pass

  

  

+ class RefuseTask(Exception):

+     """Raise to task handler to refuse a task"""

+     pass

+ 

+ 

  def parse_task_params(method, params):

      """Parse task params into a dictionary

  
@@ -286,6 +291,9 @@ 

          # these are stub tasks created by workflows

          [['method', 'params', 'workflow_id'], None, None, (None,)],

      ],

+     'workflowStep': [

+         [['workflow_id', 'step'], None, None, (None,)],

+     ],

  }

  

  
@@ -931,3 +939,16 @@ 

  

              time.sleep(self.PAUSE)

              last_repo = repo

+ 

+ 

+ class WorkflowStepTask(BaseTaskHandler):

+ 

+     Methods = ['workflowStep']

+     # all we do is make a hub call

+     _taskWeight = 0.1

+ 

+     def handler(self, workflow_id, step):

+         self.session.host.workflowStep(workflow_id, step)

+ 

+ 

+ # the end

file modified
+194 -100
@@ -30,6 +30,7 @@ 

  import copy

  import datetime

  import fcntl

+ import filecmp

  import fnmatch

  import functools

  import hashlib
@@ -226,26 +227,42 @@ 

          if not self.verifyOwner(user_id):

              raise koji.ActionNotAllowed("user %d does not own task %d" % (user_id, self.id))

  

-     def lock(self, host_id, newstate='OPEN', force=False):

+     def lock(self, host_id, newstate='OPEN', force=False, workflow=False):

          """Attempt to associate the task for host, either to assign or open

  

+         :param int host_id: id of host, can also be None

+         :param str newstate: task state to set, as a string, default: "OPEN"

+         :param bool force: force operation, default False

+         :param bool workflow: task is a workflow stub

+ 

+         If host_id is specfied as None, workflow must be True

+         If workflow is True, the task must be a workflow stub

+ 

          returns True if successful, False otherwise"""

          info = self.getInfo(request=True)

          self.runCallbacks('preTaskStateChange', info, 'state', koji.TASK_STATES[newstate])

-         self.runCallbacks('preTaskStateChange', info, 'host_id', host_id)

+         if workflow:

+             host_id = None

+         if host_id is not None:

+             self.runCallbacks('preTaskStateChange', info, 'host_id', host_id)

          # we use row-level locks to keep things sane

          # note the QueryProcessor...opts={'rowlock': True}

          task_id = self.id

          if not force:

-             query = QueryProcessor(columns=['state', 'host_id'], tables=['task'],

+             query = QueryProcessor(columns=['state', 'host_id', 'is_workflow'], tables=['task'],

                                     clauses=['id=%(task_id)s'], values={'task_id': task_id},

-                                    opts={'rowlock': True})

+                                    lock=True)

              r = query.executeOne()

              if not r:

                  raise koji.GenericError("No such task: %i" % task_id)

              state = r['state']

              otherhost = r['host_id']

-             if state == koji.TASK_STATES['FREE']:

+             if workflow:

+                 # workflows can manage their stubs

+                 if not r['is_workflow']:

+                     # should not happen

+                     raise ValueError(f'task {self.id} is not a workflow')

+             elif state == koji.TASK_STATES['FREE']:

                  if otherhost is not None:

                      log_error(f"Error: task {task_id} is both free "

                                f"and handled by host {otherhost}")
@@ -282,6 +299,7 @@ 

          # if we reach here, task is either

          #  - free and unlocked

          #  - assigned to host_id

+         #  - a workflow stub

          #  - force option is enabled

          state = koji.TASK_STATES[newstate]

          update = UpdateProcessor('task', clauses=['id=%(task_id)i'], values=locals())
@@ -290,7 +308,8 @@ 

              update.rawset(start_time='NOW()')

          update.execute()

          self.runCallbacks('postTaskStateChange', info, 'state', koji.TASK_STATES[newstate])

-         self.runCallbacks('postTaskStateChange', info, 'host_id', host_id)

+         if host_id is not None:

+             self.runCallbacks('postTaskStateChange', info, 'host_id', host_id)

          return True

  

      def assign(self, host_id, force=False):
@@ -299,11 +318,11 @@ 

          returns True if successful, False otherwise"""

          return self.lock(host_id, 'ASSIGNED', force)

  

-     def open(self, host_id):

-         """Attempt to open the task for host.

+     def open(self, host_id=None, workflow=False):

+         """Attempt to open the task for host or workflow

  

          returns task data if successful, None otherwise"""

-         if self.lock(host_id, 'OPEN'):

+         if self.lock(host_id, 'OPEN', workflow=workflow):

              # get more complete data to return

              fields = self.fields + (('task.request', 'request'),)

              query = QueryProcessor(tables=['task'], clauses=['id=%(id)i'], values=vars(self),
@@ -372,15 +391,26 @@ 

              for (child_id,) in query.execute():

                  Task(child_id).setPriority(priority, recurse=True)

  

-     def _close(self, result, state):

+     def _close(self, result, state, encode=False):

          """Mark task closed and set response

  

+         :param result: the task result to set

+         :type result: str or object

+         :param state: the state to set

+         :type state: int

+         :param encode: whether to encode the result, default: False

+         :type encode: bool

+ 

+         If encode is False (the default), the result should already be encoded

+ 

          Returns True if successful, False if not"""

          # access checks should be performed by calling function

          # this is an approximation, and will be different than what is in the database

          # the actual value should be retrieved from the 'new' value of the post callback

          now = time.time()

          info = self.getInfo(request=True)

+         if encode:

+             result = koji.xmlrpcplus.dumps((result,), methodresponse=1, allow_none=1)

          info['result'] = result

          self.runCallbacks('preTaskStateChange', info, 'state', state)

          self.runCallbacks('preTaskStateChange', info, 'completion_ts', now)
@@ -664,6 +694,7 @@ 

              opts.setdefault(f, pdata[f])

          opts.setdefault('label', None)

      else:

+         pdata = None

          opts.setdefault('priority', koji.PRIO_DEFAULT)

          # calling function should enforce priority limitations, if applicable

          opts.setdefault('arch', 'noarch')
@@ -2972,6 +3003,78 @@ 

      return repo_id, event

  

  

+ def repo_done(repo_id, data, expire=False, repo_json_updates=None):

+     """Finalize a repo

+ 

+     repo_id: the id of the repo

+     data: a dictionary of repo files in the form:

+           { arch: [uploadpath, [file1, file2, ...]], ...}

+     expire: if set to true, mark the repo expired immediately [*]

+     repo_json_updates: dict - if provided it will be shallow copied

+                               into repo.json file

+ 

+     Actions:

+     * Move uploaded repo files into place

+     * Mark repo ready

+     * Expire earlier repos

+     * Move/create 'latest' symlink

+ 

+     For dist repos, the move step is skipped (that is handled in

+     distRepoMove).

+ 

+     * This is used when a repo from an older event is generated

+     """

+     rinfo = repo_info(repo_id, strict=True)

+     convert_value(data, cast=dict, check_only=True)

+     koji.plugin.run_callbacks('preRepoDone', repo=rinfo, data=data, expire=expire)

+     if rinfo['state'] != koji.REPO_INIT:

+         raise koji.GenericError("Repo %(id)s not in INIT state (got %(state)s)" % rinfo)

+     repodir = koji.pathinfo.repo(repo_id, rinfo['tag_name'])

+     workdir = koji.pathinfo.work()

+     if repo_json_updates:

+         repo_json = koji.load_json(f'{repodir}/repo.json')

+         repo_json.update(repo_json_updates)

+         koji.dump_json(f'{repodir}/repo.json', repo_json, indent=2)

+     if not rinfo['dist']:

+         for arch, (uploadpath, files) in data.items():

+             archdir = "%s/%s" % (repodir, koji.canonArch(arch))

+             if not os.path.isdir(archdir):

+                 raise koji.GenericError("Repo arch directory missing: %s" % archdir)

+             datadir = "%s/repodata" % archdir

+             koji.ensuredir(datadir)

+             for fn in files:

+                 src = "%s/%s/%s" % (workdir, uploadpath, fn)

+                 if fn.endswith('pkglist'):

+                     dst = '%s/%s' % (archdir, fn)

+                 else:

+                     dst = "%s/%s" % (datadir, fn)

+                 if not os.path.exists(src):

+                     raise koji.GenericError("uploaded file missing: %s" % src)

+                 safer_move(src, dst)

+     if expire:

+         repo_expire(repo_id)

+         koji.plugin.run_callbacks('postRepoDone', repo=rinfo, data=data, expire=expire)

+         return

+     # else:

+     repo_ready(repo_id)

+     repo_expire_older(rinfo['tag_id'], rinfo['create_event'], rinfo['dist'])

+ 

+     # make a latest link

+     if rinfo['dist']:

+         latestrepolink = koji.pathinfo.distrepo('latest', rinfo['tag_name'])

+     else:

+         latestrepolink = koji.pathinfo.repo('latest', rinfo['tag_name'])

+         # XXX - this is a slight abuse of pathinfo

+     try:

+         if os.path.lexists(latestrepolink):

+             os.unlink(latestrepolink)

+         os.symlink(str(repo_id), latestrepolink)

+     except OSError:

+         # making this link is nonessential

+         log_error("Unable to create latest link for repo: %s" % repodir)

+     koji.plugin.run_callbacks('postRepoDone', repo=rinfo, data=data, expire=expire)

+ 

+ 

  def repo_set_state(repo_id, state, check=True):

      """Set repo state"""

      repo_id = convert_value(repo_id, cast=int)
@@ -7987,63 +8090,92 @@ 

      elif not sigkey:

          raise koji.GenericError("No signature specified")

      rinfo = get_rpm(rpminfo, strict=True)

+     nvra = "%(name)s-%(version)s-%(release)s.%(arch)s" % rinfo

      if rinfo['external_repo_id']:

          raise koji.GenericError("Not an internal rpm: %s (from %s)"

                                  % (rpminfo, rinfo['external_repo_name']))

  

+     # Determine what signature we have

      rpm_query_result = query_rpm_sigs(rpm_id=rinfo['id'], sigkey=sigkey)

      if not rpm_query_result:

-         nvra = "%(name)s-%(version)s-%(release)s.%(arch)s" % rinfo

          raise koji.GenericError("%s has no matching signatures to delete" % nvra)

+     found_keys = [r['sigkey'] for r in rpm_query_result]

  

-     clauses = ["rpm_id=%(rpm_id)i"]

-     if sigkey is not None:

-         clauses.append("sigkey=%(sigkey)s")

+     # Delete signature entries from db

+     clauses = ["rpm_id=%(rpm_id)s", "sigkey IN %(found_keys)s"]

      rpm_id = rinfo['id']

      delete = DeleteProcessor(table='rpmsigs', clauses=clauses, values=locals())

      delete.execute()

      delete = DeleteProcessor(table='rpm_checksum', clauses=clauses, values=locals())

      delete.execute()

-     binfo = get_build(rinfo['build_id'])

+ 

+     # Get the base build dir for our paths

+     binfo = get_build(rinfo['build_id'], strict=True)

      builddir = koji.pathinfo.build(binfo)

-     list_sigcaches = []

-     list_sighdrs = []

+ 

+     # Check header files

+     hdr_renames = []

+     hdr_deletes = []

      for rpmsig in rpm_query_result:

-         list_sigcaches.append(joinpath(builddir, koji.pathinfo.sighdr(rinfo, rpmsig['sigkey'])))

-         list_sighdrs.append(joinpath(builddir, koji.pathinfo.signed(rinfo, rpmsig['sigkey'])))

-     list_paths = list_sighdrs + list_sigcaches

-     count = 0

-     logged_user = get_user(context.session.user_id)

-     for file_path in list_paths:

+         hdr_path = joinpath(builddir, koji.pathinfo.sighdr(rinfo, rpmsig['sigkey']))

+         backup_path = hdr_path + f".{rpmsig['sighash']}.save"

+         if not os.path.exists(hdr_path):

+             logger.error(f'Missing signature header file: {hdr_path}')

+             # this doesn't prevent us from deleting the signature

+             # it just means we have nothing to back up

+             continue

+         if not os.path.isfile(hdr_path):

+             # this should not happen and requires human intervention

+             raise koji.GenericError(f"Not a regular file: {hdr_path}")

+         if os.path.exists(backup_path):

+             # Likely residue of previous failed deletion

+             if filecmp.cmp(hdr_path, backup_path, shallow=False):

+                 # same file contents, so we're already backed up

+                 logger.warning(f"Signature header already backed up: {backup_path}")

+                 hdr_deletes.append([rpmsig, hdr_path])

+             else:

+                 # this shouldn't happen

+                 raise koji.GenericError(f"Stray header backup file: {backup_path}")

+         else:

+             hdr_renames.append([rpmsig, hdr_path, backup_path])

+ 

+     # Delete signed copies

+     # We do these first since they are the lowest risk

+     for rpmsig in rpm_query_result:

+         signed_path = joinpath(builddir, koji.pathinfo.signed(rinfo, rpmsig['sigkey']))

+         if not os.path.exists(signed_path):

+             # signed copies might not exist

+             continue

          try:

-             os.remove(file_path)

-             count += 1

-         except FileNotFoundError:

-             logger.warning("User %s has deleted file %s", logged_user['name'], file_path)

+             os.remove(signed_path)

+             logger.warning(f"Deleted signed copy {signed_path}")

          except Exception:

-             logger.error("An error happens when deleting %s, %s deleting are deleted, "

-                          "%s deleting are skipped, the original request is %s rpm "

-                          "and %s sigkey", file_path,

-                          list_paths[:count], list_paths[count:], rpminfo, sigkey, exc_info=True)

-             raise koji.GenericError("File %s cannot be deleted." % file_path)

- 

-     for path in list_paths:

-         basedir = os.path.dirname(path)

-         if os.path.isdir(basedir) and not os.listdir(basedir):

-             try:

-                 os.rmdir(basedir)

-             except OSError:

-                 logger.warning("An error happens when deleting %s directory",

-                                basedir, exc_info=True)

-         sigdir = os.path.dirname(basedir)

-         if os.path.isdir(sigdir) and not os.listdir(sigdir):

-             try:

-                 os.rmdir(sigdir)

-             except OSError:

-                 logger.warning("An error happens when deleting %s directory",

-                                sigdir, exc_info=True)

-     logger.warning("Signed RPM %s with sigkey %s is deleted by %s", rinfo['id'], sigkey,

-                    logged_user['name'])

+             logger.error(f"Failed to delete {signed_path}", exc_info=True)

+             raise koji.GenericError(f"Failed to delete {signed_path}")

+ 

+     # Backup header files

+     for rpmsig, hdr_path, backup_path in hdr_renames:

+         # sanity checked above

+         try:

+             os.rename(hdr_path, backup_path)

+             logger.warning(f"Signature header saved to {backup_path}")

+         except Exception:

+             logger.error(f"Failed to rename {hdr_path} to {backup_path}", exc_info=True)

+ 

+     # Delete already backed-up headers

+     for rpmsig, hdr_path in hdr_deletes:

+         # verified backup above

+         try:

+             os.remove(hdr_path)

+             logger.warning(f"Deleted signature header {hdr_path}")

+         except Exception:

+             logger.error(f"Failed to delete {hdr_path}", exc_info=True)

+             raise koji.GenericError(f"Failed to delete {hdr_path}")

+ 

+     # Note: we do not delete any empty parent dirs as the primary use case for deleting these

+     # signatures is to allow the import of new, overlapping ones

+ 

+     logger.warning("Deleted signatures %s for rpm %s", found_keys, rinfo['id'])

  

  

  def _scan_sighdr(sighdr, fn):
@@ -8218,7 +8350,7 @@ 

      # make sure we have it in the db

      rpm_id = rinfo['id']

      query = QueryProcessor(tables=['rpmsigs'], columns=['sighash'],

-                            clauses=['rpm_id=%(rpm_id)i', 'sigkey=%(sigkey)s'],

+                            clauses=['rpm_id=%(rpm_id)s', 'sigkey=%(sigkey)s'],

                             values={'rpm_id': rpm_id, 'sigkey': sigkey})

      sighash = query.singleValue(strict=False)

      if not sighash:
@@ -14789,6 +14921,16 @@ 

          host.verify()

          return scheduler.get_tasks_for_host(hostID=host.id, retry=True)

  

+     def nudgeWork(self):

+         host = Host()

+         host.verify()

+         return workflow.nudge_queue()

+ 

+     def workflowStep(self, workflow_id, step):

+         host = Host()

+         host.verify()

+         return workflow.run_subtask_step(workflow_id, step)

+ 

      def refuseTask(self, task_id, soft=True, msg=''):

          soft = convert_value(soft, cast=bool)

          msg = convert_value(msg, cast=str)
@@ -15691,55 +15833,7 @@ 

          """

          host = Host()

          host.verify()

-         rinfo = repo_info(repo_id, strict=True)

-         convert_value(data, cast=dict, check_only=True)

-         koji.plugin.run_callbacks('preRepoDone', repo=rinfo, data=data, expire=expire)

-         if rinfo['state'] != koji.REPO_INIT:

-             raise koji.GenericError("Repo %(id)s not in INIT state (got %(state)s)" % rinfo)

-         repodir = koji.pathinfo.repo(repo_id, rinfo['tag_name'])

-         workdir = koji.pathinfo.work()

-         if repo_json_updates:

-             repo_json = koji.load_json(f'{repodir}/repo.json')

-             repo_json.update(repo_json_updates)

-             koji.dump_json(f'{repodir}/repo.json', repo_json, indent=2)

-         if not rinfo['dist']:

-             for arch, (uploadpath, files) in data.items():

-                 archdir = "%s/%s" % (repodir, koji.canonArch(arch))

-                 if not os.path.isdir(archdir):

-                     raise koji.GenericError("Repo arch directory missing: %s" % archdir)

-                 datadir = "%s/repodata" % archdir

-                 koji.ensuredir(datadir)

-                 for fn in files:

-                     src = "%s/%s/%s" % (workdir, uploadpath, fn)

-                     if fn.endswith('pkglist'):

-                         dst = '%s/%s' % (archdir, fn)

-                     else:

-                         dst = "%s/%s" % (datadir, fn)

-                     if not os.path.exists(src):

-                         raise koji.GenericError("uploaded file missing: %s" % src)

-                     safer_move(src, dst)

-         if expire:

-             repo_expire(repo_id)

-             koji.plugin.run_callbacks('postRepoDone', repo=rinfo, data=data, expire=expire)

-             return

-         # else:

-         repo_ready(repo_id)

-         repo_expire_older(rinfo['tag_id'], rinfo['create_event'], rinfo['dist'])

- 

-         # make a latest link

-         if rinfo['dist']:

-             latestrepolink = koji.pathinfo.distrepo('latest', rinfo['tag_name'])

-         else:

-             latestrepolink = koji.pathinfo.repo('latest', rinfo['tag_name'])

-             # XXX - this is a slight abuse of pathinfo

-         try:

-             if os.path.lexists(latestrepolink):

-                 os.unlink(latestrepolink)

-             os.symlink(str(repo_id), latestrepolink)

-         except OSError:

-             # making this link is nonessential

-             log_error("Unable to create latest link for repo: %s" % repodir)

-         koji.plugin.run_callbacks('postRepoDone', repo=rinfo, data=data, expire=expire)

+         return repo_done(repo_id, data, expire=expire, repo_json_updates=repo_json_updates)

  

      def distRepoMove(self, repo_id, uploadpath, arch):

          """

file modified
+9 -3
@@ -45,6 +45,12 @@ 

  from . import workflow

  

  

+ # HTTP headers included in every request

+ GLOBAL_HEADERS = [

+     ('Koji-Version', koji.__version__),

+ ]

+ 

+ 

  class Marshaller(ExtendedMarshaller):

  

      dispatch = ExtendedMarshaller.dispatch.copy()
@@ -390,7 +396,7 @@ 

      else:

          faultString = msg

      response = dumps(Fault(faultCode, faultString)).encode()

-     headers = [

+     headers = GLOBAL_HEADERS + [

          ('Content-Length', str(len(response))),

          ('Content-Type', "text/xml"),

      ]
@@ -400,7 +406,7 @@ 

  

  def error_reply(start_response, status, response, extra_headers=None):

      response = response.encode()

-     headers = [

+     headers = GLOBAL_HEADERS + [

          ('Content-Length', str(len(response))),

          ('Content-Type', "text/plain"),

      ]
@@ -817,7 +823,7 @@ 

              except RequestTimeout as e:

                  return error_reply(start_response, '408 Request Timeout', str(e) + '\n')

              response = response.encode()

-             headers = [

+             headers = GLOBAL_HEADERS + [

                  ('Content-Length', str(len(response))),

                  ('Content-Type', "text/xml"),

              ]

file modified
+1 -1
@@ -99,7 +99,7 @@ 

      }

      upsert = UpsertProcessor('scheduler_task_refusals', data=data, keys=('task_id', 'host_id'))

      upsert.execute()

-     log_both('Host refused task', task_id=taskID, host_id=hostID)

+     log_both(f'Host refused task: {msg}', task_id=taskID, host_id=hostID)

  

  

  class TaskRefusalsQuery(QueryView):

file modified
+483 -176
@@ -1,10 +1,13 @@ 

+ import inspect

  import json

  import logging

  import time

  

  import koji

  from koji.context import context

+ from koji.util import dslice

  from . import kojihub

+ from .scheduler import log_both

  from .db import QueryProcessor, InsertProcessor, UpsertProcessor, UpdateProcessor, \

      DeleteProcessor, QueryView, db_lock, nextval, Savepoint

  
@@ -27,19 +30,19 @@ 

  

  class WorkQueueQuery(QueryView):

  

-     tables = ['work_queue']

+     tables = ['workflow_queue']

      #joinmap = {

-     #    'workflow': 'workflow ON work_queue.workflow_id = workflow.id',

+     #    'workflow': 'workflow ON workflow_queue.workflow_id = workflow.id',

      #}

      fieldmap = {

-         'id': ['work_queue.id', None],

-         'workflow_id': ['work_queue.workflow_id', None],

-         'create_time': ['work_queue.create_time', None],

-         'create_ts': ["date_part('epoch', work_queue.create_time)", None],

-         'completion_time': ['work_queue.completion_time', None],

-         'completion_ts': ["date_part('epoch', work_queue.completion_time)", None],

-         'completed': ['work_queue.completed', None],

-         'error': ['work_queue.error', None],

+         'id': ['workflow_queue.id', None],

+         'workflow_id': ['workflow_queue.workflow_id', None],

+         'create_time': ['workflow_queue.create_time', None],

+         'create_ts': ["date_part('epoch', workflow_queue.create_time)", None],

+         'completion_time': ['workflow_queue.completion_time', None],

+         'completion_ts': ["date_part('epoch', workflow_queue.completion_time)", None],

+         'completed': ['workflow_queue.completed', None],

+         'error': ['workflow_queue.error', None],

      }

      default_fields = ('id', 'workflow_id', 'create_ts', 'completion_ts', 'completed', 'error')

  
@@ -54,9 +57,12 @@ 

      """Run next queue entry, or attempt to refill queue"""

      if queue_next():

          # if we handled a queue item, we're done

-         return

-     update_queue()

-     # TODO figure out what we should return

+         return True

+     if db_lock('workflow_maint', wait=False):

+         update_queue()

+         handle_slots()

+     return False

+     # TODO should we return something more informative?

  

  

  def queue_next():
@@ -64,33 +70,35 @@ 

  

      :returns: True if an entry ran, False otherwise

      """

-     # TODO maybe use scheduler logging mechanism?

-     query = QueryProcessor(tables=['work_queue'],

+     query = QueryProcessor(tables=['workflow_queue'],

                             columns=['id', 'workflow_id'],

                             clauses=['completed IS FALSE'],

                             opts={'order': 'id', 'limit': 1},

                             lock='skip')

      # note the lock=skip with limit 1. This will give us a row lock on the first unlocked row

-     job = query.executeOne()

-     if not job:

+     row = query.executeOne()

+     if not row:

          # either empty queue or all locked

          return False

  

-     logger.debug('Handling work queue id %(id)s', job)

-     handle_job(job)

+     logger.debug('Handling work queue id %(id)s, workflow %(workflow_id)s', row)

  

-     # mark it done

-     update = UpdateProcessor('work_queue', clauses=['id=%(id)s'], values=job)

-     update.set(completed=True)

-     update.rawset(completion_time='NOW()')

-     update.execute()

+     try:

+         run_workflow(row['workflow_id'])

+ 

+     finally:

+         # mark it done, even if we errored

+         update = UpdateProcessor('workflow_queue', clauses=['id=%(id)s'], values=row)

+         update.set(completed=True)

+         update.rawset(completion_time='NOW()')

+         update.execute()

  

-     logger.debug('Finished handling work queue id %(id)s', job)

+     logger.debug('Finished handling work queue id %(id)s', row)

      return True

  

  

  def update_queue():

-     handle_waits()

+     check_waits()

      clean_queue()

  

  
@@ -98,7 +106,7 @@ 

      logger.debug('Cleaning old queue entries')

      lifetime = 3600  # XXX config

      delete = DeleteProcessor(

-         table='work_queue',

+         table='workflow_queue',

          values={'age': f'{lifetime} seconds'},

          clauses=['completed IS TRUE', "completion_time < NOW() - %(age)s::interval"],

      )
@@ -107,7 +115,7 @@ 

          logger.info('Deleted %i old queue entries', count)

  

  

- def handle_waits():

+ def check_waits():

      """Check our wait data and see if we need to update the queue

  

      Things we're checking for:
@@ -118,8 +126,9 @@ 

      logger.debug('Checking waits')

      query = WaitsQuery(

          clauses=[

-             ['handled', 'IS', False],

-         ]

+             ['seen', 'IS', False],

+         ],

+         opts={'order': 'id'}

      )

  

      # index by workflow
@@ -128,52 +137,61 @@ 

          wf_waits.setdefault(info['workflow_id'], []).append(info)

  

      fulfilled = []

-     handled = []

+     seen = []

      requeue = []

      for workflow_id in wf_waits:

-         waiting = []

+         # first pass: check fulfillment

          for info in wf_waits[workflow_id]:

              if info['fulfilled']:

-                 handled.append(info)

+                 # fulfilled but not seen means fulfillment was noted elsewhere

+                 # mark it seen so we don't keep checking it

+                 seen.append(info)

              else:

                  # TODO we should avoid calling wait.check quite so often

                  cls = waits.get(info['wait_type'])

                  wait = cls(info)

                  if wait.check():

+                     info['fulfilled'] = True

                      fulfilled.append(info)

-                 else:

-                     waiting.append(info)

-         if not waiting:

+         waiting = []

+         nonbatch = []

+         # second pass: decide whether to requeue

+         for info in wf_waits[workflow_id]:

+             if info['fulfilled']:

+                 # batch waits won't trigger a requeue unless all other waits are fulfilled

+                 if not info.get('batch'):

+                     nonbatch.append(info)

+             else:

+                 waiting.append(info)

+         if not waiting or nonbatch:

              requeue.append(workflow_id)

  

-     for info in fulfilled:

+     for info in fulfilled + seen:

          logger.info('Fulfilled %(wait_type)s wait %(id)s for workflow %(workflow_id)s', info)

+ 

      if fulfilled:

-         # we can do these in single update

          update = UpdateProcessor(

              table='workflow_wait',

              clauses=['id IN %(ids)s'],

              values={'ids': [w['id'] for w in fulfilled]},

          )

          update.set(fulfilled=True)

-         update.set(handled=True)

+         update.rawset(fulfill_time='NOW()')

+         update.set(seen=True)

          update.execute()

  

-     for info in handled:

-         logger.info('Handled %(wait_type)s wait %(id)s for workflow %(workflow_id)s', info)

-     if handled:

-         # we can do these in single update

+     if seen:

          update = UpdateProcessor(

              table='workflow_wait',

              clauses=['id IN %(ids)s'],

-             values={'ids': [w['id'] for w in handled]},

+             values={'ids': [w['id'] for w in seen]},

          )

-         update.set(handled=True)

+         update.set(seen=True)

          update.execute()

  

      for workflow_id in requeue:

          logger.info('Re-queueing workflow %s', workflow_id)

-         insert = InsertProcessor('work_queue', data={'workflow_id': workflow_id})

+         insert = InsertProcessor('workflow_queue', data={'workflow_id': workflow_id})

          insert.execute()

  

  
@@ -188,6 +206,7 @@ 

          'stub_id': ['stub_id', None],

          'started': ['workflow.started', None],

          'completed': ['workflow.completed', None],

+         'frozen': ['workflow.frozen', None],

          'create_time': ['workflow.create_time', None],

          'start_time': ['workflow.start_time', None],

          'update_time': ['workflow.update_time', None],
@@ -203,32 +222,80 @@ 

      }

  

  

- def handle_job(job):

-     wf = WorkflowQuery(clauses=[['id', '=', job['workflow_id']]]).executeOne(strict=True)

+ class WorkflowFailure(Exception):

+     """Raised to explicitly fail a workflow"""

+     pass

+ 

+ 

+ def run_workflow(workflow_id, opts=None, strict=False):

+     query = WorkflowQuery(clauses=[['id', '=', workflow_id]]).query

+     query.lock = True  # we must have a lock on the workflow before attempting to run it

+     wf = query.executeOne(strict=True)

+ 

      if wf['completed']:

-         logger.error('Ignoring completed %(method)s workflow in queue: %(id)i', wf)

-         logger.debug('Data: %r', wf)

+         # shouldn't happen, closing the workflow should delete its queue entries

+         logger.error('Ignoring completed %(method)s workflow: %(id)i', wf)

+         logger.debug('Data: %r, Opts: %r', wf, opts)

          return

-     logger.debug('Handling workflow: %r', wf)

+     if wf['frozen']:

+         logger.warning('Skipping frozen %(method)s workflow: %(id)i', wf)

+         return

+ 

      cls = workflows.get(wf['method'])

      handler = cls(wf)

+ 

+     error = None

+     savepoint = Savepoint('pre_workflow')

      try:

-         handler.run()

+         handler.run(opts)

+ 

+     except WorkflowFailure as err:

+         # this is deliberate failure, so handle it that way

+         error = str(err)

+         handler.fail(msg=error)

+ 

      except Exception as err:

-         handle_error(job, err)

-         raise  # XXX

- 

- 

- def handle_error(job, err):

-     # for now we mark it completed but include the error

-     # TODO retries?

-     # XXX what do we do about the workflow?

-     update = UpdateProcessor('work_queue', clauses=['id=%(id)s'], values=job)

-     update.set(completed=True)

-     update.set(error=str(err))

-     update.rawset(completion_time='NOW()')

+         # for unplanned exceptions, we assume the worst

+         # rollback and freeze the workflow

+         savepoint.rollback()

+         error = str(err)

+         handle_error(wf, error)

+         logger.exception('Error handling workflow')

+ 

+     if strict and error is not None:

+         raise koji.GenericError(f'Error handling workflow: {error}')

+ 

+ 

+ def run_subtask_step(workflow_id, step):

+     opts = {'from_subtask': True, 'step': step}

+     run_workflow(workflow_id, opts, strict=True)

+ 

+ 

+ def handle_error(info, error):

+     # freeze the workflow

+     update = UpdateProcessor('workflow', clauses=['id=%(id)s'], values=info)

+     update.set(frozen=True)

+     update.rawset(update_time='NOW()')

      update.execute()

  

+     # record the error

+     error_data = {

+         'error': error,  # TODO traceback?

+         'workflow_data': info['data'],

+     }

+     data = {

+         'workflow_id': info['id'],

+         'data': json.dumps(error_data),

+     }

+     insert = InsertProcessor('workflow_error', data=data)

+     insert.execute()

+ 

+     # delist the workflow

+     for table in ('workflow_wait', 'workflow_slots', 'workflow_queue'):

+         delete = DeleteProcessor(table, clauses=['workflow_id = %(id)s'],

+                                  values=info)

+         delete.execute()

+ 

  

  class SimpleRegistry:

  
@@ -261,16 +328,69 @@ 

          self.data = info['data']

          self.waiting = False

  

-     def run(self):

+     def run(self, opts=None):

          if self.data is None:

              self.setup()

+         if opts is None:

+             opts = {}

+ 

+         if self.handle_waits():

+             # we are still waiting, so we can't go to next step

+             self.update()

+             return

  

          # TODO error handling

          step = self.data['steps'].pop(0)

+         handler = self.get_handler(step)

+         if 'step' in opts and opts['step'] != step:

+             raise koji.GenericError(f'Step mismatch {opts["step"]} != {step}')

+ 

+         is_subtask = getattr(handler, 'subtask', False)

+         if opts.get('from_subtask'):

+             # we've been called via a workflowStep task

+             if not is_subtask:

+                 raise koji.GenericError(f'Not a subtask step: {step}')

+             # otherwise we're good

+         elif is_subtask:

+             # this step needs to run via a subtask

+             self.task('workflowStep', {'workflow_id': self.info['id'], 'step': step}, wait=False)

+             # we don't need to wait for this one, because it calls us

+             # TODO handle task failure without stalling

+             return

  

-         logger.debug('Running %s step for workflow %s', step, self.info['id'])

-         func = getattr(self, step)

-         func()

+         # TODO slots are a better idea for tasks than for workflows

+         slot = getattr(handler, 'slot', None)

+         if slot:

+             # a note about timing. We don't request a slot until we're otherwise ready to run

+             # We don't want to hold a slot if we're waiting on something else.

+             if not get_slot(slot, self.info['id']):

+                 self.wait_slot(slot, request=False)  # get_slot made the request for us

+                 return

+             logger.debug('We have slot %s. Proceeding.', slot)

+ 

+         # auto-fill handler params

+         kwargs = {}

+         params = inspect.signature(handler).parameters

+         for key in params:

+             param = params[key]

+             if param.kind in (param.VAR_POSITIONAL, param.VAR_KEYWORD):

+                 # step handlers shouldn't use these, but we'll be nice

+                 logger.warning('Ignoring variable args for %s', handler)

+                 continue

+             if key == 'workflow':

+                 kwargs[key] = self

+             elif key in self.params:

+                 kwargs[key] = self.params[key]

+             elif key in self.data:

+                 kwargs[key] = self.data[key]

+ 

+         self.log(f'Running workflow step {step}')

+         logger.debug('Step args: %r', kwargs)

+         handler(**kwargs)

+ 

+         if slot:

+             # we only hold the slot during the execution of the step

+             free_slot(slot, self.info['id'])

  

          # are we done?

          if not self.data['steps']:
@@ -285,15 +405,62 @@ 

          # update the db

          self.update()

  

+     def handle_waits(self):

+         query = WaitsQuery(

+             clauses=[['workflow_id', '=', self.info['id']], ['handled', 'IS', False]],

+             opts={'order': 'id'})

+         mywaits = query.execute()

+         waiting = []

+         for info in mywaits:

+             if not info['fulfilled']:

+                 # TODO should we call check here as well?

+                 waiting.append(info)

+             else:

+                 cls = waits.get(info['wait_type'])

+                 wait = cls(info)

+                 wait.handle(workflow=self)

+                 self.log('Handled %(wait_type)s wait %(id)s' % info)

+         return bool(waiting)

+ 

+     def log(self, msg, level=logging.INFO):

+         log_both(msg, task_id=self.info['stub_id'], level=level)

+ 

      def setup(self):

          """Called to set up the workflow run"""

          logger.debug('Setting up workflow: %r', self.info)

          self.data = {'steps': self.get_steps()}

          # also open our stub task

-         # we don't worry about checks here because the entry is just a stub

-         update = UpdateProcessor('task', clauses=['id = %(stub_id)s'], values=self.info)

-         update.set(state=koji.TASK_STATES['OPEN'])

-         update.execute()

+         stub = kojihub.Task(self.info['stub_id'])

+         stub.open(workflow=True)

+ 

+     @classmethod

+     def step(cls, name=None):

+         """Decorator to add steps outside of class"""

+         # note this can't be used IN the class definition

+         steps = getattr(cls, 'STEPS', None)

+         if steps is None:

+             steps = cls.STEPS = []

+         handlers = getattr(cls, '_step_handlers', None)

+         if handlers is None:

+             handlers = cls._step_handlers = {}

+ 

+         def decorator(func):

+             nonlocal name

+             # also updates nonlocal steps

+             if name is None:

+                 name = func.__name__

+             steps.append(name)

+             handlers[name] = func

+             return func

+ 

+         return decorator

+ 

+     def get_handler(self, step):

+         handlers = getattr(self, '_step_handlers', {})

+         if handlers and step in handlers:

+             return handlers[step]

+         else:

+             return getattr(self, step)

  

      def get_steps(self):

          """Get the initial list of steps
@@ -349,6 +516,7 @@ 

          self.wait('task', {'task_id': task_id})

  

      def wait(self, wait_type, params):  # TODO maybe **params?

+         self.log(f'Waiting for {wait_type}: {params}')

          data = {

              'workflow_id': self.info['id'],

              'wait_type': wait_type,
@@ -369,19 +537,24 @@ 

          task_id = kojihub.make_task(method, args, **opts)

          if wait:

              self.wait_task(task_id)

+         return task_id

+ 

+     def wait_slot(self, name, request=True):

+         self.wait('slot', {'name': name})

+         if request:

+             request_slot(name, self.info['id'])

  

      def start(self):

          raise NotImplementedError('start method not defined')

  

      def close(self, result='complete', stub_state='CLOSED'):

          # TODO - the result field needs to be handled better

-         logger.info('Closing %(method)s workflow %(id)i', self.info)

-         # we shouldn't have any waits but...

-         delete = DeleteProcessor('workflow_wait', clauses=['workflow_id = %(id)s'],

-                                  values=self.info)

-         n = delete.execute()

-         if n:

-             logger.error('Dangling waits for %(method)s workflow %(id)i', self.info)

+         self.log('Closing %(method)s workflow' % self.info)

+ 

+         for table in ('workflow_wait', 'workflow_slots', 'workflow_queue'):

+             delete = DeleteProcessor(table, clauses=['workflow_id = %(id)s'],

+                                      values=self.info)

+             delete.execute()

  

          update = UpdateProcessor('workflow', clauses=['id=%(id)s'], values=self.info)

          update.set(data=json.dumps(self.data))
@@ -393,18 +566,25 @@ 

          # also close our stub task

          # we don't worry about checks here because the entry is just a stub

          logger.info('Closing workflow task %(stub_id)i', self.info)

-         # we shouldn't have any waits but...

-         update = UpdateProcessor('task', clauses=['id = %(stub_id)s'], values=self.info)

-         update.set(state=koji.TASK_STATES[stub_state])

+         stub = kojihub.Task(self.info['stub_id'])

+         stub._close(result, koji.TASK_STATES[stub_state], encode=True)

          # TODO handle failure

-         update.execute()

  

      def cancel(self):

          # TODO we need to do more here, but for now

          self.close(result='canceled', stub_state='CANCELED')

  

+     def fail(self, msg=None):

+         # TODO we need to do more here, but for now

+         if msg is not None:

+             msg = f'Workflow failed - {msg}'

+         else:

+             msg = 'Workflow failed'

+         self.close(result=msg, stub_state='FAILED')

+ 

      def requeue(self):

-         insert = InsertProcessor('work_queue', data={'workflow_id': self.info['id']})

+         self.log('Queuing %(method)s workflow' % self.info)

+         insert = InsertProcessor('workflow_queue', data={'workflow_id': self.info['id']})

          insert.execute()

  

      def update(self):
@@ -414,6 +594,16 @@ 

          update.execute()

  

  

+ def subtask():

+     # TODO args?

+     """Decorator to indicate that a step handler should run via a subtask"""

+     def decorator(handler):

+         handler.subtask = True

+         return handler

+ 

+     return decorator

+ 

+ 

  class ParamSpec:

  

      def __init__(self, rule, required=False):
@@ -469,10 +659,11 @@ 

      }

      insert = InsertProcessor('workflow', data=data)

      insert.execute()

+     log_both(f'Adding {method} workflow', task_id=stub_id)

  

      if queue:

          # also add it to the work queue so it will start

-         insert = InsertProcessor('work_queue', data={'workflow_id': data['id']})

+         insert = InsertProcessor('workflow_queue', data={'workflow_id': data['id']})

          insert.execute()

  

      # TODO return full info?
@@ -507,6 +698,7 @@ 

          'create_time': ['workflow_wait.create_time', None],

          'create_ts': ["date_part('epoch', workflow_wait.create_time)", None],

          'fulfilled': ['workflow_wait.fulfilled', None],

+         'seen': ['workflow_wait.seen', None],

          'handled': ['workflow_wait.handled', None],

      }

  
@@ -520,19 +712,15 @@ 

      def check(self):

          raise NotImplementedError('wait check not defined')

  

-     # XXX does it make sense to update state here?

-     def set_fulfilled(self):

-         update = UpdateProcessor('workflow_wait', clauses=['id = %(id)s'], values=self.info)

-         update.set(fulfilled=True)

-         update.execute()

- 

-     # XXX does it make sense to update state here?

      def set_handled(self):

          # TODO what should we do if not fulfilled yet?

          update = UpdateProcessor('workflow_wait', clauses=['id = %(id)s'], values=self.info)

          update.set(handled=True)

          update.execute()

  

+     def handle(self):

+         self.set_handled()

+ 

  

  @waits.add('task')

  class TaskWait(BaseWait):
@@ -547,6 +735,28 @@ 

          state = query.singleValue()

          return (state in self.END_STATES)

  

+     def handle(self, workflow):

+         self.set_handled()

+         task = kojihub.Task(self.info['params']['task_id'])

+         tinfo = task.getInfo()

+         ret = {'task': tinfo}

+         if tinfo['state'] == koji.TASK_STATES['FAILED']:

+             if not self.info['params'].get('canfail', False):

+                 raise koji.GenericError(f'Workflow task {tinfo["id"]} failed')

+                 # TODO workflow failure

+             # otherwise we keep going

+         elif tinfo['state'] == koji.TASK_STATES['CANCELED']:

+             # TODO unclear if canfail applies here

+             raise koji.GenericError(f'Workflow task {tinfo["id"]} canceled')

+         elif tinfo['state'] == koji.TASK_STATES['CLOSED']:

+             # shouldn't be a fault

+             ret['result'] = task.getResult()

+         else:

+             # should not happen

+             raise koji.GenericError(f'Task not completed: {tinfo}')

+         # TODO: update workflow data?

+         return ret

+ 

      @staticmethod

      def task_done(task_id):

          # TODO catch errors?
@@ -567,72 +777,131 @@ 

  class SlotWait(BaseWait):

  

      def check(self):

-         # we also have triggers to update these, but this is a fallback

-         params = self.info['params']

-         query = QueryProcessor(

-             tables=['workflow_slots'],

-             columns=['id'],

+         # handle_slots will mark us fulfilled, so no point in further checking here

+         return False

+ 

+ 

+ def slot(name):

+     """Decorator to indicate that a step handler requires a slot"""

+     def decorator(handler):

+         handler.slot = name

+         return handler

+ 

+     return decorator

+ 

+ 

+ def request_slot(name, workflow_id):

+     logger.info('Requesting %s slot for workflow %i', name, workflow_id)

+     data = {

+         'name': name,

+         'workflow_id': workflow_id,

+     }

+     upsert = UpsertProcessor(table='workflow_slots', data=data, skip_dup=True)

+     upsert.execute()

+     # table has: UNIQUE (name, workflow_id)

+     # so this is a no-op if we already have a request, or are already holding the slot

+ 

+ 

+ def free_slot(name, workflow_id):

+     logger.info('Freeing %s slot for workflow %i', name, workflow_id)

+     values = {

+         'name': name,

+         'workflow_id': workflow_id,

+     }

+     delete = DeleteProcessor(

+             table='workflow_slots',

              clauses=['name = %(name)s', 'workflow_id = %(workflow_id)s'],

-             values={'name': params['name'], 'workflow_id': self.info['workflow_id']},

-         )

-         slot_id = query.singleValue()

-         return (slot_id is not None)

+             values=values)

+     delete.execute()

+ 

+ 

+ def get_slot(name, workflow_id):

+     """Check for and/or attempt to acquire slot

+ 

+     :returns: True if slot is held, False otherwise

+ 

+     If False, then the slot is *requested*

+     """

+     values = {

+         'name': name,

+         'workflow_id': workflow_id,

+     }

+     query = QueryProcessor(

+         tables=['workflow_slots'],

+         columns=['id', 'held'],

+         clauses=['name = %(name)s', 'workflow_id = %(workflow_id)s'],

+         values=values,

+     )

+     slot = query.executeOne()

+     if not slot:

+         request_slot(name, workflow_id)

+     elif slot['held']:

+         return True

+ 

+     handle_slots()  # XXX?

+ 

+     # check again

+     slot = query.executeOne()

+     return slot and slot['held']

  

  

  def handle_slots():

-     """Check slot waits and see if we can fulfill them"""

+     """Check slot requests and see if we can grant them"""

  

-     query = WaitsQuery(

-         clauses=[

-             ['fulfilled', 'IS', False],

-             ['wait_type', '=', 'slot'],

-         ],

-         opts={'order': 'id'},  # oldest first

+     if not db_lock('workflow_slots', wait=False):

+         return

+ 

+     logger.debug('Checking slots')

+ 

+     query = QueryProcessor(

+         tables=['workflow_slots'],

+         columns=['id', 'name', 'workflow_id', 'held'],

+         opts={'order': 'id'},

      )

  

-     by_name = {}

-     for wait in query.execute():

-         name = wait['params']['name']

-         by_name.setdefault(name, []).append(wait)

- 

-     for name in sorted(by_name):

-         query = QueryProcessor(

-                     tables=['workflow_slots'],

-                     columns=['id', 'num'],

-                     clauses=['name = %(name)s'],

-                     values={'name': name},

-         )

-         limit = 10  # XXX CONFIG

-         held = query.execute()

-         if len(held) >= limit:

-             # all in use

-             continue

-         waits = by_name[name]

-         held = set(held)

-         for num in range(limit):

-             if num in held:

-                 continue

-             if not waits:

-                 break

-             # try to take it

-             wait = waits[0]

-             data = {

-                 'name': name,

-                 'workflow_id': wait['workflow_id'],

-                 'num': num,

-             }

-             insert = InsertProcessor(table='workflow_slots', data=data)

-             savepoint = Savepoint('pre_slot_insert')

-             try:

-                 insert.execute()

-             except Exception:

-                 # there must be a parallel call

-                 savepoint.rollback()

-                 logger.debug('Failed to acquire workflow slot')

-                 # XXX how do we avoid duplicate fulfillments by parallel instances?

-                 continue

-             # success! pop this wait so next pass can handle the next

-             waits.pop(0)

+     # index needed and held by name

+     need_idx = {}

+     held_idx = {}

+     slots = query.execute()

+     for slot in slots:

+         if slot['held']:

+             held_idx.setdefault(slot['name'], []).append(slot)

+         else:

+             need_idx.setdefault(slot['name'], []).append(slot)

+ 

+     grants = []

+     for name in need_idx:

+         need = need_idx[name]

+         held = held_idx.get(name, [])

+         limit = 3  # XXX CONFIG

+         logger.debug('Slot %s: need %i, held %i', name, len(need), len(held))

+         while need and len(held) < limit:

+             slot = need.pop(0)  # first come, first served

+             held.append(slot)

+             grants.append(slot)

+ 

+     # update the slots

+     if grants:

+         update = UpdateProcessor(table='workflow_slots',

+                                  clauses=['id IN %(ids)s'],

+                                  values={'ids': [s['id'] for s in grants]})

+         update.set(held=True)

+         update.rawset(grant_time='NOW()')

+         update.execute()

+ 

+     # also mark any waits fulfilled

+     for slot in grants:

+         update = UpdateProcessor(

+             'workflow_wait',

+             clauses=[

+                 "wait_type = 'slot'",

+                 'fulfilled IS FALSE',

+                 'workflow_id = %(workflow_id)s',

+                 "(params->>'name') = %(name)s",  # note the ->>

+             ],

+             values=slot)

+         update.set(fulfilled=True)

+         update.execute()

  

  

  @workflows.add('test')
@@ -640,46 +909,84 @@ 

  

      # XXX remove this test code

  

-     STEPS = ['start', 'finish']

+     # STEPS = ['start', 'finish']

      PARAMS = {'a': int, 'b': (int, type(None)), 'c': str}

  

-     def start(self):

-         # fire off a do-nothing task

-         logger.info('TEST WORKFLOW START')

-         task_id = self.task('sleep', {'n': 1})

  

-     def finish(self):

-         # XXX how do we propagate task_id?

-         logger.info('TEST WORKFLOW FINISH')

+ @TestWorkflow.step()

+ def start(workflow, a, b):

+     # fire off a do-nothing task

+     logger.info('TEST WORKFLOW START')

+     workflow.data['task_id'] = workflow.task('sleep', {'n': 1})

+ 

+ 

+ @subtask()

+ @TestWorkflow.step()

+ def finish():

+     time.sleep(10)

+     logger.info('TEST WORKFLOW FINISH')

  

  

  @workflows.add('new-repo')

  class NewRepoWorkflow(BaseWorkflow):

  

-     STEPS = ['start', 'repos', 'finalize']

+     STEPS = ['repo_init', 'repos', 'repo_done']

+     PARAMS = {

+         'tag': (int, str, dict),

+         'event': (int, type(None)),

+         'opts': (dict,),

+     }

  

-     def start(self):

-         # TODO validate params

-         kw = self.params

-         # ??? should we call repo_init ourselves?

-         task_id = self.task('initRepo', kw)

-         # TODO mechanism for task_id value to persist to next step

- 

-     def repos(self):

-         # TODO fetch archlist from task

-         repo_tasks = []

-         for arch in self.needed_arches:

-             params = {'repo_id': repo_id, 'arch': arch, 'oldrepo': oldrepo}

+     #@subtask()

+     def repo_init(self, tag, event=None, opts=None):

+         tinfo = kojihub.get_tag(tag, strict=True, event=event)

+         event = kojihub.convert_value(event, cast=int, none_allowed=True)

+         if opts is None:

+             opts = {}

+         opts = dslice(opts, ('with_src', 'with_debuginfo', 'with_separate_src'), strict=False)

+         # TODO further opts validation?

+         repo_id, event_id = kojihub.repo_init(tinfo['id'], event=event,

+                                               task_id=self.info['stub_id'], **opts)

+         repo_info = kojihub.repo_info(repo_id)

+         del repo_info['creation_time']  # json unfriendly

+         kw = {'tag': tinfo, 'repo': repo_info, 'opts': opts}

+         self.data['prep_id'] = self.task('prepRepo', kw)

+         self.data['repo'] = repo_info

+ 

+     def repos(self, prep_id, repo):

+         # TODO better mechanism for fetching task result

+         prepdata = kojihub.Task(prep_id).getResult()

+         repo_tasks = {}

+         for arch in prepdata['needed']:

+             params = {'repo_id': repo['id'], 'arch': arch, 'oldrepo': prepdata['oldrepo']}

              repo_tasks[arch] = self.task('createrepo', params)

+             # TODO fail workflow on any failed subtask

+         self.data['cloned'] = prepdata['cloned']

+         self.data['repo_tasks'] = repo_tasks

+ 

+     #@subtask()

+     def repo_done(self, repo, cloned, repo_tasks, event=None):

+         data = cloned.copy()

+         for arch in repo_tasks:

+             data[arch] = kojihub.Task(repo_tasks[arch]).getResult()

+ 

+         kwargs = {}

+         if event is not None:

+             kwargs['expire'] = True

+         if cloned:

+             kwargs['repo_json_updates'] = {

+                 'cloned_from_repo_id': 0,   # XXX

+                 'cloned_archs': list(sorted(cloned)),

+             }

+         kojihub.repo_done(repo['id'], data, **kwargs)

  

-     def finalize(self):

-         # TODO fetch params from self/tasks

-         repo_done(...)

+         # do we need a return?

+         return repo['id'], repo['create_event']

  

  

  class WorkflowExports:

      # TODO: would be nice to mimic our registry approach in kojixmlrpc

-     #handleWorkQueue = staticmethod(handle_work_queue)

+     # XXX most of these need access controls

      getQueue = staticmethod(get_queue)

      nudge = staticmethod(nudge_queue)

      updateQueue = staticmethod(update_queue)

file modified
+22 -7
@@ -1054,6 +1054,7 @@ 

          stub_id INTEGER UNIQUE NOT NULL REFERENCES task (id),

          started BOOLEAN NOT NULL DEFAULT FALSE,

          completed BOOLEAN NOT NULL DEFAULT FALSE,

+         frozen BOOLEAN NOT NULL DEFAULT FALSE,

          create_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),

          start_time TIMESTAMPTZ,

          update_time TIMESTAMPTZ,
@@ -1072,23 +1073,35 @@ 

          wait_type TEXT,

          params JSONB,

          create_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),

+         fulfill_time TIMESTAMPTZ,

          fulfilled BOOLEAN NOT NULL DEFAULT FALSE,  -- wait condition fulfilled?

-         handled BOOLEAN NOT NULL DEFAULT FALSE     -- workflow informed?

-         -- more ???

+         seen BOOLEAN NOT NULL DEFAULT FALSE,  -- noted by scheduler?

+         handled BOOLEAN NOT NULL DEFAULT FALSE  -- recieved by handler?

+ ) WITHOUT OIDS;

+ 

+ 

+ CREATE TABLE workflow_error (

+         id SERIAL NOT NULL PRIMARY KEY,

+         workflow_id INTEGER REFERENCES workflow(id),

+         data JSONB,

+         create_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),

+         handled BOOLEAN NOT NULL DEFAULT FALSE

  ) WITHOUT OIDS;

  

  

  CREATE TABLE workflow_slots (

          id SERIAL NOT NULL PRIMARY KEY,

          name TEXT,

-         num INTEGER NOT NULL,

-         UNIQUE (name, num),

          workflow_id INTEGER REFERENCES workflow(id),

-         create_time TIMESTAMPTZ NOT NULL DEFAULT NOW()

+         UNIQUE (name, workflow_id),

+         held BOOLEAN NOT NULL DEFAULT FALSE,

+         -- if held is False, that means the slot is requested

+         create_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),

+         grant_time TIMESTAMPTZ

  ) WITHOUT OIDS;

  

  

- CREATE TABLE work_queue (

+ CREATE TABLE workflow_queue (

          id SERIAL NOT NULL PRIMARY KEY,

          workflow_id INTEGER REFERENCES workflow(id),

          create_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
@@ -1104,6 +1117,8 @@ 

  ) WITHOUT OIDS;

  INSERT INTO locks(name) VALUES('protonmsg-plugin');

  INSERT INTO locks(name) VALUES('scheduler');

- INSERT INTO locks(name) VALUES('work_queue');

+ INSERT INTO locks(name) VALUES('workflow_queue');

+ INSERT INTO locks(name) VALUES('workflow_slots');

+ INSERT INTO locks(name) VALUES('workflow_maint');

  

  COMMIT WORK;

@@ -42,6 +42,7 @@ 

          import-cg                 Import external builds with rich metadata

          import-sig                Import signatures into the database and write signed RPMs

          list-signed               List signed copies of rpms

+         list-users                List of users

          lock-tag                  Lock a tag

          make-task                 Create an arbitrary task

          prune-signed-copies       Prune signed copies

@@ -42,6 +42,7 @@ 

          import-cg                 Import external builds with rich metadata

          import-sig                Import signatures into the database and write signed RPMs

          list-signed               List signed copies of rpms

+         list-users                List of users

          lock-tag                  Lock a tag

          make-task                 Create an arbitrary task

          prune-signed-copies       Prune signed copies

@@ -60,7 +60,7 @@ 

          expected_api = 'Invalid method: addChannel'

          expected = 'addChannel is available on hub from Koji 1.26 version, your version ' \

                     'is 1.25.1\n'

-         self.session.getKojiVersion.return_value = '1.25.1'

+         self.session.hub_version_str = '1.25.1'

  

          self.session.addChannel.side_effect = koji.GenericError(expected_api)

          arguments = ['--description', self.description, self.channel_name]

@@ -32,7 +32,8 @@ 

  

  %s: error: {message}

  """ % (self.progname, self.progname)

-         self.session.getKojiVersion.return_value = '1.33.0'

+         self.session.hub_version = (1, 33, 0)

+         self.session.hub_version_str = '1.33.0'

  

      def test_anon_cancel(self):

          args = ['123']
@@ -154,7 +155,7 @@ 

          self.session.cancelBuild.assert_called_once_with(args[1], strict=True)

  

      def test_non_exist_build_and_task_older_hub(self):

-         self.session.getKojiVersion.return_value = '1.32.0'

+         self.session.hub_version = (1, 32, 0)

          args = ['11111', 'nvr-1-30.1']

          expected_warn = """No such task: %s

  """ % (args[0])

@@ -78,7 +78,7 @@ 

          expected_api = 'Invalid method: editChannel'

          expected = 'editChannel is available on hub from Koji 1.26 version, your version ' \

                     'is 1.25.1\n'

-         self.session.getKojiVersion.return_value = '1.25.1'

+         self.session.hub_version_str = '1.25.1'

  

          self.session.editChannel.side_effect = koji.GenericError(expected_api)

          self.assert_system_exit(
@@ -94,7 +94,6 @@ 

          self.session.editChannel.assert_called_once_with(self.channel_old, name=self.channel_new,

                                                           description=self.description)

          self.session.getChannel.assert_called_once_with(self.channel_old)

-         self.session.getKojiVersion.assert_called_once_with()

  

      def test_handle_edit_channel_non_exist_channel(self):

          expected = 'No such channel: %s\n' % self.channel_old

file modified
+4 -2
@@ -50,6 +50,8 @@ 

          # Mock out the xmlrpc server

          session.getLoggedInUser.return_value = None

          session.krb_principal = user['krb_principal']

+         mock_hub_version = '1.35.0'

+         session.hub_version_str = mock_hub_version

          print_unicode_mock.return_value = "Hello"

  

          self.assert_system_exit(
@@ -63,7 +65,7 @@ 

  

          # annonymous user

          message = "Not authenticated\n" + "Hello, anonymous user!"

-         hubinfo = "You are using the hub at %s" % self.huburl

+         hubinfo = "You are using the hub at %s (Koji %s)" % (self.huburl, mock_hub_version)

          handle_moshimoshi(self.options, session, [])

          self.assert_console_message(stdout, "{0}\n\n{1}\n".format(message, hubinfo))

          self.activate_session_mock.assert_called_once_with(session, self.options)
@@ -79,7 +81,7 @@ 

                                          user['krb_principal'],

              koji.AUTHTYPES['SSL']: 'Authenticated via client certificate %s' % cert

          }

-         hubinfo = "You are using the hub at %s" % self.huburl

+         # same hubinfo

          session.getLoggedInUser.return_value = user

          message = "Hello, %s!" % self.progname

          self.options.cert = cert

@@ -0,0 +1,133 @@ 

+ from __future__ import absolute_import

+ 

+ import mock

+ from six.moves import StringIO

+ 

+ import koji

+ from koji_cli.commands import anon_handle_list_users

+ from . import utils

+ 

+ 

+ class TestListUsers(utils.CliTestCase):

+     def setUp(self):

+         self.maxDiff = None

+         self.options = mock.MagicMock()

+         self.options.debug = False

+         self.session = mock.MagicMock()

+         self.session.getAPIVersion.return_value = koji.API_VERSION

+         self.activate_session = mock.patch('koji_cli.commands.activate_session').start()

+         self.error_format = """Usage: %s list-users [options]

+ (Specify the --help global option for a list of other help options)

+ 

+ %s: error: {message}

+ """ % (self.progname, self.progname)

+ 

+     def tearDown(self):

+         mock.patch.stopall()

+ 

+     @mock.patch('sys.stdout', new_callable=StringIO)

+     def test_list_users_default_valid(self, stdout):

+         arguments = []

+         self.session.listUsers.return_value = [{

+             'id': 1, 'krb_principals': [],

+             'name': 'kojiadmin',

+             'status': 0,

+             'usertype': 0},

+             {'id': 2,

+              'krb_principals': [],

+              'name': 'testuser',

+              'status': 0,

+              'usertype': 0},

+         ]

+         rv = anon_handle_list_users(self.options, self.session, arguments)

+         actual = stdout.getvalue()

+         expected = """kojiadmin

+ testuser

+ """

+         self.assertMultiLineEqual(actual, expected)

+         self.assertEqual(rv, None)

+         self.session.listUsers.assert_called_once_with(

+             userType=koji.USERTYPES['NORMAL'], prefix=None)

+ 

+     @mock.patch('sys.stdout', new_callable=StringIO)

+     def test_list_users_with_prefix(self, stdout):

+         arguments = ['--prefix', 'koji']

+         self.session.listUsers.return_value = [{

+             'id': 1, 'krb_principals': [],

+             'name': 'kojiadmin',

+             'status': 0,

+             'usertype': 0},

+         ]

+         rv = anon_handle_list_users(self.options, self.session, arguments)

+         actual = stdout.getvalue()

+         expected = """kojiadmin

+ """

+         self.assertMultiLineEqual(actual, expected)

+         self.assertEqual(rv, None)

+         self.session.listUsers.assert_called_once_with(

+             userType=koji.USERTYPES['NORMAL'], prefix='koji')

+ 

+     @mock.patch('sys.stdout', new_callable=StringIO)

+     def test_list_users_with_usertype(self, stdout):

+         arguments = ['--usertype', 'host']

+         self.session.listUsers.return_value = [{

+             'id': 3, 'krb_principals': [],

+             'name': 'kojihost',

+             'status': 0,

+             'usertype': 1},

+             {'id': 5, 'krb_principals': [],

+              'name': 'testhost',

+              'status': 0,

+              'usertype': 1},

+         ]

+         rv = anon_handle_list_users(self.options, self.session, arguments)

+         actual = stdout.getvalue()

+         expected = """kojihost

+ testhost

+ """

+         self.assertMultiLineEqual(actual, expected)

+         self.assertEqual(rv, None)

+         self.session.listUsers.assert_called_once_with(

+             userType=koji.USERTYPES['HOST'], prefix=None)

+ 

+     def test_list_users_with_usertype_non_existing(self):

+         arguments = ['--usertype', 'test']

+         self.assert_system_exit(

+             anon_handle_list_users,

+             self.options, self.session, arguments,

+             stdout='',

+             stderr="Usertype test doesn't exist\n",

+             activate_session=None,

+             exit_code=1)

+         self.session.listUsers.assert_not_called()

+ 

+     @mock.patch('sys.stdout', new_callable=StringIO)

+     def test_list_users_with_usertype_and_prefix(self, stdout):

+         arguments = ['--usertype', 'host', '--prefix', 'test']

+         self.session.listUsers.return_value = [{

+             'id': 5, 'krb_principals': [],

+             'name': 'testhost',

+             'status': 0,

+             'usertype': 1},

+         ]

+         rv = anon_handle_list_users(self.options, self.session, arguments)

+         actual = stdout.getvalue()

+         expected = """testhost

+ """

+         self.assertMultiLineEqual(actual, expected)

+         self.assertEqual(rv, None)

+         self.session.listUsers.assert_called_once_with(

+             userType=koji.USERTYPES['HOST'], prefix='test')

+ 

+     def test_anon_handle_list_users_help(self):

+         self.assert_help(

+             anon_handle_list_users,

+             """Usage: %s list-users [options]

+ (Specify the --help global option for a list of other help options)

+ 

+ Options:

+   -h, --help           show this help message and exit

+   --usertype=USERTYPE  List users that have a given usertype (e.g. NORMAL,

+                        HOST, GROUP)

+   --prefix=PREFIX      List users that have a given prefix

+ """ % self.progname)

@@ -1,9 +1,13 @@ 

+ import os

+ import tempfile

+ import shutil

  import unittest

  

  import mock

  

  import koji

  import kojihub

+ from koji.util import joinpath

  

  DP = kojihub.DeleteProcessor

  
@@ -17,6 +21,9 @@ 

          return delete

  

      def setUp(self):

+         self.tempdir = tempfile.mkdtemp()

+         self.pathinfo = koji.PathInfo(self.tempdir)

+         mock.patch('koji.pathinfo', new=self.pathinfo).start()

          self.DeleteProcessor = mock.patch('kojihub.kojihub.DeleteProcessor',

                                            side_effect=self.getDelete).start()

          self.deletes = []
@@ -58,9 +65,31 @@ 

                                'sigkey': '2f86d6a1'}]

          self.userinfo = {'authtype': 2, 'id': 1, 'krb_principal': None, 'krb_principals': [],

                           'name': 'testuser', 'status': 0, 'usertype': 0}

+         self.set_up_files()

+ 

+     def set_up_files(self):

+         builddir = self.pathinfo.build(self.buildinfo)

+         os.makedirs(builddir)

+         self.builddir = builddir

+         self.signed = {}

+         self.sighdr = {}

+         for sig in self.queryrpmsigs:

+             key = sig['sigkey']

+             signed = joinpath(builddir, self.pathinfo.signed(self.rinfo, key))

+             self.signed[key] = signed

+             koji.ensuredir(os.path.dirname(signed))

+             with open(signed, 'wt') as fo:

+                 fo.write('SIGNED COPY\n')

+ 

+             sighdr = joinpath(builddir, self.pathinfo.sighdr(self.rinfo, key))

+             self.sighdr[key] = sighdr

+             koji.ensuredir(os.path.dirname(sighdr))

+             with open(sighdr, 'wt') as fo:

+                 fo.write('DETACHED SIGHDR\n')

  

      def tearDown(self):

          mock.patch.stopall()

+         shutil.rmtree(self.tempdir)

  

      def test_rpm_not_existing(self):

          rpm_id = 1234
@@ -84,7 +113,8 @@ 

  

      def test_external_repo(self):

          rpminfo = 1234

-         rinfo = {'external_repo_id': 1, 'external_repo_name': 'INTERNAL'}

+         rinfo = self.rinfo.copy()

+         rinfo.update({'external_repo_id': 1, 'external_repo_name': 'INTERNAL'})

          self.get_rpm.return_value = rinfo

          with self.assertRaises(koji.GenericError) as ex:

              kojihub.delete_rpm_sig(rpminfo, all_sigs=True)
@@ -108,39 +138,134 @@ 

          self.get_rpm.assert_called_once_with(rpminfo, strict=True)

          self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey=None)

  

-     @mock.patch('koji.pathinfo.build', return_value='fakebuildpath')

-     @mock.patch('os.remove')

-     def test_file_not_found_error(self, os_remove, pb):

-         rpminfo = 2

-         os_remove.side_effect = FileNotFoundError()

+     def test_file_not_found_error(self):

+         rpminfo = self.rinfo['id']

          self.get_rpm.return_value = self.rinfo

          self.get_build.return_value = self.buildinfo

          self.get_user.return_value = self.userinfo

          self.query_rpm_sigs.return_value = self.queryrpmsigs

+ 

+         # a missing signed copy or header should not error

+         builddir = self.pathinfo.build(self.buildinfo)

+         sigkey = '2f86d6a1'

+         os.remove(self.signed[sigkey])

+         os.remove(self.sighdr[sigkey])

          r = kojihub.delete_rpm_sig(rpminfo, sigkey='testkey')

          self.assertEqual(r, None)

  

+         # the files should still be gone

+         for sigkey in self.signed:

+             if os.path.exists(self.signed[sigkey]):

+                 raise Exception('signed copy not deleted')

+         for sigkey in self.sighdr:

+             if os.path.exists(self.sighdr[sigkey]):

+                 raise Exception('header still in place')

+ 

          self.assertEqual(len(self.deletes), 2)

          delete = self.deletes[0]

          self.assertEqual(delete.table, 'rpmsigs')

-         self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)i", "sigkey=%(sigkey)s"])

+         self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)s", "sigkey IN %(found_keys)s"])

  

          delete = self.deletes[1]

          self.assertEqual(delete.table, 'rpm_checksum')

-         self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)i", "sigkey=%(sigkey)s"])

+         self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)s", "sigkey IN %(found_keys)s"])

          self.get_rpm.assert_called_once_with(rpminfo, strict=True)

          self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey='testkey')

-         self.get_build.assert_called_once_with(self.rinfo['build_id'])

+         self.get_build.assert_called_once_with(self.rinfo['build_id'], strict=True)

+ 

+     def test_header_not_a_file(self):

+         rpminfo = self.rinfo['id']

+         self.get_rpm.return_value = self.rinfo

+         self.get_build.return_value = self.buildinfo

+         self.get_user.return_value = self.userinfo

+         self.query_rpm_sigs.return_value = self.queryrpmsigs

+ 

+         # we should error, without making any changes, if a header is not a regular file

+         builddir = self.pathinfo.build(self.buildinfo)

+         bad_sigkey = '2f86d6a1'

+         bad_hdr= self.sighdr[bad_sigkey]

+         os.remove(bad_hdr)

+         os.mkdir(bad_hdr)

+         with self.assertRaises(koji.GenericError) as ex:

+             r = kojihub.delete_rpm_sig(rpminfo, sigkey='testkey')

+         expected_msg = "Not a regular file: %s" % bad_hdr

+         self.assertEqual(ex.exception.args[0], expected_msg)

+ 

+         # the files should still be there

+         for sigkey in self.signed:

+             if not os.path.exists(self.signed[sigkey]):

+                 raise Exception('signed copy was deleted')

+         for sigkey in self.sighdr:

+             if not os.path.exists(self.sighdr[sigkey]):

+                 raise Exception('header was deleted')

+         if not os.path.isdir(bad_hdr):

+             # the function should not have touched the invalid path

+             raise Exception('bad header file was removed')

+ 

+     def test_stray_backup(self):

+         rpminfo = self.rinfo['id']

+         self.get_rpm.return_value = self.rinfo

+         self.get_build.return_value = self.buildinfo

+         self.get_user.return_value = self.userinfo

+         self.query_rpm_sigs.return_value = self.queryrpmsigs

+ 

+         siginfo = self.queryrpmsigs[0]

+         sigkey = siginfo['sigkey']

+         backup = "%s.%s.save" % (self.sighdr[sigkey], siginfo['sighash'])

+         with open(backup, 'wt') as fo:

+             fo.write('STRAY FILE\n')

+             # different contents

+         with self.assertRaises(koji.GenericError) as ex:

+             r = kojihub.delete_rpm_sig(rpminfo, sigkey='testkey')

+         expected_msg = "Stray header backup file: %s" % backup

+         self.assertEqual(ex.exception.args[0], expected_msg)

+         # files should not have been removed

+         for sigkey in self.signed:

+             if not os.path.exists(self.signed[sigkey]):

+                 raise Exception('signed copy was deleted incorrectly')

+         for sigkey in self.sighdr:

+             if not os.path.exists(self.sighdr[sigkey]):

+                 raise Exception('header was deleted incorrectly')

+ 

+     def test_dup_backup(self):

+         rpminfo = self.rinfo['id']

+         self.get_rpm.return_value = self.rinfo

+         self.get_build.return_value = self.buildinfo

+         self.get_user.return_value = self.userinfo

+         self.query_rpm_sigs.return_value = self.queryrpmsigs

+ 

+         siginfo = self.queryrpmsigs[0]

+         sigkey = siginfo['sigkey']

+         backup = "%s.%s.save" % (self.sighdr[sigkey], siginfo['sighash'])

+         with open(backup, 'wt') as fo:

+             fo.write('DETACHED SIGHDR\n')

+             # SAME contents

+ 

+         r = kojihub.delete_rpm_sig(rpminfo, sigkey='testkey')

+ 

+         # the files should be gone

+         for sigkey in self.signed:

+             if os.path.exists(self.signed[sigkey]):

+                 raise Exception('signed copy not deleted')

+         for sigkey in self.sighdr:

+             if os.path.exists(self.sighdr[sigkey]):

+                 raise Exception('header still in place')

+ 

+         # the sighdrs should be saved

+         for siginfo in self.queryrpmsigs:

+             sigkey = siginfo['sigkey']

+             backup = "%s.%s.save" % (self.sighdr[sigkey], siginfo['sighash'])

+             with open(backup, 'rt') as fo:

+                 self.assertEqual(fo.read(), 'DETACHED SIGHDR\n')

  

-     @mock.patch('koji.pathinfo.build', return_value='fakebuildpath')

      @mock.patch('os.remove', side_effect=OSError)

-     def test_not_valid(self, os_remove, pb):

+     def test_not_valid(self, os_remove):

          rpminfo = 2

-         filepath = 'fakebuildpath/data/signed/x86_64/fs_mark-3.3-20.el8.x86_64.rpm'

+         filepath = '%s/packages/fs_mark/3.3/20.el8/data/signed/x86_64/fs_mark-3.3-20.el8.x86_64.rpm' % self.tempdir

          self.get_rpm.return_value = self.rinfo

          self.get_build.return_value = self.buildinfo

          self.query_rpm_sigs.return_value = self.queryrpmsigs

-         expected_msg = "File %s cannot be deleted." % filepath

+         expected_msg = "Failed to delete %s" % filepath

          with self.assertRaises(koji.GenericError) as ex:

              kojihub.delete_rpm_sig(rpminfo, all_sigs=True)

          self.assertEqual(ex.exception.args[0], expected_msg)
@@ -148,18 +273,16 @@ 

          self.assertEqual(len(self.deletes), 2)

          delete = self.deletes[0]

          self.assertEqual(delete.table, 'rpmsigs')

-         self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)i"])

+         self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)s", "sigkey IN %(found_keys)s"])

  

          delete = self.deletes[1]

          self.assertEqual(delete.table, 'rpm_checksum')

-         self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)i"])

+         self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)s", "sigkey IN %(found_keys)s"])

          self.get_rpm.assert_called_once_with(rpminfo, strict=True)

          self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey=None)

-         self.get_build.assert_called_once_with(self.rinfo['build_id'])

+         self.get_build.assert_called_once_with(self.rinfo['build_id'], strict=True)

  

-     @mock.patch('koji.pathinfo.build', return_value='fakebuildpath')

-     @mock.patch('os.remove')

-     def test_valid(self, os_remove, pb):

+     def test_valid(self):

          rpminfo = 2

          self.get_rpm.return_value = self.rinfo

          self.get_build.return_value = self.buildinfo
@@ -167,14 +290,29 @@ 

          self.query_rpm_sigs.return_value = self.queryrpmsigs

          kojihub.delete_rpm_sig(rpminfo, all_sigs=True)

  

+         # the files should be gone

+         for sigkey in self.signed:

+             if os.path.exists(self.signed[sigkey]):

+                 raise Exception('signed copy not deleted')

+         for sigkey in self.sighdr:

+             if os.path.exists(self.sighdr[sigkey]):

+                 raise Exception('header still in place')

+ 

+         # the sighdrs should be saved

+         for siginfo in self.queryrpmsigs:

+             sigkey = siginfo['sigkey']

+             backup = "%s.%s.save" % (self.sighdr[sigkey], siginfo['sighash'])

+             with open(backup, 'rt') as fo:

+                 self.assertEqual(fo.read(), 'DETACHED SIGHDR\n')

+ 

          self.assertEqual(len(self.deletes), 2)

          delete = self.deletes[0]

          self.assertEqual(delete.table, 'rpmsigs')

-         self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)i"])

+         self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)s", "sigkey IN %(found_keys)s"])

  

          delete = self.deletes[1]

          self.assertEqual(delete.table, 'rpm_checksum')

-         self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)i"])

+         self.assertEqual(delete.clauses, ["rpm_id=%(rpm_id)s", "sigkey IN %(found_keys)s"])

          self.get_rpm.assert_called_once_with(rpminfo, strict=True)

          self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey=None)

-         self.get_build.assert_called_once_with(self.rinfo['build_id'])

+         self.get_build.assert_called_once_with(self.rinfo['build_id'], strict=True)

@@ -30,6 +30,36 @@ 

          my_rsession.close.assert_called()

          self.assertNotEqual(ksession.rsession, my_rsession)

  

+     @mock.patch('requests.Session')

+     def test_hub_version_old(self, rsession):

+         ksession = koji.ClientSession('http://koji.example.com/kojihub')

+         ksession.getKojiVersion = mock.MagicMock()

+         ksession.getKojiVersion.side_effect = koji.GenericError('Invalid method: getKojiVersion')

+         self.assertEqual(ksession.hub_version, (1, 22, 0))

+         ksession.getKojiVersion.assert_called_once()

+ 

+     @mock.patch('requests.Session')

+     def test_hub_version_interim(self, rsession):

+         ksession = koji.ClientSession('http://koji.example.com/kojihub')

+         ksession.getKojiVersion = mock.MagicMock()

+         ksession.getKojiVersion.return_value = '1.23.1'

+         self.assertEqual(ksession.hub_version, (1, 23, 1))

+         ksession.getKojiVersion.assert_called_once()

+ 

+     def test_hub_version_str_interim(self):

+         ksession = koji.ClientSession('http://koji.example.com/kojihub')

+         ksession.getKojiVersion = mock.MagicMock()

+         ksession.getKojiVersion.return_value = '1.23.1'

+         self.assertEqual(ksession.hub_version_str, '1.23.1')

+ 

+     def test_hub_version_new(self):

+         ksession = koji.ClientSession('http://koji.example.com/kojihub')

+         ksession.getKojiVersion = mock.MagicMock()

+         # would be filled by random call

+         ksession._ClientSession__hub_version = '1.35.0'

+         self.assertEqual(ksession.hub_version, (1, 35, 0))

+         ksession.getKojiVersion.assert_not_called()

+ 

  

  class TestFastUpload(unittest.TestCase):

  

@@ -57,12 +57,10 @@ 

        <th>Builder readiness</th>

      </tr>

      #for $channel in $channels

+     #if $channel['enabled_channel']

      <tr>

        <th>

            <a href="channelinfo?channelID=$channel['id']">$util.escapeHTML($channel['name'])</a>

-           #if not $channel['enabled_channel']

-           [disabled]

-           #end if

        </th>

        <td width="$graphWidth" class="graph">

          #if $channel['capacityPerc']
@@ -79,6 +77,7 @@ 

          </span>

        </td>

      </tr>

+     #end if

      #end for

    </table>