From 7acaf4ce77ba799282990990f97a58f416c797ec Mon Sep 17 00:00:00 2001 From: Mike McLean Date: Nov 07 2017 14:58:27 +0000 Subject: PR#646: Improve test coverage in koji/util Merges #646 https://pagure.io/koji/pull-request/646 Fixes: #642 https://pagure.io/koji/issue/642 Goal: 90% coverage for koji.util --- diff --git a/koji/util.py b/koji/util.py index cbf75b8..c224dab 100644 --- a/koji/util.py +++ b/koji/util.py @@ -85,7 +85,8 @@ def parseTime(val): result = TIME_RE.search(rest) if result: time = [int(r) for r in result.groups()] - return calendar.timegm(date + time + [0, 0, 0]) + return calendar.timegm( + datetime.datetime(*(date + time)).timetuple()) def checkForBuilds(session, tag, builds, event, latest=False): """Check that the builds existed in tag at the time of the event. diff --git a/tests/test_lib/test_utils.py b/tests/test_lib/test_utils.py index f18269f..fa94afe 100644 --- a/tests/test_lib/test_utils.py +++ b/tests/test_lib/test_utils.py @@ -2,9 +2,13 @@ from __future__ import absolute_import import mock import unittest from mock import call, patch +from datetime import datetime import os +import time +import resource import optparse +import calendar import six.moves.configparser import koji import koji.util @@ -488,6 +492,462 @@ class MavenUtilTestCase(unittest.TestCase): config.readfp(conf_file) return config + def test_formatChangelog(self): + """Test formatChangelog function""" + entries = {'date': datetime(2017, 10, 10, 12, 34, 56), + 'author': 'koji ', + 'text': 'This is a test release'} + sample = "* Tue Oct 10 2017 {0}\n{1}\n\n".format( + entries['author'].encode('utf-8'), entries['text'].encode('utf-8')) + self.assertEqual(sample, koji.util.formatChangelog((entries,))) + + def test_parseTime(self): + """Test parseTime function""" + now = datetime.now() + now_ts = int(calendar.timegm(now.timetuple())) + self.assertEqual(1507593600, koji.util.parseTime('2017-10-10')) + self.assertEqual(1507638896, koji.util.parseTime('2017-10-10 12:34:56')) + self.assertEqual(0, koji.util.parseTime('1970-01-01 00:00:00')) + self.assertNotEqual(now_ts, koji.util.parseTime(now.strftime("%Y-%m-%d"))) + self.assertEqual(now_ts, koji.util.parseTime(now.strftime("%Y-%m-%d %H:%M:%S"))) + + # non time format string + self.assertEqual(None, koji.util.parseTime('not-a-time-format')) + + time_tests = { + # invalid month + '2000-13-32': 'month must be in 1..12', + # invalid day + '2000-12-32': 'day is out of range for month', + # invalid hour + '2000-12-31 24:61:61': 'hour must be in 0..23', + # invalid minute + '2000-12-31 23:61:61': 'minute must be in 0..59', + # invalid second + '2000-12-31 23:59:61': 'second must be in 0..59', + # corner case, leap day + '1969-2-29': 'day is out of range for month' + } + + # invalid date test + for args, err in time_tests.items(): + six.assertRaisesRegex( + self, ValueError, err, koji.util.parseTime, args) + + def test_duration(self): + """Test duration function""" + start = time.time() + self.assertEqual('0:00', koji.util.duration(start)) + + # wait for 2 seconds + time.sleep(2) + self.assertEqual('0:02', koji.util.duration(start)) + + def test_printList(self): + """Test printList function""" + distro = ['fedora', 'rhel', 'centos', 'opensuse'] + self.assertEqual('', koji.util.printList([])) + self.assertEqual('fedora', koji.util.printList(distro[0:1])) + self.assertEqual('fedora and rhel', koji.util.printList(distro[0:2])) + self.assertEqual('fedora, rhel, and centos', koji.util.printList(distro[0:3])) + + def test_multi_fnmatch(self): + """Test multi_fnmatch function""" + patterns = "example.py example*.py [0-9]*.py [0-9]_*_exmple.py" + self.assertTrue(koji.util.multi_fnmatch('example.py', patterns)) + self.assertTrue(koji.util.multi_fnmatch('example.py', patterns.split())) + self.assertTrue(koji.util.multi_fnmatch('01.py', patterns.split())) + self.assertTrue(koji.util.multi_fnmatch('01_koji:util_example.py', patterns.split())) + self.assertTrue(koji.util.multi_fnmatch('example_01.py', patterns.split())) + self.assertFalse(koji.util.multi_fnmatch('sample.py', patterns.split())) + + def test_filedigestAlgo(self): + """Test filedigestAlgo function""" + hdr = {koji.RPM_TAG_FILEDIGESTALGO: None} + self.assertEqual('md5', koji.util.filedigestAlgo(hdr)) + + hdr = {koji.RPM_TAG_FILEDIGESTALGO: 2} + self.assertEqual('sha1', koji.util.filedigestAlgo(hdr)) + + hdr = {koji.RPM_TAG_FILEDIGESTALGO: 4} + self.assertEqual('unknown', koji.util.filedigestAlgo(hdr)) + + @mock.patch('os.WEXITSTATUS', return_value=255) + @mock.patch('os.WTERMSIG', return_value=19) + @mock.patch('os.WIFEXITED') + @mock.patch('os.WIFSIGNALED') + def test_parseStatus(self, m_signaled, m_exited, m_termsig, m_exit): + """Test parseStatus function""" + self.assertEqual('%s was killed by signal %i' % ('test-proc', 19), + koji.util.parseStatus(0, 'test-proc')) + + m_signaled.return_value = False + self.assertEqual('%s exited with status %i' % ('test-proc', 255), + koji.util.parseStatus(0, 'test-proc')) + + m_exited.return_value = False + self.assertEqual('%s terminated for unknown reasons' % ('test-proc'), + koji.util.parseStatus(0, 'test-proc')) + + for prefix in [['test', 'proc'], ('test', 'proc')]: + self.assertEqual( + '%s terminated for unknown reasons' % (' '.join(prefix)), + koji.util.parseStatus(0, prefix)) + + def test_isSuccess(self): + """Test isSuccess function""" + with mock.patch('os.WIFEXITED') as m_exit, \ + mock.patch('os.WEXITSTATUS') as m_exitst: + # True case + m_exit.return_value, m_exitst.return_value = True, 0 + self.assertTrue(koji.util.isSuccess(0)) + + # False cases + m_exit.return_value, m_exitst.return_value = True, 1 + self.assertFalse(koji.util.isSuccess(0)) + m_exit.return_value, m_exitst.return_value = False, 255 + self.assertFalse(koji.util.isSuccess(0)) + + def test_call_with_argcheck(self): + """Test call_wit_argcheck function""" + func = lambda *args, **kargs: True + self.assertTrue( + koji.util.call_with_argcheck( + func, [1, 2, 3], {'para1': 1, 'para2': 2})) + + # exception tests + func = lambda *args, **kargs: \ + (_ for _ in ()).throw(TypeError('fake-type-error')) + six.assertRaisesRegex(self, TypeError, 'fake-type-error', + koji.util.call_with_argcheck, + func, [1, 2, 3], {'para1': 1, 'para2': 2}) + + with mock.patch('sys.exc_info') as m_info: + m_info.side_effect = lambda: \ + [None, None, mock.MagicMock(tb_next=None)] + six.assertRaisesRegex(self, koji.ParameterError, 'fake-type-error', + koji.util.call_with_argcheck, + func, [1, 2, 3]) + + def test_dslice(self): + """Test dslice function""" + distro = {'fedora': 1, 'rhel': 2, 'centos': 3} + self.assertEqual({'fedora': 1}, koji.util.dslice(distro, ['fedora'])) + + # slice with non exist key, + # if strict bit is not set, empty dict should be returned. + self.assertEqual({}, koji.util.dslice(distro, ['debian'], False)) + # if strict bit is set, KeyError should be raised + self.assertRaises(KeyError, koji.util.dslice, distro, ['debian']) + + def test_dslice_ex(self): + """Test dslice_ex function""" + distro = {'fedora': 1, 'rhel': 2, 'centos': 3} + self.assertEqual({'rhel': 2, 'centos': 3}, + koji.util.dslice_ex(distro, ['fedora'])) + + # slice with non exist key, + # if strict bit is not set, original dict should be returned + self.assertEqual(distro, koji.util.dslice_ex(distro, ['debian'], False)) + # if strict bit is set, KeyError should be raised + self.assertRaises(KeyError, koji.util.dslice_ex, distro, ['debian']) + + def test_checkForBuilds(self): + """Test checkForBuilds function""" + builds = [koji.parse_NVR("pkg-1-r1"), + koji.parse_NVR("pkg-1-r2"), + koji.parse_NVR("pkg-1.1-r1")] + latest_builds = [koji.parse_NVR("pkg-1.1-r1")] + + session = mock.MagicMock() + session.getLatestBuilds = mock.Mock(return_value=latest_builds) + session.listTagged = mock.Mock(return_value=builds) + event = mock.MagicMock() + + # latest bit check + self.assertTrue(koji.util.checkForBuilds( + session, 'fedora', (koji.parse_NVR('pkg-1.1-r1'),), + event, latest=True)) + self.assertFalse(koji.util.checkForBuilds( + session, 'fedora', (koji.parse_NVR('pkg-1.0-r2'),), + event, latest=True)) + + # all elemnts in builds should exist. + for b in builds: + self.assertTrue( + koji.util.checkForBuilds(session, "pkg-build", (b,), event)) + + # non exist build test. + self.assertEqual(False, koji.util.checkForBuilds( + session, "pkg-build", + (koji.parse_NVR("pkg-1.0-r1"),), event)) + + def test_LazyValue(self): + """Test LazyValue object""" + init, base, incr = 0, 1, 0 + lv = koji.util.LazyValue( + lambda x, offset=0: base + x + offset, + (init,), + {'offset': incr}) + self.assertEqual(init + base + incr, lv.get()) + + base = 2 + self.assertEqual(init + base + incr, lv.get()) + + # cache bit test + init, base, incr = 1, 2, 3 + lv = koji.util.LazyValue( + lambda x, offset=0: base + x + offset, + (init,), + {'offset': incr}, + cache=True) + self.assertEqual(init + base + incr, lv.get()) + + base = 3 + + # lv.get should return cached value: 6 + self.assertNotEqual(init + base + incr, lv.get()) + + def test_LazyString(self): + """Test LazyString object""" + fmt = '[{timestamp}] {greeting} {0}' + timestamp = int(time.time()) + + lstr = koji.util.LazyString( + lambda fmt, *args, **kwargs: + fmt.format(*args, timestamp=timestamp, **kwargs), + (fmt, 'koji'), + {'greeting': 'hello'}) + + self.assertEqual( + fmt.format('koji', timestamp=timestamp, greeting='hello'), + str(lstr)) + + # non cached string should be different + prev_str = str(lstr) + timestamp += 100 + self.assertNotEqual(prev_str, str(lstr)) + + # enable caching + lstr = koji.util.LazyString( + lambda fmt, *args, **kwargs: + fmt.format(*args, timestamp=timestamp, **kwargs), + (fmt, 'koji'), + {'greeting': 'hello'}, + cache=True) + + prev_str = str(lstr) + timestamp += 10 + self.assertEqual(prev_str, str(lstr)) + + def test_LazyDict(self): + """Test LazyDict object""" + name = None + release = None + date = None + + # Testing on cache bit enabled. + ldict = koji.util.LazyDict({}) + ldict.lazyset('name', lambda: name, (), cache=True) + + name = 'fedora' + self.assertEqual(name, ldict['name']) + + # cached, ldict['name'] should not be changed + name = 'rhel' + self.assertNotEqual(name, ldict.get('name')) + + # Testing on cahce bit disabled. + ldict['name'] = koji.util.LazyValue(lambda: name, ()) + ldict['release'] = koji.util.LazyValue(lambda: release, ()) + ldict['date'] = koji.util.LazyValue(lambda: date, ()) + + name, release, date = 'fedora', 26, datetime.now().strftime('%Y%m%d') + data = {'name': name, 'release': release, 'date': date} + six.assertCountEqual(self, data.items(), ldict.items()) + six.assertCountEqual(self, data.items(), [v for v in ldict.iteritems()]) + + name, release, date = 'rhel', 7, '20171012' + six.assertCountEqual(self, [name, release, date], ldict.values()) + six.assertCountEqual(self, [name, release, date], [v for v in ldict.itervalues()]) + + data = {'name': name, 'release': release, 'date': date} + self.assertEqual(name, ldict.pop('name')) + data.pop('name') + six.assertCountEqual(self, data.items(), ldict.items()) + + (key, value) = ldict.popitem() + data.pop(key) + six.assertCountEqual(self, data.items(), ldict.items()) + + ldict_copy = ldict.copy() + six.assertCountEqual(self, data.items(), ldict_copy.items()) + + def test_LazyRecord(self): + """Test LazyRecord object""" + # create a list object with lazy attribute + lobj = koji.util.LazyRecord(list) + six.assertRaisesRegex( + self, TypeError, 'object does not support lazy attributes', + koji.util.lazysetattr, self, 'value', lambda x: x, (100,)) + + base, init, inc = 10, 1, 0 + koji.util.lazysetattr( + lobj, 'lz_value', + lambda x, offset=0: base + x + inc, + (init, ), + {'offset': inc}, + cache=True) + + self.assertEqual(base + init + inc, lobj.lz_value) + + # try to access non exist attribute data, AttributeError should raise + self.assertRaises(AttributeError, getattr, lobj, 'data') + + def test_HiddenValue(self): + """Test Hidd object""" + hv = koji.util.HiddenValue('the plain text message') + self.assertEqual('[value hidden]', str(hv)) + self.assertEqual('HiddenValue()', repr(hv)) + + hv2 = koji.util.HiddenValue(hv) + self.assertEqual(hv2.value, hv.value) + self.assertEqual('[value hidden]', str(hv2)) + self.assertEqual('HiddenValue()', repr(hv2)) + + def test_relpath(self): + """Test _relpath function""" + self.assertRaises(ValueError, koji.util._relpath, None) + self.assertRaises(ValueError, koji.util._relpath, "") + + # _relpath is a backport of os.path.relpath + # their behaviors and outputs should be the same. + for p in ["/", ".", "..", "/bin", "\0", "\n", "\t/tmp"]: + for s in [os.curdir, '/tmp']: + self.assertEqual(os.path.relpath(p, s), koji.util._relpath(p, s)) + + def test_eventFromOpts(self): + """Test eventFromOpts function""" + timestamp = datetime.now().strftime('%s') + session = mock.MagicMock() + event = mock.MagicMock(event=20171010, ts=timestamp, repo=1) + + repo_info = {'create_event': 20171010, + 'create_ts': timestamp} + + session.getEvent = lambda *args, **kwargs: event if args[0] == 20171010 else None + session.getLastEvent = lambda *args, **kwargs: event + session.repoInfo = lambda *args, **kwargs: repo_info if args[0] == 1 else None + + # opts.event = 20171010 + opts = mock.MagicMock(event=20171010) + self.assertEqual(event, koji.util.eventFromOpts(session, opts)) + + # opts.event = 12345678, non exist event + opts = mock.MagicMock(event=12345678) + self.assertEqual(None, koji.util.eventFromOpts(session, opts)) + + # opts.ts = timestamp + opts = mock.MagicMock(event='', ts=timestamp) + self.assertEqual(event, koji.util.eventFromOpts(session, opts)) + + # opts.repo = '1' + opts = mock.MagicMock(event='', ts='', repo=1) + expect = {'id': repo_info['create_event'], + 'ts': repo_info['create_ts']} + + actual = koji.util.eventFromOpts(session, opts) + self.assertNotEqual(None, actual) + six.assertCountEqual(self, expect.items(), actual.items()) + + # no event is matched case + opts = mock.MagicMock(event=0, ts=0, repo=0) + self.assertEqual(None, koji.util.eventFromOpts(session, opts)) + + def test_setup_rlimits(self): + """Test test_setup_rlimits function""" + logger = mock.MagicMock() + options = { + 'RLIMIT_AS': '', + 'RLIMIT_CORE': '0', + 'RLIMIT_CPU': '', + 'RLIMIT_DATA': '4194304', + 'RLIMIT_FSIZE': '0', + 'RLIMIT_MEMLOCK': '', + 'RLIMIT_NOFILE': '768', + 'RLIMIT_NPROC': '3', + 'RLIMIT_OFILE': '', + 'RLIMIT_RSS': '', + 'RLIMIT_STACK': '4194304' + } + + # create a resource token <--> id lookup table + rlimit_lookup = {getattr(resource, k): k for k in options} + + def _getrlimit(res): + return (options.get(rlimit_lookup[res], None), 0) + + def _setrlimit(res, limits): + results[rlimit_lookup[res]] = str(limits[0]) + + results = {k: '' for k, v in options.items()} + with mock.patch('resource.setrlimit') as m_set, \ + mock.patch('resource.getrlimit') as m_get: + m_get.side_effect = ValueError('resource.getrlimit-value-error') + six.assertRaisesRegex(self, ValueError, 'resource.getrlimit-value-error', + koji.util.setup_rlimits, options, logger) + + m_get.side_effect = _getrlimit + + # logger.error test + koji.util.setup_rlimits({'RLIMIT_AS': 'abcde'}, logger) + logger.error.assert_called_with('Invalid resource limit: %s=%s', + 'RLIMIT_AS', + 'abcde') + + koji.util.setup_rlimits({'RLIMIT_AS': '1 2 3 4 5'}, logger) + logger.error.assert_called_with('Invalid resource limit: %s=%s', + 'RLIMIT_AS', + '1 2 3 4 5') + + # exception and logger.error test + m_set.side_effect = ValueError('resource.setrlimit-value-error') + koji.util.setup_rlimits({'RLIMIT_AS': '0'}, logger) + logger.error.assert_called_with('Unable to set %s: %s', + 'RLIMIT_AS', + m_set.side_effect) + + # run setrlimit test, the results should be equal to options + m_set.side_effect = _setrlimit + + # make some noise in options + test_opt = dict(options) + test_opt.update({ + 'RLIMIT_CUSTOM': 'fake_rlimit_key', + 'DBName': 'koji', + 'DBUser': 'koji', + 'KojiDir': '/mnt/koji', + 'KojiDebug': True}) + + koji.util.setup_rlimits(test_opt, logger) + six.assertCountEqual(self, results, options) + + def test_adler32_constructor(self): + """Test adler32_constructor function""" + chksum = koji.util.adler32_constructor('Wikipedia') # checksum is 300286872 + self.assertEqual(300286872, chksum.digest()) + self.assertEqual('%08x' % (300286872), chksum.hexdigest()) + + copy = chksum.copy() + self.assertEqual(copy.digest(), chksum.digest()) + self.assertNotEqual(copy, chksum) + + chksum.update('test') # checksum is equal to adler32(b'test', 300286872) + self.assertNotEqual(300286872, chksum.digest()) + self.assertNotEqual(copy.digest(), chksum.digest()) + self.assertEqual(614401368, chksum.digest()) + + class TestRmtree(unittest.TestCase): @patch('koji.util._rmtree') @patch('os.rmdir') @@ -593,7 +1053,6 @@ class TestRmtree(unittest.TestCase): rmdir.assert_has_calls([call('b'), call('a')]) chdir.assert_has_calls([call('b'), call('..'), call('a'), call('..')]) - @patch('os.listdir') @patch('os.lstat') @patch('stat.S_ISDIR')