| |
@@ -1,13 +1,17 @@
|
| |
+ import datetime
|
| |
import tarfile
|
| |
+ import tempfile
|
| |
import sqlite3
|
| |
+ from hypothesis import given, strategies as st
|
| |
+ from hypothesis import settings, HealthCheck
|
| |
from pathlib import Path
|
| |
from typing import Any, List, NamedTuple
|
| |
|
| |
import pytest
|
| |
|
| |
from countme import CountmeMatcher, make_writer
|
| |
- from countme.parse import parse
|
| |
-
|
| |
+ from countme.parse import parse, parse_from_iterator
|
| |
+ from countme.totals import totals
|
| |
|
| |
HERE = Path(__file__).parent
|
| |
TEST_DATA_DIR = HERE.parent / "test_data"
|
| |
@@ -53,6 +57,51 @@
|
| |
logs: List[str]
|
| |
|
| |
|
| |
+ @pytest.fixture
|
| |
+ def raw_db_tar(tmp_path_cwd):
|
| |
+ yield from _test_tarfile_factory(TEST_DATA_DIR / "test_result_cmp.tar.xz")
|
| |
+
|
| |
+
|
| |
+ @pytest.fixture
|
| |
+ def totals_db_tar(tmp_path_cwd):
|
| |
+ yield from _test_tarfile_factory(TEST_DATA_DIR / "countme_totals.tar.xz")
|
| |
+
|
| |
+
|
| |
+ class ArgsTotal(NamedTuple):
|
| |
+ countme_totals: Any
|
| |
+ countme_raw: Any
|
| |
+ progress: bool
|
| |
+ csv_dump: Any
|
| |
+ sqlite: str
|
| |
+
|
| |
+
|
| |
+ def test_count_totals(tmp_path_cwd, raw_db_tar, totals_db_tar):
|
| |
+ if not raw_db_tar or not totals_db_tar:
|
| |
+ pytest.skip("Test data not found")
|
| |
+ args = ArgsTotal(
|
| |
+ countme_totals=str(tmp_path_cwd / "test_result_totals.db"),
|
| |
+ countme_raw=str(tmp_path_cwd / "test_result_cmp.db"),
|
| |
+ progress=False,
|
| |
+ csv_dump=False,
|
| |
+ sqlite=str(tmp_path_cwd / "test_result_totals"),
|
| |
+ )
|
| |
+ totals(args)
|
| |
+ db = sqlite3.connect(args.sqlite)
|
| |
+ tmp_db = tmp_path_cwd / "countme_totals"
|
| |
+ db.execute(f"ATTACH DATABASE '{tmp_db}' AS test_db;")
|
| |
+ rows_missing = db.execute(
|
| |
+ "select * from test_db.countme_totals except select * from countme_totals;"
|
| |
+ )
|
| |
+ missing = rows_missing.fetchone()
|
| |
+ rows_extra = db.execute(
|
| |
+ "select * from countme_totals except select * from test_db.countme_totals;"
|
| |
+ )
|
| |
+ extra = rows_extra.fetchone()
|
| |
+ assert (
|
| |
+ missing is None and extra is None
|
| |
+ ), f"When comparing db's\n {missing} was missing and\n {extra} was extra"
|
| |
+
|
| |
+
|
| |
def test_read_file(tmp_path_cwd, log_tar, db_tar):
|
| |
if not log_tar or not db_tar:
|
| |
pytest.skip("Test data not found")
|
| |
@@ -83,3 +132,57 @@
|
| |
assert (
|
| |
missing is None and extra is None
|
| |
), f"When comparing db's\n {missing} was missing and\n {extra} was extra"
|
| |
+
|
| |
+
|
| |
+ def create_logline(ip, date, repo):
|
| |
+ dstr = date.strftime("%d/%b/%Y:%H:%M:%S +0000")
|
| |
+ url = "/metalink?repo=updates-released-f33&arch=x86_64&countme=1"
|
| |
+ agent = "libdnf (Fedora 33; workstation; Linux.x86_64)"
|
| |
+ return f'{ip} - - [{dstr}] "GET {url} HTTP/1.1" 200 32015 "-" "{agent}"'
|
| |
+
|
| |
+
|
| |
+ @st.composite
|
| |
+ def log_data(draw):
|
| |
+ ip_sample = st.lists(st.ip_addresses(), 10, unique=True)
|
| |
+ repo = st.sampled_from(["Fedora", "epel-7", "centos8"])
|
| |
+ ips = draw(ip_sample)
|
| |
+ today = datetime.datetime.now()
|
| |
+ dates = [today - datetime.timedelta(days=d, hours=i) for i in range(1, 2) for d in range(1, 14)]
|
| |
+
|
| |
+ return list(
|
| |
+ sorted(((date, ip, draw(repo)) for ip in ips for date in dates), key=lambda x: x[0])
|
| |
+ )
|
| |
+
|
| |
+
|
| |
+ @settings(suppress_health_check=(HealthCheck.too_slow,))
|
| |
+ @given(log_data())
|
| |
+ def test_log(loglines):
|
| |
+ with tempfile.TemporaryDirectory() as tmp_dir:
|
| |
+ matcher = CountmeMatcher
|
| |
+ args = Args(
|
| |
+ writer=make_writer("sqlite", str(tmp_dir + "/test.db"), matcher.itemtuple),
|
| |
+ matcher=matcher,
|
| |
+ dupcheck=True,
|
| |
+ index=True,
|
| |
+ header=True,
|
| |
+ progress=False,
|
| |
+ matchmode="countme",
|
| |
+ format="csv",
|
| |
+ logs=[],
|
| |
+ sqlite=str(tmp_dir + "/test.db"),
|
| |
+ )
|
| |
+ parse_from_iterator(args, [(create_logline(ip, date, repo) for date, ip, repo in loglines)])
|
| |
+ db = sqlite3.connect(args.sqlite)
|
| |
+ rows_no = db.execute("select count(*) from countme_raw;").fetchone()[0]
|
| |
+ assert rows_no == len(loglines)
|
| |
+ args = ArgsTotal(
|
| |
+ countme_totals=str(tmp_dir + "/test_generated_totals.db"),
|
| |
+ countme_raw=str(tmp_dir + "/test.db"),
|
| |
+ progress=False,
|
| |
+ csv_dump=False,
|
| |
+ sqlite=str(tmp_dir + "/test_generated_totals.db"),
|
| |
+ )
|
| |
+ totals(args)
|
| |
+ db = sqlite3.connect(args.sqlite)
|
| |
+ rows_no = db.execute("select count(*) from countme_totals;").fetchone()[0]
|
| |
+ assert int(rows_no) > 0
|
| |