| |
@@ -0,0 +1,312 @@
|
| |
+ import os
|
| |
+ import tempfile
|
| |
+ import shutil
|
| |
+ import unittest
|
| |
+
|
| |
+ from unittest import mock
|
| |
+
|
| |
+ import koji
|
| |
+ import kojihub
|
| |
+ from koji.util import joinpath
|
| |
+
|
| |
+ UP = kojihub.UpdateProcessor
|
| |
+
|
| |
+
|
| |
+ class TestRenameRPMSig(unittest.TestCase):
|
| |
+
|
| |
+ def getUpdate(self, *args, **kwargs):
|
| |
+ update = UP(*args, **kwargs)
|
| |
+ update.execute = mock.MagicMock()
|
| |
+ self.updates.append(update)
|
| |
+ return update
|
| |
+
|
| |
+ def setUp(self):
|
| |
+ self.tempdir = tempfile.mkdtemp()
|
| |
+ self.pathinfo = koji.PathInfo(self.tempdir)
|
| |
+ mock.patch('koji.pathinfo', new=self.pathinfo).start()
|
| |
+ self.updates = []
|
| |
+ self.UpdateProcessor = mock.patch('kojihub.kojihub.UpdateProcessor',
|
| |
+ side_effect=self.getUpdate).start()
|
| |
+ self.get_rpm = mock.patch('kojihub.kojihub.get_rpm').start()
|
| |
+ self.query_rpm_sigs = mock.patch('kojihub.kojihub.query_rpm_sigs').start()
|
| |
+ self.get_build = mock.patch('kojihub.kojihub.get_build').start()
|
| |
+ self.get_user = mock.patch('kojihub.kojihub.get_user').start()
|
| |
+ self.verify_name_internal = mock.patch('kojihub.kojihub.verify_name_internal').start()
|
| |
+ self.buildinfo = {'build_id': 1,
|
| |
+ 'epoch': None,
|
| |
+ 'extra': None,
|
| |
+ 'id': 1,
|
| |
+ 'name': 'fs_mark',
|
| |
+ 'nvr': 'fs_mark-3.3-20.el8',
|
| |
+ 'owner_id': 1,
|
| |
+ 'owner_name': 'kojiadmin',
|
| |
+ 'package_id': 1,
|
| |
+ 'package_name': 'fs_mark',
|
| |
+ 'release': '20.el8',
|
| |
+ 'state': 1,
|
| |
+ 'task_id': None,
|
| |
+ 'version': '3.3'}
|
| |
+ self.rinfo = {'arch': 'x86_64',
|
| |
+ 'build_id': 1,
|
| |
+ 'buildroot_id': None,
|
| |
+ 'buildtime': 1564782768,
|
| |
+ 'epoch': None,
|
| |
+ 'external_repo_id': None,
|
| |
+ 'extra': None,
|
| |
+ 'id': 2,
|
| |
+ 'metadata_only': False,
|
| |
+ 'name': 'fs_mark',
|
| |
+ 'payloadhash': 'ed0690ab4b0508f2448d99a08e0a004a',
|
| |
+ 'release': '20.el8',
|
| |
+ 'size': 25644,
|
| |
+ 'version': '3.3'}
|
| |
+ self.rpmsigs = {
|
| |
+ '': {'rpm_id': 2, 'sighash': 'cb4d01bd3671b41ef51abc9be851e614', 'sigkey': ''},
|
| |
+ '2f86d6a1': {'rpm_id': 2, 'sighash': '78c245caa6deb70f0abc8b844c642cd6',
|
| |
+ 'sigkey': '2f86d6a1'}
|
| |
+ }
|
| |
+ self.queryrpmsigs = [self.rpmsigs[k] for k in sorted(self.rpmsigs)]
|
| |
+ self.userinfo = {'authtype': 2, 'id': 1, 'krb_principal': None, 'krb_principals': [],
|
| |
+ 'name': 'testuser', 'status': 0, 'usertype': 0}
|
| |
+ self.set_up_files()
|
| |
+
|
| |
+ def set_up_files(self):
|
| |
+ builddir = self.pathinfo.build(self.buildinfo)
|
| |
+ os.makedirs(builddir)
|
| |
+ self.builddir = builddir
|
| |
+ self.signed = {}
|
| |
+ self.sighdr = {}
|
| |
+ for sig in self.queryrpmsigs:
|
| |
+ key = sig['sigkey']
|
| |
+ signed = joinpath(builddir, self.pathinfo.signed(self.rinfo, key))
|
| |
+ self.signed[key] = signed
|
| |
+ koji.ensuredir(os.path.dirname(signed))
|
| |
+ with open(signed, 'wt') as fo:
|
| |
+ fo.write('SIGNED COPY\n')
|
| |
+
|
| |
+ sighdr = joinpath(builddir, self.pathinfo.sighdr(self.rinfo, key))
|
| |
+ self.sighdr[key] = sighdr
|
| |
+ koji.ensuredir(os.path.dirname(sighdr))
|
| |
+ with open(sighdr, 'wt') as fo:
|
| |
+ fo.write('DETACHED SIGHDR\n')
|
| |
+
|
| |
+ def get_files(self, with_dirs=True):
|
| |
+ data = []
|
| |
+ for root, dirs, files in os.walk(self.builddir):
|
| |
+ for name in files:
|
| |
+ fn = os.path.join(root, name)
|
| |
+ with open(fn, 'rt') as fp:
|
| |
+ contents = fp.read()
|
| |
+ # tuple because we will sort later
|
| |
+ data.append((fn, contents))
|
| |
+ if with_dirs:
|
| |
+ for name in dirs:
|
| |
+ fn = os.path.join(root, name)
|
| |
+ data.append((fn,))
|
| |
+ data.sort()
|
| |
+ return data
|
| |
+
|
| |
+ def tearDown(self):
|
| |
+ mock.patch.stopall()
|
| |
+ shutil.rmtree(self.tempdir)
|
| |
+
|
| |
+ def test_rpm_missing(self):
|
| |
+ rpm_id = 1234
|
| |
+ expected_msg = 'No such rpm: %s' % rpm_id
|
| |
+ self.get_rpm.side_effect = koji.GenericError("No such rpm: %s" % rpm_id)
|
| |
+
|
| |
+ before = self.get_files()
|
| |
+ with self.assertRaises(koji.GenericError) as ex:
|
| |
+ kojihub.rename_rpm_sig(rpm_id, 'foo', 'bar')
|
| |
+
|
| |
+ self.assertEqual(len(self.updates), 0)
|
| |
+ self.assertEqual(ex.exception.args[0], expected_msg)
|
| |
+ self.get_rpm.assert_called_once_with(rpm_id, strict=True)
|
| |
+ self.query_rpm_sigs.assert_not_called()
|
| |
+ self.assertEqual(before, self.get_files())
|
| |
+
|
| |
+ def test_external_repo(self):
|
| |
+ rpminfo = 1234
|
| |
+ rinfo = self.rinfo.copy()
|
| |
+ rinfo.update({'external_repo_id': 1, 'external_repo_name': 'INTERNAL'})
|
| |
+ self.get_rpm.return_value = rinfo
|
| |
+
|
| |
+ before = self.get_files()
|
| |
+ with self.assertRaises(koji.GenericError) as ex:
|
| |
+ kojihub.rename_rpm_sig(rpminfo, 'foo', 'bar')
|
| |
+
|
| |
+ self.assertEqual(len(self.updates), 0)
|
| |
+ expected_msg = "Not an internal rpm: %s (from %s)" % (rpminfo, rinfo['external_repo_name'])
|
| |
+ self.assertEqual(ex.exception.args[0], expected_msg)
|
| |
+ self.get_rpm.assert_called_once_with(rpminfo, strict=True)
|
| |
+ self.query_rpm_sigs.assert_not_called()
|
| |
+ self.assertEqual(before, self.get_files())
|
| |
+
|
| |
+ def test_no_oldsig(self):
|
| |
+ rpminfo = 1234
|
| |
+ nvra = "%s-%s-%s.%s" % (self.rinfo['name'], self.rinfo['version'], self.rinfo['release'],
|
| |
+ self.rinfo['arch'])
|
| |
+ expected_msg = "No foo signature for rpm %s" % nvra
|
| |
+ self.get_rpm.return_value = self.rinfo
|
| |
+ self.query_rpm_sigs.return_value = []
|
| |
+
|
| |
+ before = self.get_files()
|
| |
+ with self.assertRaises(koji.GenericError) as ex:
|
| |
+ kojihub.rename_rpm_sig(rpminfo, 'foo', 'bar')
|
| |
+
|
| |
+ self.assertEqual(len(self.updates), 0)
|
| |
+ self.assertEqual(ex.exception.args[0], expected_msg)
|
| |
+ self.get_rpm.assert_called_once_with(rpminfo, strict=True)
|
| |
+ self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey='foo')
|
| |
+ self.assertEqual(before, self.get_files())
|
| |
+
|
| |
+ def test_already_got_one(self):
|
| |
+ rpminfo = 1234
|
| |
+ nvra = "%s-%s-%s.%s" % (self.rinfo['name'], self.rinfo['version'], self.rinfo['release'],
|
| |
+ self.rinfo['arch'])
|
| |
+ expected_msg = "A bar signature already exists for rpm %s" % nvra
|
| |
+ self.get_rpm.return_value = self.rinfo
|
| |
+ self.query_rpm_sigs.side_effect = [self.query_rpm_sigs, [{'foo':1}]]
|
| |
+
|
| |
+ before = self.get_files()
|
| |
+ with self.assertRaises(koji.GenericError) as ex:
|
| |
+ kojihub.rename_rpm_sig(rpminfo, 'foo', 'bar')
|
| |
+
|
| |
+ self.assertEqual(len(self.updates), 0)
|
| |
+ self.assertEqual(ex.exception.args[0], expected_msg)
|
| |
+ self.get_rpm.assert_called_once_with(rpminfo, strict=True)
|
| |
+ self.assertEqual(before, self.get_files())
|
| |
+
|
| |
+ def test_header_missing(self):
|
| |
+ rpminfo = self.rinfo['id']
|
| |
+ self.get_rpm.return_value = self.rinfo
|
| |
+ self.get_build.return_value = self.buildinfo
|
| |
+ self.get_user.return_value = self.userinfo
|
| |
+ self.query_rpm_sigs.side_effect = [self.query_rpm_sigs, []]
|
| |
+
|
| |
+ # a missing header should error
|
| |
+ builddir = self.pathinfo.build(self.buildinfo)
|
| |
+ sigkey = '2f86d6a1'
|
| |
+ os.remove(self.sighdr[sigkey])
|
| |
+
|
| |
+ before = self.get_files()
|
| |
+ with self.assertRaises(koji.GenericError) as ex:
|
| |
+ kojihub.rename_rpm_sig(rpminfo, sigkey, 'foobar')
|
| |
+
|
| |
+ expected_msg = "Missing signature header file: %s" % self.sighdr[sigkey]
|
| |
+ self.assertEqual(ex.exception.args[0], expected_msg)
|
| |
+ self.assertEqual(before, self.get_files())
|
| |
+
|
| |
+ def test_valid(self):
|
| |
+ rpminfo = 2
|
| |
+ self.get_rpm.return_value = self.rinfo
|
| |
+ self.get_build.return_value = self.buildinfo
|
| |
+ self.get_user.return_value = self.userinfo
|
| |
+ self.query_rpm_sigs.side_effect = [self.query_rpm_sigs, []]
|
| |
+ oldkey = '2f86d6a1'
|
| |
+ kojihub.rename_rpm_sig(rpminfo, oldkey, 'foobar')
|
| |
+
|
| |
+ # the old files should be gone
|
| |
+ if os.path.exists(self.signed[oldkey]):
|
| |
+ raise Exception('signed copy not deleted')
|
| |
+ if os.path.exists(self.sighdr[oldkey]):
|
| |
+ raise Exception('header still in place')
|
| |
+
|
| |
+ # the new file should be there
|
| |
+ sighdr = joinpath(self.builddir, self.pathinfo.sighdr(self.rinfo, 'foobar'))
|
| |
+ signed = joinpath(self.builddir, self.pathinfo.signed(self.rinfo, 'foobar'))
|
| |
+ with open(sighdr, 'rt') as fp:
|
| |
+ self.assertEqual(fp.read(), 'DETACHED SIGHDR\n')
|
| |
+ with open(signed, 'rt') as fp:
|
| |
+ self.assertEqual(fp.read(), 'SIGNED COPY\n')
|
| |
+
|
| |
+ self.assertEqual(len(self.updates), 1)
|
| |
+ update = self.updates[0]
|
| |
+ self.assertEqual(update.table, 'rpmsigs')
|
| |
+ self.assertEqual(update.clauses, ["rpm_id=%(rpm_id)s", "sigkey=%(oldkey)s"])
|
| |
+
|
| |
+ self.get_build.assert_called_once_with(self.rinfo['build_id'], strict=True)
|
| |
+
|
| |
+ def test_already_got_signed_copy(self):
|
| |
+ rpminfo = 2
|
| |
+ self.get_rpm.return_value = self.rinfo
|
| |
+ self.get_build.return_value = self.buildinfo
|
| |
+ self.get_user.return_value = self.userinfo
|
| |
+ self.query_rpm_sigs.side_effect = [self.query_rpm_sigs, []]
|
| |
+ oldkey = '2f86d6a1'
|
| |
+ signed = joinpath(self.builddir, self.pathinfo.signed(self.rinfo, 'foobar'))
|
| |
+ koji.ensuredir(os.path.dirname(signed))
|
| |
+ with open(signed, 'wt') as fp:
|
| |
+ fp.write('STRAY SIGNED COPY\n')
|
| |
+
|
| |
+ before = self.get_files()
|
| |
+ with self.assertRaises(koji.GenericError) as ex:
|
| |
+ kojihub.rename_rpm_sig(rpminfo, oldkey, 'foobar')
|
| |
+
|
| |
+ expected_msg = f'Signed copy already exists: {signed}'
|
| |
+ self.assertEqual(ex.exception.args[0], expected_msg)
|
| |
+ self.get_rpm.assert_called_once_with(rpminfo, strict=True)
|
| |
+ self.assertEqual(before, self.get_files())
|
| |
+
|
| |
+ def test_already_got_sighdr(self):
|
| |
+ rpminfo = 2
|
| |
+ self.get_rpm.return_value = self.rinfo
|
| |
+ self.get_build.return_value = self.buildinfo
|
| |
+ self.get_user.return_value = self.userinfo
|
| |
+ self.query_rpm_sigs.side_effect = [self.query_rpm_sigs, []]
|
| |
+ oldkey = '2f86d6a1'
|
| |
+ sighdr = joinpath(self.builddir, self.pathinfo.sighdr(self.rinfo, 'foobar'))
|
| |
+ koji.ensuredir(os.path.dirname(sighdr))
|
| |
+ with open(sighdr, 'wt') as fp:
|
| |
+ fp.write('STRAY HEADER\n')
|
| |
+
|
| |
+ before = self.get_files()
|
| |
+ with self.assertRaises(koji.GenericError) as ex:
|
| |
+ kojihub.rename_rpm_sig(rpminfo, oldkey, 'foobar')
|
| |
+
|
| |
+ expected_msg = f'Signature header file already exists: {sighdr}'
|
| |
+ self.assertEqual(ex.exception.args[0], expected_msg)
|
| |
+ self.get_rpm.assert_called_once_with(rpminfo, strict=True)
|
| |
+ self.assertEqual(before, self.get_files())
|
| |
+
|
| |
+ @mock.patch('os.rename')
|
| |
+ def test_first_rename_fails(self, rename):
|
| |
+ rpminfo = 2
|
| |
+ self.get_rpm.return_value = self.rinfo
|
| |
+ self.get_build.return_value = self.buildinfo
|
| |
+ self.get_user.return_value = self.userinfo
|
| |
+ self.query_rpm_sigs.side_effect = [self.query_rpm_sigs, []]
|
| |
+ oldkey = '2f86d6a1'
|
| |
+ rename.side_effect = FileNotFoundError('...')
|
| |
+
|
| |
+ before = self.get_files(with_dirs=False)
|
| |
+ # this error case will leave a stray dir
|
| |
+ with self.assertRaises(koji.GenericError) as ex:
|
| |
+ kojihub.rename_rpm_sig(rpminfo, oldkey, 'foobar')
|
| |
+
|
| |
+ oldpath = joinpath(self.builddir, self.pathinfo.signed(self.rinfo, oldkey))
|
| |
+ expected_msg = f'Failed to rename {oldpath}'
|
| |
+ self.assertEqual(ex.exception.args[0], expected_msg)
|
| |
+ self.get_rpm.assert_called_once_with(rpminfo, strict=True)
|
| |
+ self.assertEqual(before, self.get_files(with_dirs=False))
|
| |
+
|
| |
+ @mock.patch('os.rename')
|
| |
+ def test_second_rename_fails(self, rename):
|
| |
+ rpminfo = 2
|
| |
+ self.get_rpm.return_value = self.rinfo
|
| |
+ self.get_build.return_value = self.buildinfo
|
| |
+ self.get_user.return_value = self.userinfo
|
| |
+ self.query_rpm_sigs.side_effect = [self.query_rpm_sigs, []]
|
| |
+ oldkey = '2f86d6a1'
|
| |
+ rename.side_effect = [None, FileNotFoundError('...')]
|
| |
+
|
| |
+ with self.assertRaises(koji.GenericError) as ex:
|
| |
+ kojihub.rename_rpm_sig(rpminfo, oldkey, 'foobar')
|
| |
+
|
| |
+ oldpath = joinpath(self.builddir, self.pathinfo.sighdr(self.rinfo, oldkey))
|
| |
+ expected_msg = f'Failed to rename {oldpath}'
|
| |
+ self.assertEqual(ex.exception.args[0], expected_msg)
|
| |
+ self.get_rpm.assert_called_once_with(rpminfo, strict=True)
|
| |
+
|
| |
+
|
| |
+ # the end
|
| |
More complex signature headers cannot be described by a single key.
This PR takes the approach of treating the
sigkey
field as a descriptive label for the signature that can be specified at import or changed later. This gives us the ability to handle more complex signing without radically altering the api or file layout.