| |
@@ -1,6 +1,7 @@
|
| |
#!/usr/bin/python
|
| |
|
| |
- import Queue # for exceptions
|
| |
+ from __future__ import absolute_import
|
| |
+ import six.moves.queue # for exceptions
|
| |
import cProfile
|
| |
from functools import partial
|
| |
import hashlib
|
| |
@@ -15,12 +16,15 @@
|
| |
import dateutil
|
| |
import koji as _koji # koji declared using profile module in main()
|
| |
import rpm
|
| |
+ from six.moves import range
|
| |
+ from six.moves import zip
|
| |
|
| |
|
| |
logger = logging.getLogger('koji.checkbuilds')
|
| |
|
| |
# an event to indicate that the feeder is done
|
| |
feeder_done = multiprocessing.Event()
|
| |
+ workers_done = multiprocessing.Event()
|
| |
|
| |
# queue to hold actions in the pipeline
|
| |
queue = multiprocessing.Queue()
|
| |
@@ -128,13 +132,12 @@
|
| |
|
| |
# wait for the queue to be empty
|
| |
queue.close()
|
| |
- queue.join_thread() # XXX is this right?
|
| |
-
|
| |
- assert queue.empty()
|
| |
+ queue.join_thread()
|
| |
|
| |
logger.info('Finished. Waiting for workers to stop.')
|
| |
for worker in workers:
|
| |
worker.join()
|
| |
+ workers_done.set()
|
| |
logger.info('Workers finished')
|
| |
|
| |
s_thread.join()
|
| |
@@ -181,8 +184,8 @@
|
| |
while True:
|
| |
try:
|
| |
method, args, kw = stats_queue.get(block=True, timeout=5)
|
| |
- except Queue.Empty:
|
| |
- if feeder_done.is_set() and queue.empty():
|
| |
+ except six.moves.queue.Empty:
|
| |
+ if workers_done.is_set():
|
| |
# is this enough?
|
| |
break
|
| |
continue
|
| |
@@ -378,7 +381,7 @@
|
| |
while True:
|
| |
try:
|
| |
build, opts = queue.get(block=True, timeout=5)
|
| |
- except Queue.Empty:
|
| |
+ except six.moves.queue.Empty:
|
| |
if feeder_done.is_set():
|
| |
# is this enough?
|
| |
break
|
| |
@@ -535,7 +538,7 @@
|
| |
def verify_rpm(self, fn, ts, n_bytes):
|
| |
logger.debug('Verifying rpm %s', fn)
|
| |
start = time.time()
|
| |
- with open(fn, 'r') as fp:
|
| |
+ with open(fn, 'rb') as fp:
|
| |
try:
|
| |
ts.hdrFromFdno(fp.fileno())
|
| |
except rpm.error as ex:
|
| |
@@ -560,7 +563,7 @@
|
| |
for rpminfo, [sigs] in zip(self.rpms, session.multiCall(strict=True)):
|
| |
for sig in sigs:
|
| |
sig_idx.setdefault(sig['sigkey'], []).append([rpminfo, sig])
|
| |
- logger.debug('Keys for %s: %s', build['nvr'], sig_idx.keys())
|
| |
+ logger.debug('Keys for %s: %s', build['nvr'], list(sig_idx.keys()))
|
| |
for sigkey in sig_idx:
|
| |
cachedir = os.path.join(self.build_dir, 'data/sigcache/%s' % sigkey)
|
| |
if not os.path.isdir(cachedir):
|
| |
@@ -575,7 +578,7 @@
|
| |
logger.warn("Cached signature missing: %s", cachefile)
|
| |
self.fail('sigcache.missing')
|
| |
continue
|
| |
- sighash = hashlib.md5(file(cachefile).read()).hexdigest()
|
| |
+ sighash = hashlib.md5(open(cachefile, 'rb').read()).hexdigest()
|
| |
if sighash != sig['sighash']:
|
| |
logger.warn('Cached signature mismatch for %s\n'
|
| |
' db: %s, file:%s',
|
| |
@@ -677,18 +680,22 @@
|
| |
sumtype = koji.CHECKSUM_TYPES[sumtype]
|
| |
except KeyError:
|
| |
logger.error('Unknown sum type %s for %s', sumtype, fn)
|
| |
+ stats.increment('checksum.unsupported')
|
| |
return
|
| |
if sumtype == 'md5':
|
| |
chk = hashlib.md5()
|
| |
elif sumtype == 'sha1':
|
| |
chk = hashlib.sha1()
|
| |
+ elif sumtype == 'sha256':
|
| |
+ chk = hashlib.sha256()
|
| |
else:
|
| |
logger.error('Unsupported sum type %s for %s', sumtype, fn)
|
| |
+ stats.increment('checksum.unsupported')
|
| |
return
|
| |
logger.debug('Checking %s for %s', sumtype, fn)
|
| |
stats.increment('checksum.checked')
|
| |
start = time.time()
|
| |
- with file(fn, 'r') as fp:
|
| |
+ with open(fn, 'rb') as fp:
|
| |
chunks = iter(partial(fp.read, 819200), b'')
|
| |
[chk.update(b) for b in chunks]
|
| |
n_bytes = fp.tell()
|
| |
py3 support