From b6189ddeaa7366b76f37379724d3077e4569d899 Mon Sep 17 00:00:00 2001 From: Yuming Zhu Date: Oct 22 2024 15:55:11 +0000 Subject: [PATCH 1/4] resort tests from lib to hub for py2 --- diff --git a/tests/test_hub/test_auth.py b/tests/test_hub/test_auth.py new file mode 100644 index 0000000..fb5386c --- /dev/null +++ b/tests/test_hub/test_auth.py @@ -0,0 +1,767 @@ +from __future__ import absolute_import + +import mock + +import unittest + +import koji +import datetime +import kojihub.auth + +UP = kojihub.auth.UpdateProcessor +QP = kojihub.auth.QueryProcessor + + +class TestAuthSession(unittest.TestCase): + def getUpdate(self, *args, **kwargs): + update = UP(*args, **kwargs) + update.execute = mock.MagicMock() + self.updates.append(update) + return update + + def getQuery(self, *args, **kwargs): + query = QP(*args, **kwargs) + query.execute = self.query_execute + query.executeOne = self.query_executeOne + query.singleValue = self.query_singleValue + self.queries.append(query) + return query + + def setUp(self): + self.context = mock.patch('kojihub.auth.context').start() + kojihub.db.context = self.context + kojihub.auth.context = self.context + self.UpdateProcessor = mock.patch('kojihub.auth.UpdateProcessor', + side_effect=self.getUpdate).start() + self.updates = [] + self.query_execute = mock.MagicMock() + self.query_executeOne = mock.MagicMock() + self.query_singleValue = mock.MagicMock() + self.QueryProcessor = mock.patch('kojihub.auth.QueryProcessor', + side_effect=self.getQuery).start() + self.queries = [] + # It seems MagicMock will not automatically handle attributes that + # start with "assert" + self.context.session.assertLogin = mock.MagicMock() + + def tearDown(self): + mock.patch.stopall() + + def test_instance(self): + """Simple kojihub.auth.Session instance""" + self.context.opts = { + 'CheckClientIP': True, + 'DisableURLSessions': False, + 'SessionRenewalTimeout': 0, + } + self.context.environ = {'QUERY_STRING': 'non_session_key=1'} + with self.assertRaises(koji.GenericError) as cm: + kojihub.auth.Session() + # no args in request/environment + self.assertEqual(cm.exception.args[0], "'session-id' not specified in session args") + + def get_session_old(self): + """auth.Session instance""" + # base session from test_basic_instance + # url-based kojihub.auth - will be dropped in 1.34 + self.context.opts = { + 'CheckClientIP': True, + 'DisableURLSessions': False, + 'SessionRenewalTimeout': 0, + } + self.context.environ = { + 'QUERY_STRING': 'session-id=123&session-key=xyz&callnum=345', + 'REMOTE_ADDR': 'remote-addr', + } + + self.query_executeOne.side_effect = [ + {'authtype': 2, 'callnum': 1, "date_part('epoch', start_time)": 1666599426.227002, + "date_part('epoch', update_time)": 1666599426.254308, 'exclusive': None, + 'expired': False, 'master': None, + 'start_time': datetime.datetime(2022, 10, 24, 8, 17, 6, 227002, + tzinfo=datetime.timezone.utc), + 'update_time': datetime.datetime(2022, 10, 24, 8, 17, 6, 254308, + tzinfo=datetime.timezone.utc), + 'user_id': 1}, + {'name': 'kojiadmin', 'status': 0, 'usertype': 0}] + self.query_singleValue.return_value = 123 + s = kojihub.auth.Session() + return s, self.context + + def get_session(self): + # base session from test_basic_instance + # header-based auth + self.context.opts = { + 'CheckClientIP': True, + 'DisableURLSessions': True, + 'SessionRenewalTimeout': 0, + } + self.context.environ = { + 'HTTP_KOJI_SESSION_ID': '123', + 'HTTP_KOJI_SESSION_KEY': 'xyz', + 'HTTP_KOJI_CALLNUM': '345', + 'REMOTE_ADDR': 'remote-addr', + } + + self.query_executeOne.side_effect = [ + {'authtype': 2, 'callnum': 1, "date_part('epoch', start_time)": 1666599426.227002, + "date_part('epoch', update_time)": 1666599426.254308, 'exclusive': None, + 'expired': False, 'master': None, + 'start_time': datetime.datetime(2022, 10, 24, 8, 17, 6, 227002, + tzinfo=datetime.timezone.utc), + 'update_time': datetime.datetime(2022, 10, 24, 8, 17, 6, 254308, + tzinfo=datetime.timezone.utc), + 'user_id': 1}, + {'name': 'kojiadmin', 'status': 0, 'usertype': 0}] + self.query_singleValue.return_value = 123 + s = kojihub.auth.Session() + return s, self.context + + def test_session_old(self): + self.get_session_old() + + def test_renewal_timeout(self): + """Simple kojihub.auth.Session instance""" + self.context.opts = { + 'CheckClientIP': True, + 'DisableURLSessions': False, + 'SessionRenewalTimeout': 1440, + } + self.context.environ = { + 'QUERY_STRING': 'session-id=123&session-key=xyz&callnum=345', + 'REMOTE_ADDR': 'remote-addr', + } + + self.query_executeOne.side_effect = [ + {'authtype': 2, 'callnum': 1, "start_ts": 1666599426.227002, + "update_ts": 1666599426.254308, 'exclusive': None, + 'expired': False, 'master': None, + 'start_time': datetime.datetime(2022, 10, 24, 8, 17, 6, 227002, + tzinfo=datetime.timezone.utc), + 'update_time': datetime.datetime(2022, 10, 24, 8, 17, 6, 254308, + tzinfo=datetime.timezone.utc), + 'renew_ts': None, + 'user_id': 1}, + {'name': 'kojiadmin', 'status': 0, 'usertype': 0}] + with self.assertRaises(koji.GenericError) as cm: + kojihub.auth.Session() + # no args in request/environment + self.assertEqual(cm.exception.args[0], 'session "123" has expired') + + self.assertEqual(len(self.updates), 1) + self.assertEqual(len(self.queries), 1) + + update = self.updates[0] + + self.assertEqual(update.table, 'sessions') + self.assertEqual(update.values['id'], 123) + self.assertEqual(update.clauses, ['id = %(id)s OR master = %(id)s']) + self.assertEqual(update.data, {'expired': True}) + self.assertEqual(update.rawdata, {}) + + query = self.queries[0] + self.assertEqual(query.tables, ['sessions']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, ['closed IS FALSE', 'hostip = %(hostip)s', 'id = %(id)i', + 'key = %(key)s']) + self.assertEqual(query.columns, ['authtype', 'callnum', 'exclusive', 'expired', 'master', + 'renew_time', "date_part('epoch', renew_time)", + 'start_time', "date_part('epoch', start_time)", + 'update_time', "date_part('epoch', update_time)", + 'user_id']) + self.assertEqual(query.aliases, ['authtype', 'callnum', 'exclusive', 'expired', 'master', + 'renew_time', 'renew_ts', 'start_time', 'start_ts', + 'update_time', 'update_ts', 'user_id']) + self.assertEqual(query.values, {'id': 123, 'key': 'xyz', 'hostip': 'remote-addr'}) + + def test_basic_instance(self): + """auth.Session instance""" + s, cntext = self.get_session() + self.assertEqual(len(self.updates), 2) + update = self.updates[0] + + self.assertEqual(update.table, 'sessions') + self.assertEqual(update.values['id'], 123) + self.assertEqual(update.clauses, ['id = %(id)i']) + self.assertEqual(update.data, {}) + self.assertEqual(update.rawdata, {'update_time': 'NOW()'}) + + update = self.updates[1] + + self.assertEqual(update.table, 'sessions') + self.assertEqual(update.values['id'], 123) + self.assertEqual(update.clauses, ['id = %(id)i']) + self.assertEqual(update.data, {'callnum': 345}) + self.assertEqual(update.rawdata, {}) + + self.assertEqual(len(self.queries), 3) + + query = self.queries[0] + self.assertEqual(query.tables, ['sessions']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, ['closed IS FALSE', 'hostip = %(hostip)s', 'id = %(id)i', + 'key = %(key)s']) + self.assertEqual(query.columns, ['authtype', 'callnum', 'exclusive', 'expired', 'master', + 'renew_time', "date_part('epoch', renew_time)", + 'start_time', "date_part('epoch', start_time)", + 'update_time', "date_part('epoch', update_time)", + 'user_id']) + self.assertEqual(query.aliases, ['authtype', 'callnum', 'exclusive', 'expired', 'master', + 'renew_time', 'renew_ts', 'start_time', 'start_ts', + 'update_time', 'update_ts', 'user_id']) + self.assertEqual(query.values, {'id': 123, 'key': 'xyz', 'hostip': 'remote-addr'}) + + query = self.queries[1] + self.assertEqual(query.tables, ['users']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, ['id=%(user_id)s']) + self.assertEqual(query.columns, ['name', 'status', 'usertype']) + self.assertEqual(query.values, {'user_id': 1}) + + query = self.queries[2] + self.assertEqual(query.tables, ['sessions']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, ['closed = FALSE', 'exclusive = TRUE', + 'user_id=%(user_id)s']) + self.assertEqual(query.columns, ['id']) + + def test_getattr(self): + """auth.Session instance""" + s, cntext = self.get_session() + + self.assertEqual(len(self.updates), 2) + update = self.updates[0] + + self.assertEqual(update.table, 'sessions') + self.assertEqual(update.values['id'], 123) + self.assertEqual(update.clauses, ['id = %(id)i']) + self.assertEqual(update.data, {}) + self.assertEqual(update.rawdata, {'update_time': 'NOW()'}) + + update = self.updates[1] + + self.assertEqual(update.table, 'sessions') + self.assertEqual(update.values['id'], 123) + self.assertEqual(update.clauses, ['id = %(id)i']) + self.assertEqual(update.data, {'callnum': 345}) + self.assertEqual(update.rawdata, {}) + + self.assertEqual(len(self.queries), 3) + + query = self.queries[0] + self.assertEqual(query.tables, ['sessions']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, ['closed IS FALSE', 'hostip = %(hostip)s', 'id = %(id)i', + 'key = %(key)s']) + self.assertEqual(query.columns, ['authtype', 'callnum', 'exclusive', 'expired', 'master', + 'renew_time', "date_part('epoch', renew_time)", + 'start_time', "date_part('epoch', start_time)", + 'update_time', "date_part('epoch', update_time)", + 'user_id']) + self.assertEqual(query.aliases, ['authtype', 'callnum', 'exclusive', 'expired', 'master', + 'renew_time', 'renew_ts', 'start_time', 'start_ts', + 'update_time', 'update_ts', 'user_id']) + self.assertEqual(query.values, {'id': 123, 'key': 'xyz', 'hostip': 'remote-addr'}) + + query = self.queries[1] + self.assertEqual(query.tables, ['users']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, ['id=%(user_id)s']) + self.assertEqual(query.columns, ['name', 'status', 'usertype']) + self.assertEqual(query.values, {'user_id': 1}) + + query = self.queries[2] + self.assertEqual(query.tables, ['sessions']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, ['closed = FALSE', 'exclusive = TRUE', + 'user_id=%(user_id)s']) + self.assertEqual(query.columns, ['id']) + + # all other names should raise error + with self.assertRaises(AttributeError): + s.non_existing_attribute + + def test_str(self): + """auth.Session string representation""" + s, cntext = self.get_session() + self.context.cnx = cntext.cnx + + s.logged_in = False + s.message = 'msg' + self.assertEqual(str(s), 'session: not logged in (msg)') + s.logged_in = True + self.assertNotEqual(str(s), 'session: not logged in') + + def test_validate(self): + """Session.validate""" + s, cntext = self.get_session() + self.context.cnx = cntext.cnx + + s.lockerror = True + with self.assertRaises(koji.AuthLockError): + s.validate() + + s.lockerror = False + self.assertTrue(s.validate()) + + def test_makeShared(self): + """Session.makeShared""" + s, _ = self.get_session() + s.makeShared() + self.assertEqual(len(self.updates), 3) + # check only last update query, first two are tested in test_basic_instance + update = self.updates[2] + + self.assertEqual(update.table, 'sessions') + self.assertEqual(update.values['session_id'], 123) + self.assertEqual(update.clauses, ['id=%(session_id)s']) + self.assertEqual(update.data, {'exclusive': None}) + self.assertEqual(update.rawdata, {}) + + self.assertEqual(len(self.queries), 3) + # all queries are tested in test_basic_instance + + @mock.patch('socket.gethostbyname') + def test_get_remote_ip(self, gethostbyname): + """Session.get_remote_ip""" + s, _ = self.get_session() + + self.context.opts = {'CheckClientIP': False} + self.assertEqual(s.get_remote_ip(), '-') + + self.context.opts = {'CheckClientIP': True} + self.assertEqual(s.get_remote_ip(override='xoverride'), 'xoverride') + + self.context.environ = {'REMOTE_ADDR': '123.123.123.123'} + self.assertEqual(s.get_remote_ip(), '123.123.123.123') + + gethostbyname.return_value = 'ip' + self.context.environ = {'REMOTE_ADDR': '127.0.0.1'} + self.assertEqual(s.get_remote_ip(), 'ip') + + def test_login(self): + s, _ = self.get_session() + + # already logged in + with self.assertRaises(koji.GenericError): + s.login('user', 'password') + + s.logged_in = False + with self.assertRaises(koji.AuthError): + s.login('user', 123) + with self.assertRaises(koji.AuthError): + s.login('user', '') + + # correct + s.get_remote_ip = mock.MagicMock() + s.get_remote_ip.return_value = 'hostip' + s.checkLoginAllowed = mock.MagicMock() + s.checkLoginAllowed.return_value = True + s.createSession = mock.MagicMock() + s.createSession.return_value = {'session-id': 'session-id'} + self.query_singleValue.return_value = 123 + + result = s.login('user', 'password') + + self.assertEqual(len(self.queries), 4) + + # check only last update query, first three are tested in test_basic_instance + query = self.queries[3] + self.assertEqual(query.tables, ['users']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, ['name = %(user)s', 'password = %(password)s']) + self.assertEqual(query.columns, ['id']) + self.assertEqual(query.values, {'user': 'user', 'password': 'password'}) + + self.assertEqual(len(self.updates), 2) + # all updates are tested in test_basic_instance + + self.assertEqual(s.get_remote_ip.call_count, 1) + self.assertEqual(s.checkLoginAllowed.call_args, mock.call(123)) + self.assertEqual(result, s.createSession.return_value) + + # one more try for non-existing user + self.query_singleValue.return_value = None + with self.assertRaises(koji.AuthError): + s.login('user', 'password') + + def test_checkKrbPrincipal(self): + s, cntext = self.get_session() + self.assertIsNone(s.checkKrbPrincipal(None)) + self.context.opts = {'AllowedKrbRealms': '*'} + self.assertIsNone(s.checkKrbPrincipal('any')) + self.context.opts = {'AllowedKrbRealms': 'example.com'} + with self.assertRaises(koji.AuthError) as cm: + s.checkKrbPrincipal('any') + self.assertEqual(cm.exception.args[0], + 'invalid Kerberos principal: any') + with self.assertRaises(koji.AuthError) as cm: + s.checkKrbPrincipal('any@') + self.assertEqual(cm.exception.args[0], + 'invalid Kerberos principal: any@') + with self.assertRaises(koji.AuthError) as cm: + s.checkKrbPrincipal('any@bannedrealm') + self.assertEqual(cm.exception.args[0], + "Kerberos principal's realm:" + " bannedrealm is not allowed") + self.assertIsNone(s.checkKrbPrincipal('user@example.com')) + self.context.opts = {'AllowedKrbRealms': 'example.com,example.net,example.org'} + self.assertIsNone(s.checkKrbPrincipal('user@example.net')) + + def test_getUserIdFromKerberos(self): + krb_principal = 'test-krb-principal' + self.query_singleValue.return_value = 135 + s, cntext = self.get_session() + s.checkKrbPrincipal = mock.MagicMock() + s.checkKrbPrincipal.return_value = True + + s.getUserIdFromKerberos(krb_principal) + + self.assertEqual(len(self.queries), 4) + # check only last update query, first three are tested in test_basic_instance + query = self.queries[3] + self.assertEqual(query.tables, ['users']) + self.assertEqual(query.joins, ['user_krb_principals ON ' + 'users.id = user_krb_principals.user_id']) + self.assertEqual(query.clauses, ['krb_principal = %(krb_principal)s']) + self.assertEqual(query.columns, ['id']) + self.assertEqual(query.values, {'krb_principal': krb_principal}) + + self.assertEqual(len(self.updates), 2) + # all updates are tested in test_basic_instance + + def test_getUserId(self): + self.query_singleValue.return_value = 135 + s, cntext = self.get_session() + username = 'test-user' + + s.getUserId(username) + + self.assertEqual(len(self.queries), 4) + # check only last update query, first three are tested in test_basic_instance + query = self.queries[3] + self.assertEqual(query.tables, ['users']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, ['name = %(username)s']) + self.assertEqual(query.columns, ['id']) + self.assertEqual(query.values, {'username': username}) + + self.assertEqual(len(self.updates), 2) + # all updates are tested in test_basic_instance + + def test_getHostId(self): + self.query_singleValue.return_value = 199 + s, cntext = self.get_session() + + s._getHostId() + + self.assertEqual(len(self.queries), 4) + # check only last update query, first three are tested in test_basic_instance + query = self.queries[3] + self.assertEqual(query.tables, ['host']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, ['user_id = %(uid)d']) + self.assertEqual(query.columns, ['id']) + self.assertEqual(query.values, {'uid': 1}) + + self.assertEqual(len(self.updates), 2) + # all updates are tested in test_basic_instance + + def test_logout_not_logged(self): + s, cntext = self.get_session() + s.logged_in = False + with self.assertRaises(koji.AuthError) as cm: + s.logout() + self.assertEqual(cm.exception.args[0], 'Not logged in') + + def test_logout_logged(self): + s, _ = self.get_session() + s.logged_in = True + s.logout() + + self.assertEqual(len(self.queries), 3) + # all queries are tested in test_basic_instance + + self.assertEqual(len(self.updates), 3) + # check only last update query, first two are tested in test_basic_instance + update = self.updates[2] + + self.assertEqual(update.table, 'sessions') + self.assertEqual(update.values, {'id': 123, 'id': 123}) + self.assertEqual(update.clauses, ['id = %(id)i OR master = %(id)i']) + self.assertEqual(update.data, {'closed': True, 'expired': True, 'exclusive': None}) + self.assertEqual(update.rawdata, {}) + + def test_logoutChild_not_logged(self): + s, cntext = self.get_session() + s.logged_in = False + with self.assertRaises(koji.AuthError) as cm: + s.logoutChild(111) + self.assertEqual(cm.exception.args[0], 'Not logged in') + + def test_logoutChild_logged(self): + s, _ = self.get_session() + s.logged_in = True + s.logoutChild(111) + + self.assertEqual(len(self.queries), 3) + # all queries are tested in test_basic_instance + + self.assertEqual(len(self.updates), 3) + # check only last update query, first two are tested in test_basic_instance + update = self.updates[2] + + self.assertEqual(update.table, 'sessions') + self.assertEqual(update.values, {'session_id': 111, 'master': 123}) + self.assertEqual(update.clauses, ['id = %(session_id)i', 'master = %(master)i']) + self.assertEqual(update.data, {'expired': True, 'exclusive': None, 'closed': True}) + self.assertEqual(update.rawdata, {}) + + def test_makeExclusive_not_master(self): + s, cntext = self.get_session() + s.master = 333 + with self.assertRaises(koji.GenericError) as cm: + s.makeExclusive() + self.assertEqual(cm.exception.args[0], 'subsessions cannot become exclusive') + + def test_makeExclusive_already_exclusive(self): + s, cntext = self.get_session() + s.master = None + s.exclusive = True + with self.assertRaises(koji.GenericError) as cm: + s.makeExclusive() + self.assertEqual(cm.exception.args[0], 'session is already exclusive') + + def test_makeExclusive_without_force(self): + s, cntext = self.get_session() + s.master = None + s.exclusive = False + self.query_singleValue.return_value = 123 + + with self.assertRaises(koji.AuthLockError) as cm: + s.makeExclusive() + self.assertEqual(cm.exception.args[0], 'Cannot get exclusive session') + + self.assertEqual(len(self.queries), 5) + self.assertEqual(len(self.updates), 2) + + def test_makeExclusive(self): + s, _ = self.get_session() + s.master = None + s.exclusive = False + self.query_singleValue.return_value = 123 + + s.makeExclusive(force=True) + + self.assertEqual(len(self.queries), 5) + # check only last two queries, first two are tested in test_basic_instance + + query = self.queries[3] + self.assertEqual(query.tables, ['users']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, ['id=%(user_id)s']) + self.assertEqual(query.columns, ['id']) + self.assertEqual(query.values, {'user_id': 1}) + + query = self.queries[4] + self.assertEqual(query.tables, ['sessions']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, ['closed = FALSE', 'exclusive = TRUE', + 'user_id=%(user_id)s']) + self.assertEqual(query.columns, ['id']) + self.assertEqual(query.values, {'user_id': 1}) + + self.assertEqual(len(self.updates), 4) + # check only last two update queries, first two are tested in test_basic_instance + + update = self.updates[2] + self.assertEqual(update.table, 'sessions') + self.assertEqual(update.values, {'excl_id': 123}) + self.assertEqual(update.clauses, ['id=%(excl_id)s']) + self.assertEqual(update.data, {'expired': True, 'exclusive': None, 'closed': True}) + self.assertEqual(update.rawdata, {}) + + update = self.updates[3] + self.assertEqual(update.table, 'sessions') + self.assertEqual(update.values, {'session_id': 123}) + self.assertEqual(update.clauses, ['id=%(session_id)s']) + self.assertEqual(update.data, {'exclusive': True}) + self.assertEqual(update.rawdata, {}) + + def test_checkLoginAllowed(self): + s, cntext = self.get_session() + self.query_executeOne.side_effect = [{'name': 'testuser', 'status': 0, 'usertype': 0}] + s.checkLoginAllowed(2) + + self.assertEqual(len(self.queries), 4) + query = self.queries[3] + self.assertEqual(query.tables, ['users']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, ['id = %(user_id)i']) + self.assertEqual(query.columns, ['name', 'status', 'usertype']) + self.assertEqual(query.values, {'user_id': 2}) + + self.assertEqual(len(self.updates), 2) + + def test_checkLoginAllowed_not_normal_status(self): + s, cntext = self.get_session() + self.query_executeOne.side_effect = [{'name': 'testuser', 'status': 1, 'usertype': 0}] + + with self.assertRaises(koji.AuthError) as cm: + s.checkLoginAllowed(2) + self.assertEqual(cm.exception.args[0], 'logins by testuser are not allowed') + + self.assertEqual(len(self.queries), 4) + query = self.queries[3] + self.assertEqual(query.tables, ['users']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, ['id = %(user_id)i']) + self.assertEqual(query.columns, ['name', 'status', 'usertype']) + self.assertEqual(query.values, {'user_id': 2}) + + self.assertEqual(len(self.updates), 2) + + def test_checkLoginAllowed_not_exist_user(self): + s, cntext = self.get_session() + self.query_executeOne.side_effect = [None] + + with self.assertRaises(koji.AuthError) as cm: + s.checkLoginAllowed(2) + self.assertEqual(cm.exception.args[0], 'invalid user_id: 2') + + self.assertEqual(len(self.queries), 4) + query = self.queries[3] + self.assertEqual(query.tables, ['users']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, ['id = %(user_id)i']) + self.assertEqual(query.columns, ['name', 'status', 'usertype']) + self.assertEqual(query.values, {'user_id': 2}) + + self.assertEqual(len(self.updates), 2) + + def test_createUserFromKerberos_invalid_krb(self): + s, cntext = self.get_session() + krb_principal = 'test-krb-princ' + with self.assertRaises(koji.AuthError) as cm: + s.createUserFromKerberos(krb_principal) + self.assertEqual(cm.exception.args[0], 'invalid Kerberos principal: %s' % krb_principal) + + def test_createUserFromKerberos_user_not_exists(self): + self.query_execute.return_value = None + s, cntext = self.get_session() + krb_principal = 'test-krb-princ@redhat.com' + s.createUser = mock.MagicMock() + s.createUser.return_value = 3 + s.createUserFromKerberos(krb_principal) + self.assertEqual(len(self.queries), 4) + self.assertEqual(len(self.updates), 2) + + query = self.queries[3] + self.assertEqual(query.tables, ['users']) + self.assertEqual(query.joins, ['LEFT JOIN user_krb_principals ON ' + 'users.id = user_krb_principals.user_id']) + self.assertEqual(query.clauses, ['name = %(user_name)s']) + self.assertEqual(query.columns, ['id', 'krb_principal']) + self.assertEqual(query.values, {'user_name': 'test-krb-princ'}) + + def test_createUserFromKerberos_valid(self): + self.query_execute.return_value = [{'id': 1, 'krb_principal': 'krb-user-1@redhat.com'}, + {'id': 1, 'krb_principal': 'krb-user-2@redhat.com'}] + s, cntext = self.get_session() + krb_principal = 'test-krb-princ@redhat.com' + s.setKrbPrincipal = mock.MagicMock() + s.setKrbPrincipal.return_value = 1 + s.createUserFromKerberos(krb_principal) + self.assertEqual(len(self.queries), 4) + self.assertEqual(len(self.updates), 2) + + query = self.queries[3] + self.assertEqual(query.tables, ['users']) + self.assertEqual(query.joins, ['LEFT JOIN user_krb_principals ON ' + 'users.id = user_krb_principals.user_id']) + self.assertEqual(query.clauses, ['name = %(user_name)s']) + self.assertEqual(query.columns, ['id', 'krb_principal']) + self.assertEqual(query.values, {'user_name': 'test-krb-princ'}) + + # functions outside Session object + + def test_get_user_data(self): + """auth.get_user_data""" + self.query_executeOne.return_value = None + self.assertEqual(len(self.queries), 0) + + self.query_executeOne.return_value = {'name': 'name', 'status': 'status', + 'usertype': 'usertype'} + kojihub.auth.get_user_data(1) + self.assertEqual(len(self.queries), 1) + query = self.queries[0] + self.assertEqual(query.tables, ['users']) + self.assertEqual(query.joins, None) + self.assertEqual(query.clauses, ['id=%(user_id)s']) + self.assertEqual(query.columns, ['name', 'status', 'usertype']) + + def test_get_user_groups(self): + """auth.get_user_groups""" + kojihub.auth.get_user_groups(1) + self.assertEqual(len(self.queries), 1) + query = self.queries[0] + self.assertEqual(query.tables, ['user_groups']) + self.assertEqual(query.joins, ['users ON group_id = users.id']) + self.assertEqual(query.clauses, ['active IS TRUE', 'user_id=%(user_id)i', + 'users.usertype=%(t_group)i']) + self.assertEqual(query.columns, ['group_id', 'name']) + + def test_get_user_perms(self): + """auth.get_user_perms""" + kojihub.auth.get_user_perms(1) + self.assertEqual(len(self.queries), 2) + query = self.queries[0] + self.assertEqual(query.tables, ['user_perms']) + self.assertEqual(query.joins, ['permissions ON perm_id = permissions.id']) + self.assertEqual(query.clauses, ['active IS TRUE', 'user_id=%(user_id)s']) + self.assertEqual(query.columns, ['name']) + query = self.queries[1] + self.assertEqual(query.tables, ['user_groups']) + self.assertEqual(query.joins, [ + 'user_perms ON user_perms.user_id = user_groups.group_id', + 'permissions ON perm_id = permissions.id']) + self.assertEqual(sorted(query.clauses), sorted([ + 'user_groups.active IS TRUE', + 'user_perms.active IS TRUE', + 'user_groups.user_id=%(user_id)s'])) + self.assertEqual(query.columns, ['permissions.name']) + + def test_get_user_perms_inherited(self): + self.query_execute.side_effect = [ + [{'id': 1, 'name': 'perm1'}, {'id': 2, 'name': 'perm2'}], + [{'name': 'perm3'}] + ] + result = kojihub.auth.get_user_perms(1) + self.assertEqual(set(result), {'perm1', 'perm2', 'perm3'}) + + def test_get_user_perms_inherited_data(self): + self.query_execute.side_effect = [ + [{'id': 1, 'name': 'perm1'}, {'id': 2, 'name': 'perm2'}], + [{'name': 'perm3', 'group': 'group_a'}, + {'name': 'perm4', 'group': 'group_b'}, + {'name': 'perm4', 'group': 'group_c'}] + ] + result = kojihub.auth.get_user_perms(1, inheritance_data=True) + self.assertEqual(result, { + 'perm1': [None], + 'perm2': [None], + 'perm3': ['group_a'], + 'perm4': ['group_b', 'group_c'], + }) + + def test_logout_logged_not_owner(self): + s, _ = self.get_session() + + s.logged_in = True + # session_id without admin perms and not owner + self.context.session.hasPerm.return_value = False + self.context.session.user_id.return_value = 123 + self.query_singleValue.return_value = None + with self.assertRaises(koji.ActionNotAllowed) as ex: + s.logout(session_id=1) + self.assertEqual("only admins or owner may logout other session", str(ex.exception)) diff --git a/tests/test_hub/test_bulkupdate_processor.py b/tests/test_hub/test_bulkupdate_processor.py deleted file mode 100644 index 25809d9..0000000 --- a/tests/test_hub/test_bulkupdate_processor.py +++ /dev/null @@ -1,91 +0,0 @@ -import mock -import unittest - -from kojihub import db - - -class TestUpdateProcessor(unittest.TestCase): - - maxDiff = None - - def setUp(self): - self.context = mock.patch('kojihub.db.context').start() - pass - - def tearDown(self): - mock.patch.stopall() - - def test_basic_instantiation(self): - proc = db.BulkUpdateProcessor('sometable') - repr(proc) - # No exception! - - def test_basic_bulk_update(self): - data = [{'id': n, 'field': f'value {n}'} for n in range(2)] - proc = db.BulkUpdateProcessor('sometable', data=data, match_keys=('id',)) - - # check sql - actual = str(proc) - expected_sql = ('UPDATE sometable SET field = __kojibulk_sometable.field\n' - 'FROM (VALUES (%(val_field_0)s, %(val_id_0)s), (%(val_field_1)s, %(val_id_1)s))\n' - 'AS __kojibulk_sometable (field, id)\n' - 'WHERE (sometable.id = __kojibulk_sometable.id)') - self.assertEqual(actual, expected_sql) - - # check values - expected_values = {'val_field_0': 'value 0', - 'val_field_1': 'value 1', - 'val_id_0': 0, - 'val_id_1': 1} - self.assertEqual(proc._values, expected_values) - - # verify execution - cursor = mock.MagicMock() - self.context.cnx.cursor.return_value = cursor - proc.execute() - cursor.execute.assert_called_once_with( - expected_sql, - expected_values, - log_errors=True, - ) - - def test_incomplete(self): - proc = db.BulkUpdateProcessor('sometable') - expected = '-- incomplete bulk update' - self.assertEqual(str(proc), expected) - - with self.assertRaises(ValueError) as ex: - proc.get_keys() - expected = 'no update data' - self.assertEqual(str(ex.exception), expected) - - def test_bad_key(self): - data = [{'id': n, 100: f'value {n}'} for n in range(2)] - proc = db.BulkUpdateProcessor('sometable', data=data, match_keys=('id',)) - with self.assertRaises(TypeError) as ex: - str(proc) - expected = 'update data must use string keys' - self.assertEqual(str(ex.exception), expected) - - def test_key_mismatch(self): - # extra key in later row - data = [ - {'id': 1, 'A': 1}, - {'id': 2, 'A': 1, 'B': 2}, - ] - proc = db.BulkUpdateProcessor('sometable', data=data, match_keys=('id',)) - with self.assertRaises(ValueError) as ex: - str(proc) - expected = 'mismatched update keys' - self.assertEqual(str(ex.exception), expected) - - # missing key in later row - data = [ - {'id': 1, 'A': 1}, - {'id': 2}, - ] - proc = db.BulkUpdateProcessor('sometable', data=data, match_keys=('id',)) - with self.assertRaises(ValueError) as ex: - str(proc) - expected = 'mismatched update keys' - self.assertEqual(str(ex.exception), expected) diff --git a/tests/test_hub/test_db/__init__.py b/tests/test_hub/test_db/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/test_hub/test_db/__init__.py diff --git a/tests/test_hub/test_db/test_bulkupdate_processor.py b/tests/test_hub/test_db/test_bulkupdate_processor.py new file mode 100644 index 0000000..25809d9 --- /dev/null +++ b/tests/test_hub/test_db/test_bulkupdate_processor.py @@ -0,0 +1,91 @@ +import mock +import unittest + +from kojihub import db + + +class TestUpdateProcessor(unittest.TestCase): + + maxDiff = None + + def setUp(self): + self.context = mock.patch('kojihub.db.context').start() + pass + + def tearDown(self): + mock.patch.stopall() + + def test_basic_instantiation(self): + proc = db.BulkUpdateProcessor('sometable') + repr(proc) + # No exception! + + def test_basic_bulk_update(self): + data = [{'id': n, 'field': f'value {n}'} for n in range(2)] + proc = db.BulkUpdateProcessor('sometable', data=data, match_keys=('id',)) + + # check sql + actual = str(proc) + expected_sql = ('UPDATE sometable SET field = __kojibulk_sometable.field\n' + 'FROM (VALUES (%(val_field_0)s, %(val_id_0)s), (%(val_field_1)s, %(val_id_1)s))\n' + 'AS __kojibulk_sometable (field, id)\n' + 'WHERE (sometable.id = __kojibulk_sometable.id)') + self.assertEqual(actual, expected_sql) + + # check values + expected_values = {'val_field_0': 'value 0', + 'val_field_1': 'value 1', + 'val_id_0': 0, + 'val_id_1': 1} + self.assertEqual(proc._values, expected_values) + + # verify execution + cursor = mock.MagicMock() + self.context.cnx.cursor.return_value = cursor + proc.execute() + cursor.execute.assert_called_once_with( + expected_sql, + expected_values, + log_errors=True, + ) + + def test_incomplete(self): + proc = db.BulkUpdateProcessor('sometable') + expected = '-- incomplete bulk update' + self.assertEqual(str(proc), expected) + + with self.assertRaises(ValueError) as ex: + proc.get_keys() + expected = 'no update data' + self.assertEqual(str(ex.exception), expected) + + def test_bad_key(self): + data = [{'id': n, 100: f'value {n}'} for n in range(2)] + proc = db.BulkUpdateProcessor('sometable', data=data, match_keys=('id',)) + with self.assertRaises(TypeError) as ex: + str(proc) + expected = 'update data must use string keys' + self.assertEqual(str(ex.exception), expected) + + def test_key_mismatch(self): + # extra key in later row + data = [ + {'id': 1, 'A': 1}, + {'id': 2, 'A': 1, 'B': 2}, + ] + proc = db.BulkUpdateProcessor('sometable', data=data, match_keys=('id',)) + with self.assertRaises(ValueError) as ex: + str(proc) + expected = 'mismatched update keys' + self.assertEqual(str(ex.exception), expected) + + # missing key in later row + data = [ + {'id': 1, 'A': 1}, + {'id': 2}, + ] + proc = db.BulkUpdateProcessor('sometable', data=data, match_keys=('id',)) + with self.assertRaises(ValueError) as ex: + str(proc) + expected = 'mismatched update keys' + self.assertEqual(str(ex.exception), expected) diff --git a/tests/test_hub/test_db/test_insert_processor.py b/tests/test_hub/test_db/test_insert_processor.py new file mode 100644 index 0000000..2c63450 --- /dev/null +++ b/tests/test_hub/test_db/test_insert_processor.py @@ -0,0 +1,208 @@ +import mock +import unittest + +import koji +import kojihub + + +class TestInsertProcessor(unittest.TestCase): + def setUp(self): + self.context_db = mock.patch('kojihub.db.context').start() + + def tearDown(self): + mock.patch.stopall() + + def test_basic_instantiation(self): + proc = kojihub.InsertProcessor('sometable') + actual = str(proc) + expected = '-- incomplete update: no assigns' + self.assertEqual(actual, expected) + + def test_to_string_with_data(self): + proc = kojihub.InsertProcessor('sometable', data={'foo': 'bar'}) + actual = str(proc) + expected = 'INSERT INTO sometable (foo) VALUES (%(foo)s)' + self.assertEqual(actual, expected) + + def test_simple_execution_with_iterate(self): + cursor = mock.MagicMock() + self.context_db.cnx.cursor.return_value = cursor + proc = kojihub.InsertProcessor('sometable', data={'foo': 'bar'}) + proc.execute() + cursor.execute.assert_called_once_with( + 'INSERT INTO sometable (foo) VALUES (%(foo)s)', + {'foo': 'bar'}, log_errors=True) + + def test_make_create(self,): + cursor = mock.MagicMock() + self.context_db.cnx.cursor.return_value = cursor + self.context_db.session.assertLogin = mock.MagicMock() + proc = kojihub.InsertProcessor('sometable', data={'foo': 'bar'}) + proc.make_create(event_id=1, user_id=2) + self.assertEqual(proc.data['create_event'], 1) + self.assertEqual(proc.data['creator_id'], 2) + + proc.make_create(user_id=2) + self.assertEqual(proc.data['create_event'], self.context_db.event_id) + self.assertEqual(proc.data['creator_id'], 2) + + proc.make_create(event_id=1) + self.assertEqual(proc.data['create_event'], 1) + self.assertEqual(proc.data['creator_id'], self.context_db.session.user_id) + + proc.make_create() + self.assertEqual(proc.data['create_event'], self.context_db.event_id) + self.assertEqual(proc.data['creator_id'], self.context_db.session.user_id) + + def test_dup_check(self): + cursor = mock.MagicMock() + self.context_db.cnx.cursor.return_value = cursor + self.context_db.session.assertLogin = mock.MagicMock() + proc = kojihub.InsertProcessor('sometable', data={'foo': 'bar'}) + proc.dup_check() + + args = cursor.execute.call_args + actual = ' '.join(args[0][0].split()) + expected = 'SELECT foo FROM sometable WHERE (foo = %(foo)s)' + self.assertEqual(actual, expected) + + proc.make_create() + proc.dup_check() + args = cursor.execute.call_args + actual = ' '.join(args[0][0].split()) + expected = 'SELECT active, foo FROM sometable WHERE ' + \ + '(active = %(active)s) AND (foo = %(foo)s)' + self.assertEqual(actual, expected) + + proc.set(onething='another') + proc.rawset(something='something else') + result = proc.dup_check() + self.assertEqual(result, None) + + def test_raw_data(self): + cursor = mock.MagicMock() + self.context_db.cnx.cursor.return_value = cursor + proc = kojihub.InsertProcessor('sometable', rawdata={'foo': '\'bar\''}) + result = proc.dup_check() + self.assertEqual(result, None) + actual = str(proc) + expected = "INSERT INTO sometable (foo) VALUES (('bar'))" # raw data + self.assertEqual(actual, expected) + + +class TestBulkInsertProcessor(unittest.TestCase): + def setUp(self): + self.context_db = mock.patch('kojihub.db.context').start() + + def tearDown(self): + mock.patch.stopall() + + def test_basic_instantiation(self): + proc = kojihub.BulkInsertProcessor('sometable') + actual = str(proc) + expected = '-- incomplete insert: no data' + self.assertEqual(actual, expected) + + def test_to_string_with_single_row(self): + proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar'}]) + actual = str(proc) + expected = 'INSERT INTO sometable (foo) VALUES (%(foo0)s)' + self.assertEqual(actual, expected) + + proc = kojihub.BulkInsertProcessor('sometable') + proc.add_record(foo='bar') + actual = str(proc) + self.assertEqual(actual, expected) + + def test_simple_execution(self): + cursor = mock.MagicMock() + self.context_db.cnx.cursor.return_value = cursor + proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar'}]) + proc.execute() + cursor.execute.assert_called_once_with( + 'INSERT INTO sometable (foo) VALUES (%(foo0)s)', + {'foo0': 'bar'}, + log_errors=True + ) + + cursor.reset_mock() + proc = kojihub.BulkInsertProcessor('sometable') + proc.add_record(foo='bar') + proc.execute() + cursor.execute.assert_called_once_with( + 'INSERT INTO sometable (foo) VALUES (%(foo0)s)', + {'foo0': 'bar'}, + log_errors=True + ) + + def test_bulk_execution(self): + cursor = mock.MagicMock() + self.context_db.cnx.cursor.return_value = cursor + + proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar1'}]) + proc.add_record(foo='bar2') + proc.add_record(foo='bar3') + proc.execute() + cursor.execute.assert_called_once_with( + 'INSERT INTO sometable (foo) VALUES (%(foo0)s), (%(foo1)s), (%(foo2)s)', + {'foo0': 'bar1', 'foo1': 'bar2', 'foo2': 'bar3'}, + log_errors=True + ) + + def test_missing_values(self): + proc = kojihub.BulkInsertProcessor('sometable') + proc.add_record(foo='bar') + proc.add_record(foo2='bar2') + with self.assertRaises(koji.GenericError) as cm: + str(proc) + self.assertEqual(cm.exception.args[0], 'Missing value foo2 in BulkInsert') + + def test_missing_values_nostrict(self): + proc = kojihub.BulkInsertProcessor('sometable', strict=False) + proc.add_record(foo='bar') + proc.add_record(foo2='bar2') + actual = str(proc) + expected = 'INSERT INTO sometable (foo, foo2) VALUES (%(foo0)s, NULL), (NULL, %(foo21)s)' + self.assertEqual(actual, expected) + + def test_missing_values_explicit_columns(self): + proc = kojihub.BulkInsertProcessor('sometable', strict=True, columns=['foo', 'foo2']) + proc.add_record(foo='bar') + with self.assertRaises(koji.GenericError) as cm: + str(proc) + self.assertEqual(cm.exception.args[0], 'Missing value foo2 in BulkInsert') + + def test_batch_execution(self): + cursor = mock.MagicMock() + self.context_db.cnx.cursor.return_value = cursor + + proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar1'}], batch=2) + proc.add_record(foo='bar2') + proc.add_record(foo='bar3') + proc.execute() + calls = cursor.execute.mock_calls + # list of (name, positional args, keyword args) + self.assertEqual(len(calls), 2) + self.assertEqual(calls[0], + mock.call('INSERT INTO sometable (foo) VALUES (%(foo0)s), (%(foo1)s)', + {'foo0': 'bar1', 'foo1': 'bar2'}, log_errors=True)) + self.assertEqual(calls[1], + mock.call('INSERT INTO sometable (foo) VALUES (%(foo0)s)', + {'foo0': 'bar3'}, log_errors=True)) + + def test_no_batch_execution(self): + cursor = mock.MagicMock() + self.context_db.cnx.cursor.return_value = cursor + + proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar1'}], batch=0) + proc.add_record(foo='bar2') + proc.add_record(foo='bar3') + proc.execute() + calls = cursor.execute.mock_calls + # list of (name, positional args, keyword args) + self.assertEqual(len(calls), 1) + self.assertEqual( + calls[0], + mock.call('INSERT INTO sometable (foo) VALUES (%(foo0)s), (%(foo1)s), (%(foo2)s)', + {'foo0': 'bar1', 'foo1': 'bar2', 'foo2': 'bar3'}, log_errors=True) + ) diff --git a/tests/test_hub/test_db/test_query_processor.py b/tests/test_hub/test_db/test_query_processor.py new file mode 100644 index 0000000..2e7513b --- /dev/null +++ b/tests/test_hub/test_db/test_query_processor.py @@ -0,0 +1,142 @@ +import mock +import unittest + +import kojihub + + +class TestQueryProcessor(unittest.TestCase): + def setUp(self): + self.simple_arguments = dict( + columns=['something'], + tables=['awesome'], + ) + self.complex_arguments = dict( + columns=['something'], + aliases=['other'], + tables=['awesome'], + joins=['morestuff'], + # values=... + # transform=... + opts={ + # 'countOnly': True, + 'order': 'other', + 'offset': 10, + 'limit': 3, + 'group': 'awesome.aha' + # 'rowlock': True, + }, + enable_group=True + ) + self.original_chunksize = kojihub.QueryProcessor.iterchunksize + kojihub.QueryProcessor.iterchunksize = 2 + self.context_db = mock.patch('kojihub.db.context').start() + + def tearDown(self): + kojihub.QueryProcessor.iterchunksize = self.original_chunksize + mock.patch.stopall() + + def test_basic_instantiation(self): + kojihub.QueryProcessor() # No exception! + + def test_instantiation_with_cols_and_aliases(self): + proc = kojihub.QueryProcessor(columns=['wat'], aliases=['zap']) + assert 'zap' in proc.colsByAlias + assert proc.colsByAlias['zap'] == 'wat' + assert len(proc.colsByAlias) == 1 + + def test_empty_as_string(self): + proc = kojihub.QueryProcessor() + actual = str(proc) + self.assertIn("SELECT", actual) + self.assertIn("FROM", actual) + + def test_simple_as_string(self): + proc = kojihub.QueryProcessor( + columns=['something'], + tables=['awesome'], + ) + actual = " ".join([token for token in str(proc).split() if token]) + expected = "SELECT something FROM awesome" + self.assertEqual(actual, expected) + + def test_complex_as_string(self): + proc = kojihub.QueryProcessor(**self.complex_arguments) + actual = " ".join([token for token in str(proc).split() if token]) + expected = "SELECT something FROM awesome JOIN morestuff" \ + " GROUP BY awesome.aha ORDER BY something OFFSET 10 LIMIT 3" + self.assertEqual(actual, expected) + args2 = self.complex_arguments.copy() + args2['enable_group'] = False + proc = kojihub.QueryProcessor(**args2) + actual = " ".join([token for token in str(proc).split() if token]) + expected = "SELECT something FROM awesome JOIN morestuff" \ + " ORDER BY something OFFSET 10 LIMIT 3" + self.assertEqual(actual, expected) + + def test_simple_with_execution(self): + cursor = mock.MagicMock() + self.context_db.cnx.cursor.return_value = cursor + proc = kojihub.QueryProcessor(**self.simple_arguments) + proc.execute() + cursor.execute.assert_called_once_with( + '\nSELECT something\n FROM awesome\n\n\n \n \n\n \n', {}) + + def test_simple_count_with_execution(self): + cursor = mock.MagicMock() + self.context_db.cnx.cursor.return_value = cursor + cursor.fetchall.return_value = [('some count',)] + args = self.simple_arguments.copy() + args['opts'] = {'countOnly': True} + proc = kojihub.QueryProcessor(**args) + results = proc.execute() + cursor.execute.assert_called_once_with( + '\nSELECT count(*)\n FROM awesome\n\n\n \n \n\n \n', {}) + self.assertEqual(results, 'some count') + + cursor.reset_mock() + args['opts']['group'] = 'id' + args['enable_group'] = True + proc = kojihub.QueryProcessor(**args) + results = proc.execute() + cursor.execute.assert_called_once_with( + 'SELECT count(*)\nFROM (\nSELECT 1\n' + ' FROM awesome\n\n\n GROUP BY id\n \n\n \n) numrows', {}) + self.assertEqual(results, 'some count') + + def test_simple_execution_with_iterate(self): + cursor = mock.MagicMock() + self.context_db.cnx.cursor.return_value = cursor + cursor.fetchall.return_value = [ + ('value number 1',), + ('value number 2',), + ('value number 3',), + ] + proc = kojihub.QueryProcessor(**self.simple_arguments) + generator = proc.iterate() + calls = cursor.execute.mock_calls + result = next(generator) + # two calls so far.. + self.assertEqual(result, {'something': 'value number 1'}) + self.assertEqual(len(calls), 2) + result = next(generator) + # still two. + self.assertEqual(result, {'something': 'value number 2'}) + self.assertEqual(len(calls), 2) + # now three. + result = next(generator) + self.assertEqual(result, {'something': 'value number 3'}) + + @mock.patch('kojihub.db._multiRow') + def test_execution_as_list_transform(self, multirow): + multirow.return_value = [{'col1': 'result_1_col_1', 'col2': 'result_1_col_2'}, + {'col1': 'result_2_col_1', 'col2': 'result_2_col_2'}] + args = dict( + columns=['col1', 'col2'], + tables=['table'], + opts={'asList': True}, + transform=lambda x: x, + ) + proc = kojihub.QueryProcessor(**args) + results = proc.execute() + self.assertEqual( + results, [['result_1_col_1', 'result_1_col_2'], ['result_2_col_1', 'result_2_col_2']]) diff --git a/tests/test_hub/test_db/test_savepoint.py b/tests/test_hub/test_db/test_savepoint.py new file mode 100644 index 0000000..87a2e2c --- /dev/null +++ b/tests/test_hub/test_db/test_savepoint.py @@ -0,0 +1,24 @@ +import mock + +import unittest + +import kojihub + + +class TestSavepoint(unittest.TestCase): + + def setUp(self): + self.dml = mock.patch('kojihub.db._dml').start() + self.context_db = mock.patch('kojihub.db.context').start() + + def tearDown(self): + mock.patch.stopall() + + def test_savepoint(self): + sp = kojihub.Savepoint('some_name') + self.assertEqual(sp.name, 'some_name') + self.dml.assert_called_once_with('SAVEPOINT some_name', {}) + + self.dml.reset_mock() + sp.rollback() + self.dml.assert_called_once_with('ROLLBACK TO SAVEPOINT some_name', {}) diff --git a/tests/test_hub/test_db/test_update_processor.py b/tests/test_hub/test_db/test_update_processor.py new file mode 100644 index 0000000..aadfeab --- /dev/null +++ b/tests/test_hub/test_db/test_update_processor.py @@ -0,0 +1,34 @@ +import mock +import unittest + +import kojihub + + +class TestUpdateProcessor(unittest.TestCase): + + def test_basic_instantiation(self): + kojihub.UpdateProcessor('sometable') # No exception! + + def test_to_string_with_data(self): + proc = kojihub.UpdateProcessor('sometable', data={'foo': 'bar'}) + actual = str(proc) + expected = 'UPDATE sometable SET foo = %(data.foo)s' + self.assertEqual(actual, expected) + + def test_to_values_from_data(self): + proc = kojihub.UpdateProcessor('sometable', data={'foo': 'bar'}) + actual = proc.get_values() + expected = {'data.foo': 'bar'} + self.assertEqual(actual, expected) + + @mock.patch('kojihub.db.context') + def test_simple_execution_with_iterate(self, context_db): + cursor = mock.MagicMock() + context_db.cnx.cursor.return_value = cursor + proc = kojihub.UpdateProcessor('sometable', data={'foo': 'bar'}) + proc.execute() + cursor.execute.assert_called_once_with( + 'UPDATE sometable SET foo = %(data.foo)s', + {'data.foo': 'bar'}, + log_errors=True, + ) diff --git a/tests/test_hub/test_db/test_upsert_processor.py b/tests/test_hub/test_db/test_upsert_processor.py new file mode 100644 index 0000000..cf7aee8 --- /dev/null +++ b/tests/test_hub/test_db/test_upsert_processor.py @@ -0,0 +1,38 @@ +import mock +import unittest + +import koji +import kojihub + + +class TestUpsertProcessor(unittest.TestCase): + def setUp(self): + self.context_db = mock.patch('kojihub.db.context').start() + + def tearDown(self): + mock.patch.stopall() + + def test_required_args(self): + with self.assertRaises(ValueError) as e: + proc = kojihub.UpsertProcessor('sometable') + self.assertEqual(e.msg, 'either keys or skip_dup must be set') + + def test_skip_dup(self): + proc = kojihub.UpsertProcessor('sometable', data={'foo': 'bar'}, skip_dup=True) + actual = str(proc) + expected = 'INSERT INTO sometable (foo) VALUES (%(foo)s) ON CONFLICT DO NOTHING' + self.assertEqual(actual, expected) + + def test_key(self): + proc = kojihub.UpsertProcessor('sometable', data={'id': 1, 'foo': 'bar'}, keys=['id']) + actual = str(proc) + expected = 'INSERT INTO sometable (foo, id) VALUES (%(foo)s, %(id)s) ON CONFLICT (id) DO UPDATE SET foo = %(foo)s' + self.assertEqual(actual, expected) + + def test_keys(self): + proc = kojihub.UpsertProcessor('sometable', data={'id': 1, 'package': 'koji', 'foo': 'bar'}, keys=['id', 'package']) + actual = str(proc) + expected = 'INSERT INTO sometable (foo, id, package) VALUES (%(foo)s, %(id)s, %(package)s) ' \ + 'ON CONFLICT (id,package) DO UPDATE SET foo = %(foo)s' + self.assertEqual(actual, expected) + diff --git a/tests/test_hub/test_upsert_processor.py b/tests/test_hub/test_upsert_processor.py deleted file mode 100644 index cf7aee8..0000000 --- a/tests/test_hub/test_upsert_processor.py +++ /dev/null @@ -1,38 +0,0 @@ -import mock -import unittest - -import koji -import kojihub - - -class TestUpsertProcessor(unittest.TestCase): - def setUp(self): - self.context_db = mock.patch('kojihub.db.context').start() - - def tearDown(self): - mock.patch.stopall() - - def test_required_args(self): - with self.assertRaises(ValueError) as e: - proc = kojihub.UpsertProcessor('sometable') - self.assertEqual(e.msg, 'either keys or skip_dup must be set') - - def test_skip_dup(self): - proc = kojihub.UpsertProcessor('sometable', data={'foo': 'bar'}, skip_dup=True) - actual = str(proc) - expected = 'INSERT INTO sometable (foo) VALUES (%(foo)s) ON CONFLICT DO NOTHING' - self.assertEqual(actual, expected) - - def test_key(self): - proc = kojihub.UpsertProcessor('sometable', data={'id': 1, 'foo': 'bar'}, keys=['id']) - actual = str(proc) - expected = 'INSERT INTO sometable (foo, id) VALUES (%(foo)s, %(id)s) ON CONFLICT (id) DO UPDATE SET foo = %(foo)s' - self.assertEqual(actual, expected) - - def test_keys(self): - proc = kojihub.UpsertProcessor('sometable', data={'id': 1, 'package': 'koji', 'foo': 'bar'}, keys=['id', 'package']) - actual = str(proc) - expected = 'INSERT INTO sometable (foo, id, package) VALUES (%(foo)s, %(id)s, %(package)s) ' \ - 'ON CONFLICT (id,package) DO UPDATE SET foo = %(foo)s' - self.assertEqual(actual, expected) - diff --git a/tests/test_lib/test_auth.py b/tests/test_lib/test_auth.py deleted file mode 100644 index fb5386c..0000000 --- a/tests/test_lib/test_auth.py +++ /dev/null @@ -1,767 +0,0 @@ -from __future__ import absolute_import - -import mock - -import unittest - -import koji -import datetime -import kojihub.auth - -UP = kojihub.auth.UpdateProcessor -QP = kojihub.auth.QueryProcessor - - -class TestAuthSession(unittest.TestCase): - def getUpdate(self, *args, **kwargs): - update = UP(*args, **kwargs) - update.execute = mock.MagicMock() - self.updates.append(update) - return update - - def getQuery(self, *args, **kwargs): - query = QP(*args, **kwargs) - query.execute = self.query_execute - query.executeOne = self.query_executeOne - query.singleValue = self.query_singleValue - self.queries.append(query) - return query - - def setUp(self): - self.context = mock.patch('kojihub.auth.context').start() - kojihub.db.context = self.context - kojihub.auth.context = self.context - self.UpdateProcessor = mock.patch('kojihub.auth.UpdateProcessor', - side_effect=self.getUpdate).start() - self.updates = [] - self.query_execute = mock.MagicMock() - self.query_executeOne = mock.MagicMock() - self.query_singleValue = mock.MagicMock() - self.QueryProcessor = mock.patch('kojihub.auth.QueryProcessor', - side_effect=self.getQuery).start() - self.queries = [] - # It seems MagicMock will not automatically handle attributes that - # start with "assert" - self.context.session.assertLogin = mock.MagicMock() - - def tearDown(self): - mock.patch.stopall() - - def test_instance(self): - """Simple kojihub.auth.Session instance""" - self.context.opts = { - 'CheckClientIP': True, - 'DisableURLSessions': False, - 'SessionRenewalTimeout': 0, - } - self.context.environ = {'QUERY_STRING': 'non_session_key=1'} - with self.assertRaises(koji.GenericError) as cm: - kojihub.auth.Session() - # no args in request/environment - self.assertEqual(cm.exception.args[0], "'session-id' not specified in session args") - - def get_session_old(self): - """auth.Session instance""" - # base session from test_basic_instance - # url-based kojihub.auth - will be dropped in 1.34 - self.context.opts = { - 'CheckClientIP': True, - 'DisableURLSessions': False, - 'SessionRenewalTimeout': 0, - } - self.context.environ = { - 'QUERY_STRING': 'session-id=123&session-key=xyz&callnum=345', - 'REMOTE_ADDR': 'remote-addr', - } - - self.query_executeOne.side_effect = [ - {'authtype': 2, 'callnum': 1, "date_part('epoch', start_time)": 1666599426.227002, - "date_part('epoch', update_time)": 1666599426.254308, 'exclusive': None, - 'expired': False, 'master': None, - 'start_time': datetime.datetime(2022, 10, 24, 8, 17, 6, 227002, - tzinfo=datetime.timezone.utc), - 'update_time': datetime.datetime(2022, 10, 24, 8, 17, 6, 254308, - tzinfo=datetime.timezone.utc), - 'user_id': 1}, - {'name': 'kojiadmin', 'status': 0, 'usertype': 0}] - self.query_singleValue.return_value = 123 - s = kojihub.auth.Session() - return s, self.context - - def get_session(self): - # base session from test_basic_instance - # header-based auth - self.context.opts = { - 'CheckClientIP': True, - 'DisableURLSessions': True, - 'SessionRenewalTimeout': 0, - } - self.context.environ = { - 'HTTP_KOJI_SESSION_ID': '123', - 'HTTP_KOJI_SESSION_KEY': 'xyz', - 'HTTP_KOJI_CALLNUM': '345', - 'REMOTE_ADDR': 'remote-addr', - } - - self.query_executeOne.side_effect = [ - {'authtype': 2, 'callnum': 1, "date_part('epoch', start_time)": 1666599426.227002, - "date_part('epoch', update_time)": 1666599426.254308, 'exclusive': None, - 'expired': False, 'master': None, - 'start_time': datetime.datetime(2022, 10, 24, 8, 17, 6, 227002, - tzinfo=datetime.timezone.utc), - 'update_time': datetime.datetime(2022, 10, 24, 8, 17, 6, 254308, - tzinfo=datetime.timezone.utc), - 'user_id': 1}, - {'name': 'kojiadmin', 'status': 0, 'usertype': 0}] - self.query_singleValue.return_value = 123 - s = kojihub.auth.Session() - return s, self.context - - def test_session_old(self): - self.get_session_old() - - def test_renewal_timeout(self): - """Simple kojihub.auth.Session instance""" - self.context.opts = { - 'CheckClientIP': True, - 'DisableURLSessions': False, - 'SessionRenewalTimeout': 1440, - } - self.context.environ = { - 'QUERY_STRING': 'session-id=123&session-key=xyz&callnum=345', - 'REMOTE_ADDR': 'remote-addr', - } - - self.query_executeOne.side_effect = [ - {'authtype': 2, 'callnum': 1, "start_ts": 1666599426.227002, - "update_ts": 1666599426.254308, 'exclusive': None, - 'expired': False, 'master': None, - 'start_time': datetime.datetime(2022, 10, 24, 8, 17, 6, 227002, - tzinfo=datetime.timezone.utc), - 'update_time': datetime.datetime(2022, 10, 24, 8, 17, 6, 254308, - tzinfo=datetime.timezone.utc), - 'renew_ts': None, - 'user_id': 1}, - {'name': 'kojiadmin', 'status': 0, 'usertype': 0}] - with self.assertRaises(koji.GenericError) as cm: - kojihub.auth.Session() - # no args in request/environment - self.assertEqual(cm.exception.args[0], 'session "123" has expired') - - self.assertEqual(len(self.updates), 1) - self.assertEqual(len(self.queries), 1) - - update = self.updates[0] - - self.assertEqual(update.table, 'sessions') - self.assertEqual(update.values['id'], 123) - self.assertEqual(update.clauses, ['id = %(id)s OR master = %(id)s']) - self.assertEqual(update.data, {'expired': True}) - self.assertEqual(update.rawdata, {}) - - query = self.queries[0] - self.assertEqual(query.tables, ['sessions']) - self.assertEqual(query.joins, None) - self.assertEqual(query.clauses, ['closed IS FALSE', 'hostip = %(hostip)s', 'id = %(id)i', - 'key = %(key)s']) - self.assertEqual(query.columns, ['authtype', 'callnum', 'exclusive', 'expired', 'master', - 'renew_time', "date_part('epoch', renew_time)", - 'start_time', "date_part('epoch', start_time)", - 'update_time', "date_part('epoch', update_time)", - 'user_id']) - self.assertEqual(query.aliases, ['authtype', 'callnum', 'exclusive', 'expired', 'master', - 'renew_time', 'renew_ts', 'start_time', 'start_ts', - 'update_time', 'update_ts', 'user_id']) - self.assertEqual(query.values, {'id': 123, 'key': 'xyz', 'hostip': 'remote-addr'}) - - def test_basic_instance(self): - """auth.Session instance""" - s, cntext = self.get_session() - self.assertEqual(len(self.updates), 2) - update = self.updates[0] - - self.assertEqual(update.table, 'sessions') - self.assertEqual(update.values['id'], 123) - self.assertEqual(update.clauses, ['id = %(id)i']) - self.assertEqual(update.data, {}) - self.assertEqual(update.rawdata, {'update_time': 'NOW()'}) - - update = self.updates[1] - - self.assertEqual(update.table, 'sessions') - self.assertEqual(update.values['id'], 123) - self.assertEqual(update.clauses, ['id = %(id)i']) - self.assertEqual(update.data, {'callnum': 345}) - self.assertEqual(update.rawdata, {}) - - self.assertEqual(len(self.queries), 3) - - query = self.queries[0] - self.assertEqual(query.tables, ['sessions']) - self.assertEqual(query.joins, None) - self.assertEqual(query.clauses, ['closed IS FALSE', 'hostip = %(hostip)s', 'id = %(id)i', - 'key = %(key)s']) - self.assertEqual(query.columns, ['authtype', 'callnum', 'exclusive', 'expired', 'master', - 'renew_time', "date_part('epoch', renew_time)", - 'start_time', "date_part('epoch', start_time)", - 'update_time', "date_part('epoch', update_time)", - 'user_id']) - self.assertEqual(query.aliases, ['authtype', 'callnum', 'exclusive', 'expired', 'master', - 'renew_time', 'renew_ts', 'start_time', 'start_ts', - 'update_time', 'update_ts', 'user_id']) - self.assertEqual(query.values, {'id': 123, 'key': 'xyz', 'hostip': 'remote-addr'}) - - query = self.queries[1] - self.assertEqual(query.tables, ['users']) - self.assertEqual(query.joins, None) - self.assertEqual(query.clauses, ['id=%(user_id)s']) - self.assertEqual(query.columns, ['name', 'status', 'usertype']) - self.assertEqual(query.values, {'user_id': 1}) - - query = self.queries[2] - self.assertEqual(query.tables, ['sessions']) - self.assertEqual(query.joins, None) - self.assertEqual(query.clauses, ['closed = FALSE', 'exclusive = TRUE', - 'user_id=%(user_id)s']) - self.assertEqual(query.columns, ['id']) - - def test_getattr(self): - """auth.Session instance""" - s, cntext = self.get_session() - - self.assertEqual(len(self.updates), 2) - update = self.updates[0] - - self.assertEqual(update.table, 'sessions') - self.assertEqual(update.values['id'], 123) - self.assertEqual(update.clauses, ['id = %(id)i']) - self.assertEqual(update.data, {}) - self.assertEqual(update.rawdata, {'update_time': 'NOW()'}) - - update = self.updates[1] - - self.assertEqual(update.table, 'sessions') - self.assertEqual(update.values['id'], 123) - self.assertEqual(update.clauses, ['id = %(id)i']) - self.assertEqual(update.data, {'callnum': 345}) - self.assertEqual(update.rawdata, {}) - - self.assertEqual(len(self.queries), 3) - - query = self.queries[0] - self.assertEqual(query.tables, ['sessions']) - self.assertEqual(query.joins, None) - self.assertEqual(query.clauses, ['closed IS FALSE', 'hostip = %(hostip)s', 'id = %(id)i', - 'key = %(key)s']) - self.assertEqual(query.columns, ['authtype', 'callnum', 'exclusive', 'expired', 'master', - 'renew_time', "date_part('epoch', renew_time)", - 'start_time', "date_part('epoch', start_time)", - 'update_time', "date_part('epoch', update_time)", - 'user_id']) - self.assertEqual(query.aliases, ['authtype', 'callnum', 'exclusive', 'expired', 'master', - 'renew_time', 'renew_ts', 'start_time', 'start_ts', - 'update_time', 'update_ts', 'user_id']) - self.assertEqual(query.values, {'id': 123, 'key': 'xyz', 'hostip': 'remote-addr'}) - - query = self.queries[1] - self.assertEqual(query.tables, ['users']) - self.assertEqual(query.joins, None) - self.assertEqual(query.clauses, ['id=%(user_id)s']) - self.assertEqual(query.columns, ['name', 'status', 'usertype']) - self.assertEqual(query.values, {'user_id': 1}) - - query = self.queries[2] - self.assertEqual(query.tables, ['sessions']) - self.assertEqual(query.joins, None) - self.assertEqual(query.clauses, ['closed = FALSE', 'exclusive = TRUE', - 'user_id=%(user_id)s']) - self.assertEqual(query.columns, ['id']) - - # all other names should raise error - with self.assertRaises(AttributeError): - s.non_existing_attribute - - def test_str(self): - """auth.Session string representation""" - s, cntext = self.get_session() - self.context.cnx = cntext.cnx - - s.logged_in = False - s.message = 'msg' - self.assertEqual(str(s), 'session: not logged in (msg)') - s.logged_in = True - self.assertNotEqual(str(s), 'session: not logged in') - - def test_validate(self): - """Session.validate""" - s, cntext = self.get_session() - self.context.cnx = cntext.cnx - - s.lockerror = True - with self.assertRaises(koji.AuthLockError): - s.validate() - - s.lockerror = False - self.assertTrue(s.validate()) - - def test_makeShared(self): - """Session.makeShared""" - s, _ = self.get_session() - s.makeShared() - self.assertEqual(len(self.updates), 3) - # check only last update query, first two are tested in test_basic_instance - update = self.updates[2] - - self.assertEqual(update.table, 'sessions') - self.assertEqual(update.values['session_id'], 123) - self.assertEqual(update.clauses, ['id=%(session_id)s']) - self.assertEqual(update.data, {'exclusive': None}) - self.assertEqual(update.rawdata, {}) - - self.assertEqual(len(self.queries), 3) - # all queries are tested in test_basic_instance - - @mock.patch('socket.gethostbyname') - def test_get_remote_ip(self, gethostbyname): - """Session.get_remote_ip""" - s, _ = self.get_session() - - self.context.opts = {'CheckClientIP': False} - self.assertEqual(s.get_remote_ip(), '-') - - self.context.opts = {'CheckClientIP': True} - self.assertEqual(s.get_remote_ip(override='xoverride'), 'xoverride') - - self.context.environ = {'REMOTE_ADDR': '123.123.123.123'} - self.assertEqual(s.get_remote_ip(), '123.123.123.123') - - gethostbyname.return_value = 'ip' - self.context.environ = {'REMOTE_ADDR': '127.0.0.1'} - self.assertEqual(s.get_remote_ip(), 'ip') - - def test_login(self): - s, _ = self.get_session() - - # already logged in - with self.assertRaises(koji.GenericError): - s.login('user', 'password') - - s.logged_in = False - with self.assertRaises(koji.AuthError): - s.login('user', 123) - with self.assertRaises(koji.AuthError): - s.login('user', '') - - # correct - s.get_remote_ip = mock.MagicMock() - s.get_remote_ip.return_value = 'hostip' - s.checkLoginAllowed = mock.MagicMock() - s.checkLoginAllowed.return_value = True - s.createSession = mock.MagicMock() - s.createSession.return_value = {'session-id': 'session-id'} - self.query_singleValue.return_value = 123 - - result = s.login('user', 'password') - - self.assertEqual(len(self.queries), 4) - - # check only last update query, first three are tested in test_basic_instance - query = self.queries[3] - self.assertEqual(query.tables, ['users']) - self.assertEqual(query.joins, None) - self.assertEqual(query.clauses, ['name = %(user)s', 'password = %(password)s']) - self.assertEqual(query.columns, ['id']) - self.assertEqual(query.values, {'user': 'user', 'password': 'password'}) - - self.assertEqual(len(self.updates), 2) - # all updates are tested in test_basic_instance - - self.assertEqual(s.get_remote_ip.call_count, 1) - self.assertEqual(s.checkLoginAllowed.call_args, mock.call(123)) - self.assertEqual(result, s.createSession.return_value) - - # one more try for non-existing user - self.query_singleValue.return_value = None - with self.assertRaises(koji.AuthError): - s.login('user', 'password') - - def test_checkKrbPrincipal(self): - s, cntext = self.get_session() - self.assertIsNone(s.checkKrbPrincipal(None)) - self.context.opts = {'AllowedKrbRealms': '*'} - self.assertIsNone(s.checkKrbPrincipal('any')) - self.context.opts = {'AllowedKrbRealms': 'example.com'} - with self.assertRaises(koji.AuthError) as cm: - s.checkKrbPrincipal('any') - self.assertEqual(cm.exception.args[0], - 'invalid Kerberos principal: any') - with self.assertRaises(koji.AuthError) as cm: - s.checkKrbPrincipal('any@') - self.assertEqual(cm.exception.args[0], - 'invalid Kerberos principal: any@') - with self.assertRaises(koji.AuthError) as cm: - s.checkKrbPrincipal('any@bannedrealm') - self.assertEqual(cm.exception.args[0], - "Kerberos principal's realm:" - " bannedrealm is not allowed") - self.assertIsNone(s.checkKrbPrincipal('user@example.com')) - self.context.opts = {'AllowedKrbRealms': 'example.com,example.net,example.org'} - self.assertIsNone(s.checkKrbPrincipal('user@example.net')) - - def test_getUserIdFromKerberos(self): - krb_principal = 'test-krb-principal' - self.query_singleValue.return_value = 135 - s, cntext = self.get_session() - s.checkKrbPrincipal = mock.MagicMock() - s.checkKrbPrincipal.return_value = True - - s.getUserIdFromKerberos(krb_principal) - - self.assertEqual(len(self.queries), 4) - # check only last update query, first three are tested in test_basic_instance - query = self.queries[3] - self.assertEqual(query.tables, ['users']) - self.assertEqual(query.joins, ['user_krb_principals ON ' - 'users.id = user_krb_principals.user_id']) - self.assertEqual(query.clauses, ['krb_principal = %(krb_principal)s']) - self.assertEqual(query.columns, ['id']) - self.assertEqual(query.values, {'krb_principal': krb_principal}) - - self.assertEqual(len(self.updates), 2) - # all updates are tested in test_basic_instance - - def test_getUserId(self): - self.query_singleValue.return_value = 135 - s, cntext = self.get_session() - username = 'test-user' - - s.getUserId(username) - - self.assertEqual(len(self.queries), 4) - # check only last update query, first three are tested in test_basic_instance - query = self.queries[3] - self.assertEqual(query.tables, ['users']) - self.assertEqual(query.joins, None) - self.assertEqual(query.clauses, ['name = %(username)s']) - self.assertEqual(query.columns, ['id']) - self.assertEqual(query.values, {'username': username}) - - self.assertEqual(len(self.updates), 2) - # all updates are tested in test_basic_instance - - def test_getHostId(self): - self.query_singleValue.return_value = 199 - s, cntext = self.get_session() - - s._getHostId() - - self.assertEqual(len(self.queries), 4) - # check only last update query, first three are tested in test_basic_instance - query = self.queries[3] - self.assertEqual(query.tables, ['host']) - self.assertEqual(query.joins, None) - self.assertEqual(query.clauses, ['user_id = %(uid)d']) - self.assertEqual(query.columns, ['id']) - self.assertEqual(query.values, {'uid': 1}) - - self.assertEqual(len(self.updates), 2) - # all updates are tested in test_basic_instance - - def test_logout_not_logged(self): - s, cntext = self.get_session() - s.logged_in = False - with self.assertRaises(koji.AuthError) as cm: - s.logout() - self.assertEqual(cm.exception.args[0], 'Not logged in') - - def test_logout_logged(self): - s, _ = self.get_session() - s.logged_in = True - s.logout() - - self.assertEqual(len(self.queries), 3) - # all queries are tested in test_basic_instance - - self.assertEqual(len(self.updates), 3) - # check only last update query, first two are tested in test_basic_instance - update = self.updates[2] - - self.assertEqual(update.table, 'sessions') - self.assertEqual(update.values, {'id': 123, 'id': 123}) - self.assertEqual(update.clauses, ['id = %(id)i OR master = %(id)i']) - self.assertEqual(update.data, {'closed': True, 'expired': True, 'exclusive': None}) - self.assertEqual(update.rawdata, {}) - - def test_logoutChild_not_logged(self): - s, cntext = self.get_session() - s.logged_in = False - with self.assertRaises(koji.AuthError) as cm: - s.logoutChild(111) - self.assertEqual(cm.exception.args[0], 'Not logged in') - - def test_logoutChild_logged(self): - s, _ = self.get_session() - s.logged_in = True - s.logoutChild(111) - - self.assertEqual(len(self.queries), 3) - # all queries are tested in test_basic_instance - - self.assertEqual(len(self.updates), 3) - # check only last update query, first two are tested in test_basic_instance - update = self.updates[2] - - self.assertEqual(update.table, 'sessions') - self.assertEqual(update.values, {'session_id': 111, 'master': 123}) - self.assertEqual(update.clauses, ['id = %(session_id)i', 'master = %(master)i']) - self.assertEqual(update.data, {'expired': True, 'exclusive': None, 'closed': True}) - self.assertEqual(update.rawdata, {}) - - def test_makeExclusive_not_master(self): - s, cntext = self.get_session() - s.master = 333 - with self.assertRaises(koji.GenericError) as cm: - s.makeExclusive() - self.assertEqual(cm.exception.args[0], 'subsessions cannot become exclusive') - - def test_makeExclusive_already_exclusive(self): - s, cntext = self.get_session() - s.master = None - s.exclusive = True - with self.assertRaises(koji.GenericError) as cm: - s.makeExclusive() - self.assertEqual(cm.exception.args[0], 'session is already exclusive') - - def test_makeExclusive_without_force(self): - s, cntext = self.get_session() - s.master = None - s.exclusive = False - self.query_singleValue.return_value = 123 - - with self.assertRaises(koji.AuthLockError) as cm: - s.makeExclusive() - self.assertEqual(cm.exception.args[0], 'Cannot get exclusive session') - - self.assertEqual(len(self.queries), 5) - self.assertEqual(len(self.updates), 2) - - def test_makeExclusive(self): - s, _ = self.get_session() - s.master = None - s.exclusive = False - self.query_singleValue.return_value = 123 - - s.makeExclusive(force=True) - - self.assertEqual(len(self.queries), 5) - # check only last two queries, first two are tested in test_basic_instance - - query = self.queries[3] - self.assertEqual(query.tables, ['users']) - self.assertEqual(query.joins, None) - self.assertEqual(query.clauses, ['id=%(user_id)s']) - self.assertEqual(query.columns, ['id']) - self.assertEqual(query.values, {'user_id': 1}) - - query = self.queries[4] - self.assertEqual(query.tables, ['sessions']) - self.assertEqual(query.joins, None) - self.assertEqual(query.clauses, ['closed = FALSE', 'exclusive = TRUE', - 'user_id=%(user_id)s']) - self.assertEqual(query.columns, ['id']) - self.assertEqual(query.values, {'user_id': 1}) - - self.assertEqual(len(self.updates), 4) - # check only last two update queries, first two are tested in test_basic_instance - - update = self.updates[2] - self.assertEqual(update.table, 'sessions') - self.assertEqual(update.values, {'excl_id': 123}) - self.assertEqual(update.clauses, ['id=%(excl_id)s']) - self.assertEqual(update.data, {'expired': True, 'exclusive': None, 'closed': True}) - self.assertEqual(update.rawdata, {}) - - update = self.updates[3] - self.assertEqual(update.table, 'sessions') - self.assertEqual(update.values, {'session_id': 123}) - self.assertEqual(update.clauses, ['id=%(session_id)s']) - self.assertEqual(update.data, {'exclusive': True}) - self.assertEqual(update.rawdata, {}) - - def test_checkLoginAllowed(self): - s, cntext = self.get_session() - self.query_executeOne.side_effect = [{'name': 'testuser', 'status': 0, 'usertype': 0}] - s.checkLoginAllowed(2) - - self.assertEqual(len(self.queries), 4) - query = self.queries[3] - self.assertEqual(query.tables, ['users']) - self.assertEqual(query.joins, None) - self.assertEqual(query.clauses, ['id = %(user_id)i']) - self.assertEqual(query.columns, ['name', 'status', 'usertype']) - self.assertEqual(query.values, {'user_id': 2}) - - self.assertEqual(len(self.updates), 2) - - def test_checkLoginAllowed_not_normal_status(self): - s, cntext = self.get_session() - self.query_executeOne.side_effect = [{'name': 'testuser', 'status': 1, 'usertype': 0}] - - with self.assertRaises(koji.AuthError) as cm: - s.checkLoginAllowed(2) - self.assertEqual(cm.exception.args[0], 'logins by testuser are not allowed') - - self.assertEqual(len(self.queries), 4) - query = self.queries[3] - self.assertEqual(query.tables, ['users']) - self.assertEqual(query.joins, None) - self.assertEqual(query.clauses, ['id = %(user_id)i']) - self.assertEqual(query.columns, ['name', 'status', 'usertype']) - self.assertEqual(query.values, {'user_id': 2}) - - self.assertEqual(len(self.updates), 2) - - def test_checkLoginAllowed_not_exist_user(self): - s, cntext = self.get_session() - self.query_executeOne.side_effect = [None] - - with self.assertRaises(koji.AuthError) as cm: - s.checkLoginAllowed(2) - self.assertEqual(cm.exception.args[0], 'invalid user_id: 2') - - self.assertEqual(len(self.queries), 4) - query = self.queries[3] - self.assertEqual(query.tables, ['users']) - self.assertEqual(query.joins, None) - self.assertEqual(query.clauses, ['id = %(user_id)i']) - self.assertEqual(query.columns, ['name', 'status', 'usertype']) - self.assertEqual(query.values, {'user_id': 2}) - - self.assertEqual(len(self.updates), 2) - - def test_createUserFromKerberos_invalid_krb(self): - s, cntext = self.get_session() - krb_principal = 'test-krb-princ' - with self.assertRaises(koji.AuthError) as cm: - s.createUserFromKerberos(krb_principal) - self.assertEqual(cm.exception.args[0], 'invalid Kerberos principal: %s' % krb_principal) - - def test_createUserFromKerberos_user_not_exists(self): - self.query_execute.return_value = None - s, cntext = self.get_session() - krb_principal = 'test-krb-princ@redhat.com' - s.createUser = mock.MagicMock() - s.createUser.return_value = 3 - s.createUserFromKerberos(krb_principal) - self.assertEqual(len(self.queries), 4) - self.assertEqual(len(self.updates), 2) - - query = self.queries[3] - self.assertEqual(query.tables, ['users']) - self.assertEqual(query.joins, ['LEFT JOIN user_krb_principals ON ' - 'users.id = user_krb_principals.user_id']) - self.assertEqual(query.clauses, ['name = %(user_name)s']) - self.assertEqual(query.columns, ['id', 'krb_principal']) - self.assertEqual(query.values, {'user_name': 'test-krb-princ'}) - - def test_createUserFromKerberos_valid(self): - self.query_execute.return_value = [{'id': 1, 'krb_principal': 'krb-user-1@redhat.com'}, - {'id': 1, 'krb_principal': 'krb-user-2@redhat.com'}] - s, cntext = self.get_session() - krb_principal = 'test-krb-princ@redhat.com' - s.setKrbPrincipal = mock.MagicMock() - s.setKrbPrincipal.return_value = 1 - s.createUserFromKerberos(krb_principal) - self.assertEqual(len(self.queries), 4) - self.assertEqual(len(self.updates), 2) - - query = self.queries[3] - self.assertEqual(query.tables, ['users']) - self.assertEqual(query.joins, ['LEFT JOIN user_krb_principals ON ' - 'users.id = user_krb_principals.user_id']) - self.assertEqual(query.clauses, ['name = %(user_name)s']) - self.assertEqual(query.columns, ['id', 'krb_principal']) - self.assertEqual(query.values, {'user_name': 'test-krb-princ'}) - - # functions outside Session object - - def test_get_user_data(self): - """auth.get_user_data""" - self.query_executeOne.return_value = None - self.assertEqual(len(self.queries), 0) - - self.query_executeOne.return_value = {'name': 'name', 'status': 'status', - 'usertype': 'usertype'} - kojihub.auth.get_user_data(1) - self.assertEqual(len(self.queries), 1) - query = self.queries[0] - self.assertEqual(query.tables, ['users']) - self.assertEqual(query.joins, None) - self.assertEqual(query.clauses, ['id=%(user_id)s']) - self.assertEqual(query.columns, ['name', 'status', 'usertype']) - - def test_get_user_groups(self): - """auth.get_user_groups""" - kojihub.auth.get_user_groups(1) - self.assertEqual(len(self.queries), 1) - query = self.queries[0] - self.assertEqual(query.tables, ['user_groups']) - self.assertEqual(query.joins, ['users ON group_id = users.id']) - self.assertEqual(query.clauses, ['active IS TRUE', 'user_id=%(user_id)i', - 'users.usertype=%(t_group)i']) - self.assertEqual(query.columns, ['group_id', 'name']) - - def test_get_user_perms(self): - """auth.get_user_perms""" - kojihub.auth.get_user_perms(1) - self.assertEqual(len(self.queries), 2) - query = self.queries[0] - self.assertEqual(query.tables, ['user_perms']) - self.assertEqual(query.joins, ['permissions ON perm_id = permissions.id']) - self.assertEqual(query.clauses, ['active IS TRUE', 'user_id=%(user_id)s']) - self.assertEqual(query.columns, ['name']) - query = self.queries[1] - self.assertEqual(query.tables, ['user_groups']) - self.assertEqual(query.joins, [ - 'user_perms ON user_perms.user_id = user_groups.group_id', - 'permissions ON perm_id = permissions.id']) - self.assertEqual(sorted(query.clauses), sorted([ - 'user_groups.active IS TRUE', - 'user_perms.active IS TRUE', - 'user_groups.user_id=%(user_id)s'])) - self.assertEqual(query.columns, ['permissions.name']) - - def test_get_user_perms_inherited(self): - self.query_execute.side_effect = [ - [{'id': 1, 'name': 'perm1'}, {'id': 2, 'name': 'perm2'}], - [{'name': 'perm3'}] - ] - result = kojihub.auth.get_user_perms(1) - self.assertEqual(set(result), {'perm1', 'perm2', 'perm3'}) - - def test_get_user_perms_inherited_data(self): - self.query_execute.side_effect = [ - [{'id': 1, 'name': 'perm1'}, {'id': 2, 'name': 'perm2'}], - [{'name': 'perm3', 'group': 'group_a'}, - {'name': 'perm4', 'group': 'group_b'}, - {'name': 'perm4', 'group': 'group_c'}] - ] - result = kojihub.auth.get_user_perms(1, inheritance_data=True) - self.assertEqual(result, { - 'perm1': [None], - 'perm2': [None], - 'perm3': ['group_a'], - 'perm4': ['group_b', 'group_c'], - }) - - def test_logout_logged_not_owner(self): - s, _ = self.get_session() - - s.logged_in = True - # session_id without admin perms and not owner - self.context.session.hasPerm.return_value = False - self.context.session.user_id.return_value = 123 - self.query_singleValue.return_value = None - with self.assertRaises(koji.ActionNotAllowed) as ex: - s.logout(session_id=1) - self.assertEqual("only admins or owner may logout other session", str(ex.exception)) diff --git a/tests/test_lib/test_insert_processor.py b/tests/test_lib/test_insert_processor.py deleted file mode 100644 index 2c63450..0000000 --- a/tests/test_lib/test_insert_processor.py +++ /dev/null @@ -1,208 +0,0 @@ -import mock -import unittest - -import koji -import kojihub - - -class TestInsertProcessor(unittest.TestCase): - def setUp(self): - self.context_db = mock.patch('kojihub.db.context').start() - - def tearDown(self): - mock.patch.stopall() - - def test_basic_instantiation(self): - proc = kojihub.InsertProcessor('sometable') - actual = str(proc) - expected = '-- incomplete update: no assigns' - self.assertEqual(actual, expected) - - def test_to_string_with_data(self): - proc = kojihub.InsertProcessor('sometable', data={'foo': 'bar'}) - actual = str(proc) - expected = 'INSERT INTO sometable (foo) VALUES (%(foo)s)' - self.assertEqual(actual, expected) - - def test_simple_execution_with_iterate(self): - cursor = mock.MagicMock() - self.context_db.cnx.cursor.return_value = cursor - proc = kojihub.InsertProcessor('sometable', data={'foo': 'bar'}) - proc.execute() - cursor.execute.assert_called_once_with( - 'INSERT INTO sometable (foo) VALUES (%(foo)s)', - {'foo': 'bar'}, log_errors=True) - - def test_make_create(self,): - cursor = mock.MagicMock() - self.context_db.cnx.cursor.return_value = cursor - self.context_db.session.assertLogin = mock.MagicMock() - proc = kojihub.InsertProcessor('sometable', data={'foo': 'bar'}) - proc.make_create(event_id=1, user_id=2) - self.assertEqual(proc.data['create_event'], 1) - self.assertEqual(proc.data['creator_id'], 2) - - proc.make_create(user_id=2) - self.assertEqual(proc.data['create_event'], self.context_db.event_id) - self.assertEqual(proc.data['creator_id'], 2) - - proc.make_create(event_id=1) - self.assertEqual(proc.data['create_event'], 1) - self.assertEqual(proc.data['creator_id'], self.context_db.session.user_id) - - proc.make_create() - self.assertEqual(proc.data['create_event'], self.context_db.event_id) - self.assertEqual(proc.data['creator_id'], self.context_db.session.user_id) - - def test_dup_check(self): - cursor = mock.MagicMock() - self.context_db.cnx.cursor.return_value = cursor - self.context_db.session.assertLogin = mock.MagicMock() - proc = kojihub.InsertProcessor('sometable', data={'foo': 'bar'}) - proc.dup_check() - - args = cursor.execute.call_args - actual = ' '.join(args[0][0].split()) - expected = 'SELECT foo FROM sometable WHERE (foo = %(foo)s)' - self.assertEqual(actual, expected) - - proc.make_create() - proc.dup_check() - args = cursor.execute.call_args - actual = ' '.join(args[0][0].split()) - expected = 'SELECT active, foo FROM sometable WHERE ' + \ - '(active = %(active)s) AND (foo = %(foo)s)' - self.assertEqual(actual, expected) - - proc.set(onething='another') - proc.rawset(something='something else') - result = proc.dup_check() - self.assertEqual(result, None) - - def test_raw_data(self): - cursor = mock.MagicMock() - self.context_db.cnx.cursor.return_value = cursor - proc = kojihub.InsertProcessor('sometable', rawdata={'foo': '\'bar\''}) - result = proc.dup_check() - self.assertEqual(result, None) - actual = str(proc) - expected = "INSERT INTO sometable (foo) VALUES (('bar'))" # raw data - self.assertEqual(actual, expected) - - -class TestBulkInsertProcessor(unittest.TestCase): - def setUp(self): - self.context_db = mock.patch('kojihub.db.context').start() - - def tearDown(self): - mock.patch.stopall() - - def test_basic_instantiation(self): - proc = kojihub.BulkInsertProcessor('sometable') - actual = str(proc) - expected = '-- incomplete insert: no data' - self.assertEqual(actual, expected) - - def test_to_string_with_single_row(self): - proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar'}]) - actual = str(proc) - expected = 'INSERT INTO sometable (foo) VALUES (%(foo0)s)' - self.assertEqual(actual, expected) - - proc = kojihub.BulkInsertProcessor('sometable') - proc.add_record(foo='bar') - actual = str(proc) - self.assertEqual(actual, expected) - - def test_simple_execution(self): - cursor = mock.MagicMock() - self.context_db.cnx.cursor.return_value = cursor - proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar'}]) - proc.execute() - cursor.execute.assert_called_once_with( - 'INSERT INTO sometable (foo) VALUES (%(foo0)s)', - {'foo0': 'bar'}, - log_errors=True - ) - - cursor.reset_mock() - proc = kojihub.BulkInsertProcessor('sometable') - proc.add_record(foo='bar') - proc.execute() - cursor.execute.assert_called_once_with( - 'INSERT INTO sometable (foo) VALUES (%(foo0)s)', - {'foo0': 'bar'}, - log_errors=True - ) - - def test_bulk_execution(self): - cursor = mock.MagicMock() - self.context_db.cnx.cursor.return_value = cursor - - proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar1'}]) - proc.add_record(foo='bar2') - proc.add_record(foo='bar3') - proc.execute() - cursor.execute.assert_called_once_with( - 'INSERT INTO sometable (foo) VALUES (%(foo0)s), (%(foo1)s), (%(foo2)s)', - {'foo0': 'bar1', 'foo1': 'bar2', 'foo2': 'bar3'}, - log_errors=True - ) - - def test_missing_values(self): - proc = kojihub.BulkInsertProcessor('sometable') - proc.add_record(foo='bar') - proc.add_record(foo2='bar2') - with self.assertRaises(koji.GenericError) as cm: - str(proc) - self.assertEqual(cm.exception.args[0], 'Missing value foo2 in BulkInsert') - - def test_missing_values_nostrict(self): - proc = kojihub.BulkInsertProcessor('sometable', strict=False) - proc.add_record(foo='bar') - proc.add_record(foo2='bar2') - actual = str(proc) - expected = 'INSERT INTO sometable (foo, foo2) VALUES (%(foo0)s, NULL), (NULL, %(foo21)s)' - self.assertEqual(actual, expected) - - def test_missing_values_explicit_columns(self): - proc = kojihub.BulkInsertProcessor('sometable', strict=True, columns=['foo', 'foo2']) - proc.add_record(foo='bar') - with self.assertRaises(koji.GenericError) as cm: - str(proc) - self.assertEqual(cm.exception.args[0], 'Missing value foo2 in BulkInsert') - - def test_batch_execution(self): - cursor = mock.MagicMock() - self.context_db.cnx.cursor.return_value = cursor - - proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar1'}], batch=2) - proc.add_record(foo='bar2') - proc.add_record(foo='bar3') - proc.execute() - calls = cursor.execute.mock_calls - # list of (name, positional args, keyword args) - self.assertEqual(len(calls), 2) - self.assertEqual(calls[0], - mock.call('INSERT INTO sometable (foo) VALUES (%(foo0)s), (%(foo1)s)', - {'foo0': 'bar1', 'foo1': 'bar2'}, log_errors=True)) - self.assertEqual(calls[1], - mock.call('INSERT INTO sometable (foo) VALUES (%(foo0)s)', - {'foo0': 'bar3'}, log_errors=True)) - - def test_no_batch_execution(self): - cursor = mock.MagicMock() - self.context_db.cnx.cursor.return_value = cursor - - proc = kojihub.BulkInsertProcessor('sometable', data=[{'foo': 'bar1'}], batch=0) - proc.add_record(foo='bar2') - proc.add_record(foo='bar3') - proc.execute() - calls = cursor.execute.mock_calls - # list of (name, positional args, keyword args) - self.assertEqual(len(calls), 1) - self.assertEqual( - calls[0], - mock.call('INSERT INTO sometable (foo) VALUES (%(foo0)s), (%(foo1)s), (%(foo2)s)', - {'foo0': 'bar1', 'foo1': 'bar2', 'foo2': 'bar3'}, log_errors=True) - ) diff --git a/tests/test_lib/test_query_processor.py b/tests/test_lib/test_query_processor.py deleted file mode 100644 index 2e7513b..0000000 --- a/tests/test_lib/test_query_processor.py +++ /dev/null @@ -1,142 +0,0 @@ -import mock -import unittest - -import kojihub - - -class TestQueryProcessor(unittest.TestCase): - def setUp(self): - self.simple_arguments = dict( - columns=['something'], - tables=['awesome'], - ) - self.complex_arguments = dict( - columns=['something'], - aliases=['other'], - tables=['awesome'], - joins=['morestuff'], - # values=... - # transform=... - opts={ - # 'countOnly': True, - 'order': 'other', - 'offset': 10, - 'limit': 3, - 'group': 'awesome.aha' - # 'rowlock': True, - }, - enable_group=True - ) - self.original_chunksize = kojihub.QueryProcessor.iterchunksize - kojihub.QueryProcessor.iterchunksize = 2 - self.context_db = mock.patch('kojihub.db.context').start() - - def tearDown(self): - kojihub.QueryProcessor.iterchunksize = self.original_chunksize - mock.patch.stopall() - - def test_basic_instantiation(self): - kojihub.QueryProcessor() # No exception! - - def test_instantiation_with_cols_and_aliases(self): - proc = kojihub.QueryProcessor(columns=['wat'], aliases=['zap']) - assert 'zap' in proc.colsByAlias - assert proc.colsByAlias['zap'] == 'wat' - assert len(proc.colsByAlias) == 1 - - def test_empty_as_string(self): - proc = kojihub.QueryProcessor() - actual = str(proc) - self.assertIn("SELECT", actual) - self.assertIn("FROM", actual) - - def test_simple_as_string(self): - proc = kojihub.QueryProcessor( - columns=['something'], - tables=['awesome'], - ) - actual = " ".join([token for token in str(proc).split() if token]) - expected = "SELECT something FROM awesome" - self.assertEqual(actual, expected) - - def test_complex_as_string(self): - proc = kojihub.QueryProcessor(**self.complex_arguments) - actual = " ".join([token for token in str(proc).split() if token]) - expected = "SELECT something FROM awesome JOIN morestuff" \ - " GROUP BY awesome.aha ORDER BY something OFFSET 10 LIMIT 3" - self.assertEqual(actual, expected) - args2 = self.complex_arguments.copy() - args2['enable_group'] = False - proc = kojihub.QueryProcessor(**args2) - actual = " ".join([token for token in str(proc).split() if token]) - expected = "SELECT something FROM awesome JOIN morestuff" \ - " ORDER BY something OFFSET 10 LIMIT 3" - self.assertEqual(actual, expected) - - def test_simple_with_execution(self): - cursor = mock.MagicMock() - self.context_db.cnx.cursor.return_value = cursor - proc = kojihub.QueryProcessor(**self.simple_arguments) - proc.execute() - cursor.execute.assert_called_once_with( - '\nSELECT something\n FROM awesome\n\n\n \n \n\n \n', {}) - - def test_simple_count_with_execution(self): - cursor = mock.MagicMock() - self.context_db.cnx.cursor.return_value = cursor - cursor.fetchall.return_value = [('some count',)] - args = self.simple_arguments.copy() - args['opts'] = {'countOnly': True} - proc = kojihub.QueryProcessor(**args) - results = proc.execute() - cursor.execute.assert_called_once_with( - '\nSELECT count(*)\n FROM awesome\n\n\n \n \n\n \n', {}) - self.assertEqual(results, 'some count') - - cursor.reset_mock() - args['opts']['group'] = 'id' - args['enable_group'] = True - proc = kojihub.QueryProcessor(**args) - results = proc.execute() - cursor.execute.assert_called_once_with( - 'SELECT count(*)\nFROM (\nSELECT 1\n' - ' FROM awesome\n\n\n GROUP BY id\n \n\n \n) numrows', {}) - self.assertEqual(results, 'some count') - - def test_simple_execution_with_iterate(self): - cursor = mock.MagicMock() - self.context_db.cnx.cursor.return_value = cursor - cursor.fetchall.return_value = [ - ('value number 1',), - ('value number 2',), - ('value number 3',), - ] - proc = kojihub.QueryProcessor(**self.simple_arguments) - generator = proc.iterate() - calls = cursor.execute.mock_calls - result = next(generator) - # two calls so far.. - self.assertEqual(result, {'something': 'value number 1'}) - self.assertEqual(len(calls), 2) - result = next(generator) - # still two. - self.assertEqual(result, {'something': 'value number 2'}) - self.assertEqual(len(calls), 2) - # now three. - result = next(generator) - self.assertEqual(result, {'something': 'value number 3'}) - - @mock.patch('kojihub.db._multiRow') - def test_execution_as_list_transform(self, multirow): - multirow.return_value = [{'col1': 'result_1_col_1', 'col2': 'result_1_col_2'}, - {'col1': 'result_2_col_1', 'col2': 'result_2_col_2'}] - args = dict( - columns=['col1', 'col2'], - tables=['table'], - opts={'asList': True}, - transform=lambda x: x, - ) - proc = kojihub.QueryProcessor(**args) - results = proc.execute() - self.assertEqual( - results, [['result_1_col_1', 'result_1_col_2'], ['result_2_col_1', 'result_2_col_2']]) diff --git a/tests/test_lib/test_savepoint.py b/tests/test_lib/test_savepoint.py deleted file mode 100644 index 87a2e2c..0000000 --- a/tests/test_lib/test_savepoint.py +++ /dev/null @@ -1,24 +0,0 @@ -import mock - -import unittest - -import kojihub - - -class TestSavepoint(unittest.TestCase): - - def setUp(self): - self.dml = mock.patch('kojihub.db._dml').start() - self.context_db = mock.patch('kojihub.db.context').start() - - def tearDown(self): - mock.patch.stopall() - - def test_savepoint(self): - sp = kojihub.Savepoint('some_name') - self.assertEqual(sp.name, 'some_name') - self.dml.assert_called_once_with('SAVEPOINT some_name', {}) - - self.dml.reset_mock() - sp.rollback() - self.dml.assert_called_once_with('ROLLBACK TO SAVEPOINT some_name', {}) diff --git a/tests/test_lib/test_update_processor.py b/tests/test_lib/test_update_processor.py deleted file mode 100644 index aadfeab..0000000 --- a/tests/test_lib/test_update_processor.py +++ /dev/null @@ -1,34 +0,0 @@ -import mock -import unittest - -import kojihub - - -class TestUpdateProcessor(unittest.TestCase): - - def test_basic_instantiation(self): - kojihub.UpdateProcessor('sometable') # No exception! - - def test_to_string_with_data(self): - proc = kojihub.UpdateProcessor('sometable', data={'foo': 'bar'}) - actual = str(proc) - expected = 'UPDATE sometable SET foo = %(data.foo)s' - self.assertEqual(actual, expected) - - def test_to_values_from_data(self): - proc = kojihub.UpdateProcessor('sometable', data={'foo': 'bar'}) - actual = proc.get_values() - expected = {'data.foo': 'bar'} - self.assertEqual(actual, expected) - - @mock.patch('kojihub.db.context') - def test_simple_execution_with_iterate(self, context_db): - cursor = mock.MagicMock() - context_db.cnx.cursor.return_value = cursor - proc = kojihub.UpdateProcessor('sometable', data={'foo': 'bar'}) - proc.execute() - cursor.execute.assert_called_once_with( - 'UPDATE sometable SET foo = %(data.foo)s', - {'data.foo': 'bar'}, - log_errors=True, - ) From 46e93bb469c55499ce445863e9d5d291a6383c96 Mon Sep 17 00:00:00 2001 From: Yuming Zhu Date: Oct 22 2024 15:55:11 +0000 Subject: [PATCH 2/4] fix removeNonprintable for PY2 --- diff --git a/koji/__init__.py b/koji/__init__.py index 7a8111e..b0a6e4b 100644 --- a/koji/__init__.py +++ b/koji/__init__.py @@ -3820,13 +3820,14 @@ if six.PY3: def removeNonprintable(value): + # TODO: it's no more used in PY2 # expects raw-encoded string, not unicode if six.PY2: - value = value.translate(None, NONPRINTABLE_CHARS) + return value.translate(None, NONPRINTABLE_CHARS) else: value = value.translate(NONPRINTABLE_CHARS_TABLE) - # remove broken unicode chars (some changelogs, etc.) - return value.encode('utf-8', errors='replace').decode() + # remove broken unicode chars (some changelogs, etc.) + return value.encode('utf-8', errors='replace').decode() def _fix_print(value): From 8e2b4a0e1560046cba5b3fe611864a79f89ca70e Mon Sep 17 00:00:00 2001 From: Yuming Zhu Date: Oct 22 2024 15:55:11 +0000 Subject: [PATCH 3/4] use OSError instead of FileNotFoundError due to PY2 support --- diff --git a/koji/util.py b/koji/util.py index 750bbda..1b5e669 100644 --- a/koji/util.py +++ b/koji/util.py @@ -803,9 +803,12 @@ def _rmtree_nofork(path, logger=None): logger = logger or logging.getLogger('koji') try: st = os.lstat(path) - except FileNotFoundError: - logger.warning("No such file/dir %s for removal" % path) - return + except OSError as e: + # FileNotFoundError is N/A in py2 + if e.errno == errno.ENOENT: + logger.warning("No such file/dir %s for removal" % path) + return + raise if not stat.S_ISDIR(st.st_mode): raise koji.GenericError("Not a directory: %s" % path) dev = st.st_dev From 0e0318fbdb22cec32d54bb0c6ec1109f06e8b97a Mon Sep 17 00:00:00 2001 From: Yuming Zhu Date: Oct 22 2024 15:58:14 +0000 Subject: [PATCH 4/4] unittest: enabling tests/test_lib for py2 --- diff --git a/tests/test_lib/test_client_session.py b/tests/test_lib/test_client_session.py index fb3de76..9347e9d 100644 --- a/tests/test_lib/test_client_session.py +++ b/tests/test_lib/test_client_session.py @@ -234,6 +234,10 @@ class TestMultiCall(unittest.TestCase): def test_MultiCallHack_weakref_validation(self): expected_exc = 'The session parameter must be a weak reference' + if six.PY2: + with self.assertRaisesRegexp(TypeError, expected_exc): + koji.MultiCallHack(self.ksession) + return with self.assertRaisesRegex(TypeError, expected_exc): koji.MultiCallHack(self.ksession) diff --git a/tests/test_lib/test_gen_mock_config.py b/tests/test_lib/test_gen_mock_config.py index ee1b895..af699d5 100644 --- a/tests/test_lib/test_gen_mock_config.py +++ b/tests/test_lib/test_gen_mock_config.py @@ -18,10 +18,10 @@ class TestGenMockConfig(unittest.TestCase): if not fn.endswith('.data'): continue path = os.path.join(datadir, fn) - with open(path, 'rt', encoding='utf-8') as fo: + with open(path, 'rt') as fo: s = fo.read() params = ast.literal_eval(s) - with open(path[:-5] + '.out', 'rt', encoding='utf-8') as fo: + with open(path[:-5] + '.out', 'rt') as fo: expected = fo.read() output = koji.genMockConfig(**params) self.assertMultiLineEqual(output, expected) diff --git a/tests/test_lib/test_plugin.py b/tests/test_lib/test_plugin.py index d76bbeb..3e794af 100644 --- a/tests/test_lib/test_plugin.py +++ b/tests/test_lib/test_plugin.py @@ -7,11 +7,11 @@ import sys import unittest try: - import importlib - imp = None -except ImportError: import imp importlib = None +except ImportError: + import importlib + imp = None import koji import koji.util diff --git a/tests/test_lib/test_spliced_sig.py b/tests/test_lib/test_spliced_sig.py index 107bcd8..bcb7b50 100644 --- a/tests/test_lib/test_spliced_sig.py +++ b/tests/test_lib/test_spliced_sig.py @@ -5,6 +5,8 @@ import shutil import tempfile import unittest +import six + import koji @@ -31,6 +33,7 @@ class TestCheckSigMD5(unittest.TestCase): self.assertEqual(contents_signed, contents_spliced) self.assertNotEqual(contents_signed, contents_orig) + @unittest.skipIf(six.PY2, "Python 2 not supported") def test_splice_rpm_sighdr(self): contents_signed = open(self.signed, 'rb').read() sighdr = koji.rip_rpm_sighdr(self.signed) diff --git a/tests/test_lib/test_tasks.py b/tests/test_lib/test_tasks.py index 7cb897c..0c80983 100644 --- a/tests/test_lib/test_tasks.py +++ b/tests/test_lib/test_tasks.py @@ -316,9 +316,8 @@ class TasksTestCase(unittest.TestCase): @patch('time.time') @patch('time.sleep') - @patch('signal.sigtimedwait') @patch('signal.pause') - def test_BaseTaskHandler_wait_timeout(self, pause, sigtimedwait, sleep, time): + def test_BaseTaskHandler_wait_timeout(self, pause, sleep, time): """Tests timeout behavior in the wait function""" temp_path = self.get_tmp_dir_path('TaskTest') obj = TaskTest(95, 'some_method', ['random_arg'], None, None, temp_path) @@ -326,11 +325,18 @@ class TasksTestCase(unittest.TestCase): obj.session = MagicMock() obj.session.host.taskWait.return_value = [[], [99, 100, 101]] time.side_effect = list(range(0, 4000, 60)) + if six.PY3: + sigtimedwait = patch('signal.sigtimedwait').start() + try: obj.wait([99, 100, 101], timeout=3600) raise Exception('A GenericError was not raised.') except koji.GenericError as e: self.assertEqual(e.args[0][:24], 'Subtasks timed out after') + + if six.PY3: + sigtimedwait.stop() + obj.session.host.taskSetWait.assert_called_once_with(95, [99, 100, 101]) obj.session.cancelTaskChildren.assert_called_once_with(95) obj.session.getTaskResult.assert_not_called() @@ -338,9 +344,8 @@ class TasksTestCase(unittest.TestCase): @patch('time.time') @patch('time.sleep') - @patch('signal.sigtimedwait') @patch('signal.pause') - def test_BaseTaskHandler_wait_avoid_timeout(self, pause, sigtimedwait, sleep, time): + def test_BaseTaskHandler_wait_avoid_timeout(self, pause, sleep, time): """Tests that timeout does not happen if tasks finish in time""" temp_path = self.get_tmp_dir_path('TaskTest') obj = TaskTest(95, 'some_method', ['random_arg'], None, None, temp_path) @@ -355,8 +360,15 @@ class TasksTestCase(unittest.TestCase): # and then report all done taskWait_returns.append([[99, 100, 101], []]) obj.session.host.taskWait.side_effect = taskWait_returns + + if six.PY3: + sigtimedwait = patch('signal.sigtimedwait').start() + obj.wait([99, 100, 101], timeout=3600) + if six.PY3: + sigtimedwait.stop() + obj.session.host.taskSetWait.assert_called_once_with(95, [99, 100, 101]) obj.session.cancelTaskChildren.assert_not_called() pause.assert_not_called() diff --git a/tests/test_lib/test_utils.py b/tests/test_lib/test_utils.py index 79aa414..fefe764 100644 --- a/tests/test_lib/test_utils.py +++ b/tests/test_lib/test_utils.py @@ -739,7 +739,7 @@ class MavenUtilTestCase(unittest.TestCase): def _read_conf(self, cfile): path = os.path.dirname(__file__) - with open(path + cfile, 'rt', encoding='utf-8') as conf_file: + with open(path + cfile, 'rt') as conf_file: if six.PY2: config = six.moves.configparser.SafeConfigParser() config.readfp(conf_file) @@ -1496,7 +1496,10 @@ class TestRmtree(unittest.TestCase): rmtree_nofork.assert_called_once() self.assertEqual(rmtree_nofork.call_args[0][0], path) _exit.assert_called_once() - logger = rmtree_nofork.call_args.kwargs['logger'] + if mock.__package__ == 'unittest': + logger = rmtree_nofork.call_args.kwargs['logger'] + else: + logger = rmtree_nofork.call_args[1]['logger'] @mock.patch('tempfile.mkstemp') # avoid stray temp file @mock.patch('koji.util._rmtree_nofork') diff --git a/tests/test_lib/test_xmlrpcplus.py b/tests/test_lib/test_xmlrpcplus.py index af7ff30..f7454e0 100644 --- a/tests/test_lib/test_xmlrpcplus.py +++ b/tests/test_lib/test_xmlrpcplus.py @@ -1,10 +1,16 @@ # coding=utf-8 from __future__ import absolute_import -from six.moves import range -import unittest + import re +import sys +import unittest +try: + from unittest.mock import ANY +except ImportError: + from mock import ANY from six.moves import xmlrpc_client + from koji import xmlrpcplus @@ -98,6 +104,9 @@ class TestDump(unittest.TestCase): 'RegexNameInternal.compiled': re.compile('^[A-Za-z0-9/_.+-]+$')} dist_data_output = ({'MaxNameLengthInternal': 15, 'RegexNameInternal.compiled': "re.compile('^[A-Za-z0-9/_.+-]+$')"},) + if sys.version_info < (3, 7): + dist_data_output[0]['RegexNameInternal.compiled'] = ANY + dict_data = (dict_data,) enc = xmlrpcplus.dumps(dict_data, methodresponse=1) params, method = xmlrpc_client.loads(enc) diff --git a/tox.ini b/tox.ini index 5b56a3a..4cb79cb 100644 --- a/tox.ini +++ b/tox.ini @@ -57,7 +57,7 @@ commands_pre = {envbindir}/coverage2 erase commands = {envbindir}/coverage2 run --source . -m pytest {posargs:\ - tests/test_builder tests/test_cli \ + tests/test_builder tests/test_cli tests/test_lib \ tests/test_plugins/test_runroot_builder.py \ tests/test_plugins/test_save_failed_tree_builder.py \ tests/test_plugins/test_runroot_cli.py \