#58 Fix parse_sql entry to use specific seperator
Closed 2 years ago by smooge. Opened 2 years ago by smooge.
prod  into  main

file added
+12
@@ -0,0 +1,12 @@ 

+ [run]

+ branch = True

+ source = countme

+ omit = countme/version.py

+ 

+ [report]

+ precision = 2

+ #fail_under = 99

+ exclude_lines =

+     pragma: no cover

+     def __repr__

+ show_missing = True

file added
+6
@@ -0,0 +1,6 @@ 

+ [flake8]

+ show_source = True

+ max_line_length = 100

+ ignore = E203,W503

+ exclude = .git,.tox,dist,*egg,build,files

+ 

file added
+7
@@ -0,0 +1,7 @@ 

+ /*.egg-info/

+ **/__pycache__/

+ 

+ /.coverage

+ /coverage.xml

+ /htmlcov

+ 

file added
+12
@@ -0,0 +1,12 @@ 

+ ---

+ - job:

+     name: tox-f33

+     run: ci/tox.yaml

+     nodeset:

+       nodes:

+         name: test-node

+         label: pod-python-f33

+ - project:

+     check:

+       jobs:

+         - tox-f33

file added
+3
@@ -0,0 +1,3 @@ 

+ exclude .zuul.yaml

+ prune ci

+ prune test_data

file added
+15
@@ -0,0 +1,15 @@ 

+ ---

+ - hosts: all

+   tasks:

+     - name: List project directory on the test system

+       command: ls -al {{ansible_user_dir}}/{{zuul.project.src_dir}}

+     - name: install dependencies

+       become: yes

+       package:

+         name:

+           - python3-tox

+         state: present

+     - name: run pytest

+       command:

+         chdir: '{{ansible_user_dir}}/{{zuul.project.src_dir}}'

+         cmd: python -m tox

file modified
+25 -221
@@ -1,245 +1,49 @@ 

  #!/usr/bin/python3

  

- import sys

- import sqlite3

  import argparse

- import datetime

- from collections import Counter

- from typing import NamedTuple

- from countme import CountmeItem, weeknum, SQLiteWriter, SQLiteReader, CSVWriter

+ from countme.totals import totals

  

- # NOTE: log timestamps do not move monotonically forward, but they don't

- # seem to ever jump backwards more than 241 seconds. I assume this is

- # some timeout that's set to 4 minutes, and the log entry shows up after

- # expiry, or something. Anyway, what this means is that once we've seen

- # a timestamp that's 241 seconds past the end of a week, we can assume that

- # there will be no further entries whose timestamps belong to the previous

- # week.

- # We could probably watch the max jitter between log lines and adjust

- # this if needed, but for now I'm just gonna pad it to 600 seconds.

- # The difference between 241 and 600 is kind of irrelevant - since we get logs

- # in 24-hour chunks, any window that extends into the next day means we have to

- # wait 24 hours until we can be sure we have all the data for the previous

- # week, so the effect would be the same if this was 3600 or 43200 or whatever.

- # TODO: this should probably move into the module somewhere..

- LOG_JITTER_WINDOW = 600

- 

- # Feb 11 2020 was the date that we branched F32 from Rawhide, so we've decided

- # to use that as the starting week for countme data.

- COUNTME_START_TIME=1581292800 # =Mon Feb 10 00:00:00 2020 (UTC)

- COUNTME_START_WEEKNUM=2614

- 

- DAY_LEN = 24*60*60

- WEEK_LEN = 7*DAY_LEN

- COUNTME_EPOCH = 345600  # =00:00:00 Mon Jan 5 00:00:00 1970 (UTC)

- 

- # And here's how you convert a weeknum to a human-readable date

- COUNTME_EPOCH_ORDINAL=719167

- def weekdate(weeknum, weekday=0):

-     if weekday < 0 or weekday > 6:

-         raise ValueError("weekday must be between 0 (Mon) and 6 (Sun)")

-     ordinal = COUNTME_EPOCH_ORDINAL + 7*weeknum + weekday

-     return datetime.date.fromordinal(ordinal)

- 

- def daterange(weeknum):

-     return weekdate(weeknum,0), weekdate(weeknum,6)

  

  # ===========================================================================

- # ====== Count Buckets & Items ==============================================

+ # ====== CLI parser & __main__ ==============================================

  # ===========================================================================

  

- class CountBucket(NamedTuple):

-     weeknum: int

-     os_name: str

-     os_version: str

-     os_variant: str

-     os_arch: str

-     sys_age: int

-     repo_tag: str

-     repo_arch: str

- 

-     @classmethod

-     def from_item(cls, item):

-         return cls._make((weeknum(item.timestamp),) + item[2:])

- 

- BucketSelect = CountBucket(

-     weeknum = f"((timestamp-{COUNTME_EPOCH})/{WEEK_LEN}) as weeknum",

-     os_name = "os_name",

-     os_version = "os_version",

-     os_variant = "os_variant",

-     os_arch = "os_arch",

-     sys_age = "sys_age",

-     repo_tag = "repo_tag",

-     repo_arch = "repo_arch"

- )

- 

- TotalsItem = NamedTuple("TotalsItem", [("hits", int)] + list(CountBucket.__annotations__.items()))

- TotalsItem.__doc__ = '''TotalsItem is CountBucket with a "hits" count on the front.'''

- 

- class CSVCountItem(NamedTuple):

-     '''

-     Represents one row in a countme_totals.csv file.

-     In the interest of human-readability, we replace 'weeknum' with the

-     start and end dates of that week.

-     '''

-     week_start: str

-     week_end: str

-     hits: int

-     os_name: str

-     os_version: str

-     os_variant: str

-     os_arch: str

-     sys_age: int

-     repo_tag: str

-     repo_arch: str

- 

-     @classmethod

-     def from_totalitem(cls, item):

-         '''Use this method to convert a CountItem to a CSVCountItem.'''

-         hits, weeknum, *rest = item

-         week_start, week_end = daterange(weeknum)

-         return cls._make([week_start, week_end, hits] + rest)

- 

- # ===========================================================================

- # ====== SQL + Progress helpers =============================================

- # ===========================================================================

- 

- class RawDB(SQLiteReader):

-     def __init__(self, fp, **kwargs):

-         super().__init__(fp, CountmeItem, tablename='countme_raw', **kwargs)

- 

-     def _minmax(self, column):

-         cur = self._con.execute(f"SELECT min({column}),max({column}) FROM {self._tablename}")

-         return cur.fetchone()

- 

-     def complete_weeks(self):

-         '''Return a range(startweek, provweek) that covers (valid + complete)

-         weeknums contained in this database. The database may contain some

-         data for `provweek`, but since it's provisional/incomplete it's

-         outside the range.'''

-         # startweek can't be earlier than the first week of data

-         startweek = max(weeknum(self.mintime()), COUNTME_START_WEEKNUM)

-         # A week is provisional until the LOG_JITTER_WINDOW expires, so once

-         # tsmax minus LOG_JITTER_WINDOW ticks over into a new weeknum, that

-         # weeknum is the provisional one. So...

-         provweek = weeknum(self.maxtime() - LOG_JITTER_WINDOW)

-         return range(startweek, provweek)

- 

-     def week_count(self, weeknum):

-         start_ts = weeknum*WEEK_LEN+COUNTME_EPOCH

-         end_ts = start_ts + WEEK_LEN

-         cur = self._con.execute(

-             f"SELECT COUNT(*)"

-             f" FROM {self._tablename}"

-             f" WHERE timestamp >= {start_ts} AND timestamp < {end_ts}")

-         return cur.fetchone()[0]

- 

-     def week_iter(self, weeknum, select='*'):

-         if isinstance(select, (tuple, list)):

-             item_select = ','.join(select)

-         elif isinstance(select, str):

-             item_select = select

-         else:

-             raise ValueError(f"select should be a string or tuple, not {select.__class__.__name__}")

-         start_ts = weeknum*WEEK_LEN+COUNTME_EPOCH

-         end_ts = start_ts + WEEK_LEN

-         return self._con.execute(

-             f"SELECT {item_select}"

-             f" FROM {self._tablename}"

-             f" WHERE timestamp >= {start_ts} AND timestamp < {end_ts}")

- 

- try:

-     from tqdm import tqdm as Progress

- except ImportError:

-     from countme.progress import diyprog as Progress

- 

- 

- 

- # ===========================================================================

- # ====== CLI parser & main() ================================================

- # ===========================================================================

  

  def parse_args(argv=None):

      p = argparse.ArgumentParser(

-         description = "Aggregate 'countme' log records to weekly totals.",

+         description="Aggregate 'countme' log records to weekly totals.",

      )

-     p.add_argument("-V", "--version", action='version',

-         version='%(prog)s 0.0.1')

+     p.add_argument("-V", "--version", action="version", version="%(prog)s 0.0.1")

  

-     p.add_argument("countme_totals",

-         help="Database containing countme_totals")

+     p.add_argument("countme_totals", help="Database containing countme_totals")

  

-     p.add_argument("--update-from",

-         metavar="COUNTME_RAW_DB", dest="countme_raw",

-         help="Update totals from raw data (from ./parse-access-log.py)")

+     p.add_argument(

+         "--update-from",

+         metavar="COUNTME_RAW_DB",

+         dest="countme_raw",

+         help="Update totals from raw data (from ./parse-access-log.py)",

+     )

  

-     p.add_argument("--csv-dump",

-         type=argparse.FileType('wt', encoding='utf-8'),

-         help="File to dump CSV-formatted totals data")

+     p.add_argument(

+         "--csv-dump",

+         type=argparse.FileType("wt", encoding="utf-8"),

+         help="File to dump CSV-formatted totals data",

+     )

  

-     p.add_argument("--progress", action="store_true",

-         help="Show progress while reading and counting data.")

+     p.add_argument(

+         "--progress",

+         action="store_true",

+         help="Show progress while reading and counting data.",

+     )

  

      args = p.parse_args(argv)

  

      return args

  

  

- def main():

-     args = parse_args()

- 

-     # Initialize the writer (better to fail early..)

-     totals = SQLiteWriter(args.countme_totals, TotalsItem,

-                           timefield='weeknum',

-                           tablename='countme_totals')

-     totals.write_header()

- 

-     # Are we doing an update?

-     if args.countme_raw:

-         rawdb = RawDB(args.countme_raw)

- 

-         # Check to see if there's any new weeks to get data for

-         complete_weeks = sorted(rawdb.complete_weeks())

-         newest_totals = totals.maxtime() or -1

-         new_weeks = [w for w in complete_weeks if w > newest_totals]

- 

-         # Count week by week

-         for week in new_weeks:

- 

-             # Set up a progress meter and counter

-             mon, sun = daterange(week)

-             desc = f"week {week} ({mon} -- {sun})"

-             total = rawdb.week_count(week)

-             prog = Progress(total=total, desc=desc, disable=True if not args.progress else None, unit="row", unit_scale=False)

-             hitcount = Counter()

- 

-             # Select raw items into their buckets and count 'em up

-             for bucket in rawdb.week_iter(week, select=BucketSelect):

-                 hitcount[bucket] += 1

-                 prog.update()

- 

-             # Write the resulting totals into countme_totals

-             totals.write_items((hits,)+bucket for bucket,hits in hitcount.items())

-             prog.close()

- 

-         # Oh and make sure we index them by time.

-         totals.write_index()

- 

- 

-     # Was a CSV dump requested?

-     if args.csv_dump:

-         totalreader = SQLiteReader(args.countme_totals, TotalsItem,

-                                    timefield='weeknum',

-                                    tablename='countme_totals')

-         writer = CSVWriter(args.csv_dump, CSVCountItem,

-                            timefield='week_start')

-         writer.write_header()

-         for item in totalreader:

-             writer.write_item(CSVCountItem.from_totalitem(item))

- 

- 

- if __name__ == '__main__':

+ if __name__ == "__main__":

      try:

-         main()

+         args = parse_args()

+         totals(args)

      except KeyboardInterrupt:

-         raise SystemExit(3) # You know, 3, like 'C', like Ctrl-C!

+         raise SystemExit(3)  # You know, 3, like 'C', like Ctrl-C!

file modified
+191 -92
@@ -25,44 +25,65 @@ 

  

  # TODO: this should probably get cleaned up?

  __all__ = (

-     'weeknum', 'parse_logtime', 'parse_querydict',

- 

-     'ItemWriter', 'CSVWriter', 'JSONWriter', 'AWKWriter', 'SQLiteWriter',

-     'ItemReader', 'CSVReader',                            'SQLiteReader',

- 

-     'make_writer', 'guessreader', 'autoreader',

- 

-     'LogItem',    'MirrorItem',    'CountmeItem',

-     'LogMatcher', 'MirrorMatcher', 'CountmeMatcher',

+     "weeknum",

+     "parse_logtime",

+     "parse_querydict",

+     "ItemWriter",

+     "CSVWriter",

+     "JSONWriter",

+     "AWKWriter",

+     "SQLiteWriter",

+     "ItemReader",

+     "CSVReader",

+     "SQLiteReader",

+     "make_writer",

+     "guessreader",

+     "autoreader",

+     "LogItem",

+     "MirrorItem",

+     "CountmeItem",

+     "LogMatcher",

+     "MirrorMatcher",

+     "CountmeMatcher",

  )

  

- import os

- import re

- from datetime import date, time, datetime, timezone, timedelta

+ from datetime import datetime, timezone, timedelta

  from urllib.parse import parse_qsl

- from typing import NamedTuple, Optional

+ from typing import NamedTuple, Optional, Type, Union

  

  from .regex import COUNTME_LOG_RE, MIRRORS_LOG_RE

- from .version import __version__, __version_info__

  

  # ===========================================================================

  # ====== Output item definitions and helpers ================================

  # ===========================================================================

  

- DAY_LEN = 24*60*60

- WEEK_LEN = 7*DAY_LEN

+ DAY_LEN = 24 * 60 * 60

+ WEEK_LEN = 7 * DAY_LEN

  COUNTME_EPOCH = 345600  # =00:00:00 Mon Jan 5 00:00:00 1970 (UTC)

  MONTHIDX = {

-     'Jan':1, 'Feb':2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6,

-     'Jul':7, 'Aug':8, 'Sep':9, 'Oct':10, 'Nov':11, 'Dec':12

+     "Jan": 1,

+     "Feb": 2,

+     "Mar": 3,

+     "Apr": 4,

+     "May": 5,

+     "Jun": 6,

+     "Jul": 7,

+     "Aug": 8,

+     "Sep": 9,

+     "Oct": 10,

+     "Nov": 11,

+     "Dec": 12,

  }

  

+ 

  def weeknum(timestamp):

      return (int(timestamp) - COUNTME_EPOCH) // WEEK_LEN

  

+ 

  def strptime_logtime(logtime):

      return datetime.strptime(logtime, "%d/%b/%Y:%H:%M:%S %z")

  

+ 

  def logtime_to_isoformat(logtime):

      # logtime: '29/Mar/2020:16:04:28 +0000'

      # ISO8601: '2020-03-29T16:04:28+00:00'
@@ -74,13 +95,15 @@ 

      offm = logtime[24:26]

      return f"{y}-{m:02}-{d}T{time}{offh}:{offm}"

  

+ 

  def offset_to_timezone(offset):

-     '''Convert a UTC offset like -0400 to a datetime.timezone instance'''

-     offmin = 60*int(offset[1:3]) + int(offset[3:5])

-     if offset[0] == '-':

+     """Convert a UTC offset like -0400 to a datetime.timezone instance"""

+     offmin = 60 * int(offset[1:3]) + int(offset[3:5])

+     if offset[0] == "-":

          offmin = -offmin

      return timezone(timedelta(minutes=offmin))

  

+ 

  def parse_logtime(logtime):

      # Equivalent to - but faster than - strptime_logtime.

      # It's like ~1.5usec vs 11usec, which might seem trivial but in my tests
@@ -89,21 +112,25 @@ 

      # (btw, slicing logtime by hand and using re.split are both marginally

      # slower. datetime.fromisoformat is slightly faster but not available

      # in Python 3.6 or earlier.)

-     dt, off = logtime.split(' ',1)

-     date, hour, minute, second = dt.split(':',3)

-     day, month, year = date.split('/',2)

-     tz = timezone.utc if off in {"+0000","-0000"} else offset_to_timezone(off)

-     return datetime(int(year), MONTHIDX[month], int(day),

-                     int(hour), int(minute), int(second), 0, tz)

+     dt, off = logtime.split(" ", 1)

+     date, hour, minute, second = dt.split(":", 3)

+     day, month, year = date.split("/", 2)

+     tz = timezone.utc if off in {"+0000", "-0000"} else offset_to_timezone(off)

+     return datetime(

+         int(year), MONTHIDX[month], int(day), int(hour), int(minute), int(second), 0, tz

+     )

+ 

  

  def parse_querydict(querystr):

-     '''Parse request query the way mirrormanager does (last value wins)'''

-     return dict(parse_qsl(querystr))

+     """Parse request query the way mirrormanager does (last value wins)"""

+     return dict(parse_qsl(querystr, seperator="&"))

+ 

  

  class LogItem(NamedTuple):

-     '''

+     """

      Generic access.log data holder.

-     '''

+     """

+ 

      host: str

      identity: str

      time: str
@@ -123,31 +150,35 @@ 

          return parse_logtime(self.time).timestamp()

  

      def queryitems(self):

-         return parse_qsl(self.query)

+         return parse_qsl(self.query, seperator="&")

  

      def querydict(self):

          return parse_querydict(self.query)

  

+ 

  # TODO: would be kinda nice if there was a clear subclass / translation

  # between item classes... or if compile_log_regex made the class for you?

  # Or something? It feels like these things should be more closely bound.

  

  

  class MirrorItem(NamedTuple):

-     '''

+     """

      A basic mirrorlist/metalink metadata item.

      Each item has a timestamp, IP, and the requested repo= and arch= values.

-     '''

+     """

+ 

      timestamp: int

      host: str

      repo_tag: Optional[str]

      repo_arch: Optional[str]

  

+ 

  class CountmeItem(NamedTuple):

-     '''

+     """

      A "countme" match item.

      Includes the countme value and libdnf User-Agent fields.

-     '''

+     """

+ 

      timestamp: int

      host: str

      os_name: str
@@ -160,11 +191,14 @@ 

  

  

  class LogMatcher:

-     '''Base class for a LogMatcher, which iterates through a log file'''

+     """Base class for a LogMatcher, which iterates through a log file"""

+ 

      regex = NotImplemented

-     itemtuple = NotImplemented

+     itemtuple: Union[Type[MirrorItem], Type[CountmeItem]]

+ 

      def __init__(self, fileobj):

          self.fileobj = fileobj

+ 

      def iteritems(self):

          # TODO: at this point we're single-threaded and CPU-bound;

          # multithreading would speed things up here.
@@ -172,94 +206,126 @@ 

              match = self.regex.match(line)

              if match:

                  yield self.make_item(match)

+ 

      __iter__ = iteritems

+ 

      @classmethod

      def make_item(cls, match):

          raise NotImplementedError

  

+ 

  class MirrorMatcher(LogMatcher):

-     '''Match all mirrorlist/metalink items, like mirrorlist.py does.'''

+     """Match all mirrorlist/metalink items, like mirrorlist.py does."""

+ 

      regex = MIRRORS_LOG_RE

      itemtuple = MirrorItem

+ 

      @classmethod

      def make_item(cls, match):

-         timestamp = parse_logtime(match['time']).timestamp()

-         query = parse_querydict(match['query'])

-         return cls.itemtuple(timestamp = int(timestamp),

-                              host      = match['host'],

-                              repo_tag  = query.get('repo'),

-                              repo_arch = query.get('arch'))

+         timestamp = parse_logtime(match["time"]).timestamp()

+         query = parse_querydict(match["query"])

+         return cls.itemtuple(

+             timestamp=int(timestamp),

+             host=match["host"],

+             repo_tag=query.get("repo"),

+             repo_arch=query.get("arch"),

+         )

+ 

  

  class CountmeMatcher(LogMatcher):

-     '''Match the libdnf-style "countme" requests.'''

+     """Match the libdnf-style "countme" requests."""

+ 

      regex = COUNTME_LOG_RE

      itemtuple = CountmeItem

+ 

      @classmethod

      def make_item(cls, match):

-         timestamp = parse_logtime(match['time']).timestamp()

-         query = parse_querydict(match['query'])

-         return cls.itemtuple(timestamp  = int(timestamp),

-                              host       = match['host'],

-                              os_name    = match['os_name'],

-                              os_version = match['os_version'],

-                              os_variant = match['os_variant'],

-                              os_arch    = match['os_arch'],

-                              sys_age    = int(query.get('countme')),

-                              repo_tag   = query.get('repo'),

-                              repo_arch  = query.get('arch'))

+         timestamp = parse_logtime(match["time"]).timestamp()

+         query = parse_querydict(match["query"])

+         return cls.itemtuple(

+             timestamp=int(timestamp),

+             host=match["host"],

+             os_name=match["os_name"],

+             os_version=match["os_version"],

+             os_variant=match["os_variant"],

+             os_arch=match["os_arch"],

+             sys_age=int(query.get("countme")),

+             repo_tag=query.get("repo"),

+             repo_arch=query.get("arch"),

+         )

+ 

  

  # ===========================================================================

  # ====== ItemWriters - output formatting classes ============================

  # ===========================================================================

  

+ 

  class ItemWriter:

-     def __init__(self, fp, itemtuple, timefield='timestamp', **kwargs):

+     def __init__(self, fp, itemtuple, timefield="timestamp", **kwargs):

          self._fp = fp

          self._itemtuple = itemtuple

          self._fields = itemtuple._fields

          assert timefield in self._fields, f"{itemtuple.__name__!r} has no time field {timefield!r}"

          self._timefield = timefield

          self._get_writer(**kwargs)

+ 

      def _get_writer(self, **kwargs):

          raise NotImplementedError

+ 

      def write_item(self, item):

          raise NotImplementedError

+ 

      def write_items(self, items):

          for item in items:

              self.write_item(item)

+ 

      def write_header(self):

          pass

+ 

      def write_index(self):

          pass

  

+ 

  class JSONWriter(ItemWriter):

      def _get_writer(self, **kwargs):

          import json

+ 

          self._dump = json.dump

+ 

      def write_item(self, item):

          self._dump(item._asdict(), self._fp)

  

+ 

  class CSVWriter(ItemWriter):

      def _get_writer(self, **kwargs):

          import csv

+ 

          self._writer = csv.writer(self._fp)

+ 

      def write_header(self):

          self._writer.writerow(self._fields)

+ 

      def write_item(self, item):

          self._writer.writerow(item)

  

+ 

  class AWKWriter(ItemWriter):

-     def _get_writer(self, field_separator='\t', **kwargs):

+     def _get_writer(self, field_separator="\t", **kwargs):

          self._fieldsep = field_separator

+ 

      def _write_row(self, vals):

-         self._fp.write(self._fieldsep.join(str(v) for v in vals) + '\n')

+         self._fp.write(self._fieldsep.join(str(v) for v in vals) + "\n")

+ 

      def write_header(self):

          self._write_row(self._fields)

+ 

      def write_item(self, item):

          self._write_row(item)

  

+ 

  class SQLiteWriter(ItemWriter):

-     '''Write each item as a new row in a SQLite database table.'''

+     """Write each item as a new row in a SQLite database table."""

+ 

      # We have to get a little fancier with types here since SQL tables expect

      # typed values. Good thing Python has types now, eh?

      SQL_TYPE = {
@@ -272,11 +338,14 @@ 

          Optional[float]: "REAL",

          Optional[bytes]: "BLOB",

      }

+ 

      def _sqltype(self, fieldname):

          typehint = self._itemtuple.__annotations__[fieldname]

          return self.SQL_TYPE.get(typehint, "TEXT")

-     def _get_writer(self, tablename='countme_raw', **kwargs):

+ 

+     def _get_writer(self, tablename="countme_raw", **kwargs):

          import sqlite3

+ 

          if hasattr(self._fp, "name"):

              filename = self._fp.name

          else:
@@ -288,52 +357,55 @@ 

          # Generate SQL commands so we can use them later.

          # self._create_table creates the table, with column names and types

          # matching the names and types of the fields in self._itemtuple.

-         self._create_table = (

-             "CREATE TABLE IF NOT EXISTS {table} ({coldefs})".format(

-                 table=tablename,

-                 coldefs=",".join(f"{f} {self._sqltype(f)}" for f in self._fields),

-             )

+         self._create_table = "CREATE TABLE IF NOT EXISTS {table} ({coldefs})".format(

+             table=tablename,

+             coldefs=",".join(f"{f} {self._sqltype(f)}" for f in self._fields),

          )

          # self._insert_item is an "INSERT" command with '?' placeholders.

-         self._insert_item = (

-             "INSERT INTO {table} ({colnames}) VALUES ({colvals})".format(

-                 table=tablename,

-                 colnames=",".join(self._fields),

-                 colvals=",".join("?" for f in self._fields),

-             )

+         self._insert_item = "INSERT INTO {table} ({colnames}) VALUES ({colvals})".format(

+             table=tablename,

+             colnames=",".join(self._fields),

+             colvals=",".join("?" for f in self._fields),

          )

          # self._create_time_index creates an index on 'timestamp' or whatever

          # the time-series field is.

          self._create_time_index = (

              "CREATE INDEX IF NOT EXISTS {timefield}_idx on {table} ({timefield})".format(

-                 table=tablename,

-                 timefield=self._timefield

+                 table=tablename, timefield=self._timefield

              )

          )

+ 

      def write_header(self):

          self._cur.execute(self._create_table)

+ 

      def write_item(self, item):

          self._cur.execute(self._insert_item, item)

+ 

      def write_items(self, items):

          with self._con:

              self._con.executemany(self._insert_item, items)

+ 

      def write_index(self):

          self._cur.execute(self._create_time_index)

          self._con.commit()

+ 

      def has_item(self, item):

-         '''Return True if a row matching `item` exists in this database.'''

+         """Return True if a row matching `item` exists in this database."""

          condition = " AND ".join(f"{field}=?" for field in self._fields)

-         cur = self._cur.execute(f"SELECT COUNT(*) FROM {self._tablename} WHERE {condition}",item)

+         cur = self._cur.execute(f"SELECT COUNT(*) FROM {self._tablename} WHERE {condition}", item)

          return bool(cur.fetchone()[0])

+ 

      def mintime(self):

          cur = self._cur.execute(f"SELECT MIN({self._timefield}) FROM {self._tablename}")

          return cur.fetchone()[0]

+ 

      def maxtime(self):

          cur = self._cur.execute(f"SELECT MAX({self._timefield}) FROM {self._tablename}")

          return cur.fetchone()[0]

  

+ 

  def make_writer(name, *args, **kwargs):

-     '''Convenience function to grab/instantiate the right writer'''

+     """Convenience function to grab/instantiate the right writer"""

      if name == "csv":

          writer = CSVWriter

      elif name == "json":
@@ -346,13 +418,16 @@ 

          raise ValueError(f"Unknown writer '{name}'")

      return writer(*args, **kwargs)

  

+ 

  # ===========================================================================

  # ====== ItemReaders - counterpart to ItemWriter ============================

  # ===========================================================================

  

+ 

  class ReaderError(RuntimeError):

      pass

  

+ 

  class ItemReader:

      def __init__(self, fp, itemtuple, **kwargs):

          self._fp = fp
@@ -365,88 +440,112 @@ 

              raise ReaderError("no field names found")

          if filefields != self._itemfields:

              raise ReaderError(f"field mismatch: expected {self._itemfields}, got {filefields}")

+ 

      @property

      def fields(self):

          return self._itemfields

+ 

      def _get_reader(self):

-         '''Set up the ItemReader.'''

+         """Set up the ItemReader."""

          raise NotImplementedError

+ 

      def _get_fields(self):

-         '''Called immediately after _get_reader().

-         Should return a tuple of the fieldnames found in self._fp.'''

+         """Called immediately after _get_reader().

+         Should return a tuple of the fieldnames found in self._fp."""

          raise NotImplementedError

+ 

      def _iter_rows(self):

-         '''Return an iterator/generator that produces a row for each item.'''

+         """Return an iterator/generator that produces a row for each item."""

          raise NotImplementedError

+ 

      def _find_item(self, item):

-         '''Return True if the given item is in this file'''

+         """Return True if the given item is in this file"""

          raise NotImplementedError

+ 

      def __iter__(self):

          for item in self._iter_rows():

              yield self._itemfactory(item)

+ 

      def __contains__(self, item):

          return self._find_item(item)

  

+ 

  class CSVReader(ItemReader):

      def _get_reader(self, **kwargs):

          import csv

+ 

          self._reader = csv.reader(self._fp)

+ 

      def _get_fields(self):

          filefields = tuple(next(self._reader))

          # Sanity check: if any fieldname is a number... this isn't a header

          if any(name.isnumeric() for name in filefields):

-             header = ','.join(filefields)

+             header = ",".join(filefields)

              raise ReaderError(f"header bad/missing: expected {self._itemfields}, got {header!r}")

          return filefields

+ 

      def _iter_rows(self):

          return self._reader

+ 

      def _dup(self):

          # This is pretty gross, but then, so's CSV

-         return self.__class__(open(self._fp.name, 'rt'), self._itemtuple)

+         return self.__class__(open(self._fp.name, "rt"), self._itemtuple)

+ 

      def _find_item(self, item):

          stritem = self._itemfactory(str(v) for v in item)

-         return (stritem in self._dup()) # O(n) worst case. Again: gross.

+         return stritem in self._dup()  # O(n) worst case. Again: gross.

+ 

  

  class AWKReader(CSVReader):

-     def _get_reader(self, field_separator='\t', **kwargs):

+     def _get_reader(self, field_separator="\t", **kwargs):

          self._reader = (line.split(field_separator) for line in self._fp)

  

+ 

  class JSONReader(CSVReader):

      def _get_reader(self, **kwargs):

          import json

+ 

          self._reader = (json.loads(line) for line in self._fp)

  

+ 

  class SQLiteReader(ItemReader):

-     def _get_reader(self, tablename='countme_raw', timefield="timestamp", **kwargs):

+     def _get_reader(self, tablename="countme_raw", timefield="timestamp", **kwargs):

          import sqlite3

+ 

          if hasattr(self._fp, "name"):

              filename = self._fp.name

          else:

              filename = self._fp

-         #self._con = sqlite3.connect(f"file:{filename}?mode=ro", uri=True)

+         # self._con = sqlite3.connect(f"file:{filename}?mode=ro", uri=True)

          self._con = sqlite3.connect(filename)

          self._cur = self._con.cursor()

          self._tablename = tablename

          self._timefield = timefield

          self._filename = filename

+ 

      def _get_fields(self):

          fields_sql = f"PRAGMA table_info('{self._tablename}')"

          filefields = tuple(r[1] for r in self._cur.execute(fields_sql))

          return filefields

+ 

      def _find_item(self, item):

          condition = " AND ".join(f"{field}=?" for field in self.fields)

-         self._cur.execute(f"SELECT COUNT(*) FROM {self._tablename} WHERE {condition}",item)

+         self._cur.execute(f"SELECT COUNT(*) FROM {self._tablename} WHERE {condition}", item)

          return bool(self._cur.fetchone()[0])

+ 

      def _iter_rows(self):

          fields = ",".join(self._itemfields)

          return self._cur.execute(f"SELECT {fields} FROM {self._tablename}")

+ 

      def mintime(self):

          cur = self._cur.execute(f"SELECT MIN({self._timefield}) FROM {self._tablename}")

          return cur.fetchone()[0]

+ 

      def maxtime(self):

          cur = self._cur.execute(f"SELECT MAX({self._timefield}) FROM {self._tablename}")

          return cur.fetchone()[0]

  

+ 

  # Guess the right reader based on the filename.

  def guessreader(fp):

      if fp.name.endswith(".csv"):
@@ -459,9 +558,9 @@ 

          reader = None

      return reader

  

+ 

  # TODO: should have name/args more like make_writer...

  def autoreader(fp, itemtuple, **kwargs):

-     '''Convenience function to guess & instantiate the right writer'''

+     """Convenience function to guess & instantiate the right writer"""

      reader = guessreader(fp)

      return reader(fp, itemtuple, **kwargs)

- 

file added
+61
@@ -0,0 +1,61 @@ 

+ from contextlib import contextmanager

+ from pathlib import Path

+ from tempfile import NamedTemporaryFile

+ from typing import Iterator, Union

+ 

+ from countme.progress import ReadProgress

+ 

+ 

+ @contextmanager

+ def pre_process(filepath: Union[str, Path]) -> Iterator[str]:

+     filepath = Path(filepath)

+     with NamedTemporaryFile(

+         prefix=f"mirrors-countme-{filepath.name}-",

+         suffix=".preprocessed",

+     ) as tmpfile:

+         import subprocess

+ 

+         print(f"Preprocessing file: {filepath}")

+         cmd = ["grep", "countme", str(filepath)]

+         r = subprocess.run(cmd, stdout=tmpfile)

+         if r.returncode != 0:

+             print(f"Preprocessing file failed, returning original: {filepath}")

+             yield str(filepath)

+         yield tmpfile.name

+ 

+ 

+ def parse_from_iterator(args, lines):

+     if args.header or args.sqlite:

+         args.writer.write_header()

+ 

+     for logf in lines:

+         # Make an iterator object for the matching log lines

+         match_iter = iter(args.matcher(logf))

+ 

+         # TEMP WORKAROUND: filter out match items with missing values

+         if args.matchmode == "countme":

+             match_iter = filter(lambda i: None not in i, match_iter)

+ 

+         # Duplicate data check (for sqlite output)

+         if args.dupcheck:

+             try:

+                 item = next(match_iter)  # grab first matching item

+             except StopIteration:

+                 # If there is no next match, keep going

+                 continue

+             if args.writer.has_item(item):  # if it's already in the db...

+                 continue  # skip to next log

+             else:  # otherwise

+                 args.writer.write_item(item)  # insert it into the db

+ 

+         # Write matching items (sqlite does commit at end, or rollback on error)

+         args.writer.write_items(match_iter)

+ 

+     if args.index:

+         args.writer.write_index()

+ 

+ 

+ def parse(args=None):

+     parse_from_iterator(

+         args, ReadProgress(args.logs, display=args.progress, pre_process=pre_process)

+     )

file modified
+69 -40
@@ -19,45 +19,58 @@ 

  

  import os

  import sys

- from .regex import compile_log_regex, LOG_DATE_RE

+ from contextlib import contextmanager

+ from pathlib import Path

+ from typing import Iterator, Union

+ 

+ from .regex import LOG_DATE_RE

  

  __all__ = (

-     'ReadProgress', 'TQDMReadProgress', 'DIYReadProgress',

+     "ReadProgress",

+     "DIYReadProgress",

  )

  

  # ===========================================================================

  # ====== Progress meters & helpers ==========================================

  # ===========================================================================

  

+ 

  def log_date(line):

      match = LOG_DATE_RE.match(line)

      if match:

-         return match['date']

+         return match["date"]

      return "??/??/????"

  

+ 

  def log_reader(logfn):

      if logfn.endswith(".xz"):

          import lzma

-         return lzma.open(logfn, mode='rt')

+ 

+         return lzma.open(logfn, mode="rt")

      elif logfn.endswith(".gz"):

          import gzip

-         return gzip.open(logfn, mode='rt')

+ 

+         return gzip.open(logfn, mode="rt")

      else:

-         return open(logfn, mode='rt')

+         return open(logfn, mode="rt")

+ 

  

  def xz_log_size(xz_filename):

      import subprocess

+ 

      cmd = ["xz", "--list", "--robot", xz_filename]

      r = subprocess.run(cmd, stdout=subprocess.PIPE)

      if r.returncode != 0:

          return None

-     for line in r.stdout.split(b'\n'):

-         f = line.split(b'\t')

-         if f[0] == b'totals':

+     for line in r.stdout.split(b"\n"):

+         f = line.split(b"\t")

+         if f[0] == b"totals":

              return int(f[4])

  

+ 

  def gz_log_size(gz_filename):

      import subprocess

+ 

      cmd = ["gzip", "--quiet", "--list", gz_filename]

      r = subprocess.run(cmd, stdout=subprocess.PIPE)

      if r.returncode != 0:
@@ -65,6 +78,7 @@ 

      csize, uncsize, ratio, name = r.stdout.split()

      return int(uncsize)

  

+ 

  def log_total_size(logfn):

      if logfn.endswith(".xz"):

          return xz_log_size(logfn)
@@ -74,25 +88,36 @@ 

          return os.stat(logfn).st_size

  

  

+ @contextmanager

+ def no_preprocess(filepath: Union[str, Path]) -> Iterator[str]:

+     yield str(filepath)

+ 

+ 

  class ReadProgressBase:

-     def __init__(self, logs, display=True):

-         '''logs should be a sequence of line-iterable file-like objects.

-         if display is False, no progress output will be printed.'''

+     def __init__(self, logs, display=True, pre_process=no_preprocess):

+         """logs should be a sequence of line-iterable file-like objects.

+         if display is False, no progress output will be printed."""

          self.logs = logs

          self.display = display

+         self.pre_process = pre_process

  

      def __iter__(self):

-         '''Iterator for ReadProgress; yields a sequence of line-iterable

-         file-like objects (one for each log in logs).'''

+         """Iterator for ReadProgress; yields a sequence of line-iterable

+         file-like objects (one for each log in logs)."""

          for num, logfn in enumerate(self.logs):

-             logf = log_reader(logfn)

-             total = log_total_size(logfn)

-             yield self._iter_log_lines(logf, num, total)

+             with self.pre_process(logfn) as processed_log:

+                 logf = log_reader(processed_log)

+                 total = log_total_size(processed_log)

+                 yield self._iter_log_lines(logf, num, total)

  

      def _iter_log_lines(self, logf, num, total):

          # Make a progress meter for this file

-         prog = self._progress_obj(unit='b', unit_scale=True, total=total,

-                                   disable=True if not self.display else None)

+         prog = self._progress_obj(

+             unit="b",

+             unit_scale=True,

+             total=total,

+             disable=True if not self.display else None,

+         )

          # Get the first line manually so we can get logdate

          line = next(logf)

          desc = f"log {num+1}/{len(self.logs)}, date={log_date(line)}"
@@ -107,19 +132,23 @@ 

          prog.close()

  

  

- # Here's how we use the tqdm progress module to show read progress.

- class TQDMReadProgress(ReadProgressBase):

-     def _progress_obj(self, *args, **kwargs):

-         return tqdm(*args, **kwargs)

- 

  # No TQDM? Use our little do-it-yourself knockoff version.

- class DIYReadProgress(TQDMReadProgress):

+ class DIYReadProgress(ReadProgressBase):

      def _progress_obj(self, *args, **kwargs):

          return diyprog(*args, **kwargs)

  

+ 

  class diyprog:

-     def __init__(self, desc=None, total=None, file=None, disable=False,

-                  unit='b', unit_scale=True, barchar='_-=#'):

+     def __init__(

+         self,

+         desc=None,

+         total=None,

+         file=None,

+         disable=False,

+         unit="b",

+         unit_scale=True,

+         barchar="_-=#",

+     ):

          # COMPAT NOTE: tqdm objects with disable=True have no .desc attribute

          self.desc = desc

          self.total = total
@@ -133,11 +162,12 @@ 

  

      def set_description(self, desc=None, refresh=True):

          self.desc = desc

-         if refresh:

+         if refresh and not self.disable:

              self.display()

  

      def update(self, n=1):

-         if self.disable: return

+         if self.disable:

+             return

          self.count += n

          if self.count >= self.showat:

              self.showat = min(self.total, self.showat + self.total // 100)
@@ -150,7 +180,7 @@ 

  

      @staticmethod

      def hrsize(n):

-         for suffix in 'kmgtp':

+         for suffix in "kmgtp":

              n /= 1000

              if n < 1000:

                  break
@@ -180,18 +210,17 @@ 

          bar = (pct // 4) * self.barchar[-1]

          if pct < 100:

              bar += self.barchar[pct % 4]

-         print(f"{desc}: {pct:>3}% [{bar:<25}] {count:>7}/{total:<7}",

-               flush=True, file=self.file, end='\r')

+         print(

+             f"{desc}: {pct:>3}% [{bar:<25}] {count:>7}/{total:<7}",

+             flush=True,

+             file=self.file,

+             end="\r",

+         )

  

      def close(self):

-         if self.disable: return

+         if self.disable:

+             return

          print(flush=True, file=self.file)

  

- # Default ReadProgress: use tqdm if possible, else use the DIY one

- try:

-     # TODO: make this work with a local tqdm checkout/git submodule

-     from tqdm import tqdm

-     ReadProgress = TQDMReadProgress

- except ImportError:

-     ReadProgress = DIYReadProgress

  

+ ReadProgress = DIYReadProgress

file modified
+54 -49
@@ -20,9 +20,13 @@ 

  import re

  

  __all__ = (

-     'compile_log_regex',

-     'LOG_RE', 'LIBDNF_USER_AGENT_RE', 'COUNTME_USER_AGENT_RE',

-     'MIRRORS_LOG_RE', 'COUNTME_LOG_RE', 'LOG_DATE_RE',

+     "compile_log_regex",

+     "LOG_RE",

+     "LIBDNF_USER_AGENT_RE",

+     "COUNTME_USER_AGENT_RE",

+     "MIRRORS_LOG_RE",

+     "COUNTME_LOG_RE",

+     "LOG_DATE_RE",

  )

  

  # ===========================================================================
@@ -34,7 +38,7 @@ 

  # That's the standard Combined Log Format, with numeric IPs (%a).

  #

  # Example log line:

- #   240.159.140.173 - - [29/Mar/2020:16:04:28 +0000] "GET /metalink?repo=fedora-modular-32&arch=x86_64&countme=1 HTTP/2.0" 200 18336 "-" "libdnf (Fedora 32; workstation; Linux.x86_64)"

+ #   240.159.140.173 - - [29/Mar/2020:16:04:28 +0000] "GET /metalink?repo=fedora-modular-32&arch=x86_64&countme=1 HTTP/2.0" 200 18336 "-" "libdnf (Fedora 32; workstation; Linux.x86_64)" # noqa

  #

  # Here it is as a Python regex, with a format placeholder for the actual field

  # contents. Default field regexes are in LOG_PATTERN_FIELDS, below, and
@@ -44,43 +48,43 @@ 

  # present but 'query' may be absent, depending on the value of 'query_match'.

  # 'query_match' should be '?' (optional), '' (required), or '{0}' (absent).

  LOG_PATTERN_FORMAT = (

-     r'^'

-     r'(?P<host>{host})\s'

-     r'(?P<identity>{identity})\s'

-     r'(?P<user>{user})\s'

-     r'\[(?P<time>{time})\]\s'

+     r"^"

+     r"(?P<host>{host})\s"

+     r"(?P<identity>{identity})\s"

+     r"(?P<user>{user})\s"

+     r"\[(?P<time>{time})\]\s"

      r'"(?P<method>{method})\s'

-     r'(?P<path>{path})(?:\?(?P<query>{query})){query_match}'

+     r"(?P<path>{path})(?:\?(?P<query>{query})){query_match}"

      r'\s(?P<protocol>{protocol})"\s'

-     r'(?P<status>{status})\s'

-     r'(?P<nbytes>{nbytes})\s'

+     r"(?P<status>{status})\s"

+     r"(?P<nbytes>{nbytes})\s"

      r'"(?P<referrer>{referrer})"\s'

      r'"(?P<user_agent>{user_agent})"\s*'

-     r'$'

+     r"$"

  )

  

  # Pattern for a HTTP header token, as per RFC7230.

  # Basically: all printable ASCII chars except '"(),/:;<=>?@[\]{}'

  # (see https://tools.ietf.org/html/rfc7230#section-3.2.6)

- HTTP_TOKEN_PATTERN=r"[\w\#$%^!&'*+.`|~-]+"

+ HTTP_TOKEN_PATTERN = r"[\w\#$%^!&'*+.`|~-]+"

  

  # Here's the default/fallback patterns for each field.

  # Note that all fields are non-zero width except query, which is optional,

  # and query_match, which should be '?', '', or '{0}', as described above.

  LOG_PATTERN_FIELDS = {

-     'host':       '\S+',

-     'identity':   '\S+',

-     'user':       '\S+',

-     'time':       '.+?',

-     'method':     HTTP_TOKEN_PATTERN,

-     'path':       '[^\s\?]+',

-     'query':      '\S*',

-     'query_match':'?',

-     'protocol':   'HTTP/\d\.\d',

-     'status':     '\d+',

-     'nbytes':     '\d+|-',

-     'referrer':   '[^"]+',

-     'user_agent': '.+?',

+     "host": "\\S+",

+     "identity": "\\S+",

+     "user": "\\S+",

+     "time": ".+?",

+     "method": HTTP_TOKEN_PATTERN,

+     "path": "[^\\s\\?]+",

+     "query": "\\S*",

+     "query_match": "?",

+     "protocol": "HTTP/\\d\\.\\d",

+     "status": "\\d+",

+     "nbytes": "\\d+|-",

+     "referrer": '[^"]+',

+     "user_agent": ".+?",

  }

  

  # A regex for libdnf/rpm-ostree user-agent strings.
@@ -105,21 +109,21 @@ 

  # For more info on the User-Agent header, see RFC7231, Section 5.5.3:

  #   https://tools.ietf.org/html/rfc7231#section-5.5.3)

  COUNTME_USER_AGENT_PATTERN = (

-     r'(?P<product>(libdnf|rpm-ostree)(?:/(?P<product_version>\S+))?)\s+'

-     r'\('

-       r'(?P<os_name>.*)\s'

-       r'(?P<os_version>[0-9a-z._-]*?);\s'

-       r'(?P<os_variant>[0-9a-z._-]*);\s'

-       r'(?P<os_canon>[\w./]+)\.'

-       r'(?P<os_arch>\w+)'

-     r'\)'

+     r"(?P<product>(?:libdnf|rpm-ostree)(?:/(?P<product_version>\S+))?)\s+"

+     r"\("

+     r"(?P<os_name>.*)\s"

+     r"(?P<os_version>[0-9a-z._-]*?);\s"

+     r"(?P<os_variant>[0-9a-z._-]*);\s"

+     r"(?P<os_canon>[\w./]+)\."

+     r"(?P<os_arch>\w+)"

+     r"\)"

  )

  COUNTME_USER_AGENT_RE = re.compile(COUNTME_USER_AGENT_PATTERN)

  LIBDNF_USER_AGENT_RE = re.compile(COUNTME_USER_AGENT_PATTERN)

  

- # Helper function for making compiled log-matching regexes.

+ 

  def compile_log_regex(flags=0, ascii=True, query_present=None, **kwargs):

-     '''

+     """

      Return a compiled re.Pattern object that should match lines in access_log,

      capturing each field (as listed in LOG_PATTERN_FIELDS) in its own group.

  
@@ -136,25 +140,26 @@ 

      target resource has a query string - i.e. query is required.

      If False, it only matches lines *without* a query string.

      If None (the default), the query string is optional.

-     '''

-     if ascii:

+     """

+     if ascii:  # pragma: no branch

          flags |= re.ASCII

  

-     fields      = LOG_PATTERN_FIELDS.copy()

+     fields = LOG_PATTERN_FIELDS.copy()

      fields.update(kwargs)

  

      if query_present is not None:

-         fields['query_match'] = '' if query_present else '{0}'

+         fields["query_match"] = "" if query_present else "{0}"

  

      pattern = LOG_PATTERN_FORMAT.format(**fields)

  

      return re.compile(pattern, flags=flags)

  

+ 

  # Default matcher that should match any access.log line

  LOG_RE = compile_log_regex()

  

  # Compiled pattern to match all mirrorlist/metalink hits, like mirrorlist.py

- MIRRORS_LOG_RE = compile_log_regex(path=r'/metalink|/mirrorlist')

+ MIRRORS_LOG_RE = compile_log_regex(path=r"/metalink|/mirrorlist")

  

  # Compiled pattern for countme lines.

  # We only count:
@@ -162,13 +167,13 @@ 

  #   * that have a query string containing "&countme=\d+",

  #   * with libdnf's User-Agent string (see above).

  COUNTME_LOG_RE = compile_log_regex(

-     method        = "GET|HEAD",

-     query_present = True,

-     path          = r'/metalink|/mirrorlist',

-     query         = r'\S+&countme=\d+\S*',

-     status        = r'200|302',

-     user_agent    = COUNTME_USER_AGENT_PATTERN,

+     method="GET|HEAD",

+     query_present=True,

+     path=r"/metalink|/mirrorlist",

+     query=r"\S+&countme=\d+\S*",

+     status=r"200|302",

+     user_agent=COUNTME_USER_AGENT_PATTERN,

  )

  

  # Regex for pulling the date out of a log line

- LOG_DATE_RE = compile_log_regex(time=r'(?P<date>[^:]+):.*?')

+ LOG_DATE_RE = compile_log_regex(time=r"(?P<date>[^:]+):.*?")

file added
+232
@@ -0,0 +1,232 @@ 

+ import datetime

+ from countme.progress import diyprog as Progress

+ from collections import Counter

+ from typing import NamedTuple

+ from countme import CountmeItem, weeknum, SQLiteWriter, SQLiteReader, CSVWriter

+ 

+ 

+ # NOTE: log timestamps do not move monotonically forward, but they don't

+ # seem to ever jump backwards more than 241 seconds. I assume this is

+ # some timeout that's set to 4 minutes, and the log entry shows up after

+ # expiry, or something. Anyway, what this means is that once we've seen

+ # a timestamp that's 241 seconds past the end of a week, we can assume that

+ # there will be no further entries whose timestamps belong to the previous

+ # week.

+ # We could probably watch the max jitter between log lines and adjust

+ # this if needed, but for now I'm just gonna pad it to 600 seconds.

+ # The difference between 241 and 600 is kind of irrelevant - since we get logs

+ # in 24-hour chunks, any window that extends into the next day means we have to

+ # wait 24 hours until we can be sure we have all the data for the previous

+ # week, so the effect would be the same if this was 3600 or 43200 or whatever.

+ # TODO: this should probably move into the module somewhere..

+ LOG_JITTER_WINDOW = 600

+ 

+ # Feb 11 2020 was the date that we branched F32 from Rawhide, so we've decided

+ # to use that as the starting week for countme data.

+ COUNTME_START_TIME = 1581292800  # =Mon Feb 10 00:00:00 2020 (UTC)

+ COUNTME_START_WEEKNUM = 2614

+ 

+ DAY_LEN = 24 * 60 * 60

+ WEEK_LEN = 7 * DAY_LEN

+ COUNTME_EPOCH = 345600  # =00:00:00 Mon Jan 5 00:00:00 1970 (UTC)

+ 

+ # And here's how you convert a weeknum to a human-readable date

+ COUNTME_EPOCH_ORDINAL = 719167

+ 

+ 

+ def weekdate(weeknum, weekday=0):

+     if weekday < 0 or weekday > 6:

+         raise ValueError("weekday must be between 0 (Mon) and 6 (Sun)")

+     ordinal = COUNTME_EPOCH_ORDINAL + 7 * int(weeknum) + weekday

+     return datetime.date.fromordinal(ordinal)

+ 

+ 

+ def daterange(weeknum):

+     return weekdate(weeknum, 0), weekdate(weeknum, 6)

+ 

+ 

+ # ===========================================================================

+ # ====== Count Buckets & Items ==============================================

+ # ===========================================================================

+ 

+ 

+ class CountBucket(NamedTuple):

+     weeknum: str  # this is a query

+     os_name: str

+     os_version: str

+     os_variant: str

+     os_arch: str

+     sys_age: str  # this is a key

+     repo_tag: str

+     repo_arch: str

+ 

+     @classmethod

+     def from_item(cls, item):

+         return cls._make((weeknum(item.timestamp),) + item[2:])

+ 

+ 

+ BucketSelect = CountBucket(

+     weeknum=f"((timestamp-{COUNTME_EPOCH})/{WEEK_LEN}) as weeknum",

+     os_name="os_name",

+     os_version="os_version",

+     os_variant="os_variant",

+     os_arch="os_arch",

+     sys_age="sys_age",

+     repo_tag="repo_tag",

+     repo_arch="repo_arch",

+ )

+ 

+ 

+ class TotalsItem(NamedTuple):

+     hits: int

+     weeknum: str  # this is a query

+     os_name: str

+     os_version: str

+     os_variant: str

+     os_arch: str

+     sys_age: str  # this is a key

+     repo_tag: str

+     repo_arch: str

+ 

+     @classmethod

+     def from_item(cls, item):

+         return cls._make((weeknum(item.timestamp),) + item[2:])

+ 

+ 

+ TotalsItem.__doc__ = """TotalsItem is CountBucket with a "hits" count on the front."""

+ 

+ 

+ class CSVCountItem(NamedTuple):

+     """

+     Represents one row in a countme_totals.csv file.

+     In the interest of human-readability, we replace 'weeknum' with the

+     start and end dates of that week.

+     """

+ 

+     week_start: str

+     week_end: str

+     hits: int

+     os_name: str

+     os_version: str

+     os_variant: str

+     os_arch: str

+     sys_age: int

+     repo_tag: str

+     repo_arch: str

+ 

+     @classmethod

+     def from_totalitem(cls, item):

+         """Use this method to convert a CountItem to a CSVCountItem."""

+         hits, weeknum, *rest = item

+         week_start, week_end = daterange(weeknum)

+         return cls._make([week_start, week_end, hits] + rest)

+ 

+ 

+ # ===========================================================================

+ # ====== SQL + Progress helpers =============================================

+ # ===========================================================================

+ 

+ 

+ class RawDB(SQLiteReader):

+     def __init__(self, fp, **kwargs):

+         super().__init__(fp, CountmeItem, tablename="countme_raw", **kwargs)

+ 

+     def _minmax(self, column):

+         cur = self._con.execute(f"SELECT min({column}),max({column}) FROM {self._tablename}")

+         return cur.fetchone()

+ 

+     def complete_weeks(self):

+         """Return a range(startweek, provweek) that covers (valid + complete)

+         weeknums contained in this database. The database may contain some

+         data for `provweek`, but since it's provisional/incomplete it's

+         outside the range."""

+         # startweek can't be earlier than the first week of data

+         startweek = max(weeknum(self.mintime()), COUNTME_START_WEEKNUM)

+         # A week is provisional until the LOG_JITTER_WINDOW expires, so once

+         # tsmax minus LOG_JITTER_WINDOW ticks over into a new weeknum, that

+         # weeknum is the provisional one. So...

+         provweek = weeknum(self.maxtime() - LOG_JITTER_WINDOW)

+         return range(startweek, provweek)

+ 

+     def week_count(self, weeknum):

+         start_ts = weeknum * WEEK_LEN + COUNTME_EPOCH

+         end_ts = start_ts + WEEK_LEN

+         cur = self._con.execute(

+             f"SELECT COUNT(*)"

+             f" FROM {self._tablename}"

+             f" WHERE timestamp >= {start_ts} AND timestamp < {end_ts}"

+         )

+         return cur.fetchone()[0]

+ 

+     def week_iter(self, weeknum, select="*"):

+         if isinstance(select, (tuple, list)):

+             item_select = ",".join(select)

+         elif isinstance(select, str):

+             item_select = select

+         else:

+             raise ValueError(f"select should be a string or tuple, not {select.__class__.__name__}")

+         start_ts = weeknum * WEEK_LEN + COUNTME_EPOCH

+         end_ts = start_ts + WEEK_LEN

+         return self._con.execute(

+             f"SELECT {item_select}"

+             f" FROM {self._tablename}"

+             f" WHERE timestamp >= {start_ts} AND timestamp < {end_ts}"

+         )

+ 

+ 

+ def totals(args):

+     # Initialize the writer (better to fail early..)

+     totals = SQLiteWriter(

+         args.countme_totals, TotalsItem, timefield="weeknum", tablename="countme_totals"

+     )

+     totals.write_header()

+ 

+     # Are we doing an update?

+     if args.countme_raw:

+         rawdb = RawDB(args.countme_raw)

+ 

+         # Check to see if there's any new weeks to get data for

+         complete_weeks = sorted(rawdb.complete_weeks())

+         newest_totals = totals.maxtime() or -1

+         new_weeks = [w for w in complete_weeks if w > int(newest_totals)]

+ 

+         # Count week by week

+         for week in new_weeks:

+ 

+             # Set up a progress meter and counter

+             mon, sun = daterange(week)

+             desc = f"week {week} ({mon} -- {sun})"

+             total = rawdb.week_count(week)

+             prog = Progress(

+                 total=total,

+                 desc=desc,

+                 disable=True if not args.progress else None,

+                 unit="row",

+                 unit_scale=False,

+             )

+             hitcount = Counter()

+ 

+             # Select raw items into their buckets and count 'em up

+             for bucket in rawdb.week_iter(week, select=BucketSelect):

+                 hitcount[bucket] += 1

+                 prog.update()

+ 

+             # Write the resulting totals into countme_totals

+             totals.write_items((hits,) + bucket for bucket, hits in hitcount.items())

+             prog.close()

+ 

+         # Oh and make sure we index them by time.

+         totals.write_index()

+ 

+     # Was a CSV dump requested?

+     if args.csv_dump:

+         totalreader = SQLiteReader(

+             args.countme_totals,

+             TotalsItem,

+             timefield="weeknum",

+             tablename="countme_totals",

+         )

+         writer = CSVWriter(args.csv_dump, CSVCountItem, timefield="week_start")

+         writer.write_header()

+         for item in totalreader:

+             writer.write_item(CSVCountItem.from_totalitem(item))

file modified
+2 -2
@@ -1,2 +1,2 @@ 

- __version__ = "0.0.4"

- __version_info__ = tuple(__version__.split('.'))

+ __version__ = "0.0.7"

+ __version_info__ = tuple(int(x) for x in __version__.split("."))

file removed
-64
@@ -1,64 +0,0 @@ 

- %global srcname mirrors-countme

- %global pkgname mirrors_countme

- %global modname countme

- 

- Name:    python-%{srcname}

- Version: 0.0.4

- Release: 1%{?dist}

- Summary: access_log parsing & host counting for DNF mirrors

- URL:     https://pagure.io/mirrors-countme

- License: GPLv3+

- Source0: https://pagure.io/%{srcname}/archive/%{version}/%{srcname}-%{version}.tar.gz

- BuildArch: noarch

- # Not quite sure what minimum sqlite version we need, but scripts use

- # /usr/bin/sqlite3 and the python module is "sqlite3", so...

- Requires: sqlite >= 3.0.0

- 

- %global _description %{expand:

- A python module and scripts for parsing httpd access_log to find requests

- including `countme=N`, parse the data included with those requests, and

- compile weekly counts of DNF clients broken out by OS name, OS version,

- system arch, etc.}

- 

- # This is for the toplevel metapackage.

- %description %_description

- 

- 

- # This section defines the python3-mirrors-countme subpackage.

- %package -n python3-%{srcname}

- Summary: %{summary}

- BuildRequires: python3-devel python3-setuptools

- #Recommends: python3-%%{srcname}+fancy_progress