#3603 Logging for scheduler
Closed 2 years ago by tkopecek. Opened 2 years ago by tkopecek.
tkopecek/koji scheduler2  into  master

file modified
+59
@@ -7737,3 +7737,62 @@ 

          print("Number of tasks: %d" % tasks.result)

          print("Number of builds: %d" % builds.result)

          print('')

+ 

+ 

+ def handle_scheduler_logs(goptions, session, args):

+     "[monitor] Query scheduler logs"

+     usage = "usage: %prog scheduler-logs <options>"

+     parser = OptionParser(usage=get_usage_str(usage))

+     parser.add_option("--task", type="int", action="store",

+                       help="Filter by task ID")

+     parser.add_option("--host", type="str", action="store",

+                       help="Filter by host (name/ID)")

+     parser.add_option("--level", type="str", action="store",

+                       choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],

+                       help="Filter by message level")

+     parser.add_option("--from", type="float", action="store", dest="from_ts",

+                       help="Logs from given timestamp")

+     parser.add_option("--to", type="float", action="store", dest="to_ts",

+                       help="Logs until given timestamp (included)")

+     parser.add_option("--logger", type="str", action="store",

+                       help="Filter by logger name")

+     (options, args) = parser.parse_args(args)

+     if len(args) != 0:

+         parser.error("There are no arguments for this command")

+ 

+     kwargs = {}

+     if options.task:

+         kwargs['taskID'] = options.task

+     if options.host:

+         try:

+             kwargs['hostID'] = int(options.host)

+         except ValueError:

+             kwargs['hostID'] = session.getHost(options.host)['id']

+     if options.level:

+         kwargs['level'] = options.level

+     if options.from_ts:

+         kwargs['from_ts'] = options.from_ts

+     if options.to_ts:

+         kwargs['to_ts'] = options.to_ts

+     if options.logger:

+         kwargs['logger_name'] = options.logger

+ 

+     logs = session.scheduler.getLogs(**kwargs)

+ 

+     mask = ("%(task_id)s\t%(host_name)s\t%(msg_time)s\t%(logger_name)s"

+             "\t%(level)s\t%(location)s\t%(msg)s")

+     if not goptions.quiet:

+         h = mask % {

+             'task_id': 'Task',

+             'host_name': 'Host',

+             'msg_time': 'Time',

+             'logger_name': 'Logger',

+             'level': 'Level',

+             'location': 'Location',

+             'msg': 'Message',

+         }

+         print(h)

+         print('-' * len(h))

+ 

+     for log in logs:

+         print(mask % log)

file modified
+17
@@ -955,4 +955,21 @@ 

  ) WITHOUT OIDS;

  

  

+ -- Scheduler tables

+ CREATE TYPE logger_level AS ENUM ('NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL');

+ CREATE TABLE scheduler_log_messages (

+         id SERIAL NOT NULL PRIMARY KEY,

+         task_id INTEGER REFERENCES task (id),

+         host_id INTEGER REFERENCES host (id),

+         msg_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),

+         logger_name VARCHAR(200) NOT NULL,

+         level logger_level NOT NULL,

+         location VARCHAR(200),

+         msg TEXT NOT NULL

+ ) WITHOUT OIDS;

+ CREATE INDEX scheduler_log_messages_task_id ON scheduler_log_messages(task_id);

+ CREATE INDEX scheduler_log_messages_host_id ON scheduler_log_messages(host_id);

+ CREATE INDEX scheduler_log_messages_msg_time ON scheduler_log_messages(msg_time);

+ CREATE INDEX scheduler_log_messages_level ON scheduler_log_messages(level);

+ 

  COMMIT WORK;

file modified
+4
@@ -42,6 +42,8 @@ 

  from koji.server import ServerError, BadRequest, RequestTimeout

  from koji.xmlrpcplus import ExtendedMarshaller, Fault, dumps, getparser

  

+ from . import scheduler

+ 

  

  class Marshaller(ExtendedMarshaller):

  
@@ -845,8 +847,10 @@ 

      registry = HandlerRegistry()

      functions = kojihub.RootExports()

      hostFunctions = kojihub.HostExports()

+     schedulerFunctions = scheduler.SchedulerExports()

      registry.register_instance(functions)

      registry.register_module(hostFunctions, "host")

+     registry.register_module(schedulerFunctions, "scheduler")

      registry.register_function(koji.auth.login)

      registry.register_function(koji.auth.sslLogin)

      registry.register_function(koji.auth.logout)

file added
+96
@@ -0,0 +1,96 @@ 

+ import functools

+ import logging

+ 

+ from koji.db import InsertProcessor, QueryProcessor

+ 

+ 

+ class SchedulerExports():

+     def getLogs(self, taskID=None, hostID=None, level=None,

+                 from_ts=None, to_ts=None, logger_name=None):

+         """Return all related log messages

+ 

+         :param int taskID: filter by task

+         :param int hostID: filter by host

+         :param str level: filter by message level

+         :param float from_ts: filter from earliest time

+         :param float to_ts: filter to latest time (from_ts < ts <= to_ts)

+         :param str logger_name: filter by logger name

+         :return [dict]: list of messages

+         """

+         fields = (

+             ('scheduler_log_messages.id', 'id'),

+             ('task_id', 'task_id'),

+             ('host_id', 'host_id'),

+             ('msg_time', 'msg_time'),

+             ('logger_name', 'logger_name'),

+             ('level', 'level'),

+             ('location', 'location'),

+             ('msg', 'msg'),

+             ('hosts.name', 'host_name'),

+         )

+         clauses = []

+         values = {}

+         if taskID is not None:

+             clauses.append("taskID = %(taskID)")

+             values['taskID'] = taskID

+         if hostID is not None:

+             clauses.append("hostID = %(hostID)")

+             values['hostID'] = hostID

+         if level is not None:

+             clauses.append("level = %(level)s")

+             values['level'] = level.upper()

+         if from_ts is not None:

+             clauses.append("msg_time > %(from_ts)s")

+             values['from_ts'] = float(from_ts)

+         if to_ts is not None:

+             clauses.append("msg_time <= %(to_ts)s")

+             values['to_ts'] = float(to_ts)

+         if logger_name is not None:

+             clauses.append("logger_name = %(to_ts)s")

+             values['logger_name'] = logger_name

+ 

+         columns, aliases = zip(*fields)

+         query = QueryProcessor(tables=['scheduler_log_messages'],

+                                columns=columns, aliases=aliases,

+                                joins=['hosts ON host_id = hosts.id'],

+                                clauses=clauses, values=values,

+                                opts={'order': 'msg_time'})

+         return query.execute()

+ 

+ 

+ class DBLogger(object):

+     """DBLogger class for encapsulating scheduler logging. It is thread-safe

+     as both logging parts do this per se (loggind + DB handler via context)"""

+ 

+     def __init__(self, logger_name=None):

+         if logger_name:

+             self.logger = logger_name

+         else:

+             self.logger = 'koji.scheduler'

+ 

+     def log(self, msg, logger_name=None, level=logging.NOTSET,

+             task_id=None, host_id=None, location=None):

+         if not logger_name:

+             logger_name = self.logger

+         # log to regular log

+         text = f"task: {task_id}, host: {host_id}, location: {location}, message: {msg}"

+         logging.getLogger(logger_name).log(level, text)

+         # log to db

+         insert = InsertProcessor(

+             'scheduler_log_messages',

+             data={

+                 'logger_name': logger_name,

+                 'level': logging._levelToName[level],

+                 'task_id': task_id,

+                 'host_id': host_id,

+                 'location': location,

+                 'msg': msg,

+             }

+         )

+         insert.execute()

+ 

+     debug = functools.partialmethod(log, level=logging.DEBUG)

+     info = functools.partialmethod(log, level=logging.INFO)

+     warning = functools.partialmethod(log, level=logging.WARNING)

+     error = functools.partialmethod(log, level=logging.ERROR)

+     critical = functools.partialmethod(log, level=logging.CRITICAL)

@@ -0,0 +1,67 @@ 

+ import logging

+ import mock

+ import unittest

+ 

+ import scheduler

+ 

+ IP = scheduler.InsertProcessor

+ 

+ 

+ class TestDBLogger(unittest.TestCase):

+     def setUp(self):

+         self.InsertProcessor = mock.patch('scheduler.InsertProcessor',

+                                           side_effect=self.getInsert).start()

+         self.inserts = []

+ 

+     def tearDown(self):

+         mock.patch.stopall()

+ 

+     def getInsert(self, *args, **kwargs):

+         insert = IP(*args, **kwargs)

+         insert.execute = mock.MagicMock()

+         self.inserts.append(insert)

+         return insert

+ 

+     def test_defaults(self):

+         logger = scheduler.DBLogger()

+         self.assertEqual(logger.logger, 'koji.scheduler')

+         self.assertEqual(len(self.inserts), 0)

+ 

+     def test_basic(self):

+         logger = scheduler.DBLogger()

+         logger.log("text")

+         self.assertEqual(len(self.inserts), 1)

+         insert = self.inserts[0]

+         self.assertEqual(insert.table, 'scheduler_log_messages')

+         self.assertEqual(insert.data, {

+             'task_id': None,

+             'host_id': None,

+             'logger_name': 'koji.scheduler',

+             'level': 'NOTSET',

+             'location': None,

+             'text': 'text',

+         })

+ 

+     def test_all(self):

+         logger = scheduler.DBLogger()

+         logger.log("text", logger_name="logger_name", level=logging.ERROR,

+                    task_id=123, host_id=456, location="location")

+         self.assertEqual(len(self.inserts), 1)

+         insert = self.inserts[0]

+         self.assertEqual(insert.data, {

+             'task_id': 123,

+             'host_id': 456,

+             'logger_name': 'logger_name',

+             'level': 'ERROR',

+             'location': 'location',

+             'text': 'text',

+         })

+ 

+     def test_levels(self):

+         logger = scheduler.DBLogger()

+         for level in ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'):

+             m = getattr(logger, level.lower())

+             m("")

+             insert = self.inserts[0]

+             self.assertEqual(insert.data['level'], level)

+             self.inserts = []

file modified
+19
@@ -147,6 +147,21 @@ 

              cursor.execute("VACUUM ANALYZE buildroot")

  

  

+ def clean_scheduler_logs(cursor, vacuum, test, age):

+     clauses = f"(msg_time < NOW() - '{age:d} days'::interval)"

+     if options.verbose:

+         query = QueryProcessor(tables=["scheduler_log_messages"],

+                                clauses=clauses,

+                                opts={'countOnly': True})

+         rows = query.execute()

+         print(f"Deleting {rows} scheduler log messages")

+     if not test:

+         delete = DeleteProcessor(table="scheduler_log_messages", clauses=clauses)\

+         delete.execute()

+         if vacuum:

+             cursor.execute("VACUUM ANALYZE scheduler_log_messages")

+ 

+ 

  if __name__ == "__main__":

      global options

      parser = OptionParser("%prog cleans koji database")
@@ -180,6 +195,9 @@ 

      parser.add_option('--scratch-builds-age', type=int, dest="scratch_age",

                        action="store", default=730, metavar="DAYS",

                        help="Delete scratch builds' tasks older than this (default: 2 years")

+     parser.add_option('--logs-age', type=int,

+                       action="store", default=7, metavar="DAYS",

+                       help="Delete scheduler log messages older than this (default: 7 days)")

      parser.add_option('--buildroots', action="store_true",

                        help="Delete unreferenced buildroots")

      parser.add_option('-f', '--force', action="store_true",
@@ -240,6 +258,7 @@ 

      clean_sessions(cursor, options.vacuum, options.test, options.sessions_age,

                     options.sessions_absolute_age)

      clean_reservations(cursor, options.vacuum, options.test, options.reservations_age)

+     clean_scheduler_logs(cursor, options.vacuum, options.test, options.logs_age)

      if options.tag_notifications:

          clean_notification_tasks(cursor, options.vacuum, options.test,

                                   age=options.tag_notifications_age)

Metadata Update from @tkopecek:
- Pull-request tagged with: scheduler

2 years ago

1 new commit added

  • basic CLI interface for scheduler logs
2 years ago

rebased onto 4085cde

2 years ago

Pull-Request has been closed by tkopecek

2 years ago