| |
@@ -95,6 +95,7 @@
|
| |
|
| |
|
| |
logger = logging.getLogger('koji.hub')
|
| |
+ sched_logger = scheduler.DBLogger()
|
| |
|
| |
|
| |
NUMERIC_TYPES = (int, float)
|
| |
@@ -315,10 +316,12 @@
|
| |
else:
|
| |
return None
|
| |
|
| |
- def free(self):
|
| |
+ def free(self, newstate=koji.TASK_STATES['FREE']):
|
| |
"""Free a task"""
|
| |
+ if newstate not in [koji.TASK_STATES['FREE'], koji.TASK_STATES['REFUSED']]:
|
| |
+ raise koji.GenericError("Can't be called with other than FREE/REFUSED states")
|
| |
info = self.getInfo(request=True)
|
| |
- self.runCallbacks('preTaskStateChange', info, 'state', koji.TASK_STATES['FREE'])
|
| |
+ self.runCallbacks('preTaskStateChange', info, 'state', newstate)
|
| |
self.runCallbacks('preTaskStateChange', info, 'host_id', None)
|
| |
# access checks should be performed by calling function
|
| |
query = QueryProcessor(tables=['task'], columns=['state'], clauses=['id = %(id)i'],
|
| |
@@ -327,14 +330,13 @@
|
| |
if not oldstate:
|
| |
raise koji.GenericError("No such task: %i" % self.id)
|
| |
if koji.TASK_STATES[oldstate] in ['CLOSED', 'CANCELED', 'FAILED']:
|
| |
- raise koji.GenericError("Cannot free task %i, state is %s" %
|
| |
+ raise koji.GenericError("Cannot free/refuse task %i, state is %s" %
|
| |
(self.id, koji.TASK_STATES[oldstate]))
|
| |
- newstate = koji.TASK_STATES['FREE']
|
| |
newhost = None
|
| |
update = UpdateProcessor('task', clauses=['id=%(task_id)s'], values={'task_id': self.id},
|
| |
data={'state': newstate, 'host_id': newhost})
|
| |
update.execute()
|
| |
- self.runCallbacks('postTaskStateChange', info, 'state', koji.TASK_STATES['FREE'])
|
| |
+ self.runCallbacks('postTaskStateChange', info, 'state', newstate)
|
| |
self.runCallbacks('postTaskStateChange', info, 'host_id', None)
|
| |
return True
|
| |
|
| |
@@ -2537,44 +2539,6 @@
|
| |
update.execute()
|
| |
|
| |
|
| |
- def get_ready_hosts():
|
| |
- """Return information about hosts that are ready to build.
|
| |
-
|
| |
- Hosts set the ready flag themselves
|
| |
- Note: We ignore hosts that are late checking in (even if a host
|
| |
- is busy with tasks, it should be checking in quite often).
|
| |
- """
|
| |
- query = QueryProcessor(
|
| |
- tables=['host'],
|
| |
- columns=['host.id', 'name', 'arches', 'task_load', 'capacity'],
|
| |
- aliases=['id', 'name', 'arches', 'task_load', 'capacity'],
|
| |
- clauses=[
|
| |
- 'enabled IS TRUE',
|
| |
- 'ready IS TRUE',
|
| |
- 'expired IS FALSE',
|
| |
- 'master IS NULL',
|
| |
- 'active IS TRUE',
|
| |
- "update_time > NOW() - '5 minutes'::interval"
|
| |
- ],
|
| |
- joins=[
|
| |
- 'sessions USING (user_id)',
|
| |
- 'host_config ON host.id = host_config.host_id'
|
| |
- ]
|
| |
- )
|
| |
- hosts = query.execute()
|
| |
- for host in hosts:
|
| |
- query = QueryProcessor(
|
| |
- tables=['host_channels'],
|
| |
- columns=['channel_id'],
|
| |
- clauses=['host_id=%(id)s', 'active IS TRUE', 'enabled IS TRUE'],
|
| |
- joins=['channels ON host_channels.channel_id = channels.id'],
|
| |
- values=host
|
| |
- )
|
| |
- rows = query.execute()
|
| |
- host['channels'] = [row['channel_id'] for row in rows]
|
| |
- return hosts
|
| |
-
|
| |
-
|
| |
def get_all_arches():
|
| |
"""Return a list of all (canonical) arches available from hosts"""
|
| |
ret = {}
|
| |
@@ -2590,27 +2554,6 @@
|
| |
return list(ret.keys())
|
| |
|
| |
|
| |
- def get_active_tasks(host=None):
|
| |
- """Return data on tasks that are yet to be run"""
|
| |
- fields = ['id', 'state', 'channel_id', 'host_id', 'arch', 'method', 'priority', 'create_time']
|
| |
- values = dslice(koji.TASK_STATES, ('FREE', 'ASSIGNED'))
|
| |
- if host:
|
| |
- values['arches'] = host['arches'].split() + ['noarch']
|
| |
- values['channels'] = host['channels']
|
| |
- values['host_id'] = host['id']
|
| |
- clause = '(state = %(ASSIGNED)i AND host_id = %(host_id)i)'
|
| |
- if values['channels']:
|
| |
- clause += ''' OR (state = %(FREE)i AND arch IN %(arches)s \
|
| |
- AND channel_id IN %(channels)s)'''
|
| |
- clauses = [clause]
|
| |
- else:
|
| |
- clauses = ['state IN (%(FREE)i,%(ASSIGNED)i)']
|
| |
- queryOpts = {'limit': 100, 'order': 'priority,create_time'}
|
| |
- query = QueryProcessor(columns=fields, tables=['task'], clauses=clauses,
|
| |
- values=values, opts=queryOpts)
|
| |
- return query.execute()
|
| |
-
|
| |
-
|
| |
def get_task_descendents(task, childMap=None, request=False):
|
| |
if childMap is None:
|
| |
childMap = {}
|
| |
@@ -14095,16 +14038,9 @@
|
| |
|
| |
This data is relatively small and the necessary load analysis is
|
| |
relatively complex, so we let the host machines crunch it."""
|
| |
- hosts = get_ready_hosts()
|
| |
- for host in hosts:
|
| |
- if host['id'] == self.id:
|
| |
- break
|
| |
- else:
|
| |
- # this host not in ready list
|
| |
- return [[], []]
|
| |
- # host is the host making the call
|
| |
- tasks = get_active_tasks(host)
|
| |
- return [hosts, tasks]
|
| |
+ host = get_host(self.id)
|
| |
+ tasks = scheduler.getTaskRuns(hostID=self.id)
|
| |
+ return [[host], tasks]
|
| |
|
| |
def getTask(self):
|
| |
"""Open next available task and return it"""
|
| |
@@ -14222,6 +14158,66 @@
|
| |
task.assertHost(host.id)
|
| |
return task.setWeight(weight)
|
| |
|
| |
+ def setHostData(self, hostdata):
|
| |
+ """Builder will update all its resources
|
| |
+
|
| |
+ Initial implementation contains:
|
| |
+ - available task methods
|
| |
+ - maxjobs
|
| |
+ - host readiness
|
| |
+ """
|
| |
+ host = Host()
|
| |
+ host.verify()
|
| |
+ clauses = ['host_id = %(host_id)i']
|
| |
+ values = {'host_id': host.id}
|
| |
+ table = 'scheduler_host_data'
|
| |
+ query = QueryProcessor(tables=[table], clauses=clauses, values=values,
|
| |
+ opts={'countOnly': True})
|
| |
+ if query.singleValue() > 0:
|
| |
+ update = UpdateProcessor(table=table, data={'data': hostdata},
|
| |
+ clauses=clauses, values=values)
|
| |
+ update.execute()
|
| |
+ else:
|
| |
+ insert = InsertProcessor(table=table, data={'data': hostdata},
|
| |
+ clauses=clauses, values=values)
|
| |
+ insert.execute()
|
| |
+ sched_logger.debug(f"Updating host data with: {hostdata}",
|
| |
+ host_id=host.id, location='setHostData')
|
| |
+
|
| |
+ def getTasks(self):
|
| |
+ host = Host()
|
| |
+ host.verify()
|
| |
+
|
| |
+ query = QueryProcessor(
|
| |
+ tables=['scheduler_task_runs'],
|
| |
+ clauses=[
|
| |
+ 'host_id = %(host_id)s',
|
| |
+ 'state in %(states)s'
|
| |
+ ],
|
| |
+ values={
|
| |
+ 'host_id': host.id,
|
| |
+ 'states': [
|
| |
+ koji.TASK_STATES['SCHEDULED'],
|
| |
+ koji.TASK_STATES['ASSIGNED'],
|
| |
+ ],
|
| |
+ }
|
| |
+ )
|
| |
+ tasks = query.execute()
|
| |
+ for task in tasks:
|
| |
+ sched_logger.debug("Sending task", host_id=host.id, task_id=task['id'],
|
| |
+ location="getTasks")
|
| |
+ return tasks
|
| |
+
|
| |
+ def refuseTask(self, task_id):
|
| |
+ host = Host()
|
| |
+ host.verify()
|
| |
+
|
| |
+ task = Task(task_id)
|
| |
+ task.free(newstate=koji.TASK_STATES['REFUSED'])
|
| |
+ sched_logger.warning("Refusing task", host_id=host.id, task_id=task_id,
|
| |
+ location="refuseTask")
|
| |
+ return True
|
| |
+
|
| |
def getHostTasks(self):
|
| |
host = Host()
|
| |
host.verify()
|
| |
Basic scheduler API stub (non-working). Related to #3603 #3588