#2516 Moving scheduler logic to hub
Closed 2 years ago by tkopecek. Opened 4 years ago by tkopecek.
tkopecek/koji scheduler  into  master

file modified
+74 -13
@@ -14031,21 +14031,82 @@ 

              update.execute()

              context.commit_pending = True

  

-     def getLoadData(self):

-         """Get load balancing data

+     def getAvailableTask(self):

+         """Get one available (free/assigned) task for given builder"""

+         logger = logging.getLogger('koji.scheduler')

  

-         This data is relatively small and the necessary load analysis is

-         relatively complex, so we let the host machines crunch it."""

+         bin_hosts = {}  # hosts indexed by bin

+         bins = {}  # bins for this host

+         our_avail = None

          hosts = get_ready_hosts()

          for host in hosts:

+             host['bins'] = []

              if host['id'] == self.id:

-                 break

-         else:

-             # this host not in ready list

-             return [[], []]

-         # host is the host making the call

-         tasks = get_active_tasks(host)

-         return [hosts, tasks]

+                 our_avail = host['capacity'] - host['task_load']

+             for chan in host['channels']:

+                 for arch in host['arches'].split() + ['noarch']:

+                     bin = "%s:%s" % (chan, arch)

+                     bin_hosts.setdefault(bin, []).append(host)

+                     if host['id'] == self.id:

+                         bins[bin] = 1

+         if our_avail is None:

+             logger.info(f"Server did not report this host. Are we disabled? {self.id}")

+             return None

+         elif not bins:

+             logger.info("No bins for this host. Missing channel/arch config? {self.id}")

+             # Note: we may still take an assigned task below

+ 

+         # sort available capacities for each of our bins

+         avail = {}

+         for bin in bins:

+             avail[bin] = [host['capacity'] - host['task_load'] for host in bin_hosts[bin]]

+             avail[bin].sort(reverse=True)

+ 

+         for task in get_active_tasks():

+             bin = "%(channel_id)s:%(arch)s" % task

+             # note: tasks are already in priority order

+             logger.debug(f"task: {task}, host: {self.id}")

+             if task['state'] == koji.TASK_STATES['ASSIGNED']:

+                 logger.debug(f"task {task} is assigned")

+                 if self.id == task['host_id']:

+                     logger.debug(f"task {task} is assigned to us: {self.id}")

+                     return task

+             elif task['state'] == koji.TASK_STATES['FREE']:

+                 logger.debug(f"task {task} is free, bin={bin}, host={self.id}")

+                 if bin not in bins:

+                     continue

+                 # see where our available capacity is compared to other hosts for this bin

+                 # (note: the hosts in this bin are exactly those that could

+                 # accept this task)

+                 bin_avail = avail.get(bin, [0])

+ 

+                 # Check to see if we should still delay taking a task

+                 logger.debug(f"checkAvailDelay: host: {self.id}, task: {task['id']}, "

+                              f"bin_avail: {bin_avail}, our_avail: {our_avail}")

+ 

+                 # determine our normalized bin rank

+                 for pos, cap in enumerate(bin_avail):

+                     if our_avail >= cap:

+                         break

+                 if len(bin_avail) > 1:

+                     rank = float(pos) / (len(bin_avail) - 1)

+                 else:

+                     rank = 0.0

+                 # so, 0.0 for highest available capacity, 1.0 for lowest

+ 

+                 delay = rank * context.opts['TaskAvailDelay']

+                 now = time.time()

+                 if task['create_time'].timestamp() > (now + delay):

+                     logger.debug(

+                         f"skipping task {task['id']}, "

+                         f"age={now - task['create_time'].timestamp()}, "

+                         f"rank={rank}")

+                     continue

+                 return task

+             else:

+                 # should not happen

+                 logger.error("Invalid task state (task: %d, state: %d)", task['id'], task['state'])

+         return None

  

      def getTask(self):

          """Open next available task and return it"""
@@ -14112,10 +14173,10 @@ 

          host.verify()

          host.updateHost(task_load, ready)

  

-     def getLoadData(self):

+     def getAvailableTask(self):

          host = Host()

          host.verify()

-         return host.getLoadData()

+         return host.getAvailableTask()

  

      def getHost(self):

          """Return information about this host"""

file modified
+2
@@ -475,6 +475,8 @@ 

           '%(asctime)s [%(levelname)s] m=%(method)s u=%(user_name)s p=%(process)s r=%(remoteaddr)s '

           '%(name)s: %(message)s'],

  

+         ['TaskAvailDelay', 'int', 180],

+ 

          ['MissingPolicyOk', 'boolean', True],

          ['EnableMaven', 'boolean', False],

          ['EnableWin', 'boolean', False],

file modified
+13 -117
@@ -716,7 +716,6 @@ 

          self.options = options

          self.session = session

          self.tasks = {}

-         self.skipped_tasks = {}

          self.pids = {}

          self.subsessions = {}

          self.handlers = {}
@@ -1024,123 +1023,20 @@ 

          if not self.ready:

              self.logger.info("Not ready for task")

              return False

-         hosts, tasks = self.session.host.getLoadData()

-         self.logger.debug("Load Data:")

-         self.logger.debug("  hosts: %r" % hosts)

-         self.logger.debug("  tasks: %r" % tasks)

-         # now we organize this data into channel-arch bins

-         bin_hosts = {}  # hosts indexed by bin

-         bins = {}  # bins for this host

-         our_avail = None

-         for host in hosts:

-             host['bins'] = []

-             if host['id'] == self.host_id:

-                 # note: task_load reported by server might differ from what we

-                 # sent due to precision variation

-                 our_avail = host['capacity'] - host['task_load']

-             for chan in host['channels']:

-                 for arch in host['arches'].split() + ['noarch']:

-                     bin = "%s:%s" % (chan, arch)

-                     bin_hosts.setdefault(bin, []).append(host)

-                     if host['id'] == self.host_id:

-                         bins[bin] = 1

-         self.logger.debug("bins: %r" % bins)

-         if our_avail is None:

-             self.logger.info("Server did not report this host. Are we disabled?")

+         task = self.session.host.getAvailableTask()

+         if not task:

              return False

-         elif not bins:

-             self.logger.info("No bins for this host. Missing channel/arch config?")

-             # Note: we may still take an assigned task below

-         # sort available capacities for each of our bins

-         avail = {}

-         for bin in bins:

-             avail[bin] = [host['capacity'] - host['task_load'] for host in bin_hosts[bin]]

-             avail[bin].sort()

-             avail[bin].reverse()

-         self.cleanDelayTimes()

-         for task in tasks:

-             # note: tasks are in priority order

-             self.logger.debug("task: %r" % task)

-             if task['method'] not in self.handlers:

-                 self.logger.warning("Skipping task %(id)i, no handler for method %(method)s", task)

-                 continue

-             if task['id'] in self.tasks:

-                 # we were running this task, but it apparently has been

-                 # freed or reassigned. We can't do anything with it until

-                 # updateTasks notices this and cleans up.

-                 self.logger.debug("Task %(id)s freed or reassigned", task)

-                 continue

-             if task['state'] == koji.TASK_STATES['ASSIGNED']:

-                 self.logger.debug("task is assigned")

-                 if self.host_id == task['host_id']:

-                     # assigned to us, we can take it regardless

-                     if self.takeTask(task):

-                         return True

-             elif task['state'] == koji.TASK_STATES['FREE']:

-                 bin = "%(channel_id)s:%(arch)s" % task

-                 self.logger.debug("task is free, bin=%r" % bin)

-                 if bin not in bins:

-                     continue

-                 # see where our available capacity is compared to other hosts for this bin

-                 # (note: the hosts in this bin are exactly those that could

-                 # accept this task)

-                 bin_avail = avail.get(bin, [0])

-                 if self.checkAvailDelay(task, bin_avail, our_avail):

-                     # decline for now and give the upper half a chance

-                     continue

-                 # otherwise, we attempt to open the task

-                 if self.takeTask(task):

-                     return True

-             else:

-                 # should not happen

-                 raise Exception("Invalid task state reported by server")

-         return False

- 

-     def checkAvailDelay(self, task, bin_avail, our_avail):

-         """Check to see if we should still delay taking a task

- 

-         Returns True if we are still in the delay period and should skip the

-         task. Otherwise False (delay has expired).

-         """

- 

-         now = time.time()

-         ts = self.skipped_tasks.get(task['id'])

-         if not ts:

-             ts = self.skipped_tasks[task['id']] = now

- 

-         # determine our normalized bin rank

-         for pos, cap in enumerate(bin_avail):

-             if our_avail >= cap:

-                 break

-         if len(bin_avail) > 1:

-             rank = float(pos) / (len(bin_avail) - 1)

-         else:

-             rank = 0.0

-         # so, 0.0 for highest available capacity, 1.0 for lowest

- 

-         delay = getattr(self.options, 'task_avail_delay', 180)

-         delay *= rank

- 

-         # return True if we should delay

-         if now - ts < delay:

-             self.logger.debug("skipping task %i, age=%s rank=%s"

-                               % (task['id'], int(now - ts), rank))

-             return True

-         # otherwise

-         del self.skipped_tasks[task['id']]

-         return False

- 

-     def cleanDelayTimes(self):

-         """Remove old entries from skipped_tasks"""

-         now = time.time()

-         delay = getattr(self.options, 'task_avail_delay', 180)

-         cutoff = now - delay * 10

-         # After 10x the delay, we've had plenty of opportunity to take the

-         # task, so either it has already been taken or we can't take it.

-         for task_id in list(self.skipped_tasks):

-             ts = self.skipped_tasks[task_id]

-             if ts < cutoff:

-                 del self.skipped_tasks[task_id]

+         if task['method'] not in self.handlers:

+             self.logger.warning("Skipping task %(id)i, no handler for method %(method)s", task)

+             return False

+         if task['id'] in self.tasks:

+             # we were running this task, but it apparently has been

+             # freed or reassigned. We can't do anything with it until

+             # updateTasks notices this and cleans up.

+             self.logger.debug("Task %(id)s freed or reassigned", task)

+             return False

+         self.takeTask(task)

+         return True

  

      def _waitTask(self, task_id, pid=None):

          """Wait (nohang) on the task, return true if finished"""

It is a WIP
- We have to move logic to hub
- potentially lower number of calls to hub
- allows us to design some task-based policy
- builder tracks usage of its own resources (mem/disk for now) so it can refuse a task which needs more resources.

What is missing is storing request_resources in db/task. I've started with moving the logic but now it looks that it can be solved without that. (even without uploading resource info to hub). Question is if we want to have such policies or what is the best way to continue now. (discussion starts now)

rebased onto 9ed4368269c6343752b3d7b0f2c39e75fae2b834

2 years ago

@mikem cleaned:
- dropped resources management
- dropped skipped_tasks table
- delaying task is not 1:1 with the old code now, but it is based on task['create_time'] instead with a bit different behaviour, but seems sufficient to me for now.

5 new commits added

  • remove skipped_tasks table and resources management
  • subtract used/reserved resources from free_resources
  • reservations
  • basic task policy
  • move task selection logic to hub
2 years ago

rebased onto 68495711685c9ab22a7aa0514abedcd444fbc39e

2 years ago

rebased onto dde7f05

2 years ago

Pull-Request has been closed by tkopecek

2 years ago