#3772 Scheduler part 1
Merged 2 years ago by tkopecek. Opened 2 years ago by mikem.
mikem/koji scheduler-work3  into  master

...
Mike McLean • 2 years ago  
...
Mike McLean • 2 years ago  
...
Mike McLean • 2 years ago  
flake8
Mike McLean • 2 years ago  
fixes
Mike McLean • 2 years ago  
...
Mike McLean • 2 years ago  
typo
Mike McLean • 2 years ago  
TEST2
Mike McLean • 2 years ago  
fragment
Mike McLean • 2 years ago  
...
Mike McLean • 2 years ago  
...
Mike McLean • 2 years ago  
...
Mike McLean • 2 years ago  
XXX TEST
Mike McLean • 2 years ago  
cleanup
Mike McLean • 2 years ago  
...
Mike McLean • 2 years ago  
typo
Mike McLean • 2 years ago  
...
Mike McLean • 2 years ago  
typo
Mike McLean • 2 years ago  
...
Mike McLean • 2 years ago  
wip
Tomas Kopecek • 2 years ago  
file modified
+122
@@ -7899,3 +7899,125 @@ 

              # repoID option added in 1.33

              if options.buildroots:

                  warn("--buildroots option is available with hub 1.33 or newer")

+ 

+ 

+ def _format_ts(ts):

+     if ts:

+         return time.strftime("%y-%m-%d %H:%M:%S", time.localtime(ts))

+     else:

+         return ''

+ 

+ 

+ def anon_handle_scheduler_info(goptions, session, args):

+     """[monitor] Show information about scheduling"""

+     usage = "usage: %prog schedulerinfo [options]"

+     parser = OptionParser(usage=get_usage_str(usage))

+     parser.add_option("-t", "--task", action="store", type=int, default=None,

+                       help="Limit data to given task id")

+     parser.add_option("--host", action="store", default=None,

+                       help="Limit data to given builder id")

+     parser.add_option("--state", action="store", type='choice', default=None,

+                       choices=[x for x in koji.TASK_STATES.keys()],

+                       help="Limit data to task state")

+     (options, args) = parser.parse_args(args)

+     if len(args) > 0:

+         parser.error("This command takes no arguments")

+ 

+     ensure_connection(session, goptions)

+ 

+     host_id = None

+     if options.host:

+         try:

+             host_id = int(options.host)

+         except ValueError:

+             host_id = session.getHost(options.host, strict=True)['id']

+ 

+     # get the data

+     clauses = []

+     if options.task:

+         clauses.append(('task_id', options.task))

+     if options.host:

+         clauses.append(('host_id', options.host))

+     if options.state:

+         clauses.append(('state', koji.TASK_STATES[options.state]))

+ 

+     runs = session.scheduler.getTaskRuns(

+         clauses=clauses,

+         fields=('task_id', 'host_name', 'state', 'create_ts', 'start_ts', 'completion_ts')

+     )

+     mask = '%(task_id)-9s %(host_name)-20s %(state)-7s ' \

+            '%(create_ts)-17s %(start_ts)-17s %(completion_ts)-17s'

+     if not goptions.quiet:

+         header = mask % {

+             'task_id': 'Task',

+             'host_name': 'Host',

+             'state': 'State',

+             'create_ts': 'Created',

+             'start_ts': 'Started',

+             'completion_ts': 'Ended',

+         }

+         print(header)

+         print('-' * len(header))

+     for run in runs:

+         run['state'] = koji.TASK_STATES[run['state']]

+         for ts in ('create_ts', 'start_ts', 'completion_ts'):

+             run[ts] = _format_ts(run[ts])

+         print(mask % run)

+ 

+     if host_id:

+         print('Host data for %s:' % options.host)

+         host_data = session.scheduler.getHostData(hostID=host_id)

+         if len(host_data) > 0:

+             print(host_data[0]['data'])

+         else:

+             print('-')

+ 

+ 

+ def handle_scheduler_logs(goptions, session, args):

+     "[monitor] Query scheduler logs"

+     usage = "usage: %prog scheduler-logs <options>"

+     parser = OptionParser(usage=get_usage_str(usage))

+     parser.add_option("--task", type="int", action="store",

+                       help="Filter by task ID")

+     parser.add_option("--host", type="str", action="store",

+                       help="Filter by host (name/ID)")

+     parser.add_option("--from", type="float", action="store", dest="from_ts",

+                       help="Logs from given timestamp")

+     parser.add_option("--to", type="float", action="store", dest="to_ts",

+                       help="Logs until given timestamp (included)")

+     (options, args) = parser.parse_args(args)

+     if len(args) != 0:

+         parser.error("There are no arguments for this command")

+ 

+     clauses = []

+     if options.task:

+         clauses.append(['task_id', options.task])

+     if options.host:

+         try:

+             host_id = int(options.host)

+         except ValueError:

+             host_id = session.getHost(options.host)['id']

+         clauses.append(['host_id', host_id])

+     if options.from_ts:

+         clauses.append(['msg_ts', '>=', options.from_ts])

+     if options.to_ts:

+         clauses.append(['msg_ts', '<', options.to_ts])

+ 

+     logs = session.scheduler.getLogMessages(clauses, fields=('task_id', 'host_id', 'host_name',

+                                                              'msg_ts', 'msg'))

+     for log in logs:

+         log['time'] = time.asctime(time.localtime(log['msg_ts']))

+ 

+     mask = ("%(task_id)s\t%(host_name)s\t%(time)s\t%(msg)s")

+     if not goptions.quiet:

+         h = mask % {

+             'task_id': 'Task',

+             'host_name': 'Host',

+             'time': 'Time',

+             'msg': 'Message',

+         }

+         print(h)

+         print('-' * len(h))

+ 

+     for log in logs:

+         print(mask % log)

file modified
+4 -3
@@ -261,12 +261,13 @@ 

          # not finished either.  info would be none.

          if not info:

              return 'unknown'

-         if info['state'] == koji.TASK_STATES['OPEN']:

+         if koji.TASK_STATES[info['state']] in ['OPEN', 'ASSIGNED']:

+             state = koji.TASK_STATES[info['state']].lower()

              if info['host_id']:

                  host = self.session.getHost(info['host_id'])

-                 return 'open (%s)' % host['name']

+                 return '%s (%s)' % (state, host['name'])

              else:

-                 return 'open'

+                 return state

          elif info['state'] == koji.TASK_STATES['FAILED']:

              s = 'FAILED: %s' % self.get_failure()

  

@@ -0,0 +1,51 @@ 

+ -- upgrade script to migrate the Koji database schema

+ -- from version 1.33 to 1.34

+ 

+ BEGIN;

+     -- scheduler tables

+     CREATE TABLE scheduler_task_runs (

+             id SERIAL NOT NULL PRIMARY KEY,

+             task_id INTEGER REFERENCES task (id) NOT NULL,

+             host_id INTEGER REFERENCES host (id) NOT NULL,

+             active BOOLEAN NOT NULL DEFAULT TRUE,

+             create_time TIMESTAMPTZ NOT NULL DEFAULT NOW()

+     ) WITHOUT OIDS;

+     CREATE INDEX scheduler_task_runs_task ON scheduler_task_runs(task_id);

+     CREATE INDEX scheduler_task_runs_host ON scheduler_task_runs(host_id);

+     CREATE INDEX scheduler_task_runs_create_time ON scheduler_task_runs(create_time);

+ 

+ 

+     CREATE TABLE scheduler_host_data (

+             host_id INTEGER REFERENCES host (id) PRIMARY KEY,

+             data JSONB

+     ) WITHOUT OIDS;

+ 

+ 

+     CREATE TABLE scheduler_sys_data (

+             name TEXT NOT NULL PRIMARY KEY,

+             data JSONB

+     ) WITHOUT OIDS;

+ 

+ 

+     CREATE TABLE scheduler_task_refusals (

+             id SERIAL NOT NULL PRIMARY KEY,

+             task_id INTEGER REFERENCES task (id) NOT NULL,

+             host_id INTEGER REFERENCES host (id) NOT NULL,

+             by_host BOOLEAN NOT NULL,

+             soft BOOLEAN NOT NULL DEFAULT FALSE,

+             msg TEXT,

+             time TIMESTAMPTZ NOT NULL DEFAULT NOW(),

+             UNIQUE (task_id, host_id)

+     ) WITHOUT OIDS;

+ 

+ 

+     CREATE TABLE scheduler_log_messages (

+             id SERIAL NOT NULL PRIMARY KEY,

+             task_id INTEGER REFERENCES task (id),

+             host_id INTEGER REFERENCES host (id),

+             msg_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),

+             msg TEXT NOT NULL

+     ) WITHOUT OIDS;

+ 

+     INSERT INTO locks(name) VALUES('scheduler');

+ COMMIT;

file modified
+2
@@ -1429,6 +1429,8 @@ 

              if not valid_host:

                  self.logger.info(

                      'Skipping task %s (%s) due to host check', task['id'], task['method'])

+                 if task['state'] == koji.TASK_STATES['ASSIGNED']:

+                     self.session.host.refuseTask(task['id'], soft=False, msg='failed host check')

                  return False

          data = self.session.host.openTask(task['id'])

          if data is None:

file modified
+129
@@ -444,6 +444,42 @@ 

          return _dml(str(self), self.data)

  

  

+ class UpsertProcessor(InsertProcessor):

+     """Build a basic upsert statement

+ 

+     table - the table to insert into

+     data - a dictionary of data to insert (keys = row names)

+     rawdata - data to insert specified as sql expressions rather than python values

+     keys - the rows that are the unique keys

+     skip_dup - if set to true, do nothing on conflict

+     """

+ 

+     def __init__(self, table, data=None, rawdata=None, keys=None, skip_dup=False):

+         super(UpsertProcessor, self).__init__(table, data=data, rawdata=rawdata)

+         self.keys = keys

+         self.skip_dup = skip_dup

+         if not keys and not skip_dup:

+             raise ValueError('either keys or skip_dup must be set')

+ 

+     def __repr__(self):

+         return "<UpsertProcessor: %r>" % vars(self)

+ 

+     def __str__(self):

+         insert = super(UpsertProcessor, self).__str__()

+         parts = [insert]

+         if self.skip_dup:

+             parts.append(' ON CONFLICT DO NOTHING')

+         else:

+             parts.append(f' ON CONFLICT ({",".join(self.keys)}) DO UPDATE SET ')

+             # filter out conflict keys from data

+             data = {k: self.data[k] for k in self.data if k not in self.keys}

+             rawdata = {k: self.rawdata[k] for k in self.rawdata if k not in self.keys}

+             assigns = [f"{key} = %({key})s" for key in data]

+             assigns.extend([f"{key} = ({rawdata[key]})" for key in self.rawdata])

+             parts.append(', '.join(sorted(assigns)))

+         return ''.join(parts)

+ 

+ 

  class UpdateProcessor(object):

      """Build an update statement

  
@@ -834,6 +870,99 @@ 

          return results

  

  

+ class QueryView:

+     # abstract base class

+ 

+     # subclasses should provide...

+     tables = []

+     joins = []

+     joinmap = {}

+     fieldmap = {}

+     default_fields = ()

+ 

+     def __init__(self, clauses=None, fields=None, opts=None):

+         self.extra_joins = []

+         self.values = {}

+         tables = list(self.tables)  # copy

+         fields = self.get_fields(fields)

+         fields, aliases = zip(*fields.items())

+         clauses = self.get_clauses(clauses)

+         joins = self.get_joins()

+         self.query = QueryProcessor(

+             columns=fields, aliases=aliases,

+             tables=tables, joins=joins,

+             clauses=clauses, values=self.values,

+             opts=opts)

+ 

+     def get_fields(self, fields):

+         fields = fields or self.default_fields

+         if not fields or fields == '*':

+             fields = sorted(self.fieldmap.keys())

+ 

+         return {self.map_field(f): f for f in fields}

+ 

+     def map_field(self, field):

+         f_info = self.fieldmap.get(field)

+         if f_info is None:

+             raise koji.ParameterError(f'Invalid field for query {field}')

+         fullname, joinkey = f_info

+         fullname = fullname or field

+         if joinkey:

+             self.extra_joins.append(joinkey)

+             # duplicates removed later

+         return fullname

+ 

+     def get_clauses(self, clauses):

+         # for now, just a very simple implementation

+         result = []

+         clauses = clauses or []

+         for n, clause in enumerate(clauses):

+             # TODO checks check checks

+             if len(clause) == 2:

+                 # implicit operator

+                 field, value = clause

+                 if isinstance(value, (list, tuple)):

+                     op = 'IN'

+                 else:

+                     op = '='

+             elif len(clause) == 3:

+                 field, op, value = clause

+                 op = op.upper()

+                 if op not in ('IN', '=', '!=', '>', '<', '>=', '<='):

+                     raise koji.ParameterError(f'Invalid operator: {op}')

+             else:

+                 raise koji.ParameterError(f'Invalid clause: {clause}')

+             fullname = self.map_field(field)

+             key = f'v_{field}_{n}'

+             self.values[key] = value

+             result.append(f'{fullname} {op} %({key})s')

+ 

+         return result

+ 

+     def get_joins(self):

+         joins = list(self.joins)

+         seen = set()

+         # note we preserve the order that extra joins were added

+         for joinkey in self.extra_joins:

+             if joinkey in seen:

+                 continue

+             seen.add(joinkey)

+             joins.append(self.joinmap[joinkey])

+         return joins

+ 

+     def execute(self):

+         return self.query.execute()

+ 

+     def executeOne(self, strict=False):

+         return self.query.executeOne(strict=strict)

+ 

+     def iterate(self):

+         return self.query.iterate()

+ 

+     def singleValue(self, strict=True):

+         return self.query.singleValue(strict=strict)

+ 

+ 

  class BulkInsertProcessor(object):

      def __init__(self, table, data=None, columns=None, strict=True, batch=1000):

          """Do bulk inserts - it has some limitations compared to

file modified
+51 -71
@@ -74,6 +74,7 @@ 

      multi_fnmatch,

      safer_move,

  )

+ from . import scheduler

  from .auth import get_user_perms, get_user_groups

  from .db import (  # noqa: F401

      BulkInsertProcessor,
@@ -82,6 +83,7 @@ 

      QueryProcessor,

      Savepoint,

      UpdateProcessor,

+     UpsertProcessor,

      _applyQueryOpts,

      _dml,

      _fetchSingle,
@@ -387,6 +389,7 @@ 

                                   data={'result': info['result'], 'state': state},

                                   rawdata={'completion_time': 'NOW()'})

          update.execute()

+ 

          self.runCallbacks('postTaskStateChange', info, 'state', state)

          self.runCallbacks('postTaskStateChange', info, 'completion_ts', now)

  
@@ -2534,44 +2537,6 @@ 

      update.execute()

  

  

- def get_ready_hosts():

-     """Return information about hosts that are ready to build.

- 

-     Hosts set the ready flag themselves

-     Note: We ignore hosts that are late checking in (even if a host

-         is busy with tasks, it should be checking in quite often).

-     """

-     query = QueryProcessor(

-         tables=['host'],

-         columns=['host.id', 'name', 'arches', 'task_load', 'capacity'],

-         aliases=['id', 'name', 'arches', 'task_load', 'capacity'],

-         clauses=[

-             'enabled IS TRUE',

-             'ready IS TRUE',

-             'expired IS FALSE',

-             'master IS NULL',

-             'active IS TRUE',

-             "sessions.update_time > NOW() - '5 minutes'::interval"

-         ],

-         joins=[

-             'sessions USING (user_id)',

-             'host_config ON host.id = host_config.host_id'

-         ]

-     )

-     hosts = query.execute()

-     for host in hosts:

-         query = QueryProcessor(

-             tables=['host_channels'],

-             columns=['channel_id'],

-             clauses=['host_id=%(id)s', 'active IS TRUE', 'enabled IS TRUE'],

-             joins=['channels ON host_channels.channel_id = channels.id'],

-             values=host

-         )

-         rows = query.execute()

-         host['channels'] = [row['channel_id'] for row in rows]

-     return hosts

- 

- 

  def get_all_arches():

      """Return a list of all (canonical) arches available from hosts"""

      ret = {}
@@ -2587,27 +2552,6 @@ 

      return list(ret.keys())

  

  

- def get_active_tasks(host=None):

-     """Return data on tasks that are yet to be run"""

-     fields = ['id', 'state', 'channel_id', 'host_id', 'arch', 'method', 'priority', 'create_time']

-     values = dslice(koji.TASK_STATES, ('FREE', 'ASSIGNED'))

-     if host:

-         values['arches'] = host['arches'].split() + ['noarch']

-         values['channels'] = host['channels']

-         values['host_id'] = host['id']

-         clause = '(state = %(ASSIGNED)i AND host_id = %(host_id)i)'

-         if values['channels']:

-             clause += ''' OR (state = %(FREE)i AND arch IN %(arches)s \

- AND channel_id IN %(channels)s)'''

-         clauses = [clause]

-     else:

-         clauses = ['state IN (%(FREE)i,%(ASSIGNED)i)']

-     queryOpts = {'limit': 100, 'order': 'priority,create_time'}

-     query = QueryProcessor(columns=fields, tables=['task'], clauses=clauses,

-                            values=values, opts=queryOpts)

-     return query.execute()

- 

- 

  def get_task_descendents(task, childMap=None, request=False):

      if childMap is None:

          childMap = {}
@@ -14324,18 +14268,17 @@ 

      def getLoadData(self):

          """Get load balancing data

  

-         This data is relatively small and the necessary load analysis is

-         relatively complex, so we let the host machines crunch it."""

-         hosts = get_ready_hosts()

-         for host in hosts:

-             if host['id'] == self.id:

-                 break

-         else:

-             # this host not in ready list

-             return [[], []]

-         # host is the host making the call

-         tasks = get_active_tasks(host)

-         return [hosts, tasks]

+         This call is here for backwards compatibility.

+         Originally, it returned broad information about all hosts and tasks so that individual

+         hosts could make informed decisions about which task to take.

+ 

+         Now it presents only data for the calling host and the tasks that have been assigned to

+         it"""

+ 

+         host = get_host(self.id)

+         host['channels'] = [c['id'] for c in list_channels(hostID=self.id)]

+         tasks = scheduler.get_tasks_for_host(hostID=self.id, retry=True)

+         return [[host], tasks]

  

      def isEnabled(self):

          """Return whether this host is enabled or not."""
@@ -14409,6 +14352,43 @@ 

          task.assertHost(host.id)

          return task.setWeight(weight)

  

+     def setHostData(self, hostdata):

+         """Builder will update all its resources

+ 

+         Initial implementation contains:

+           - available task methods

+           - maxjobs

+           - host readiness

+         """

+         host = Host()

+         host.verify()

+         upsert = UpsertProcessor(

+             table='scheduler_host_data',

+             keys=['host_id'],

+             data={'host_id': host.id, 'data': hostdata},

+         )

+         upsert.execute()

+ 

+     def getTasks(self):

+         host = Host()

+         host.verify()

+         return scheduler.get_tasks_for_host(hostID=host.id, retry=True)

+ 

+     def refuseTask(self, task_id, soft=True, msg=''):

+         soft = convert_value(soft, cast=bool)

+         msg = convert_value(msg, cast=str)

+         host = Host()

+         host.verify()

+ 

+         task = Task(task_id)

+         tinfo = task.getInfo(strict=True)

+         if tinfo['host_id'] != host.id:

+             logger.warning('Host %s refused unrelated task: %s', host.id, tinfo['id'])

+             return

+         scheduler.set_refusal(host.id, tinfo['id'], soft=soft, msg=msg, by_host=True)

+         # also free the task

+         task.free()

+ 

      def getHostTasks(self):

          host = Host()

          host.verify()

file modified
+12
@@ -41,6 +41,7 @@ 

  from koji.xmlrpcplus import ExtendedMarshaller, Fault, dumps, getparser

  from . import auth

  from . import db

+ from . import scheduler

  

  

  class Marshaller(ExtendedMarshaller):
@@ -502,6 +503,15 @@ 

          ['RPMDefaultChecksums', 'string', 'md5 sha256'],

  

          ['SessionRenewalTimeout', 'integer', 1440],

+ 

+         # scheduler options

+         ['MaxJobs', 'integer', 15],

+         ['CapacityOvercommit', 'integer', 5],

+         ['ReadyTimeout', 'integer', 180],

+         ['AssignTimeout', 'integer', 300],

+         ['SoftRefusalTimeout', 'integer', 900],

+         ['HostTimeout', 'integer', 900],

+         ['RunInterval', 'integer', 60],

      ]

      opts = {}

      for name, dtype, default in cfgmap:
@@ -844,8 +854,10 @@ 

      registry = HandlerRegistry()

      functions = kojihub.RootExports()

      hostFunctions = kojihub.HostExports()

+     schedulerFunctions = scheduler.SchedulerExports()

      registry.register_instance(functions)

      registry.register_module(hostFunctions, "host")

+     registry.register_module(schedulerFunctions, "scheduler")

      registry.register_function(auth.login)

      registry.register_function(auth.sslLogin)

      registry.register_function(auth.logout)

file added
+605
@@ -0,0 +1,605 @@ 

+ import json

+ import logging

+ import time

+ 

+ import koji

+ from koji.context import context

+ from . import kojihub

+ from .db import QueryProcessor, InsertProcessor, UpsertProcessor, UpdateProcessor, \

+     DeleteProcessor, QueryView, db_lock

+ 

+ 

+ logger = logging.getLogger('koji.scheduler')

+ 

+ 

+ def log_db(msg, task_id=None, host_id=None):

+     insert = InsertProcessor(

+         'scheduler_log_messages',

+         data={'msg': msg, 'task_id': task_id, 'host_id': host_id},

+     )

+     insert.execute()

+ 

+ 

+ def log_both(msg, task_id=None, host_id=None, level=logging.INFO):

+     pre1 = f"[task_id={task_id}] " if task_id else ""

+     pre2 = f"[host_id={host_id}] " if host_id else ""

+     logger.log(level, '%s%s%s', pre1, pre2, msg)

+     log_db(msg, task_id, host_id)

+ 

+ 

+ class LogMessagesQuery(QueryView):

+ 

+     tables = ['scheduler_log_messages']

+     joinmap = {

+         # outer joins because these fields can be null

+         'task': 'LEFT JOIN task ON scheduler_log_messages.task_id = task.id',

+         'host': 'LEFT JOIN host ON scheduler_log_messages.host_id = host.id',

+     }

+     fieldmap = {

+         'id': ['scheduler_log_messages.id', None],

+         'task_id': ['scheduler_log_messages.task_id', None],

+         'host_id': ['scheduler_log_messages.host_id', None],

+         'msg_ts': ["date_part('epoch', scheduler_log_messages.msg_time)", None],

+         'msg': ['scheduler_log_messages.msg', None],

+         'method': ['task.method', 'task'],

+         'state': ['task.state', 'task'],

+         'owner': ['task.owner', 'task'],

+         'arch': ['task.arch', 'task'],

+         'channel_id': ['task.channel_id', 'task'],

+         'host_name': ['host.name', 'host'],

+         'host_ready': ['host.ready', 'host'],

+     }

+     default_fields = ('id', 'task_id', 'host_id', 'msg', 'msg_ts')

+ 

+ 

+ def get_log_messages(clauses=None, fields=None):

+     return LogMessagesQuery(clauses, fields).execute()

+ 

+ 

+ def get_tasks_for_host(hostID, retry=True):

+     """Get the tasks assigned to a given host"""

+     hostID = kojihub.convert_value(hostID, cast=int, none_allowed=True)

+ 

+     fields = (

+         ('task.id', 'id'),

+         ('task.state', 'state'),

+         ('task.channel_id', 'channel_id'),

+         ('task.host_id', 'host_id'),

+         ('task.arch', 'arch'),

+         ('task.method', 'method'),

+         ('task.priority', 'priority'),

+         ("date_part('epoch', create_time)", 'create_ts'),

+     )

+     fields, aliases = zip(*fields)

+ 

+     query = QueryProcessor(

+         columns=fields, aliases=aliases, tables=['task'],

+         clauses=['host_id = %(hostID)s', 'state=%(assigned)s'],

+         values={'hostID': hostID, 'assigned': koji.TASK_STATES['ASSIGNED']},

+     )

+ 

+     tasks = query.execute()

+ 

+     if not tasks and retry:

+         # run scheduler and try again

+         TaskScheduler().run()

+         tasks = query.execute()

+ 

+     return tasks

+ 

+ 

+ def set_refusal(hostID, taskID, soft=True, by_host=False, msg=''):

+     data = {

+         'task_id': kojihub.convert_value(hostID, cast=int),

+         'host_id': kojihub.convert_value(taskID, cast=int),

+         'soft': kojihub.convert_value(soft, cast=bool),

+         'by_host': kojihub.convert_value(by_host, cast=bool),

+         'msg': kojihub.convert_value(msg, cast=str),

+     }

+     upsert = UpsertProcessor('scheduler_task_refusals', data=data, keys=('task_id', 'host_id'))

+     upsert.execute()

+     log_both('Host refused task', task_id=taskID, host_id=hostID)

+ 

+ 

+ class TaskRefusalsQuery(QueryView):

+ 

+     tables = ['scheduler_task_refusals']

+     joinmap = {

+         'task': 'task ON scheduler_task_refusals.task_id = task.id',

+         'host': 'host ON scheduler_task_refusals.host_id = host.id',

+     }

+     fieldmap = {

+         'id': ['scheduler_task_refusals.id', None],

+         'task_id': ['scheduler_task_refusals.task_id', None],

+         'host_id': ['scheduler_task_refusals.host_id', None],

+         'by_host': ['scheduler_task_refusals.by_host', None],

+         'soft': ['scheduler_task_refusals.soft', None],

+         'msg': ['scheduler_task_refusals.msg', None],

+         'ts': ["date_part('epoch', scheduler_task_refusals.time)", None],

+         'method': ['task.method', 'task'],

+         'state': ['task.state', 'task'],

+         'owner': ['task.owner', 'task'],

+         'arch': ['task.arch', 'task'],

+         'channel_id': ['task.channel_id', 'task'],

+         'host_name': ['host.name', 'host'],

+         'host_ready': ['host.ready', 'host'],

+     }

+     default_fields = ('id', 'task_id', 'host_id', 'by_host', 'soft', 'msg', 'ts')

+ 

+ 

+ def get_task_refusals(clauses=None, fields=None):

+     return TaskRefusalsQuery(clauses, fields).execute()

+ 

+ 

+ def get_host_data(hostID=None):

+     """Return actual builder data

+ 

+     :param int hostID: Return data for given host (otherwise for all)

+     :returns list[dict]: list of host_id/data dicts

+     """

+     clauses = []

+     columns = ['host_id', 'data']

+     if hostID is not None:

+         clauses.append('host_id = %(hostID)i')

+     query = QueryProcessor(

+         tables=['scheduler_host_data'],

+         clauses=clauses,

+         columns=columns,

+         values=locals(),

+         opts={'order': 'host_id'}

+     )

+ 

+     return query.execute()

+ 

+ 

+ class TaskRunsQuery(QueryView):

+ 

+     tables = ['scheduler_task_runs']

+     joinmap = {

+         'task': 'task ON scheduler_task_runs.task_id = task.id',

+         'host': 'host ON scheduler_task_runs.host_id = host.id',

+     }

+     fieldmap = {

+         'id': ['scheduler_task_runs.id', None],

+         'task_id': ['scheduler_task_runs.task_id', None],

+         'method': ['task.method', 'task'],

+         'state': ['task.state', 'task'],

+         'owner': ['task.owner', 'task'],

+         'arch': ['task.arch', 'task'],

+         'channel_id': ['task.channel_id', 'task'],

+         'host_name': ['host.name', 'host'],

+         'host_ready': ['host.ready', 'host'],

+         'host_id': ['scheduler_task_runs.host_id', None],

+         'active': ['scheduler_task_runs.active', None],

+         'create_ts': ["date_part('epoch', scheduler_task_runs.create_time)", None],

+         'start_ts': ["date_part('epoch', task.start_time)", 'task'],

+         'completion_ts': ["date_part('epoch', task.completion_time)", 'task'],

+     }

+     default_fields = ('id', 'task_id', 'host_id', 'active', 'create_ts')

+ 

+ 

+ def get_task_runs(clauses=None, fields=None):

+     return TaskRunsQuery(clauses, fields).execute()

+ 

+ 

+ class TaskScheduler(object):

+ 

+     def __init__(self):

+         self.hosts_by_bin = {}

+         self.hosts = {}

+         self.active_tasks = []

+         self.free_tasks = []

+ 

+         # TODO these things need proper config

+         self.maxjobs = context.opts['MaxJobs']

+         self.capacity_overcommit = context.opts['CapacityOvercommit']

+         self.ready_timeout = context.opts['ReadyTimeout']

+         self.assign_timeout = context.opts['AssignTimeout']

+         self.soft_refusal_timeout = context.opts['SoftRefusalTimeout']

+         self.host_timeout = context.opts['HostTimeout']

+         self.run_interval = context.opts['RunInterval']

+ 

+     def run(self, force=False):

+         if not db_lock('scheduler', wait=force):

+             # already running elsewhere

+             return False

+ 

+         if not force and not self.check_ts():

+             # already ran too recently

+             return False

+ 

+         logger.info('Running task scheduler')

+         self.get_tasks()

+         self.get_hosts()

+         self.check_hosts()

+         self.do_schedule()

+         self.check_active_tasks()

+ 

+         return True

+ 

+     def check_ts(self):

+         """Check the last run timestamp

+ 

+         Returns True if the scheduler should run, False otherwise

+         """

+ 

+         # get last ts

+         query = QueryProcessor(

+             tables=['scheduler_sys_data'],

+             columns=['data'],

+             clauses=['name = %(name)s'],

+             values={'name': 'last_run_ts'},

+         )

+         last = query.singleValue(strict=False) or 0

+ 

+         now = time.time()

+         delta = now - last

+ 

+         if delta < 0:

+             logger.error('Last run in the future by %i seconds', -delta)

+             ret = False

+             # update the ts so that a system time rollback doesn't keep us from running

+         elif delta < self.run_interval:

+             logger.debug('Skipping run due to run_interval setting')

+             # return now without updating ts

+             return False

+         else:

+             ret = True

+ 

+         # save current ts

+         upsert = UpsertProcessor(

+             'scheduler_sys_data',

+             data={'name': 'last_run_ts',

+                   'data': json.dumps(now)},

+             keys=['name'],

+         )

+         upsert.execute()

+ 

+         return ret

+ 

+     def do_schedule(self):

+         # debug

+         logger.info(f'Hosts: {len(self.hosts)}')

+         logger.info(f'Free tasks: {len(self.free_tasks)}')

+         logger.info(f'Active tasks: {len(self.active_tasks)}')

+ 

+         # calculate host load and task count

+         for task in self.active_tasks:

+             # for now, we mirror what kojid updateTasks has been doing

+             host = self.hosts.get(task['host_id'])

+             if not host:

+                 # not showing as ready

+                 # TODO log and deal with this condition

+                 continue

+             host.setdefault('_load', 0.0)

+             if not task['waiting']:

+                 host['_load'] += task['weight']

+             host.setdefault('_ntasks', 0)

+             host['_ntasks'] += 1

+ 

+         for host in self.hosts.values():

+             host.setdefault('_load', 0.0)

+             host.setdefault('_ntasks', 0)

+             host.setdefault('_demand', 0.0)

+             # temporary test code

+             logger.info(f'Host: {host}')

+             ldiff = host['task_load'] - host['_load']

+             if abs(ldiff) > 0.01:

+                 # this is expected in a number of cases, just observing

+                 logger.info(f'Host load differs by {ldiff:.2f}: {host}')

+ 

+         # figure out which hosts *can* take each task

+         # at the moment this is mostly just bin, but in the future it will be more complex

+         refusals = self.get_refusals()

+         for task in self.free_tasks:

+             task['_hosts'] = []

+             min_avail = min(0, task['weight'] - self.capacity_overcommit)

+             h_refused = refusals.get(task['task_id'], {})

+             for host in self.hosts_by_bin.get(task['_bin'], []):

+                 if (host['ready'] and host['_ntasks'] < self.maxjobs and

+                         host['capacity'] - host['_load'] > min_avail and

+                         host['id'] not in h_refused):

+                     task['_hosts'].append(host)

+             logger.info(f'Task {task["task_id"]}: {len(task["_hosts"])} options')

+             for host in task['_hosts']:

+                 # demand gives us a rough measure of how much overall load is pending for the host

+                 host.setdefault('_demand', 0.0)

+                 host['_demand'] += task['weight'] / len(task['_hosts'])

+ 

+         # normalize demand to 1

+         max_demand = sum([h['_demand'] for h in self.hosts.values()])

+         if max_demand > 0.0:

+             for h in self.hosts.values():

+                 h['_demand'] = (h['_demand'] / max_demand)

+ 

+         for h in self.hosts.values():

+             self._rank_host(h)

+ 

+         # tasks are already in priority order

+         for task in self.free_tasks:

+             min_avail = task['weight'] - self.capacity_overcommit

+             task['_hosts'].sort(key=lambda h: h['_rank'])

+             logger.debug('Task %i choices: %s', task['task_id'],

+                          [(h['name'], "%(_rank).2f" % h) for h in task['_hosts']])

+             for host in task['_hosts']:

+                 if (host['capacity'] - host['_load'] > min_avail and

+                         host['_ntasks'] < self.maxjobs):

+                     # add run entry

+                     self.add_run(task, host)

+                     # update our totals and rank

+                     host['_load'] += task['weight']

+                     host['_ntasks'] += 1

+                     self._rank_host(host)

+                     break

+             else:

+                 logger.debug('Could not assign task %s', task['task_id'])

+ 

+     def _rank_host(self, host):

+         host['_rank'] = host['_load'] + host['_ntasks'] + host['_demand']

+ 

+     def check_active_tasks(self):

+         """Check on active tasks"""

+         runs = self.get_active_runs()

+         logger.info('Found %i active runs', len(runs))

+         logger.info('Checking on %i active tasks', len(self.active_tasks))

+         for task in self.active_tasks:

+ 

+             if not task['host_id']:

+                 log_both('Active task with no host', task_id=task['task_id'], level=logging.ERROR)

+                 kojihub.Task(task['task_id']).free()

+                 continue

+ 

+             host = self.hosts.get(task['host_id'])

+             if not host:

+                 # host disabled?

+                 # TODO

+                 continue

+ 

+             taskruns = runs.get(task['task_id'], [])

+             if not taskruns:

+                 log_both('Assigned task with no active run entry', task_id=task['task_id'],

+                          host_id=host['id'], level=logging.ERROR)

+                 kojihub.Task(task['task_id']).free()

+                 continue

+ 

+             if len(taskruns) > 1:

+                 logger.error('Multiple active run entries for assigned task %(task_id)s',

+                              task)

+                 # TODO fix

+ 

+             if task['state'] == koji.TASK_STATES['ASSIGNED']:

+                 # TODO check time since assigned

+                 # if not taken within a timeout

+                 #  - if host not checking in, then make sure host marked unavail and free

+                 #  - if host *is* checking in, then treat as refusal and free

+                 age = time.time() - min([r['create_ts'] for r in taskruns])

+                 if age > self.assign_timeout:

+                     log_both('Task assignment timeout', task_id=task['task_id'],

+                              host_id=host['id'])

+                     kojihub.Task(task['task_id']).free()

+ 

+             elif task['state'] == koji.TASK_STATES['OPEN']:

+                 if host['update_ts'] is None:

+                     # shouldn't happen?

+                     # fall back to task_run time

+                     age = time.time() - min([r['create_ts'] for r in taskruns])

+                 else:

+                     age = time.time() - host['update_ts']

+                 if age > self.host_timeout:

+                     log_both('Freeing task from unresponsive host', task_id=task['task_id'],

+                              host_id=host['id'])

+                     kojihub.Task(task['task_id']).free()

+ 

+         # end stale runs

+         update = UpdateProcessor(

+             'scheduler_task_runs',

+             data={'active': False},

+             clauses=['active = TRUE',

+                      '(SELECT id FROM task WHERE task.id=task_id AND '

+                      'state IN %(states)s) IS NULL'],

+             values={'states': [koji.TASK_STATES[s] for s in ('OPEN', 'ASSIGNED')]},

+         )

+         update.execute()

+ 

+     def check_hosts(self):

+         # sanity check ready status

+         hosts_to_mark = []

+         for host in self.hosts.values():

+             if not host['ready']:

+                 continue

+             if (host['update_ts'] is None or time.time() - host['update_ts'] > self.ready_timeout):

+                 hosts_to_mark.append(host)

+                 log_both('Marking host not ready', host_id=host['id'])

+ 

+         if hosts_to_mark:

+             update = UpdateProcessor(

+                 'host',

+                 data={'ready': False},

+                 clauses=['id IN %(host_ids)s'],

+                 values={'host_ids': [h['id'] for h in hosts_to_mark]},

+             )

+             update.execute()

+ 

+     def get_active_runs(self):

+         runs = get_task_runs([["active", True]])

+         runs_by_task = {}

+         for run in runs:

+             runs_by_task.setdefault(run['task_id'], [])

+             runs_by_task[run['task_id']].append(run)

+ 

+         return runs_by_task

+ 

+     def get_tasks(self):

+         """Get the task data that we need for scheduling"""

+ 

+         fields = (

+             ('task.id', 'task_id'),

+             ('task.state', 'state'),

+             ('task.waiting', 'waiting'),

+             ('task.weight', 'weight'),

+             ('channel_id', 'channel_id'),

+             ('task.host_id', 'host_id'),

+             ('arch', 'arch'),

+             ('method', 'method'),

+             ('priority', 'priority'),

+             ("date_part('epoch', task.create_time)", 'create_ts'),

+             # ('scheduler_task_runs.id', 'run_id'),

+         )

+         fields, aliases = zip(*fields)

+ 

+         values = {'states': [koji.TASK_STATES[n] for n in ('ASSIGNED', 'OPEN')]}

+ 

+         query = QueryProcessor(

+             columns=fields, aliases=aliases, tables=['task'],

+             clauses=('task.state IN %(states)s',

+                      'task.host_id IS NOT NULL',  # should always be set, but...

+                      ),

+             values=values,

+         )

+         active_tasks = query.execute()

+ 

+         values = {'state': koji.TASK_STATES['FREE']}

+         query = QueryProcessor(

+             columns=fields, aliases=aliases, tables=['task'],

+             clauses=('task.state = %(state)s',),

+             values=values,

+             opts={'order': 'priority,create_ts', 'limit': 1000},  # TODO config

+             # scheduler order

+             # lower priority numbers take precedence, like posix process priority

+             # at a given priority, earlier creation times take precedence

+         )

+         free_tasks = query.execute()

+ 

+         for task in free_tasks:

+             tbin = '%(channel_id)s:%(arch)s' % task

+             task['_bin'] = tbin

+ 

+         for task in active_tasks:

+             tbin = '%(channel_id)s:%(arch)s' % task

+             task['_bin'] = tbin

+ 

+         self.free_tasks = free_tasks

+         self.active_tasks = active_tasks

+ 

+     def get_refusals(self):

+         """Get task refusals and clean stale entries"""

+         refusals = {}

+         cutoff_ts = time.time() - self.soft_refusal_timeout

+         to_drop = []

+         for row in get_task_refusals(fields=('id', 'task_id', 'host_id', 'soft', 'ts', 'state')):

+             if ((row['soft'] and row['ts'] < cutoff_ts) or

+                     koji.TASK_STATES[row['state']] not in ('FREE', 'OPEN', 'ASSIGNED')):

+                 to_drop.append(row['id'])

+             else:

+                 # index by task and host

+                 refusals.setdefault(row['task_id'], {})[row['host_id']] = row

+ 

+         if to_drop:

+             # drop stale entries

+             delete = DeleteProcessor(

+                 'scheduler_task_refusals',

+                 clauses=['id IN %(to_drop)s'],

+                 values=locals(),

+             )

+             delete.execute()

+ 

+         return refusals

+ 

+     def get_hosts(self):

+         # get hosts and bin them

+         hosts_by_bin = {}

+         hosts_by_id = {}

+         for host in self._get_hosts():

+             host['_bins'] = []

+             hosts_by_id[host['id']] = host

+             for chan in host['channels']:

+                 for arch in host['arches'].split() + ['noarch']:

+                     host_bin = "%s:%s" % (chan, arch)

+                     hosts_by_bin.setdefault(host_bin, []).append(host)

+                     host['_bins'].append(host_bin)

+ 

+         self.hosts_by_bin = hosts_by_bin

+         self.hosts = hosts_by_id

+ 

+     def _get_hosts(self):

+         """Query enabled hosts"""

+ 

+         fields = (

+             ('host.id', 'id'),

+             ('host.name', 'name'),

+             ("date_part('epoch', host.update_time)", 'update_ts'),

+             ('host.task_load', 'task_load'),

+             ('host.ready', 'ready'),

+             ('host_config.arches', 'arches'),

+             ('host_config.capacity', 'capacity'),

+         )

+         fields, aliases = zip(*fields)

+ 

+         query = QueryProcessor(

+             tables=['host'],

+             columns=fields,

+             aliases=aliases,

+             clauses=[

+                 'host_config.enabled IS TRUE',

+                 'host_config.active IS TRUE',

+             ],

+             joins=[

+                 'host_config ON host.id = host_config.host_id'

+             ]

+         )

+ 

+         hosts = query.execute()

+ 

+         # also get channel info

+         query = QueryProcessor(

+             tables=['host_channels'],

+             columns=['host_id', 'channel_id'],

+             clauses=['active IS TRUE', 'channels.enabled IS TRUE'],

+             joins=['channels ON host_channels.channel_id = channels.id'],

+         )

+         chan_idx = {}

+         for row in query.execute():

+             chan_idx.setdefault(row['host_id'], []).append(row['channel_id'])

+         for host in hosts:

+             host['channels'] = chan_idx.get(host['id'], [])

+ 

+         return hosts

+ 

+     def add_run(self, task, host):

+         log_both('Assigning task', task_id=task['task_id'], host_id=host['id'])

+ 

+         # mark any older runs inactive

+         update = UpdateProcessor(

+             'scheduler_task_runs',

+             data={'active': False},

+             clauses=['task_id=%(task_id)s', 'active = TRUE'],

+             values={'task_id': task['task_id']},

+         )

+         update.execute()

+ 

+         # add the new run

+         insert = InsertProcessor('scheduler_task_runs')

+         insert.set(task_id=task['task_id'], host_id=host['id'])

+         insert.execute()

+ 

+         # mark the task assigned

+         task = kojihub.Task(task['task_id'])

+         task.assign(host['id'])

+ 

+ 

+ class SchedulerExports:

+     getTaskRuns = staticmethod(get_task_runs)

+     getTaskRefusals = staticmethod(get_task_refusals)

+     getHostData = staticmethod(get_host_data)

+     getLogMessages = staticmethod(get_log_messages)

+ 

+     def doRun(self, force=False):

+         """Run the scheduler

+ 

+         This is a debug tool and should not normally be needed.

+         Scheduler runs are regularly triggered by builder checkins

+         """

+ 

+         force = kojihub.convert_value(force, cast=bool)

+         context.session.assertPerm('admin')

+         return TaskScheduler().run(force=force)

file modified
+48
@@ -983,10 +983,58 @@ 

  ) WITHOUT OIDS;

  CREATE INDEX rpm_checksum_rpm_id ON rpm_checksum(rpm_id);

  

+ 

+ -- scheduler tables

+ CREATE TABLE scheduler_task_runs (

+         id SERIAL NOT NULL PRIMARY KEY,

+         task_id INTEGER REFERENCES task (id) NOT NULL,

+         host_id INTEGER REFERENCES host (id) NOT NULL,

+         active BOOLEAN NOT NULL DEFAULT TRUE,

+         create_time TIMESTAMPTZ NOT NULL DEFAULT NOW()

+ ) WITHOUT OIDS;

+ CREATE INDEX scheduler_task_runs_task ON scheduler_task_runs(task_id);

+ CREATE INDEX scheduler_task_runs_host ON scheduler_task_runs(host_id);

+ CREATE INDEX scheduler_task_runs_create_time ON scheduler_task_runs(create_time);

+ 

+ 

+ CREATE TABLE scheduler_host_data (

+         host_id INTEGER REFERENCES host (id) PRIMARY KEY,

+         data JSONB

+ ) WITHOUT OIDS;

+ 

+ 

+ CREATE TABLE scheduler_sys_data (

+         name TEXT NOT NULL PRIMARY KEY,

+         data JSONB

+ ) WITHOUT OIDS;

+ 

+ 

+ CREATE TABLE scheduler_task_refusals (

+         id SERIAL NOT NULL PRIMARY KEY,

+         task_id INTEGER REFERENCES task (id) NOT NULL,

+         host_id INTEGER REFERENCES host (id) NOT NULL,

+         by_host BOOLEAN NOT NULL,

+         soft BOOLEAN NOT NULL DEFAULT FALSE,

+         msg TEXT,

+         time TIMESTAMPTZ NOT NULL DEFAULT NOW(),

+         UNIQUE (task_id, host_id)

+ ) WITHOUT OIDS;

+ 

+ 

+ CREATE TABLE scheduler_log_messages (

+         id SERIAL NOT NULL PRIMARY KEY,

+         task_id INTEGER REFERENCES task (id),

+         host_id INTEGER REFERENCES host (id),

+         msg_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),

+         msg TEXT NOT NULL

+ ) WITHOUT OIDS;

+ 

+ 

  -- this table is used for locking, see db_lock()

  CREATE TABLE locks (

          name TEXT NOT NULL PRIMARY KEY

  ) WITHOUT OIDS;

  INSERT INTO locks(name) VALUES('protonmsg-plugin');

+ INSERT INTO locks(name) VALUES('scheduler');

  

  COMMIT WORK;

@@ -136,6 +136,8 @@ 

          edit-notification         Edit user's notification

          list-notifications        List user's notifications and blocks

          remove-notification       Remove user's notifications

+         scheduler-info            Show information about scheduling

+         scheduler-logs            Query scheduler logs

          unblock-notification      Unblock user's notification

          wait-repo                 Wait for a repo to be regenerated

          watch-logs                Watch logs in realtime

@@ -1,64 +0,0 @@ 

- import kojihub

- import mock

- import unittest

- 

- QP = kojihub.QueryProcessor

- 

- 

- class TestGetReadyHosts(unittest.TestCase):

- 

-     def setUp(self):

-         self.maxDiff = None

-         self.QueryProcessor = mock.patch('kojihub.kojihub.QueryProcessor',

-                                          side_effect=self.getQuery).start()

-         self.queries = []

-         self.query_execute = mock.MagicMock()

- 

-     def tearDown(self):

-         mock.patch.stopall()

- 

-     def getQuery(self, *args, **kwargs):

-         query = QP(*args, **kwargs)

-         query.execute = self.query_execute

-         self.queries.append(query)

-         return query

- 

-     def test_valid(self):

-         hosts = [{'host.id': 1, 'name': 'hostname', 'arches': 'arch123', 'task_load': 0,

-                   'capacity': 3},

-                  {'host.id': 2, 'name': 'hostname-2', 'arches': 'arch123', 'task_load': 0,

-                   'capacity': 3}]

-         expected_res = [{'host.id': 1, 'name': 'hostname', 'arches': 'arch123', 'task_load': 0,

-                          'capacity': 3, 'channels': [1]},

-                         {'host.id': 2, 'name': 'hostname-2', 'arches': 'arch123', 'task_load': 0,

-                          'capacity': 3, 'channels': [2, 3]}

-                         ]

-         self.query_execute.side_effect = [hosts, [{'channel_id': 1}],

-                                           [{'channel_id': 2}, {'channel_id': 3}]]

-         result = kojihub.get_ready_hosts()

-         self.assertEqual(result, expected_res)

-         self.assertEqual(len(self.queries), 3)

- 

-         query = self.queries[0]

-         self.assertEqual(query.tables, ['host'])

-         self.assertEqual(query.joins, ['sessions USING (user_id)',

-                                        'host_config ON host.id = host_config.host_id'])

-         self.assertEqual(query.clauses, ['active IS TRUE', 'enabled IS TRUE', 'expired IS FALSE',

-                                          'master IS NULL', 'ready IS TRUE',

-                                          "sessions.update_time > NOW() - '5 minutes'::interval"])

-         self.assertEqual(query.values, {})

-         self.assertEqual(query.columns, ['arches', 'capacity', 'host.id', 'name', 'task_load'])

- 

-         query = self.queries[1]

-         self.assertEqual(query.tables, ['host_channels'])

-         self.assertEqual(query.joins, ['channels ON host_channels.channel_id = channels.id'])

-         self.assertEqual(query.clauses, ['active IS TRUE', 'enabled IS TRUE', 'host_id=%(id)s'])

-         self.assertEqual(query.values, hosts[0])

-         self.assertEqual(query.columns, ['channel_id'])

- 

-         query = self.queries[2]

-         self.assertEqual(query.tables, ['host_channels'])

-         self.assertEqual(query.joins, ['channels ON host_channels.channel_id = channels.id'])

-         self.assertEqual(query.clauses, ['active IS TRUE', 'enabled IS TRUE', 'host_id=%(id)s'])

-         self.assertEqual(query.values, hosts[1])

-         self.assertEqual(query.columns, ['channel_id'])

file modified
+23 -4
@@ -11,17 +11,17 @@ 

  

  

  def clean_sessions(cursor, vacuum, test, age, absolute):

-     clauses = f"(update_time < NOW() - '{age:d} days'::interval)"

+     clause = f"(update_time < NOW() - '{age:d} days'::interval)"

      if absolute is not None:

-         clauses += f"OR (start_time < NOW() - '{absolute:d} days'::interval)"

+         clause += f"OR (start_time < NOW() - '{absolute:d} days'::interval)"

  

      if options.verbose:

-         query = QueryProcessor(tables=['sessions'], clauses=[clauses], opts={'countOnly': True})

+         query = QueryProcessor(tables=['sessions'], clauses=[clause], opts={'countOnly': True})

          rows = query.execute()

          print(f"Deleting {rows} sessions")

  

      if not test:

-         delete = DeleteProcessor(table='sessions', clauses=[clauses])

+         delete = DeleteProcessor(table='sessions', clauses=[clause])

          delete.execute()

          if vacuum:

              cursor.execute("VACUUM ANALYZE sessions")
@@ -147,6 +147,21 @@ 

              cursor.execute("VACUUM ANALYZE buildroot")

  

  

+ def clean_scheduler_logs(cursor, vacuum, test, age):

+     clauses = [f"(msg_time < NOW() - '{age:d} days'::interval)"]

+     if options.verbose:

+         query = QueryProcessor(tables=["scheduler_log_messages"],

+                                clauses=clauses,

+                                opts={'countOnly': True})

+         rows = query.execute()

+         print(f"Deleting {rows} scheduler log messages")

+     if not test:

+         delete = DeleteProcessor(table="scheduler_log_messages", clauses=clauses)

+         delete.execute()

+         if vacuum:

+             cursor.execute("VACUUM ANALYZE scheduler_log_messages")

+ 

+ 

  if __name__ == "__main__":

      global options

      parser = OptionParser("%prog cleans koji database")
@@ -180,6 +195,9 @@ 

      parser.add_option('--scratch-builds-age', type=int, dest="scratch_age",

                        action="store", default=730, metavar="DAYS",

                        help="Delete scratch builds' tasks older than this (default: 2 years")

+     parser.add_option('--logs-age', type=int,

+                       action="store", default=7, metavar="DAYS",

+                       help="Delete scheduler log messages older than this (default: 7 days)")

      parser.add_option('--buildroots', action="store_true",

                        help="Delete unreferenced buildroots")

      parser.add_option('-f', '--force', action="store_true",
@@ -240,6 +258,7 @@ 

      clean_sessions(cursor, options.vacuum, options.test, options.sessions_age,

                     options.sessions_absolute_age)

      clean_reservations(cursor, options.vacuum, options.test, options.reservations_age)

+     clean_scheduler_logs(cursor, options.vacuum, options.test, options.logs_age)

      if options.tag_notifications:

          clean_notification_tasks(cursor, options.vacuum, options.test,

                                   age=options.tag_notifications_age)

file modified
+6 -1
@@ -1780,7 +1780,12 @@ 

      values['host'] = host

      values['channels'] = channels

      values['buildroots'] = buildroots

-     values['lastUpdate'] = server.getLastHostUpdate(host['id'], ts=True)

+     if 'update_ts' not in host:

+         # be nice with older hub

+         # TODO remove this compat workaround after a release

+         values['lastUpdate'] = server.getLastHostUpdate(host['id'], ts=True)

+     else:

+         values['lastUpdate'] = koji.formatTimeLong(host['update_ts'])

      if environ['koji.currentUser']:

          values['perms'] = server.getUserPerms(environ['koji.currentUser']['id'])

      else:

This branch represents current work on centralized scheduling logic in koji. Posted here for discussion

Metadata Update from @mikem:
- Pull-request tagged with: discussion

2 years ago

The code currently works for me, but there is more to be done

rebased onto e172763e1cdd1b52c8bab80873d7307fb28f97a0

2 years ago

28 new commits added

  • outer joins for log query
  • basic scheduler log cli
  • use host.update_ts in kojiweb/hostinfo
  • fix ambiguous column ref
  • fakehub --exclusive option
  • fakehub --user option
  • query for scheduler logs
  • call host.refuseTask when host check fails
  • ...
  • rework host.refuseTask
  • honor refusals
  • UpsertProcessor
  • use QueryView for get_task_runs
  • more QueryView
  • ...
  • basic QueryView working
  • QueryView fragment
  • initial task refusal functions
  • ...
  • handle first time case for last_run_ts
  • scheduler check_ts fragment
  • drop unused var
  • fix convert_value refs
  • avoid upsert in db_lock
  • pull getHostData() from pr3631
  • flake8
  • delete old scheduler log messages
  • Revert "move convert_value to util lib"
2 years ago

1 new commit added

  • drop unused task states
2 years ago

Maybe worth adding constraint to db?

I played wioth the idea that QueryView could construct PG VIEW and use it later. But there is probably zero gain in that and it just would obscure things.

add I/LIKE, IS, IS NOT? What about OR here - it is not possible to pass such clauses here. (IS NOT NULL). Pass clause as a callable - eventCondition with params or just the output?
Rest are probably corner-cases not worth implementing here. (e.g. ~* operator, nvr concatenation...)

I played wioth the idea that QueryView could construct PG VIEW

The naming of the class is something I've debated. It's kind of like a view, but I definitely don't want to actually construct a pg view.

It's really more of a query constructor. Granted I guess that term applies to QueryProcessor too.

There is certainly room to extend the types of conditions that the class can handle. I think a lot of that can be future work as long as the basic structure here looks ok. The most important thing is whether the overall model and api for it looks reasonable.

1 new commit added

  • fix import
2 years ago

rebased onto 2badf88883e677c8dcd666a54d3180e6d8ead3c1

2 years ago

rebased. no new commits, but skipped a few that were included in #3820 and #3819. conflicts were mainly with overlapping schema additions, the removal of get_ready_hosts, and minor variations in the commits from #3820 and #3819.

s/scheduler_task_run/scheduler_task_runs/

4 new commits added

  • use Task interface for correct state hooks
  • use UpsertProcessor
  • fix scheduler-info
  • fix schema
2 years ago

5 new commits added

  • fix unit tests
  • drop unused code
  • unify getTasks and getLoadData code
  • drop some test code
  • allow admins to trigger scheduler run with an api call
2 years ago

1 new commit added

  • unit test
2 years ago

rebased onto 761711e1607b5a10133a07eee97d8c26c2bd0d91

2 years ago

(trivial rebase onto current master branch)

I would try some testing now. There are two commits:
https://pagure.io/fork/tkopecek/koji/c/a18b69904e2af45680e271b6df4cb6c773bac443 - schema migration
https://pagure.io/fork/tkopecek/koji/c/ea0ba521aa4b66ad3b37da3d1b43f928fdd0eeb7 - default values instead of Nones - not really needed, just my linter throws less errors in do_schedule with potential len() on None. Other options are to add asserts aka assert(isinstance(self.hosts), dict) or just ignore it.

I would try some testing now. There are two commits:
https://pagure.io/fork/tkopecek/koji/c/a18b69904e2af45680e271b6df4cb6c773bac443 - schema migration
https://pagure.io/fork/tkopecek/koji/c/ea0ba521aa4b66ad3b37da3d1b43f928fdd0eeb7 - default values instead of Nones - not really needed, just my linter throws less errors in do_schedule with potential len() on None. Other options are to add asserts aka assert(isinstance(self.hosts), dict) or just ignore it.

Metadata Update from @tkopecek:
- Pull-request untagged with: discussion
- Pull-request tagged with: testing-ready

2 years ago

2 new commits added

  • default values
  • schema upgrade
2 years ago

Missing lock creation in schem migration.

Metadata Update from @tkopecek:
- Pull-request tagged with: scheduler

2 years ago

Metadata Update from @relias-redhat:
- Pull-request untagged with: testing-ready

2 years ago

6 new commits added

  • configurable scheduler parameters
  • flake8 fixes
  • wrong variable
  • missing lock in db
  • unify clauses handling
  • remove typo
2 years ago

Metadata Update from @tkopecek:
- Pull-request tagged with: testing-ready

2 years ago

Metadata Update from @relias-redhat:
- Pull-request tagged with: testing-done

2 years ago

rebased onto 5bca490

2 years ago

Commit b872c0d fixes this pull-request

Pull-Request has been merged by tkopecek

2 years ago