#3616 Scheduler API
Closed 2 years ago by tkopecek. Opened 2 years ago by tkopecek.
tkopecek/koji scheduler-api  into  master

file modified
+56
@@ -7742,3 +7742,59 @@ 

          print("Number of tasks: %d" % tasks.result)

          print("Number of builds: %d" % builds.result)

          print('')

+ 

+ 

+ def anon_handle_scheduler_info(goptions, session, args):

+     """[monitor] Show information about scheduling"""

+     usage = "usage: %prog schedulerinfo [options]"

+     parser = OptionParser(usage=get_usage_str(usage))

+     parser.add_option("-t", "--task", action="store", type=int, default=None,

+                       help="Limit data to given task id")

+     parser.add_option("--host", action="store", default=None,

+                       help="Limit data to given builder (name/id)")

+     parser.add_option("--state", action="store", type='str', default=None,

+                       choices=[x for x in koji.TASK_STATES.keys()],

+                       help="Limit data to task state")

+     (options, args) = parser.parse_args(args)

+     if len(args) > 0:

+         parser.error("This command takes no arguments")

+ 

+     ensure_connection(session, goptions)

+ 

+     host_id = None

+     if options.host:

+         try:

+             host_id = int(options.host)

+         except ValueError:

+             host_id = session.getHost(options.host, strict=True)['id']

+ 

+     if options.state:

+         state = koji.TASK_STATES[options.state]

+     else:

+         state = None

+ 

+     # get the data

+     runs = session.scheduler.getTaskRuns(taskID=options.task, hostID=host_id, state=state)

+     mask = '%(task_id)s\t%(host_id)s\t%(state)s\t%(create_time)s\t%(start_time)s\t%(end_time)s'

+     if not goptions.quiet:

+         header = mask % {

+             'task_id': 'Task',

+             'host_name': 'Host',

+             'state': 'State',

+             'create_time': 'Created',

+             'start_time': 'Started',

+             'end_time': 'Ended'

+         }

+         print(header)

+         print('-' * len(header))

+     for run in runs:

+         run['state'] = koji.TASK_STATES[runs['state']]

+         print(mask % run)

+ 

+     if host_id:

+         print('Host data for %s:' % options.host)

+         host_data = session.scheduler.getHostData(hostID=host_id)

+         if len(host_data) > 0:

+             print(host_data[0]['data'])

+         else:

+             print('-')

file modified
+20
@@ -955,4 +955,24 @@ 

  ) WITHOUT OIDS;

  

  

+ -- scheduler tables

+ CREATE TABLE scheduler_task_runs (

+         id SERIAL NOT NULL PRIMARY KEY,

+         task_id INTEGER REFERENCES task (id) NOT NULL,

+         host_id INTEGER REFERENCES host (id) NOT NULL,

+         state INTEGER NOT NULL,

+         create_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),

+         start_time TIMESTAMPTZ,

+         end_time TIMESTAMPTZ,

+ ) WITHOUT OIDS;

+ CREATE INDEX scheduler_task_runs_task ON scheduler_task_runs(task_id);

+ CREATE INDEX scheduler_task_runs_host ON scheduler_task_runs(host_id);

+ CREATE INDEX scheduler_task_runs_state ON scheduler_task_runs(state);

+ CREATE INDEX scheduler_task_runs_create_time ON scheduler_task_runs(create_time);

+ 

+ CREATE TABLE scheduler_host_data (

+         host_id INTEGER REFERENCES host (id) PRIMARY KEY,

+         data JSONB,

+ ) WITHOUT OIDS;

+ 

  COMMIT WORK;

file modified
+2
@@ -193,6 +193,8 @@ 

      'CANCELED',

      'ASSIGNED',

      'FAILED',

+     'SCHEDULED',

+     'REFUSED',

  ))

  

  BUILD_STATES = Enum((

file modified
+70 -74
@@ -95,6 +95,7 @@ 

  

  

  logger = logging.getLogger('koji.hub')

+ sched_logger = scheduler.DBLogger()

  

  

  NUMERIC_TYPES = (int, float)
@@ -315,10 +316,12 @@ 

          else:

              return None

  

-     def free(self):

+     def free(self, newstate=koji.TASK_STATES['FREE']):

          """Free a task"""

+         if newstate not in [koji.TASK_STATES['FREE'], koji.TASK_STATES['REFUSED']]:

+             raise koji.GenericError("Can't be called with other than FREE/REFUSED states")

          info = self.getInfo(request=True)

-         self.runCallbacks('preTaskStateChange', info, 'state', koji.TASK_STATES['FREE'])

+         self.runCallbacks('preTaskStateChange', info, 'state', newstate)

          self.runCallbacks('preTaskStateChange', info, 'host_id', None)

          # access checks should be performed by calling function

          query = QueryProcessor(tables=['task'], columns=['state'], clauses=['id = %(id)i'],
@@ -327,14 +330,13 @@ 

          if not oldstate:

              raise koji.GenericError("No such task: %i" % self.id)

          if koji.TASK_STATES[oldstate] in ['CLOSED', 'CANCELED', 'FAILED']:

-             raise koji.GenericError("Cannot free task %i, state is %s" %

+             raise koji.GenericError("Cannot free/refuse task %i, state is %s" %

                                      (self.id, koji.TASK_STATES[oldstate]))

-         newstate = koji.TASK_STATES['FREE']

          newhost = None

          update = UpdateProcessor('task', clauses=['id=%(task_id)s'], values={'task_id': self.id},

                                   data={'state': newstate, 'host_id': newhost})

          update.execute()

-         self.runCallbacks('postTaskStateChange', info, 'state', koji.TASK_STATES['FREE'])

+         self.runCallbacks('postTaskStateChange', info, 'state', newstate)

          self.runCallbacks('postTaskStateChange', info, 'host_id', None)

          return True

  
@@ -2537,44 +2539,6 @@ 

      update.execute()

  

  

- def get_ready_hosts():

-     """Return information about hosts that are ready to build.

- 

-     Hosts set the ready flag themselves

-     Note: We ignore hosts that are late checking in (even if a host

-         is busy with tasks, it should be checking in quite often).

-     """

-     query = QueryProcessor(

-         tables=['host'],

-         columns=['host.id', 'name', 'arches', 'task_load', 'capacity'],

-         aliases=['id', 'name', 'arches', 'task_load', 'capacity'],

-         clauses=[

-             'enabled IS TRUE',

-             'ready IS TRUE',

-             'expired IS FALSE',

-             'master IS NULL',

-             'active IS TRUE',

-             "update_time > NOW() - '5 minutes'::interval"

-         ],

-         joins=[

-             'sessions USING (user_id)',

-             'host_config ON host.id = host_config.host_id'

-         ]

-     )

-     hosts = query.execute()

-     for host in hosts:

-         query = QueryProcessor(

-             tables=['host_channels'],

-             columns=['channel_id'],

-             clauses=['host_id=%(id)s', 'active IS TRUE', 'enabled IS TRUE'],

-             joins=['channels ON host_channels.channel_id = channels.id'],

-             values=host

-         )

-         rows = query.execute()

-         host['channels'] = [row['channel_id'] for row in rows]

-     return hosts

- 

- 

  def get_all_arches():

      """Return a list of all (canonical) arches available from hosts"""

      ret = {}
@@ -2590,27 +2554,6 @@ 

      return list(ret.keys())

  

  

- def get_active_tasks(host=None):

-     """Return data on tasks that are yet to be run"""

-     fields = ['id', 'state', 'channel_id', 'host_id', 'arch', 'method', 'priority', 'create_time']

-     values = dslice(koji.TASK_STATES, ('FREE', 'ASSIGNED'))

-     if host:

-         values['arches'] = host['arches'].split() + ['noarch']

-         values['channels'] = host['channels']

-         values['host_id'] = host['id']

-         clause = '(state = %(ASSIGNED)i AND host_id = %(host_id)i)'

-         if values['channels']:

-             clause += ''' OR (state = %(FREE)i AND arch IN %(arches)s \

- AND channel_id IN %(channels)s)'''

-         clauses = [clause]

-     else:

-         clauses = ['state IN (%(FREE)i,%(ASSIGNED)i)']

-     queryOpts = {'limit': 100, 'order': 'priority,create_time'}

-     query = QueryProcessor(columns=fields, tables=['task'], clauses=clauses,

-                            values=values, opts=queryOpts)

-     return query.execute()

- 

- 

  def get_task_descendents(task, childMap=None, request=False):

      if childMap is None:

          childMap = {}
@@ -14095,16 +14038,9 @@ 

  

          This data is relatively small and the necessary load analysis is

          relatively complex, so we let the host machines crunch it."""

-         hosts = get_ready_hosts()

-         for host in hosts:

-             if host['id'] == self.id:

-                 break

-         else:

-             # this host not in ready list

-             return [[], []]

-         # host is the host making the call

-         tasks = get_active_tasks(host)

-         return [hosts, tasks]

+         host = get_host(self.id)

+         tasks = scheduler.getTaskRuns(hostID=self.id)

+         return [[host], tasks]

  

      def getTask(self):

          """Open next available task and return it"""
@@ -14222,6 +14158,66 @@ 

          task.assertHost(host.id)

          return task.setWeight(weight)

  

+     def setHostData(self, hostdata):

+         """Builder will update all its resources

+ 

+         Initial implementation contains:

+           - available task methods

+           - maxjobs

+           - host readiness

+         """

+         host = Host()

+         host.verify()

+         clauses = ['host_id = %(host_id)i']

+         values = {'host_id': host.id}

+         table = 'scheduler_host_data'

+         query = QueryProcessor(tables=[table], clauses=clauses, values=values,

+                                opts={'countOnly': True})

+         if query.singleValue() > 0:

+             update = UpdateProcessor(table=table, data={'data': hostdata},

+                                      clauses=clauses, values=values)

+             update.execute()

+         else:

+             insert = InsertProcessor(table=table, data={'data': hostdata},

+                                      clauses=clauses, values=values)

+             insert.execute()

+         sched_logger.debug(f"Updating host data with: {hostdata}",

+                            host_id=host.id, location='setHostData')

+ 

+     def getTasks(self):

+         host = Host()

+         host.verify()

+ 

+         query = QueryProcessor(

+             tables=['scheduler_task_runs'],

+             clauses=[

+                 'host_id = %(host_id)s',

+                 'state in %(states)s'

+             ],

+             values={

+                 'host_id': host.id,

+                 'states': [

+                     koji.TASK_STATES['SCHEDULED'],

+                     koji.TASK_STATES['ASSIGNED'],

+                 ],

+             }

+         )

+         tasks = query.execute()

+         for task in tasks:

+             sched_logger.debug("Sending task", host_id=host.id, task_id=task['id'],

+                                location="getTasks")

+         return tasks

+ 

+     def refuseTask(self, task_id):

+         host = Host()

+         host.verify()

+ 

+         task = Task(task_id)

+         task.free(newstate=koji.TASK_STATES['REFUSED'])

+         sched_logger.warning("Refusing task", host_id=host.id, task_id=task_id,

+                              location="refuseTask")

+         return True

+ 

      def getHostTasks(self):

          host = Host()

          host.verify()

file modified
+3
@@ -708,6 +708,7 @@ 

      log_handler.setFormatter(HubFormatter(opts['LogFormat']))

  

  

+     import scheduler

  def get_memory_usage():

      pagesize = resource.getpagesize()

      statm = [pagesize * int(y) // 1024
@@ -845,8 +846,10 @@ 

      registry = HandlerRegistry()

      functions = kojihub.RootExports()

      hostFunctions = kojihub.HostExports()

+     schedulerFunctions = scheduler.SchedulerExports()

      registry.register_instance(functions)

      registry.register_module(hostFunctions, "host")

+     registry.register_module(schedulerFunctions, "scheduler")

      registry.register_function(koji.auth.login)

      registry.register_function(koji.auth.sslLogin)

      registry.register_function(koji.auth.logout)

Basic scheduler API stub (non-working). Related to #3603 #3588

Metadata Update from @tkopecek:
- Pull-request tagged with: scheduler

2 years ago

1 new commit added

  • replace old calls
2 years ago

rebased onto 05d6ced

2 years ago

Pull-Request has been closed by tkopecek

2 years ago