#4373 Draft: allow renaming rpm signatures
Opened 2 months ago by mikem. Modified 2 months ago
mikem/koji multi-signature  into  master

file modified
+123 -62
@@ -7496,7 +7496,7 @@ 

          fn = fileinfo['hub.path']

          rpminfo = import_rpm(fn, buildinfo, brinfo.id, fileinfo=fileinfo)

          import_rpm_file(fn, buildinfo, rpminfo)

-         add_rpm_sig(rpminfo['id'], koji.rip_rpm_sighdr(fn))

+         add_rpm_sig(rpminfo['id'], koji.rip_rpm_sighdr(fn), sigkey=fileinfo.get('sigkey'))

  

      def import_log(self, buildinfo, fileinfo):

          if fileinfo.get('metadata_only', False):
@@ -8171,7 +8171,7 @@ 

                      sumobj.write(sum.hexdigest())

  

  

- def add_rpm_sig(an_rpm, sighdr):

+ def add_rpm_sig(an_rpm, sighdr, sigkey=None):

      """Store a signature header for an rpm"""

      # calling function should perform permission checks, if applicable

      rinfo = get_rpm(an_rpm, strict=True)
@@ -8182,30 +8182,38 @@ 

      builddir = koji.pathinfo.build(binfo)

      if not os.path.isdir(builddir):

          raise koji.GenericError("No such directory: %s" % builddir)

+     if sigkey is not None:

+         verify_name_internal(sigkey)

+ 

+     # verify sigmd5 matches rpm and pick sigkey if needed

      rawhdr = koji.RawHeader(sighdr)

      sigmd5 = koji.hex_string(rawhdr.get(koji.RPM_SIGTAG_MD5))

-     if sigmd5 == rinfo['payloadhash']:

+     if sigmd5 != rinfo['payloadhash']:

          # note: payloadhash is a misnomer, that field is populated with sigmd5.

-         sigkey = rawhdr.get(koji.RPM_SIGTAG_GPG)

-         if not sigkey:

-             sigkey = rawhdr.get(koji.RPM_SIGTAG_PGP)

-         if not sigkey:

-             sigkey = rawhdr.get(koji.RPM_SIGTAG_DSA)

-         if not sigkey:

-             sigkey = rawhdr.get(koji.RPM_SIGTAG_RSA)

-     else:

          # Double check using rpm in case we have somehow misread

          rpm_path = "%s/%s" % (builddir, koji.pathinfo.rpm(rinfo))

-         sigmd5, sigkey = _scan_sighdr(sighdr, rpm_path)

+         sigmd5, rawsig = _scan_sighdr(sighdr, rpm_path)

          sigmd5 = koji.hex_string(sigmd5)

          if sigmd5 != rinfo['payloadhash']:

              nvra = "%(name)s-%(version)s-%(release)s.%(arch)s" % rinfo

              raise koji.GenericError("wrong md5 for %s: %s" % (nvra, sigmd5))

-     if not sigkey:

-         sigkey = ''

-         # we use the sigkey='' to represent unsigned in the db (so that uniqueness works)

-     else:

-         sigkey = koji.get_sigpacket_key_id(sigkey)

+     elif sigkey is None:

+         rawsig = rawhdr.get(koji.RPM_SIGTAG_GPG)

+         if not rawsig:

+             rawsig = rawhdr.get(koji.RPM_SIGTAG_PGP)

+         if not rawsig:

+             sigkey = rawhdr.get(koji.RPM_SIGTAG_DSA)

+         if not rawsig:

+             rawsig = rawhdr.get(koji.RPM_SIGTAG_RSA)

+ 

+     if sigkey is None:

+         if not rawsig:

+             sigkey = ''

+             # we use the sigkey='' to represent unsigned in the db (so that uniqueness works)

+         else:

+             sigkey = koji.get_sigpacket_key_id(rawsig)

+ 

+     # do the insert

      sighash = md5_constructor(sighdr).hexdigest()

      rpm_id = rinfo['id']

      koji.plugin.run_callbacks('preRPMSign', sigkey=sigkey, sighash=sighash, build=binfo, rpm=rinfo)
@@ -8216,6 +8224,7 @@ 

      except IntegrityError:

          nvra = "%(name)s-%(version)s-%(release)s.%(arch)s" % rinfo

          raise koji.GenericError("Signature already exists for package %s, key %s" % (nvra, sigkey))

+ 

      # - write to fs

      sigpath = "%s/%s" % (builddir, koji.pathinfo.sighdr(rinfo, sigkey))

      koji.ensuredir(os.path.dirname(sigpath))
@@ -8225,6 +8234,80 @@ 

                                sigkey=sigkey, sighash=sighash, build=binfo, rpm=rinfo)

  

  

+ def rename_rpm_sig(rpminfo, oldkey, newkey):

+     """Change the sigkey for an rpm signature"""

+ 

+     verify_name_internal(newkey)

+     rinfo = get_rpm(rpminfo, strict=True)

+     nvra = "%(name)s-%(version)s-%(release)s.%(arch)s" % rinfo

+     if rinfo['external_repo_id']:

+         raise koji.GenericError("Not an internal rpm: %s (from %s)"

+                                 % (rpminfo, rinfo['external_repo_name']))

+ 

+     # Determine what signature we have

+     rows = query_rpm_sigs(rpm_id=rinfo['id'], sigkey=oldkey)

+     if not rows:

+         raise koji.GenericError(f'No {oldkey} signature for rpm {nvra}')

+ 

+     # Check if newkey exists already

+     rows = query_rpm_sigs(rpm_id=rinfo['id'], sigkey=newkey)

+     if rows:

+         raise koji.GenericError(f'A {newkey} signature already exists for rpm {nvra}')

+ 

+     # Update db

+     update = UpdateProcessor(

+         table='rpmsigs',

+         data={'sigkey': newkey},

+         clauses=["rpm_id=%(rpm_id)s", "sigkey=%(oldkey)s"],

+         values={'rpm_id': rinfo['id'], 'oldkey': oldkey},

+     )

+     update.execute()

+ 

+     # Get the base build dir for our paths

+     binfo = get_build(rinfo['build_id'], strict=True)

+     builddir = koji.pathinfo.build(binfo)

+ 

+     # Check header file

+     old_path = joinpath(builddir, koji.pathinfo.sighdr(rinfo, oldkey))

+     if not os.path.exists(old_path):

+         raise koji.GenericError(f'Missing signature header file: {old_path}')

+     new_path = joinpath(builddir, koji.pathinfo.sighdr(rinfo, newkey))

+     if os.path.exists(new_path):

+         # shouldn't happen, newkey isn't in db

+         raise koji.GenericError(f'Signature header file already exists: {new_path}')

+ 

+     # Check signed copies

+     new_signed_path = joinpath(builddir, koji.pathinfo.signed(rinfo, newkey))

+     if os.path.exists(new_signed_path):

+         # shouldn't happen, newkey isn't in db

+         raise koji.GenericError(f'Signed copy already exists: {new_signed_path}')

+ 

+     # rename the signed copy first if present, lowest risk

+     old_signed_path = joinpath(builddir, koji.pathinfo.signed(rinfo, oldkey))

+     if os.path.exists(old_signed_path):

+         # signed copies might not exist

+         try:

+             koji.ensuredir(os.path.dirname(new_signed_path))

+             os.rename(old_signed_path, new_signed_path)

+         except Exception:

+             # shouldn't happen and may need cleanup, so log copiously

+             logger.error(f"Failed to rename {old_signed_path}", exc_info=True)

+             raise koji.GenericError(f"Failed to rename {old_signed_path}")

+ 

+     # rename the header file next

+     try:

+         koji.ensuredir(os.path.dirname(new_path))

+         os.rename(old_path, new_path)

+     except Exception:

+         # shouldn't happen and may need cleanup, so log copiously

+         logger.error(f"Failed to rename {old_path}", exc_info=True)

+         raise koji.GenericError(f"Failed to rename {old_path}")

+ 

+     # Note: we do not delete any empty parent dirs

+ 

+     logger.warning("Renamed signature for rpm %s: %s to %s", nvra, oldkey, newkey)

+ 

+ 

  def delete_rpm_sig(rpminfo, sigkey=None, all_sigs=False):

      """Delete rpm signature

  
@@ -8367,47 +8450,6 @@ 

      return koji.get_header_field(hdr, 'sigmd5'), sig

  

  

- def check_rpm_sig(an_rpm, sigkey, sighdr):

-     # verify that the provided signature header matches the key and rpm

-     rinfo = get_rpm(an_rpm, strict=True)

-     binfo = get_build(rinfo['build_id'])

-     builddir = koji.pathinfo.build(binfo)

-     rpm_path = "%s/%s" % (builddir, koji.pathinfo.rpm(rinfo))

-     if not os.path.exists(rpm_path):

-         raise koji.GenericError("No such path: %s" % rpm_path)

-     if not os.path.isfile(rpm_path):

-         raise koji.GenericError("Not a regular file: %s" % rpm_path)

-     fd, temp = tempfile.mkstemp()

-     os.close(fd)

-     try:

-         koji.splice_rpm_sighdr(sighdr, rpm_path, dst=temp)

-         ts = rpm.TransactionSet()

-         ts.setVSFlags(0)  # full verify

-         with open(temp, 'rb') as fo:

-             hdr = ts.hdrFromFdno(fo.fileno())

-     except Exception:

-         try:

-             os.unlink(temp)

-         except Exception:

-             pass

-         raise

-     raw_key = koji.get_header_field(hdr, 'siggpg')

-     if not raw_key:

-         raw_key = koji.get_header_field(hdr, 'sigpgp')

-     if not raw_key:

-         raw_key = koji.get_header_field(hdr, 'dsaheader')

-     if not raw_key:

-         raw_key = koji.get_header_field(hdr, 'rsaheader')

-     if not raw_key:

-         found_key = None

-     else:

-         found_key = koji.get_sigpacket_key_id(raw_key)

-     if sigkey.lower() != found_key:

-         raise koji.GenericError("Signature key mismatch: got %s, expected %s"

-                                 % (found_key, sigkey))

-     os.unlink(temp)

- 

- 

  def query_rpm_sigs(rpm_id=None, sigkey=None, queryOpts=None):

      """Queries db for rpm signatures

  
@@ -11587,7 +11629,7 @@ 

          reject_draft(build)

          new_image_build(build)

  

-     def importRPM(self, path, basename):

+     def importRPM(self, path, basename, sigkey=None):

          """Import an RPM into the database.

  

          The file must be uploaded first.
@@ -11599,7 +11641,7 @@ 

              raise koji.GenericError("No such file: %s" % fn)

          rpminfo = import_rpm(fn)

          import_rpm_file(fn, rpminfo['build'], rpminfo)

-         add_rpm_sig(rpminfo['id'], koji.rip_rpm_sighdr(fn))

+         add_rpm_sig(rpminfo['id'], koji.rip_rpm_sighdr(fn), sigkey=sigkey)

          for tag in list_tags(build=rpminfo['build_id']):

              set_tag_update(tag['id'], 'IMPORT')

          return rpminfo
@@ -13169,13 +13211,30 @@ 

          # XXX - still not sure if this is the right restriction

          return write_signed_rpm(an_rpm, sigkey, force)

  

-     def addRPMSig(self, an_rpm, data):

+     def addRPMSig(self, an_rpm, data, sigkey=None):

          """Store a signature header for an rpm

  

          data: the signature header encoded as base64

          """

          context.session.assertPerm('sign')

-         return add_rpm_sig(an_rpm, base64.b64decode(data))

+         return add_rpm_sig(an_rpm, base64.b64decode(data), sigkey=sigkey)

+ 

+     def renameRPMSig(self, rpminfo, oldkey, newkey):

+         """Rename rpm signature

+ 

+         This changes the 'sigkey' value for a stored rpm signature and renames

+         the files approriately.

+ 

+         This call requires ``admin`` permission (``sign`` is not sufficient).

+ 

+         :param dict/str/id rpm: map containing 'name', 'version', 'release', and 'arch'

+                                 string N-V-R.A

+                                 int ID

+         :param str oldkey: Old signature key

+         :param str newkey: New signature key

+         """

+         context.session.assertPerm('admin')

+         return rename_rpm_sig(rpminfo, oldkey, newkey)

  

      def deleteRPMSig(self, rpminfo, sigkey=None, all_sigs=False):

          """Delete rpm signature
@@ -13183,6 +13242,8 @@ 

          Only use this method in extreme situations, because it goes against

          Koji's design of immutable, auditable data.

  

+         In most cases, it is preferable to use renameRPMSig instead.

+ 

          This call requires ``admin`` permission (``sign`` is not sufficient).

  

          :param dict/str/id rpm: map containing 'name', 'version', 'release', and 'arch'

@@ -0,0 +1,312 @@ 

+ import os

+ import tempfile

+ import shutil

+ import unittest

+ 

+ from unittest import mock

+ 

+ import koji

+ import kojihub

+ from koji.util import joinpath

+ 

+ UP = kojihub.UpdateProcessor

+ 

+ 

+ class TestRenameRPMSig(unittest.TestCase):

+ 

+     def getUpdate(self, *args, **kwargs):

+         update = UP(*args, **kwargs)

+         update.execute = mock.MagicMock()

+         self.updates.append(update)

+         return update

+ 

+     def setUp(self):

+         self.tempdir = tempfile.mkdtemp()

+         self.pathinfo = koji.PathInfo(self.tempdir)

+         mock.patch('koji.pathinfo', new=self.pathinfo).start()

+         self.updates = []

+         self.UpdateProcessor = mock.patch('kojihub.kojihub.UpdateProcessor',

+                                           side_effect=self.getUpdate).start()

+         self.get_rpm = mock.patch('kojihub.kojihub.get_rpm').start()

+         self.query_rpm_sigs = mock.patch('kojihub.kojihub.query_rpm_sigs').start()

+         self.get_build = mock.patch('kojihub.kojihub.get_build').start()

+         self.get_user = mock.patch('kojihub.kojihub.get_user').start()

+         self.verify_name_internal = mock.patch('kojihub.kojihub.verify_name_internal').start()

+         self.buildinfo = {'build_id': 1,

+                           'epoch': None,

+                           'extra': None,

+                           'id': 1,

+                           'name': 'fs_mark',

+                           'nvr': 'fs_mark-3.3-20.el8',

+                           'owner_id': 1,

+                           'owner_name': 'kojiadmin',

+                           'package_id': 1,

+                           'package_name': 'fs_mark',

+                           'release': '20.el8',

+                           'state': 1,

+                           'task_id': None,

+                           'version': '3.3'}

+         self.rinfo = {'arch': 'x86_64',

+                       'build_id': 1,

+                       'buildroot_id': None,

+                       'buildtime': 1564782768,

+                       'epoch': None,

+                       'external_repo_id': None,

+                       'extra': None,

+                       'id': 2,

+                       'metadata_only': False,

+                       'name': 'fs_mark',

+                       'payloadhash': 'ed0690ab4b0508f2448d99a08e0a004a',

+                       'release': '20.el8',

+                       'size': 25644,

+                       'version': '3.3'}

+         self.rpmsigs = {

+             '': {'rpm_id': 2, 'sighash': 'cb4d01bd3671b41ef51abc9be851e614', 'sigkey': ''},

+             '2f86d6a1': {'rpm_id': 2, 'sighash': '78c245caa6deb70f0abc8b844c642cd6',

+                          'sigkey': '2f86d6a1'}

+         }

+         self.queryrpmsigs = [self.rpmsigs[k] for k in sorted(self.rpmsigs)]

+         self.userinfo = {'authtype': 2, 'id': 1, 'krb_principal': None, 'krb_principals': [],

+                          'name': 'testuser', 'status': 0, 'usertype': 0}

+         self.set_up_files()

+ 

+     def set_up_files(self):

+         builddir = self.pathinfo.build(self.buildinfo)

+         os.makedirs(builddir)

+         self.builddir = builddir

+         self.signed = {}

+         self.sighdr = {}

+         for sig in self.queryrpmsigs:

+             key = sig['sigkey']

+             signed = joinpath(builddir, self.pathinfo.signed(self.rinfo, key))

+             self.signed[key] = signed

+             koji.ensuredir(os.path.dirname(signed))

+             with open(signed, 'wt') as fo:

+                 fo.write('SIGNED COPY\n')

+ 

+             sighdr = joinpath(builddir, self.pathinfo.sighdr(self.rinfo, key))

+             self.sighdr[key] = sighdr

+             koji.ensuredir(os.path.dirname(sighdr))

+             with open(sighdr, 'wt') as fo:

+                 fo.write('DETACHED SIGHDR\n')

+ 

+     def get_files(self, with_dirs=True):

+         data = []

+         for root, dirs, files in os.walk(self.builddir):

+             for name in files:

+                 fn = os.path.join(root, name)

+                 with open(fn, 'rt') as fp:

+                     contents = fp.read()

+                 # tuple because we will sort later

+                 data.append((fn, contents))

+             if with_dirs:

+                 for name in dirs:

+                     fn = os.path.join(root, name)

+                     data.append((fn,))

+         data.sort()

+         return data

+ 

+     def tearDown(self):

+         mock.patch.stopall()

+         shutil.rmtree(self.tempdir)

+ 

+     def test_rpm_missing(self):

+         rpm_id = 1234

+         expected_msg = 'No such rpm: %s' % rpm_id

+         self.get_rpm.side_effect = koji.GenericError("No such rpm: %s" % rpm_id)

+ 

+         before = self.get_files()

+         with self.assertRaises(koji.GenericError) as ex:

+             kojihub.rename_rpm_sig(rpm_id, 'foo', 'bar')

+ 

+         self.assertEqual(len(self.updates), 0)

+         self.assertEqual(ex.exception.args[0], expected_msg)

+         self.get_rpm.assert_called_once_with(rpm_id, strict=True)

+         self.query_rpm_sigs.assert_not_called()

+         self.assertEqual(before, self.get_files())

+ 

+     def test_external_repo(self):

+         rpminfo = 1234

+         rinfo = self.rinfo.copy()

+         rinfo.update({'external_repo_id': 1, 'external_repo_name': 'INTERNAL'})

+         self.get_rpm.return_value = rinfo

+ 

+         before = self.get_files()

+         with self.assertRaises(koji.GenericError) as ex:

+             kojihub.rename_rpm_sig(rpminfo, 'foo', 'bar')

+ 

+         self.assertEqual(len(self.updates), 0)

+         expected_msg = "Not an internal rpm: %s (from %s)" % (rpminfo, rinfo['external_repo_name'])

+         self.assertEqual(ex.exception.args[0], expected_msg)

+         self.get_rpm.assert_called_once_with(rpminfo, strict=True)

+         self.query_rpm_sigs.assert_not_called()

+         self.assertEqual(before, self.get_files())

+ 

+     def test_no_oldsig(self):

+         rpminfo = 1234

+         nvra = "%s-%s-%s.%s" % (self.rinfo['name'], self.rinfo['version'], self.rinfo['release'],

+                                 self.rinfo['arch'])

+         expected_msg = "No foo signature for rpm %s" % nvra

+         self.get_rpm.return_value = self.rinfo

+         self.query_rpm_sigs.return_value = []

+ 

+         before = self.get_files()

+         with self.assertRaises(koji.GenericError) as ex:

+             kojihub.rename_rpm_sig(rpminfo, 'foo', 'bar')

+ 

+         self.assertEqual(len(self.updates), 0)

+         self.assertEqual(ex.exception.args[0], expected_msg)

+         self.get_rpm.assert_called_once_with(rpminfo, strict=True)

+         self.query_rpm_sigs.assert_called_once_with(rpm_id=self.rinfo['id'], sigkey='foo')

+         self.assertEqual(before, self.get_files())

+ 

+     def test_already_got_one(self):

+         rpminfo = 1234

+         nvra = "%s-%s-%s.%s" % (self.rinfo['name'], self.rinfo['version'], self.rinfo['release'],

+                                 self.rinfo['arch'])

+         expected_msg = "A bar signature already exists for rpm %s" % nvra

+         self.get_rpm.return_value = self.rinfo

+         self.query_rpm_sigs.side_effect = [self.query_rpm_sigs, [{'foo':1}]]

+ 

+         before = self.get_files()

+         with self.assertRaises(koji.GenericError) as ex:

+             kojihub.rename_rpm_sig(rpminfo, 'foo', 'bar')

+ 

+         self.assertEqual(len(self.updates), 0)

+         self.assertEqual(ex.exception.args[0], expected_msg)

+         self.get_rpm.assert_called_once_with(rpminfo, strict=True)

+         self.assertEqual(before, self.get_files())

+ 

+     def test_header_missing(self):

+         rpminfo = self.rinfo['id']

+         self.get_rpm.return_value = self.rinfo

+         self.get_build.return_value = self.buildinfo

+         self.get_user.return_value = self.userinfo

+         self.query_rpm_sigs.side_effect = [self.query_rpm_sigs, []]

+ 

+         # a missing header should error

+         builddir = self.pathinfo.build(self.buildinfo)

+         sigkey = '2f86d6a1'

+         os.remove(self.sighdr[sigkey])

+ 

+         before = self.get_files()

+         with self.assertRaises(koji.GenericError) as ex:

+             kojihub.rename_rpm_sig(rpminfo, sigkey, 'foobar')

+ 

+         expected_msg = "Missing signature header file: %s" % self.sighdr[sigkey]

+         self.assertEqual(ex.exception.args[0], expected_msg)

+         self.assertEqual(before, self.get_files())

+ 

+     def test_valid(self):

+         rpminfo = 2

+         self.get_rpm.return_value = self.rinfo

+         self.get_build.return_value = self.buildinfo

+         self.get_user.return_value = self.userinfo

+         self.query_rpm_sigs.side_effect = [self.query_rpm_sigs, []]

+         oldkey = '2f86d6a1'

+         kojihub.rename_rpm_sig(rpminfo, oldkey, 'foobar')

+ 

+         # the old files should be gone

+         if os.path.exists(self.signed[oldkey]):

+             raise Exception('signed copy not deleted')

+         if os.path.exists(self.sighdr[oldkey]):

+             raise Exception('header still in place')

+ 

+         # the new file should be there

+         sighdr = joinpath(self.builddir, self.pathinfo.sighdr(self.rinfo, 'foobar'))

+         signed = joinpath(self.builddir, self.pathinfo.signed(self.rinfo, 'foobar'))

+         with open(sighdr, 'rt') as fp:

+             self.assertEqual(fp.read(), 'DETACHED SIGHDR\n')

+         with open(signed, 'rt') as fp:

+             self.assertEqual(fp.read(), 'SIGNED COPY\n')

+ 

+         self.assertEqual(len(self.updates), 1)

+         update = self.updates[0]

+         self.assertEqual(update.table, 'rpmsigs')

+         self.assertEqual(update.clauses, ["rpm_id=%(rpm_id)s", "sigkey=%(oldkey)s"])

+ 

+         self.get_build.assert_called_once_with(self.rinfo['build_id'], strict=True)

+ 

+     def test_already_got_signed_copy(self):

+         rpminfo = 2

+         self.get_rpm.return_value = self.rinfo

+         self.get_build.return_value = self.buildinfo

+         self.get_user.return_value = self.userinfo

+         self.query_rpm_sigs.side_effect = [self.query_rpm_sigs, []]

+         oldkey = '2f86d6a1'

+         signed = joinpath(self.builddir, self.pathinfo.signed(self.rinfo, 'foobar'))

+         koji.ensuredir(os.path.dirname(signed))

+         with open(signed, 'wt') as fp:

+             fp.write('STRAY SIGNED COPY\n')

+ 

+         before = self.get_files()

+         with self.assertRaises(koji.GenericError) as ex:

+             kojihub.rename_rpm_sig(rpminfo, oldkey, 'foobar')

+ 

+         expected_msg = f'Signed copy already exists: {signed}'

+         self.assertEqual(ex.exception.args[0], expected_msg)

+         self.get_rpm.assert_called_once_with(rpminfo, strict=True)

+         self.assertEqual(before, self.get_files())

+ 

+     def test_already_got_sighdr(self):

+         rpminfo = 2

+         self.get_rpm.return_value = self.rinfo

+         self.get_build.return_value = self.buildinfo

+         self.get_user.return_value = self.userinfo

+         self.query_rpm_sigs.side_effect = [self.query_rpm_sigs, []]

+         oldkey = '2f86d6a1'

+         sighdr = joinpath(self.builddir, self.pathinfo.sighdr(self.rinfo, 'foobar'))

+         koji.ensuredir(os.path.dirname(sighdr))

+         with open(sighdr, 'wt') as fp:

+             fp.write('STRAY HEADER\n')

+ 

+         before = self.get_files()

+         with self.assertRaises(koji.GenericError) as ex:

+             kojihub.rename_rpm_sig(rpminfo, oldkey, 'foobar')

+ 

+         expected_msg = f'Signature header file already exists: {sighdr}'

+         self.assertEqual(ex.exception.args[0], expected_msg)

+         self.get_rpm.assert_called_once_with(rpminfo, strict=True)

+         self.assertEqual(before, self.get_files())

+ 

+     @mock.patch('os.rename')

+     def test_first_rename_fails(self, rename):

+         rpminfo = 2

+         self.get_rpm.return_value = self.rinfo

+         self.get_build.return_value = self.buildinfo

+         self.get_user.return_value = self.userinfo

+         self.query_rpm_sigs.side_effect = [self.query_rpm_sigs, []]

+         oldkey = '2f86d6a1'

+         rename.side_effect = FileNotFoundError('...')

+ 

+         before = self.get_files(with_dirs=False)

+         # this error case will leave a stray dir

+         with self.assertRaises(koji.GenericError) as ex:

+             kojihub.rename_rpm_sig(rpminfo, oldkey, 'foobar')

+ 

+         oldpath = joinpath(self.builddir, self.pathinfo.signed(self.rinfo, oldkey))

+         expected_msg = f'Failed to rename {oldpath}'

+         self.assertEqual(ex.exception.args[0], expected_msg)

+         self.get_rpm.assert_called_once_with(rpminfo, strict=True)

+         self.assertEqual(before, self.get_files(with_dirs=False))

+ 

+     @mock.patch('os.rename')

+     def test_second_rename_fails(self, rename):

+         rpminfo = 2

+         self.get_rpm.return_value = self.rinfo

+         self.get_build.return_value = self.buildinfo

+         self.get_user.return_value = self.userinfo

+         self.query_rpm_sigs.side_effect = [self.query_rpm_sigs, []]

+         oldkey = '2f86d6a1'

+         rename.side_effect = [None, FileNotFoundError('...')]

+ 

+         with self.assertRaises(koji.GenericError) as ex:

+             kojihub.rename_rpm_sig(rpminfo, oldkey, 'foobar')

+ 

+         oldpath = joinpath(self.builddir, self.pathinfo.sighdr(self.rinfo, oldkey))

+         expected_msg = f'Failed to rename {oldpath}'

+         self.assertEqual(ex.exception.args[0], expected_msg)

+         self.get_rpm.assert_called_once_with(rpminfo, strict=True)

+ 

+ 

+ # the end

More complex signature headers cannot be described by a single key.

This PR takes the approach of treating the sigkey field as a descriptive label for the signature that can be specified at import or changed later. This gives us the ability to handle more complex signing without radically altering the api or file layout.

Side note: this also drops the check_rpm_sig function from the hub code, which has never been used