From 5e8157056cae39444f6b554fc49e4e3be2d32743 Mon Sep 17 00:00:00 2001 From: Stephen Smoogen Date: Feb 16 2023 17:36:53 +0000 Subject: fix merge --- diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..b4750b1 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,12 @@ +[run] +branch = True +source = countme +omit = countme/version.py + +[report] +precision = 2 +#fail_under = 99 +exclude_lines = + pragma: no cover + def __repr__ +show_missing = True diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..ebbbfe0 --- /dev/null +++ b/.flake8 @@ -0,0 +1,6 @@ +[flake8] +show_source = True +max_line_length = 100 +ignore = E203,W503 +exclude = .git,.tox,dist,*egg,build,files + diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..79b2f75 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +/*.egg-info/ +**/__pycache__/ + +/.coverage +/coverage.xml +/htmlcov + diff --git a/.zuul.yaml b/.zuul.yaml new file mode 100644 index 0000000..f86577b --- /dev/null +++ b/.zuul.yaml @@ -0,0 +1,12 @@ +--- +- job: + name: tox-f33 + run: ci/tox.yaml + nodeset: + nodes: + name: test-node + label: pod-python-f33 +- project: + check: + jobs: + - tox-f33 diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..9ca4a89 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,3 @@ +exclude .zuul.yaml +prune ci +prune test_data diff --git a/ci/tox.yaml b/ci/tox.yaml new file mode 100644 index 0000000..69694ff --- /dev/null +++ b/ci/tox.yaml @@ -0,0 +1,15 @@ +--- +- hosts: all + tasks: + - name: List project directory on the test system + command: ls -al {{ansible_user_dir}}/{{zuul.project.src_dir}} + - name: install dependencies + become: yes + package: + name: + - python3-tox + state: present + - name: run pytest + command: + chdir: '{{ansible_user_dir}}/{{zuul.project.src_dir}}' + cmd: python -m tox diff --git a/countme-totals.py b/countme-totals.py index c587569..c8ec1e3 100755 --- a/countme-totals.py +++ b/countme-totals.py @@ -1,245 +1,49 @@ #!/usr/bin/python3 -import sys -import sqlite3 import argparse -import datetime -from collections import Counter -from typing import NamedTuple -from countme import CountmeItem, weeknum, SQLiteWriter, SQLiteReader, CSVWriter +from countme.totals import totals -# NOTE: log timestamps do not move monotonically forward, but they don't -# seem to ever jump backwards more than 241 seconds. I assume this is -# some timeout that's set to 4 minutes, and the log entry shows up after -# expiry, or something. Anyway, what this means is that once we've seen -# a timestamp that's 241 seconds past the end of a week, we can assume that -# there will be no further entries whose timestamps belong to the previous -# week. -# We could probably watch the max jitter between log lines and adjust -# this if needed, but for now I'm just gonna pad it to 600 seconds. -# The difference between 241 and 600 is kind of irrelevant - since we get logs -# in 24-hour chunks, any window that extends into the next day means we have to -# wait 24 hours until we can be sure we have all the data for the previous -# week, so the effect would be the same if this was 3600 or 43200 or whatever. -# TODO: this should probably move into the module somewhere.. -LOG_JITTER_WINDOW = 600 - -# Feb 11 2020 was the date that we branched F32 from Rawhide, so we've decided -# to use that as the starting week for countme data. -COUNTME_START_TIME=1581292800 # =Mon Feb 10 00:00:00 2020 (UTC) -COUNTME_START_WEEKNUM=2614 - -DAY_LEN = 24*60*60 -WEEK_LEN = 7*DAY_LEN -COUNTME_EPOCH = 345600 # =00:00:00 Mon Jan 5 00:00:00 1970 (UTC) - -# And here's how you convert a weeknum to a human-readable date -COUNTME_EPOCH_ORDINAL=719167 -def weekdate(weeknum, weekday=0): - if weekday < 0 or weekday > 6: - raise ValueError("weekday must be between 0 (Mon) and 6 (Sun)") - ordinal = COUNTME_EPOCH_ORDINAL + 7*weeknum + weekday - return datetime.date.fromordinal(ordinal) - -def daterange(weeknum): - return weekdate(weeknum,0), weekdate(weeknum,6) # =========================================================================== -# ====== Count Buckets & Items ============================================== +# ====== CLI parser & __main__ ============================================== # =========================================================================== -class CountBucket(NamedTuple): - weeknum: int - os_name: str - os_version: str - os_variant: str - os_arch: str - sys_age: int - repo_tag: str - repo_arch: str - - @classmethod - def from_item(cls, item): - return cls._make((weeknum(item.timestamp),) + item[2:]) - -BucketSelect = CountBucket( - weeknum = f"((timestamp-{COUNTME_EPOCH})/{WEEK_LEN}) as weeknum", - os_name = "os_name", - os_version = "os_version", - os_variant = "os_variant", - os_arch = "os_arch", - sys_age = "sys_age", - repo_tag = "repo_tag", - repo_arch = "repo_arch" -) - -TotalsItem = NamedTuple("TotalsItem", [("hits", int)] + list(CountBucket.__annotations__.items())) -TotalsItem.__doc__ = '''TotalsItem is CountBucket with a "hits" count on the front.''' - -class CSVCountItem(NamedTuple): - ''' - Represents one row in a countme_totals.csv file. - In the interest of human-readability, we replace 'weeknum' with the - start and end dates of that week. - ''' - week_start: str - week_end: str - hits: int - os_name: str - os_version: str - os_variant: str - os_arch: str - sys_age: int - repo_tag: str - repo_arch: str - - @classmethod - def from_totalitem(cls, item): - '''Use this method to convert a CountItem to a CSVCountItem.''' - hits, weeknum, *rest = item - week_start, week_end = daterange(weeknum) - return cls._make([week_start, week_end, hits] + rest) - -# =========================================================================== -# ====== SQL + Progress helpers ============================================= -# =========================================================================== - -class RawDB(SQLiteReader): - def __init__(self, fp, **kwargs): - super().__init__(fp, CountmeItem, tablename='countme_raw', **kwargs) - - def _minmax(self, column): - cur = self._con.execute(f"SELECT min({column}),max({column}) FROM {self._tablename}") - return cur.fetchone() - - def complete_weeks(self): - '''Return a range(startweek, provweek) that covers (valid + complete) - weeknums contained in this database. The database may contain some - data for `provweek`, but since it's provisional/incomplete it's - outside the range.''' - # startweek can't be earlier than the first week of data - startweek = max(weeknum(self.mintime()), COUNTME_START_WEEKNUM) - # A week is provisional until the LOG_JITTER_WINDOW expires, so once - # tsmax minus LOG_JITTER_WINDOW ticks over into a new weeknum, that - # weeknum is the provisional one. So... - provweek = weeknum(self.maxtime() - LOG_JITTER_WINDOW) - return range(startweek, provweek) - - def week_count(self, weeknum): - start_ts = weeknum*WEEK_LEN+COUNTME_EPOCH - end_ts = start_ts + WEEK_LEN - cur = self._con.execute( - f"SELECT COUNT(*)" - f" FROM {self._tablename}" - f" WHERE timestamp >= {start_ts} AND timestamp < {end_ts}") - return cur.fetchone()[0] - - def week_iter(self, weeknum, select='*'): - if isinstance(select, (tuple, list)): - item_select = ','.join(select) - elif isinstance(select, str): - item_select = select - else: - raise ValueError(f"select should be a string or tuple, not {select.__class__.__name__}") - start_ts = weeknum*WEEK_LEN+COUNTME_EPOCH - end_ts = start_ts + WEEK_LEN - return self._con.execute( - f"SELECT {item_select}" - f" FROM {self._tablename}" - f" WHERE timestamp >= {start_ts} AND timestamp < {end_ts}") - -try: - from tqdm import tqdm as Progress -except ImportError: - from countme.progress import diyprog as Progress - - - -# =========================================================================== -# ====== CLI parser & main() ================================================ -# =========================================================================== def parse_args(argv=None): p = argparse.ArgumentParser( - description = "Aggregate 'countme' log records to weekly totals.", + description="Aggregate 'countme' log records to weekly totals.", ) - p.add_argument("-V", "--version", action='version', - version='%(prog)s 0.0.1') + p.add_argument("-V", "--version", action="version", version="%(prog)s 0.0.1") - p.add_argument("countme_totals", - help="Database containing countme_totals") + p.add_argument("countme_totals", help="Database containing countme_totals") - p.add_argument("--update-from", - metavar="COUNTME_RAW_DB", dest="countme_raw", - help="Update totals from raw data (from ./parse-access-log.py)") + p.add_argument( + "--update-from", + metavar="COUNTME_RAW_DB", + dest="countme_raw", + help="Update totals from raw data (from ./parse-access-log.py)", + ) - p.add_argument("--csv-dump", - type=argparse.FileType('wt', encoding='utf-8'), - help="File to dump CSV-formatted totals data") + p.add_argument( + "--csv-dump", + type=argparse.FileType("wt", encoding="utf-8"), + help="File to dump CSV-formatted totals data", + ) - p.add_argument("--progress", action="store_true", - help="Show progress while reading and counting data.") + p.add_argument( + "--progress", + action="store_true", + help="Show progress while reading and counting data.", + ) args = p.parse_args(argv) return args -def main(): - args = parse_args() - - # Initialize the writer (better to fail early..) - totals = SQLiteWriter(args.countme_totals, TotalsItem, - timefield='weeknum', - tablename='countme_totals') - totals.write_header() - - # Are we doing an update? - if args.countme_raw: - rawdb = RawDB(args.countme_raw) - - # Check to see if there's any new weeks to get data for - complete_weeks = sorted(rawdb.complete_weeks()) - newest_totals = totals.maxtime() or -1 - new_weeks = [w for w in complete_weeks if w > newest_totals] - - # Count week by week - for week in new_weeks: - - # Set up a progress meter and counter - mon, sun = daterange(week) - desc = f"week {week} ({mon} -- {sun})" - total = rawdb.week_count(week) - prog = Progress(total=total, desc=desc, disable=True if not args.progress else None, unit="row", unit_scale=False) - hitcount = Counter() - - # Select raw items into their buckets and count 'em up - for bucket in rawdb.week_iter(week, select=BucketSelect): - hitcount[bucket] += 1 - prog.update() - - # Write the resulting totals into countme_totals - totals.write_items((hits,)+bucket for bucket,hits in hitcount.items()) - prog.close() - - # Oh and make sure we index them by time. - totals.write_index() - - - # Was a CSV dump requested? - if args.csv_dump: - totalreader = SQLiteReader(args.countme_totals, TotalsItem, - timefield='weeknum', - tablename='countme_totals') - writer = CSVWriter(args.csv_dump, CSVCountItem, - timefield='week_start') - writer.write_header() - for item in totalreader: - writer.write_item(CSVCountItem.from_totalitem(item)) - - -if __name__ == '__main__': +if __name__ == "__main__": try: - main() + args = parse_args() + totals(args) except KeyboardInterrupt: - raise SystemExit(3) # You know, 3, like 'C', like Ctrl-C! + raise SystemExit(3) # You know, 3, like 'C', like Ctrl-C! diff --git a/countme/__init__.py b/countme/__init__.py index b8da1bd..3452d7d 100644 --- a/countme/__init__.py +++ b/countme/__init__.py @@ -25,44 +25,65 @@ # TODO: this should probably get cleaned up? __all__ = ( - 'weeknum', 'parse_logtime', 'parse_querydict', - - 'ItemWriter', 'CSVWriter', 'JSONWriter', 'AWKWriter', 'SQLiteWriter', - 'ItemReader', 'CSVReader', 'SQLiteReader', - - 'make_writer', 'guessreader', 'autoreader', - - 'LogItem', 'MirrorItem', 'CountmeItem', - 'LogMatcher', 'MirrorMatcher', 'CountmeMatcher', + "weeknum", + "parse_logtime", + "parse_querydict", + "ItemWriter", + "CSVWriter", + "JSONWriter", + "AWKWriter", + "SQLiteWriter", + "ItemReader", + "CSVReader", + "SQLiteReader", + "make_writer", + "guessreader", + "autoreader", + "LogItem", + "MirrorItem", + "CountmeItem", + "LogMatcher", + "MirrorMatcher", + "CountmeMatcher", ) -import os -import re -from datetime import date, time, datetime, timezone, timedelta +from datetime import datetime, timezone, timedelta from urllib.parse import parse_qsl -from typing import NamedTuple, Optional +from typing import NamedTuple, Optional, Type, Union from .regex import COUNTME_LOG_RE, MIRRORS_LOG_RE -from .version import __version__, __version_info__ # =========================================================================== # ====== Output item definitions and helpers ================================ # =========================================================================== -DAY_LEN = 24*60*60 -WEEK_LEN = 7*DAY_LEN +DAY_LEN = 24 * 60 * 60 +WEEK_LEN = 7 * DAY_LEN COUNTME_EPOCH = 345600 # =00:00:00 Mon Jan 5 00:00:00 1970 (UTC) MONTHIDX = { - 'Jan':1, 'Feb':2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, - 'Jul':7, 'Aug':8, 'Sep':9, 'Oct':10, 'Nov':11, 'Dec':12 + "Jan": 1, + "Feb": 2, + "Mar": 3, + "Apr": 4, + "May": 5, + "Jun": 6, + "Jul": 7, + "Aug": 8, + "Sep": 9, + "Oct": 10, + "Nov": 11, + "Dec": 12, } + def weeknum(timestamp): return (int(timestamp) - COUNTME_EPOCH) // WEEK_LEN + def strptime_logtime(logtime): return datetime.strptime(logtime, "%d/%b/%Y:%H:%M:%S %z") + def logtime_to_isoformat(logtime): # logtime: '29/Mar/2020:16:04:28 +0000' # ISO8601: '2020-03-29T16:04:28+00:00' @@ -74,13 +95,15 @@ def logtime_to_isoformat(logtime): offm = logtime[24:26] return f"{y}-{m:02}-{d}T{time}{offh}:{offm}" + def offset_to_timezone(offset): - '''Convert a UTC offset like -0400 to a datetime.timezone instance''' - offmin = 60*int(offset[1:3]) + int(offset[3:5]) - if offset[0] == '-': + """Convert a UTC offset like -0400 to a datetime.timezone instance""" + offmin = 60 * int(offset[1:3]) + int(offset[3:5]) + if offset[0] == "-": offmin = -offmin return timezone(timedelta(minutes=offmin)) + def parse_logtime(logtime): # Equivalent to - but faster than - strptime_logtime. # It's like ~1.5usec vs 11usec, which might seem trivial but in my tests @@ -89,21 +112,25 @@ def parse_logtime(logtime): # (btw, slicing logtime by hand and using re.split are both marginally # slower. datetime.fromisoformat is slightly faster but not available # in Python 3.6 or earlier.) - dt, off = logtime.split(' ',1) - date, hour, minute, second = dt.split(':',3) - day, month, year = date.split('/',2) - tz = timezone.utc if off in {"+0000","-0000"} else offset_to_timezone(off) - return datetime(int(year), MONTHIDX[month], int(day), - int(hour), int(minute), int(second), 0, tz) + dt, off = logtime.split(" ", 1) + date, hour, minute, second = dt.split(":", 3) + day, month, year = date.split("/", 2) + tz = timezone.utc if off in {"+0000", "-0000"} else offset_to_timezone(off) + return datetime( + int(year), MONTHIDX[month], int(day), int(hour), int(minute), int(second), 0, tz + ) + def parse_querydict(querystr): - '''Parse request query the way mirrormanager does (last value wins)''' + """Parse request query the way mirrormanager does (last value wins)""" return dict(parse_qsl(querystr, seperator="&")) + class LogItem(NamedTuple): - ''' + """ Generic access.log data holder. - ''' + """ + host: str identity: str time: str @@ -128,26 +155,30 @@ class LogItem(NamedTuple): def querydict(self): return parse_querydict(self.query) + # TODO: would be kinda nice if there was a clear subclass / translation # between item classes... or if compile_log_regex made the class for you? # Or something? It feels like these things should be more closely bound. class MirrorItem(NamedTuple): - ''' + """ A basic mirrorlist/metalink metadata item. Each item has a timestamp, IP, and the requested repo= and arch= values. - ''' + """ + timestamp: int host: str repo_tag: Optional[str] repo_arch: Optional[str] + class CountmeItem(NamedTuple): - ''' + """ A "countme" match item. Includes the countme value and libdnf User-Agent fields. - ''' + """ + timestamp: int host: str os_name: str @@ -160,11 +191,14 @@ class CountmeItem(NamedTuple): class LogMatcher: - '''Base class for a LogMatcher, which iterates through a log file''' + """Base class for a LogMatcher, which iterates through a log file""" + regex = NotImplemented - itemtuple = NotImplemented + itemtuple: Union[Type[MirrorItem], Type[CountmeItem]] + def __init__(self, fileobj): self.fileobj = fileobj + def iteritems(self): # TODO: at this point we're single-threaded and CPU-bound; # multithreading would speed things up here. @@ -172,94 +206,126 @@ class LogMatcher: match = self.regex.match(line) if match: yield self.make_item(match) + __iter__ = iteritems + @classmethod def make_item(cls, match): raise NotImplementedError + class MirrorMatcher(LogMatcher): - '''Match all mirrorlist/metalink items, like mirrorlist.py does.''' + """Match all mirrorlist/metalink items, like mirrorlist.py does.""" + regex = MIRRORS_LOG_RE itemtuple = MirrorItem + @classmethod def make_item(cls, match): - timestamp = parse_logtime(match['time']).timestamp() - query = parse_querydict(match['query']) - return cls.itemtuple(timestamp = int(timestamp), - host = match['host'], - repo_tag = query.get('repo'), - repo_arch = query.get('arch')) + timestamp = parse_logtime(match["time"]).timestamp() + query = parse_querydict(match["query"]) + return cls.itemtuple( + timestamp=int(timestamp), + host=match["host"], + repo_tag=query.get("repo"), + repo_arch=query.get("arch"), + ) + class CountmeMatcher(LogMatcher): - '''Match the libdnf-style "countme" requests.''' + """Match the libdnf-style "countme" requests.""" + regex = COUNTME_LOG_RE itemtuple = CountmeItem + @classmethod def make_item(cls, match): - timestamp = parse_logtime(match['time']).timestamp() - query = parse_querydict(match['query']) - return cls.itemtuple(timestamp = int(timestamp), - host = match['host'], - os_name = match['os_name'], - os_version = match['os_version'], - os_variant = match['os_variant'], - os_arch = match['os_arch'], - sys_age = int(query.get('countme')), - repo_tag = query.get('repo'), - repo_arch = query.get('arch')) + timestamp = parse_logtime(match["time"]).timestamp() + query = parse_querydict(match["query"]) + return cls.itemtuple( + timestamp=int(timestamp), + host=match["host"], + os_name=match["os_name"], + os_version=match["os_version"], + os_variant=match["os_variant"], + os_arch=match["os_arch"], + sys_age=int(query.get("countme")), + repo_tag=query.get("repo"), + repo_arch=query.get("arch"), + ) + # =========================================================================== # ====== ItemWriters - output formatting classes ============================ # =========================================================================== + class ItemWriter: - def __init__(self, fp, itemtuple, timefield='timestamp', **kwargs): + def __init__(self, fp, itemtuple, timefield="timestamp", **kwargs): self._fp = fp self._itemtuple = itemtuple self._fields = itemtuple._fields assert timefield in self._fields, f"{itemtuple.__name__!r} has no time field {timefield!r}" self._timefield = timefield self._get_writer(**kwargs) + def _get_writer(self, **kwargs): raise NotImplementedError + def write_item(self, item): raise NotImplementedError + def write_items(self, items): for item in items: self.write_item(item) + def write_header(self): pass + def write_index(self): pass + class JSONWriter(ItemWriter): def _get_writer(self, **kwargs): import json + self._dump = json.dump + def write_item(self, item): self._dump(item._asdict(), self._fp) + class CSVWriter(ItemWriter): def _get_writer(self, **kwargs): import csv + self._writer = csv.writer(self._fp) + def write_header(self): self._writer.writerow(self._fields) + def write_item(self, item): self._writer.writerow(item) + class AWKWriter(ItemWriter): - def _get_writer(self, field_separator='\t', **kwargs): + def _get_writer(self, field_separator="\t", **kwargs): self._fieldsep = field_separator + def _write_row(self, vals): - self._fp.write(self._fieldsep.join(str(v) for v in vals) + '\n') + self._fp.write(self._fieldsep.join(str(v) for v in vals) + "\n") + def write_header(self): self._write_row(self._fields) + def write_item(self, item): self._write_row(item) + class SQLiteWriter(ItemWriter): - '''Write each item as a new row in a SQLite database table.''' + """Write each item as a new row in a SQLite database table.""" + # We have to get a little fancier with types here since SQL tables expect # typed values. Good thing Python has types now, eh? SQL_TYPE = { @@ -272,11 +338,14 @@ class SQLiteWriter(ItemWriter): Optional[float]: "REAL", Optional[bytes]: "BLOB", } + def _sqltype(self, fieldname): typehint = self._itemtuple.__annotations__[fieldname] return self.SQL_TYPE.get(typehint, "TEXT") - def _get_writer(self, tablename='countme_raw', **kwargs): + + def _get_writer(self, tablename="countme_raw", **kwargs): import sqlite3 + if hasattr(self._fp, "name"): filename = self._fp.name else: @@ -288,52 +357,55 @@ class SQLiteWriter(ItemWriter): # Generate SQL commands so we can use them later. # self._create_table creates the table, with column names and types # matching the names and types of the fields in self._itemtuple. - self._create_table = ( - "CREATE TABLE IF NOT EXISTS {table} ({coldefs})".format( - table=tablename, - coldefs=",".join(f"{f} {self._sqltype(f)}" for f in self._fields), - ) + self._create_table = "CREATE TABLE IF NOT EXISTS {table} ({coldefs})".format( + table=tablename, + coldefs=",".join(f"{f} {self._sqltype(f)}" for f in self._fields), ) # self._insert_item is an "INSERT" command with '?' placeholders. - self._insert_item = ( - "INSERT INTO {table} ({colnames}) VALUES ({colvals})".format( - table=tablename, - colnames=",".join(self._fields), - colvals=",".join("?" for f in self._fields), - ) + self._insert_item = "INSERT INTO {table} ({colnames}) VALUES ({colvals})".format( + table=tablename, + colnames=",".join(self._fields), + colvals=",".join("?" for f in self._fields), ) # self._create_time_index creates an index on 'timestamp' or whatever # the time-series field is. self._create_time_index = ( "CREATE INDEX IF NOT EXISTS {timefield}_idx on {table} ({timefield})".format( - table=tablename, - timefield=self._timefield + table=tablename, timefield=self._timefield ) ) + def write_header(self): self._cur.execute(self._create_table) + def write_item(self, item): self._cur.execute(self._insert_item, item) + def write_items(self, items): with self._con: self._con.executemany(self._insert_item, items) + def write_index(self): self._cur.execute(self._create_time_index) self._con.commit() + def has_item(self, item): - '''Return True if a row matching `item` exists in this database.''' + """Return True if a row matching `item` exists in this database.""" condition = " AND ".join(f"{field}=?" for field in self._fields) - cur = self._cur.execute(f"SELECT COUNT(*) FROM {self._tablename} WHERE {condition}",item) + cur = self._cur.execute(f"SELECT COUNT(*) FROM {self._tablename} WHERE {condition}", item) return bool(cur.fetchone()[0]) + def mintime(self): cur = self._cur.execute(f"SELECT MIN({self._timefield}) FROM {self._tablename}") return cur.fetchone()[0] + def maxtime(self): cur = self._cur.execute(f"SELECT MAX({self._timefield}) FROM {self._tablename}") return cur.fetchone()[0] + def make_writer(name, *args, **kwargs): - '''Convenience function to grab/instantiate the right writer''' + """Convenience function to grab/instantiate the right writer""" if name == "csv": writer = CSVWriter elif name == "json": @@ -346,13 +418,16 @@ def make_writer(name, *args, **kwargs): raise ValueError(f"Unknown writer '{name}'") return writer(*args, **kwargs) + # =========================================================================== # ====== ItemReaders - counterpart to ItemWriter ============================ # =========================================================================== + class ReaderError(RuntimeError): pass + class ItemReader: def __init__(self, fp, itemtuple, **kwargs): self._fp = fp @@ -365,88 +440,112 @@ class ItemReader: raise ReaderError("no field names found") if filefields != self._itemfields: raise ReaderError(f"field mismatch: expected {self._itemfields}, got {filefields}") + @property def fields(self): return self._itemfields + def _get_reader(self): - '''Set up the ItemReader.''' + """Set up the ItemReader.""" raise NotImplementedError + def _get_fields(self): - '''Called immediately after _get_reader(). - Should return a tuple of the fieldnames found in self._fp.''' + """Called immediately after _get_reader(). + Should return a tuple of the fieldnames found in self._fp.""" raise NotImplementedError + def _iter_rows(self): - '''Return an iterator/generator that produces a row for each item.''' + """Return an iterator/generator that produces a row for each item.""" raise NotImplementedError + def _find_item(self, item): - '''Return True if the given item is in this file''' + """Return True if the given item is in this file""" raise NotImplementedError + def __iter__(self): for item in self._iter_rows(): yield self._itemfactory(item) + def __contains__(self, item): return self._find_item(item) + class CSVReader(ItemReader): def _get_reader(self, **kwargs): import csv + self._reader = csv.reader(self._fp) + def _get_fields(self): filefields = tuple(next(self._reader)) # Sanity check: if any fieldname is a number... this isn't a header if any(name.isnumeric() for name in filefields): - header = ','.join(filefields) + header = ",".join(filefields) raise ReaderError(f"header bad/missing: expected {self._itemfields}, got {header!r}") return filefields + def _iter_rows(self): return self._reader + def _dup(self): # This is pretty gross, but then, so's CSV - return self.__class__(open(self._fp.name, 'rt'), self._itemtuple) + return self.__class__(open(self._fp.name, "rt"), self._itemtuple) + def _find_item(self, item): stritem = self._itemfactory(str(v) for v in item) - return (stritem in self._dup()) # O(n) worst case. Again: gross. + return stritem in self._dup() # O(n) worst case. Again: gross. + class AWKReader(CSVReader): - def _get_reader(self, field_separator='\t', **kwargs): + def _get_reader(self, field_separator="\t", **kwargs): self._reader = (line.split(field_separator) for line in self._fp) + class JSONReader(CSVReader): def _get_reader(self, **kwargs): import json + self._reader = (json.loads(line) for line in self._fp) + class SQLiteReader(ItemReader): - def _get_reader(self, tablename='countme_raw', timefield="timestamp", **kwargs): + def _get_reader(self, tablename="countme_raw", timefield="timestamp", **kwargs): import sqlite3 + if hasattr(self._fp, "name"): filename = self._fp.name else: filename = self._fp - #self._con = sqlite3.connect(f"file:{filename}?mode=ro", uri=True) + # self._con = sqlite3.connect(f"file:{filename}?mode=ro", uri=True) self._con = sqlite3.connect(filename) self._cur = self._con.cursor() self._tablename = tablename self._timefield = timefield self._filename = filename + def _get_fields(self): fields_sql = f"PRAGMA table_info('{self._tablename}')" filefields = tuple(r[1] for r in self._cur.execute(fields_sql)) return filefields + def _find_item(self, item): condition = " AND ".join(f"{field}=?" for field in self.fields) - self._cur.execute(f"SELECT COUNT(*) FROM {self._tablename} WHERE {condition}",item) + self._cur.execute(f"SELECT COUNT(*) FROM {self._tablename} WHERE {condition}", item) return bool(self._cur.fetchone()[0]) + def _iter_rows(self): fields = ",".join(self._itemfields) return self._cur.execute(f"SELECT {fields} FROM {self._tablename}") + def mintime(self): cur = self._cur.execute(f"SELECT MIN({self._timefield}) FROM {self._tablename}") return cur.fetchone()[0] + def maxtime(self): cur = self._cur.execute(f"SELECT MAX({self._timefield}) FROM {self._tablename}") return cur.fetchone()[0] + # Guess the right reader based on the filename. def guessreader(fp): if fp.name.endswith(".csv"): @@ -459,9 +558,9 @@ def guessreader(fp): reader = None return reader + # TODO: should have name/args more like make_writer... def autoreader(fp, itemtuple, **kwargs): - '''Convenience function to guess & instantiate the right writer''' + """Convenience function to guess & instantiate the right writer""" reader = guessreader(fp) return reader(fp, itemtuple, **kwargs) - diff --git a/countme/parse.py b/countme/parse.py new file mode 100644 index 0000000..1fcf5f2 --- /dev/null +++ b/countme/parse.py @@ -0,0 +1,61 @@ +from contextlib import contextmanager +from pathlib import Path +from tempfile import NamedTemporaryFile +from typing import Iterator, Union + +from countme.progress import ReadProgress + + +@contextmanager +def pre_process(filepath: Union[str, Path]) -> Iterator[str]: + filepath = Path(filepath) + with NamedTemporaryFile( + prefix=f"mirrors-countme-{filepath.name}-", + suffix=".preprocessed", + ) as tmpfile: + import subprocess + + print(f"Preprocessing file: {filepath}") + cmd = ["grep", "countme", str(filepath)] + r = subprocess.run(cmd, stdout=tmpfile) + if r.returncode != 0: + print(f"Preprocessing file failed, returning original: {filepath}") + yield str(filepath) + yield tmpfile.name + + +def parse_from_iterator(args, lines): + if args.header or args.sqlite: + args.writer.write_header() + + for logf in lines: + # Make an iterator object for the matching log lines + match_iter = iter(args.matcher(logf)) + + # TEMP WORKAROUND: filter out match items with missing values + if args.matchmode == "countme": + match_iter = filter(lambda i: None not in i, match_iter) + + # Duplicate data check (for sqlite output) + if args.dupcheck: + try: + item = next(match_iter) # grab first matching item + except StopIteration: + # If there is no next match, keep going + continue + if args.writer.has_item(item): # if it's already in the db... + continue # skip to next log + else: # otherwise + args.writer.write_item(item) # insert it into the db + + # Write matching items (sqlite does commit at end, or rollback on error) + args.writer.write_items(match_iter) + + if args.index: + args.writer.write_index() + + +def parse(args=None): + parse_from_iterator( + args, ReadProgress(args.logs, display=args.progress, pre_process=pre_process) + ) diff --git a/countme/progress.py b/countme/progress.py index 47a1344..2ac64f2 100644 --- a/countme/progress.py +++ b/countme/progress.py @@ -19,45 +19,58 @@ import os import sys -from .regex import compile_log_regex, LOG_DATE_RE +from contextlib import contextmanager +from pathlib import Path +from typing import Iterator, Union + +from .regex import LOG_DATE_RE __all__ = ( - 'ReadProgress', 'TQDMReadProgress', 'DIYReadProgress', + "ReadProgress", + "DIYReadProgress", ) # =========================================================================== # ====== Progress meters & helpers ========================================== # =========================================================================== + def log_date(line): match = LOG_DATE_RE.match(line) if match: - return match['date'] + return match["date"] return "??/??/????" + def log_reader(logfn): if logfn.endswith(".xz"): import lzma - return lzma.open(logfn, mode='rt') + + return lzma.open(logfn, mode="rt") elif logfn.endswith(".gz"): import gzip - return gzip.open(logfn, mode='rt') + + return gzip.open(logfn, mode="rt") else: - return open(logfn, mode='rt') + return open(logfn, mode="rt") + def xz_log_size(xz_filename): import subprocess + cmd = ["xz", "--list", "--robot", xz_filename] r = subprocess.run(cmd, stdout=subprocess.PIPE) if r.returncode != 0: return None - for line in r.stdout.split(b'\n'): - f = line.split(b'\t') - if f[0] == b'totals': + for line in r.stdout.split(b"\n"): + f = line.split(b"\t") + if f[0] == b"totals": return int(f[4]) + def gz_log_size(gz_filename): import subprocess + cmd = ["gzip", "--quiet", "--list", gz_filename] r = subprocess.run(cmd, stdout=subprocess.PIPE) if r.returncode != 0: @@ -65,6 +78,7 @@ def gz_log_size(gz_filename): csize, uncsize, ratio, name = r.stdout.split() return int(uncsize) + def log_total_size(logfn): if logfn.endswith(".xz"): return xz_log_size(logfn) @@ -74,25 +88,36 @@ def log_total_size(logfn): return os.stat(logfn).st_size +@contextmanager +def no_preprocess(filepath: Union[str, Path]) -> Iterator[str]: + yield str(filepath) + + class ReadProgressBase: - def __init__(self, logs, display=True): - '''logs should be a sequence of line-iterable file-like objects. - if display is False, no progress output will be printed.''' + def __init__(self, logs, display=True, pre_process=no_preprocess): + """logs should be a sequence of line-iterable file-like objects. + if display is False, no progress output will be printed.""" self.logs = logs self.display = display + self.pre_process = pre_process def __iter__(self): - '''Iterator for ReadProgress; yields a sequence of line-iterable - file-like objects (one for each log in logs).''' + """Iterator for ReadProgress; yields a sequence of line-iterable + file-like objects (one for each log in logs).""" for num, logfn in enumerate(self.logs): - logf = log_reader(logfn) - total = log_total_size(logfn) - yield self._iter_log_lines(logf, num, total) + with self.pre_process(logfn) as processed_log: + logf = log_reader(processed_log) + total = log_total_size(processed_log) + yield self._iter_log_lines(logf, num, total) def _iter_log_lines(self, logf, num, total): # Make a progress meter for this file - prog = self._progress_obj(unit='b', unit_scale=True, total=total, - disable=True if not self.display else None) + prog = self._progress_obj( + unit="b", + unit_scale=True, + total=total, + disable=True if not self.display else None, + ) # Get the first line manually so we can get logdate line = next(logf) desc = f"log {num+1}/{len(self.logs)}, date={log_date(line)}" @@ -107,19 +132,23 @@ class ReadProgressBase: prog.close() -# Here's how we use the tqdm progress module to show read progress. -class TQDMReadProgress(ReadProgressBase): - def _progress_obj(self, *args, **kwargs): - return tqdm(*args, **kwargs) - # No TQDM? Use our little do-it-yourself knockoff version. -class DIYReadProgress(TQDMReadProgress): +class DIYReadProgress(ReadProgressBase): def _progress_obj(self, *args, **kwargs): return diyprog(*args, **kwargs) + class diyprog: - def __init__(self, desc=None, total=None, file=None, disable=False, - unit='b', unit_scale=True, barchar='_-=#'): + def __init__( + self, + desc=None, + total=None, + file=None, + disable=False, + unit="b", + unit_scale=True, + barchar="_-=#", + ): # COMPAT NOTE: tqdm objects with disable=True have no .desc attribute self.desc = desc self.total = total @@ -133,11 +162,12 @@ class diyprog: def set_description(self, desc=None, refresh=True): self.desc = desc - if refresh: + if refresh and not self.disable: self.display() def update(self, n=1): - if self.disable: return + if self.disable: + return self.count += n if self.count >= self.showat: self.showat = min(self.total, self.showat + self.total // 100) @@ -150,7 +180,7 @@ class diyprog: @staticmethod def hrsize(n): - for suffix in 'kmgtp': + for suffix in "kmgtp": n /= 1000 if n < 1000: break @@ -180,18 +210,17 @@ class diyprog: bar = (pct // 4) * self.barchar[-1] if pct < 100: bar += self.barchar[pct % 4] - print(f"{desc}: {pct:>3}% [{bar:<25}] {count:>7}/{total:<7}", - flush=True, file=self.file, end='\r') + print( + f"{desc}: {pct:>3}% [{bar:<25}] {count:>7}/{total:<7}", + flush=True, + file=self.file, + end="\r", + ) def close(self): - if self.disable: return + if self.disable: + return print(flush=True, file=self.file) -# Default ReadProgress: use tqdm if possible, else use the DIY one -try: - # TODO: make this work with a local tqdm checkout/git submodule - from tqdm import tqdm - ReadProgress = TQDMReadProgress -except ImportError: - ReadProgress = DIYReadProgress +ReadProgress = DIYReadProgress diff --git a/countme/regex.py b/countme/regex.py index ee7fd29..3c8fa4d 100644 --- a/countme/regex.py +++ b/countme/regex.py @@ -20,9 +20,13 @@ import re __all__ = ( - 'compile_log_regex', - 'LOG_RE', 'LIBDNF_USER_AGENT_RE', 'COUNTME_USER_AGENT_RE', - 'MIRRORS_LOG_RE', 'COUNTME_LOG_RE', 'LOG_DATE_RE', + "compile_log_regex", + "LOG_RE", + "LIBDNF_USER_AGENT_RE", + "COUNTME_USER_AGENT_RE", + "MIRRORS_LOG_RE", + "COUNTME_LOG_RE", + "LOG_DATE_RE", ) # =========================================================================== @@ -34,7 +38,7 @@ __all__ = ( # That's the standard Combined Log Format, with numeric IPs (%a). # # Example log line: -# 240.159.140.173 - - [29/Mar/2020:16:04:28 +0000] "GET /metalink?repo=fedora-modular-32&arch=x86_64&countme=1 HTTP/2.0" 200 18336 "-" "libdnf (Fedora 32; workstation; Linux.x86_64)" +# 240.159.140.173 - - [29/Mar/2020:16:04:28 +0000] "GET /metalink?repo=fedora-modular-32&arch=x86_64&countme=1 HTTP/2.0" 200 18336 "-" "libdnf (Fedora 32; workstation; Linux.x86_64)" # noqa # # Here it is as a Python regex, with a format placeholder for the actual field # contents. Default field regexes are in LOG_PATTERN_FIELDS, below, and @@ -44,43 +48,43 @@ __all__ = ( # present but 'query' may be absent, depending on the value of 'query_match'. # 'query_match' should be '?' (optional), '' (required), or '{0}' (absent). LOG_PATTERN_FORMAT = ( - r'^' - r'(?P{host})\s' - r'(?P{identity})\s' - r'(?P{user})\s' - r'\[(?P