#45 Add preprocessing step
Merged 3 years ago by nphilipp. Opened 3 years ago by asaleh.
asaleh/mirrors-countme preprocess  into  main

file modified
+24 -1
@@ -1,11 +1,34 @@ 

+ from contextlib import contextmanager

+ from pathlib import Path

+ from tempfile import NamedTemporaryFile

+ from typing import Iterator, Union

+ 

  from countme.progress import ReadProgress

  

  

+ @contextmanager

+ def pre_process(filepath: Union[str, Path]) -> Iterator[str]:

+     filepath = Path(filepath)

+     with NamedTemporaryFile(

+         prefix=f"mirrors-countme-{filepath.name}-",

+         suffix=".preprocessed",

+     ) as tmpfile:

+         import subprocess

+ 

+         print(f"Preprocessing file: {filepath}")

+         cmd = ["grep", "countme", str(filepath)]

+         r = subprocess.run(cmd, stdout=tmpfile)

+         if r.returncode != 0:

+             print(f"Preprocessing file failed, returning original: {filepath}")

+             yield str(filepath)

+         yield tmpfile.name

+ 

+ 

  def parse(args=None):

      if args.header or args.sqlite:

          args.writer.write_header()

  

-     for logf in ReadProgress(args.logs, display=args.progress):

+     for logf in ReadProgress(args.logs, display=args.progress, pre_process=pre_process):

          # Make an iterator object for the matching log lines

          match_iter = iter(args.matcher(logf))

  

file modified
+15 -4
@@ -19,6 +19,10 @@ 

  

  import os

  import sys

+ from contextlib import contextmanager

+ from pathlib import Path

+ from typing import Iterator, Union

+ 

  from .regex import LOG_DATE_RE

  

  __all__ = (
@@ -84,20 +88,27 @@ 

          return os.stat(logfn).st_size

  

  

+ @contextmanager

+ def no_preprocess(filepath: Union[str, Path]) -> Iterator[str]:

+     yield str(filepath)

+ 

+ 

  class ReadProgressBase:

-     def __init__(self, logs, display=True):

+     def __init__(self, logs, display=True, pre_process=no_preprocess):

          """logs should be a sequence of line-iterable file-like objects.

          if display is False, no progress output will be printed."""

          self.logs = logs

          self.display = display

+         self.pre_process = pre_process

  

      def __iter__(self):

          """Iterator for ReadProgress; yields a sequence of line-iterable

          file-like objects (one for each log in logs)."""

          for num, logfn in enumerate(self.logs):

-             logf = log_reader(logfn)

-             total = log_total_size(logfn)

-             yield self._iter_log_lines(logf, num, total)

+             with self.pre_process(logfn) as processed_log:

+                 logf = log_reader(processed_log)

+                 total = log_total_size(processed_log)

+                 yield self._iter_log_lines(logf, num, total)

  

      def _iter_log_lines(self, logf, num, total):

          # Make a progress meter for this file

Adding preprocessing step to speedup the parsing.

Build succeeded.

Metadata Update from @nphilipp:
- Request assigned

3 years ago

Some ideas/thoughts (here because it'd become too scattered inline, in no particular order):

  • The log files we process are on NFS, so we'd better pipe the output of grep into a local temporary file instead of a file right beside the unprocessed one.
  • We can also just use that file object in the stdout parameter in subprocess.run() and get rid of the intermediate shell process.
  • We should clean up after ourselves! Ideally, pre_process() would be a context manager which can then clean up the intermediate processed file when it's no longer needed.
  • Preprocessing a log file in plain Python[1] is roughly as fast (about 1 minute for about 8GB) as using grep or fgrep to do the same with cold caches (which is what we'll most likely be dealing with). With that, I'd say that the bottleneck should be shoveling a couple of gigs of log data over the network, not filtering them… What do you think, why is processing all log lines so slow, why does preprocessing it with grep speed things up? Does ReadProgress add so much overhead?

[1]: like this:

rawlog_path = "/mnt/fedora_stats/combined-http/latest/mirrors.fedoraproject.org-access.log"
processedlog_path = "/tmp/countme.py.out"

with open(rawlog_path, "rt") as rawlog, open(processedlog_path, "wt") as processedlog:
    for l in rawlog:
        if "countme" in l:
            print(l, end="", file=processedlog)

2 new commits added

  • Add pre-processing step that uses grep to speedup parsing
  • Added preprocessing step.
3 years ago

Build succeeded.

I did change it to local temporary file with a context-manager to clean-up after itself.

What you suggest with python being as fast as grep, I am not sure the problem is with bandwidth, as when I was testing it localy, there was no network access to slow things down.

It could be the case of the speedup I have observed was due to caching? I.e. I did re-run it several times, it was on my local disk, e.t.c.

rebased onto 86deda1cceea940eb335d1de007c38e64854d7c1

3 years ago

rebased onto 4bde590ea0ba3e97437ab6134e3cddffc2e57808

3 years ago

Build succeeded.

rebased onto 9cd422c43aaf0811292d4a1b8f78af16b838de57

3 years ago

Build succeeded.

rebased onto 07c43eb76b1a28b64399a85551aa47d6da9711b9

3 years ago

Build failed. More information on how to proceed and troubleshoot errors available at https://fedoraproject.org/wiki/Zuul-based-ci

Hmm, TIL that type-annotating generators/iterables isn't that straight-forward:

mypy run-test: commands[0] | python -m mypy --config-file /workspace/src/pagure.io/mirrors-countme/mypy.cfg .
countme/progress.py:91: error: The return type of a generator function should be "Generator" or one of its supertypes
countme/progress.py:91: error: Argument 1 to "contextmanager" has incompatible type "Callable[[Union[str, Path]], str]"; expected "Callable[..., Iterator[<nothing>]]"
countme/parse.py:9: error: The return type of a generator function should be "Generator" or one of its supertypes
countme/parse.py:9: error: Argument 1 to "contextmanager" has incompatible type "Callable[[Union[str, Path]], str]"; expected "Callable[..., Iterator[<nothing>]]"
Found 4 errors in 2 files (checked 12 source files)
ERROR: InvocationError for command /workspace/src/pagure.io/mirrors-countme/.tox/mypy/bin/python -m mypy --config-file mypy.cfg . (exited with code 1)

rebased onto 58d56f1

3 years ago

Build succeeded.

Pull-Request has been merged by nphilipp

3 years ago