#3955 New command - list-users
Closed a year ago by alisboav. Opened a year ago by alisboav.
alisboav/koji perms  into  master

file modified
+21
@@ -2154,6 +2154,27 @@ 

      for volinfo in session.listVolumes():

          print(volinfo['name'])

  

+ def handle_list_users(goptions, session, args):

+     "[info] List of users with given a permission"

+     usage = "usage: %prog list-users [options]"

+     parser = OptionParser(usage=get_usage_str(usage))

+     parser.add_option("--perm", help="List users that have a given permission")

+     (options, args) = parser.parse_args(args)

+     if len(args) > 0:

+         parser.error("This command takes no arguments")

+     activate_session(session, goptions)

+     users = []

+     if options.perm:

+         users_list = session.getPermsUser(options.perm)

+         if users_list:

+             for p in users_list:

+                 users.append({'name': p})

+         else:

+             parser.error("No such permission: %s" % options.perm)

+     else:

+         parser.error("Please provide a permission with --perm")

+     for user in users:

+         print(user['name'])

  

  def handle_list_permissions(goptions, session, args):

      "[info] List user permissions"

file modified
+7
@@ -790,6 +790,13 @@ 

      result = query.execute()

      return [r['name'] for r in result]

  

+ def get_users_with_perm(perm_id):

+     query = QueryProcessor(tables=['user_perms'], columns=['name'],

+                            clauses=['active = TRUE', 'perm_id=%(perm_id)s'],

+                            joins=['users ON user_id = users.id'],

+                            values={'perm_id': perm_id})

+     result = query.execute()

+     return [r['name'] for r in result]

  

  def get_user_data(user_id):

      query = QueryProcessor(tables=['users'], columns=['name', 'status', 'usertype'],

file modified
+12 -1
@@ -74,7 +74,7 @@ 

      multi_fnmatch,

      safer_move,

  )

- from .auth import get_user_perms, get_user_groups

+ from .auth import get_user_perms, get_user_groups, get_users_with_perm

  from .db import (  # noqa: F401

      BulkInsertProcessor,

      DeleteProcessor,
@@ -13390,6 +13390,17 @@ 

          user_info = get_user(userID, strict=True)

          return get_user_perms(user_info['id'])

  

+     def getPermsUser(self, perm=None):

+         """Get a list of users with the given permission.

+         Options:

+         - perm: Permission name."""

+         if not context.session.hasPerm('admin'):

+             raise koji.ActionNotAllowed("This action requires admin privileges")

+         perm_id = get_perm_id(perm)

+         if not perm_id:

+             raise koji.GenericError('Permission %s not available' % perm)

+         return get_users_with_perm(perm_id)

+ 

      def getAllPerms(self):

          """Get a list of all permissions in the system.  Returns a list of maps.  Each

          map contains the following keys:

@@ -115,6 +115,7 @@ 

          list-targets              List the build targets

          list-tasks                Print the list of tasks

          list-untagged             List untagged builds

+         list-users                List users with a given permission

          list-volumes              List storage volumes

          mock-config               Create a mock config

          repoinfo                  Print basic information about a repo

@@ -0,0 +1,104 @@ 

+ from __future__ import absolute_import

+ import mock

+ import six

+ import unittest

+ 

+ from koji_cli.commands import handle_list_users

+ from . import utils

+ 

+ 

+ class TestListUsers(utils.CliTestCase):

+ 

+     def setUp(self):

+         self.error_format = """Usage: %s list-users [options]

+ (Specify the --help global option for a list of other help options)

+ 

+ %s: error: {message}

+ """ % (self.progname, self.progname)

+ 

+         self.session = mock.MagicMock()

+         self.activate_session_mock = mock.patch('koji_cli.commands.activate_session').start()

+         self.options = mock.MagicMock()

+         self.options.quiet = True

+         self.users = [{'id': 101, 'name': "user01"},

+                       {'id': 102, 'name': "user02"},

+                       {'id': 103, 'name': "user03"},

+                       {'id': 104, 'name': "user04"},

+                       {'id': 105, 'name': "user05"}

+         ]

+         self.perm='admin'

+                       

+ 

+     def test_handle_list_users_arg_error(self):

+         """Test handle_list_users argument error (no argument is required)"""

+         expected = self.format_error_message("This command takes no arguments")

+         self.assert_system_exit(

+             handle_list_users,

+             self.options,

+             self.session,

+             ['arg-1', 'arg-2'],

+             stderr=expected,

+             activate_session=None,

+             exit_code=2

+         )

+         self.activate_session_mock.assert_not_called()

+         self.session.getPermsUser.assert_not_called()

+     

+ 

+     def test_handle_list_users_perm_not_exist(self):

+         """Test handle_list_users when perm does not exist"""

+         self.session.getPermsUser.return_value = []

+         expected = self.format_error_message("No such permission: notperm")

+         self.assert_system_exit(

+             handle_list_users,

+             self.options,

+             self.session,

+             ['--perm', 'notperm'],

+             stderr=expected,

+             activate_session=None,

+             exit_code=2

+         )

+         self.activate_session_mock.assert_called_once()

+         self.session.getPermsUser.assert_called_once()

+ 

+     @mock.patch('sys.stdout', new_callable=six.StringIO)

+     def test_handle_list_users_no_perm(self, stdout):

+         """Test handle_list_users when --perm is not setting"""

+         expected = self.format_error_message("""Please provide a permission with --perm""")

+         self.assert_system_exit(

+             handle_list_users,

+             self.options,

+             self.session,

+             [],

+             stderr=expected,

+             activate_session=None,

+             exit_code=2

+         )

+         self.activate_session_mock.assert_called_once()

+         self.session.getPermsUser.assert_not_called()

+ 

+     @mock.patch('sys.stdout', new_callable=six.StringIO)

+     def test_handle_list_users_perm(self, stdout):

+         """Test handle_list_users user permissions"""

+         expected = "user01\nuser02\nuser03\nuser04\nuser05\n"

+         users = [p['name'] for p in self.users[::1]]

+         self.session.getPermsUser.return_value = users

+         handle_list_users(self.options, self.session, ['--perm', self.perm])

+         self.assert_console_message(stdout, expected)

+         self.activate_session_mock.assert_called_once()

+         self.session.getPermsUser.assert_called_once()

+ 

+     def test_handle_list_users_help(self):

+         self.assert_help(

+             handle_list_users,

+             """Usage: %s list-users [options]

+ (Specify the --help global option for a list of other help options)

+ 

+ Options:

+   -h, --help   show this help message and exit

+   --perm=PERM  List users that have a given permission

+ """ % self.progname)

+ 

+ 

+ if __name__ == '__main__':

+     unittest.main()

@@ -0,0 +1,32 @@ 

+ import mock

+ import unittest

+ import koji

+ import kojihub

+ 

+ 

+ class TestGetPermsUser(unittest.TestCase):

+     def setUp(self):

+         self.context = mock.patch('kojihub.kojihub.context').start()

+         self.get_perm_id = mock.patch('kojihub.kojihub.get_perm_id').start()

+         self.get_users_with_perm = mock.patch('kojihub.kojihub.get_users_with_perm').start()

+ 

+     def tearDown(self):

+         mock.patch.stopall()

+ 

+     def test_no_admin(self):

+         self.context.session.hasPerm.return_value = False

+         with self.assertRaises(koji.ActionNotAllowed) as ex:

+             kojihub.RootExports().getPermsUser('admin')

+         self.assertEqual("This action requires admin privileges", str(ex.exception))

+         self.get_users_with_perm.assert_not_called()

+ 

+     def test_no_perm(self):

+         self.get_perm_id.return_value = None

+         with self.assertRaises(koji.GenericError):

+             kojihub.RootExports().getPermsUser('noperm')

+         self.get_users_with_perm.assert_not_called()

+ 

+     def test_normal(self):

+         self.get_perm_id.return_value = 1

+         kojihub.RootExports().getPermsUser('admin')

+         self.get_users_with_perm.assert_called_once_with(1)

https://pagure.io/koji/issue/3950

Adding a new command - list-users.
It receives a permission and return the list of users with that permission.
This command requires admin privileges.

Example:

[root@mvf-alma9 ~]# koji -p kojitest list-users --perm=admin
user1
user2

[kojiuser@mvf-alma9 ~]$ koji -p kojitest list-users --perm=admin
2023-12-05 09:26:42,068 [ERROR] koji: ActionNotAllowed: This action requires admin privileges
[kojiuser@mvf-alma9 ~]$ koji -p kojitest list-permissions --user kojiuser
Permission name   
------------------
build   

Pull-Request has been closed by alisboav

a year ago