| |
@@ -716,7 +716,6 @@
|
| |
self.options = options
|
| |
self.session = session
|
| |
self.tasks = {}
|
| |
- self.skipped_tasks = {}
|
| |
self.pids = {}
|
| |
self.subsessions = {}
|
| |
self.handlers = {}
|
| |
@@ -1024,123 +1023,20 @@
|
| |
if not self.ready:
|
| |
self.logger.info("Not ready for task")
|
| |
return False
|
| |
- hosts, tasks = self.session.host.getLoadData()
|
| |
- self.logger.debug("Load Data:")
|
| |
- self.logger.debug(" hosts: %r" % hosts)
|
| |
- self.logger.debug(" tasks: %r" % tasks)
|
| |
- # now we organize this data into channel-arch bins
|
| |
- bin_hosts = {} # hosts indexed by bin
|
| |
- bins = {} # bins for this host
|
| |
- our_avail = None
|
| |
- for host in hosts:
|
| |
- host['bins'] = []
|
| |
- if host['id'] == self.host_id:
|
| |
- # note: task_load reported by server might differ from what we
|
| |
- # sent due to precision variation
|
| |
- our_avail = host['capacity'] - host['task_load']
|
| |
- for chan in host['channels']:
|
| |
- for arch in host['arches'].split() + ['noarch']:
|
| |
- bin = "%s:%s" % (chan, arch)
|
| |
- bin_hosts.setdefault(bin, []).append(host)
|
| |
- if host['id'] == self.host_id:
|
| |
- bins[bin] = 1
|
| |
- self.logger.debug("bins: %r" % bins)
|
| |
- if our_avail is None:
|
| |
- self.logger.info("Server did not report this host. Are we disabled?")
|
| |
+ task = self.session.host.getAvailableTask()
|
| |
+ if not task:
|
| |
return False
|
| |
- elif not bins:
|
| |
- self.logger.info("No bins for this host. Missing channel/arch config?")
|
| |
- # Note: we may still take an assigned task below
|
| |
- # sort available capacities for each of our bins
|
| |
- avail = {}
|
| |
- for bin in bins:
|
| |
- avail[bin] = [host['capacity'] - host['task_load'] for host in bin_hosts[bin]]
|
| |
- avail[bin].sort()
|
| |
- avail[bin].reverse()
|
| |
- self.cleanDelayTimes()
|
| |
- for task in tasks:
|
| |
- # note: tasks are in priority order
|
| |
- self.logger.debug("task: %r" % task)
|
| |
- if task['method'] not in self.handlers:
|
| |
- self.logger.warning("Skipping task %(id)i, no handler for method %(method)s", task)
|
| |
- continue
|
| |
- if task['id'] in self.tasks:
|
| |
- # we were running this task, but it apparently has been
|
| |
- # freed or reassigned. We can't do anything with it until
|
| |
- # updateTasks notices this and cleans up.
|
| |
- self.logger.debug("Task %(id)s freed or reassigned", task)
|
| |
- continue
|
| |
- if task['state'] == koji.TASK_STATES['ASSIGNED']:
|
| |
- self.logger.debug("task is assigned")
|
| |
- if self.host_id == task['host_id']:
|
| |
- # assigned to us, we can take it regardless
|
| |
- if self.takeTask(task):
|
| |
- return True
|
| |
- elif task['state'] == koji.TASK_STATES['FREE']:
|
| |
- bin = "%(channel_id)s:%(arch)s" % task
|
| |
- self.logger.debug("task is free, bin=%r" % bin)
|
| |
- if bin not in bins:
|
| |
- continue
|
| |
- # see where our available capacity is compared to other hosts for this bin
|
| |
- # (note: the hosts in this bin are exactly those that could
|
| |
- # accept this task)
|
| |
- bin_avail = avail.get(bin, [0])
|
| |
- if self.checkAvailDelay(task, bin_avail, our_avail):
|
| |
- # decline for now and give the upper half a chance
|
| |
- continue
|
| |
- # otherwise, we attempt to open the task
|
| |
- if self.takeTask(task):
|
| |
- return True
|
| |
- else:
|
| |
- # should not happen
|
| |
- raise Exception("Invalid task state reported by server")
|
| |
- return False
|
| |
-
|
| |
- def checkAvailDelay(self, task, bin_avail, our_avail):
|
| |
- """Check to see if we should still delay taking a task
|
| |
-
|
| |
- Returns True if we are still in the delay period and should skip the
|
| |
- task. Otherwise False (delay has expired).
|
| |
- """
|
| |
-
|
| |
- now = time.time()
|
| |
- ts = self.skipped_tasks.get(task['id'])
|
| |
- if not ts:
|
| |
- ts = self.skipped_tasks[task['id']] = now
|
| |
-
|
| |
- # determine our normalized bin rank
|
| |
- for pos, cap in enumerate(bin_avail):
|
| |
- if our_avail >= cap:
|
| |
- break
|
| |
- if len(bin_avail) > 1:
|
| |
- rank = float(pos) / (len(bin_avail) - 1)
|
| |
- else:
|
| |
- rank = 0.0
|
| |
- # so, 0.0 for highest available capacity, 1.0 for lowest
|
| |
-
|
| |
- delay = getattr(self.options, 'task_avail_delay', 180)
|
| |
- delay *= rank
|
| |
-
|
| |
- # return True if we should delay
|
| |
- if now - ts < delay:
|
| |
- self.logger.debug("skipping task %i, age=%s rank=%s"
|
| |
- % (task['id'], int(now - ts), rank))
|
| |
- return True
|
| |
- # otherwise
|
| |
- del self.skipped_tasks[task['id']]
|
| |
- return False
|
| |
-
|
| |
- def cleanDelayTimes(self):
|
| |
- """Remove old entries from skipped_tasks"""
|
| |
- now = time.time()
|
| |
- delay = getattr(self.options, 'task_avail_delay', 180)
|
| |
- cutoff = now - delay * 10
|
| |
- # After 10x the delay, we've had plenty of opportunity to take the
|
| |
- # task, so either it has already been taken or we can't take it.
|
| |
- for task_id in list(self.skipped_tasks):
|
| |
- ts = self.skipped_tasks[task_id]
|
| |
- if ts < cutoff:
|
| |
- del self.skipped_tasks[task_id]
|
| |
+ if task['method'] not in self.handlers:
|
| |
+ self.logger.warning("Skipping task %(id)i, no handler for method %(method)s", task)
|
| |
+ return False
|
| |
+ if task['id'] in self.tasks:
|
| |
+ # we were running this task, but it apparently has been
|
| |
+ # freed or reassigned. We can't do anything with it until
|
| |
+ # updateTasks notices this and cleans up.
|
| |
+ self.logger.debug("Task %(id)s freed or reassigned", task)
|
| |
+ return False
|
| |
+ self.takeTask(task)
|
| |
+ return True
|
| |
|
| |
def _waitTask(self, task_id, pid=None):
|
| |
"""Wait (nohang) on the task, return true if finished"""
|
| |
It is a WIP
- We have to move logic to hub
- potentially lower number of calls to hub
- allows us to design some task-based policy
- builder tracks usage of its own resources (mem/disk for now) so it can refuse a task which needs more resources.
What is missing is storing request_resources in db/task. I've started with moving the logic but now it looks that it can be solved without that. (even without uploading resource info to hub). Question is if we want to have such policies or what is the best way to continue now. (discussion starts now)