#4251 drop cgi import
Merged 6 months ago by mikem. Opened 6 months ago by mikem.
mikem/koji drop-cgi  into  master

file modified
+1
@@ -111,6 +111,7 @@ 

      # provide some needed info

      environ['SCRIPT_FILENAME'] = wsgi_publisher.__file__

      environ['REQUEST_URI'] = get_url(environ)

+     environ['REQUEST_SCHEME'] = environ['wsgi.url_scheme']

      set_config(environ)

      if FIRST:

          pprint.pprint(environ)

@@ -1,11 +1,10 @@ 

  from unittest import mock

  import unittest

- import cgi

  

  import koji

- from io import BytesIO

  from .loadwebindex import webidx

  from koji.server import ServerRedirect

+ from kojiweb.util import FieldStorageCompat

  

  

  class TestBuildTargetCreate(unittest.TestCase):
@@ -28,20 +27,12 @@ 

      def tearDown(self):

          mock.patch.stopall()

  

-     def get_fs(self, urlencode_data):

-         urlencode_environ = {

-             'CONTENT_LENGTH': str(len(urlencode_data)),

-             'CONTENT_TYPE': 'application/x-www-form-urlencoded',

-             'QUERY_STRING': '',

-             'REQUEST_METHOD': 'POST',

-         }

-         data = BytesIO(urlencode_data)

-         data.seek(0)

-         return cgi.FieldStorage(fp=data, environ=urlencode_environ)

+     def get_fs(self, query):

+         return FieldStorageCompat({'QUERY_STRING': query})

  

      def test_buildtargetcreate_add_case_exception(self):

          """Test buildtargetcreate function raises exception when buildtarget is not created."""

-         urlencode_data = b"add=True&name=testname&buildTag=11&destTag=99"

+         urlencode_data = "add=True&name=testname&buildTag=11&destTag=99"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):
@@ -63,7 +54,7 @@ 

  

      def test_buildtargetcreate_add_case_valid(self):

          """Test buildtargetcreate function valid case (add)."""

-         urlencode_data = b"add=True&name=testname&buildTag=11&destTag=99"

+         urlencode_data = "add=True&name=testname&buildTag=11&destTag=99"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):
@@ -85,7 +76,7 @@ 

  

      def test_buildtargetcreate_cancel_case(self):

          """Test buildtargetcreate function valid case (cancel)."""

-         urlencode_data = b"cancel=True"

+         urlencode_data = "cancel=True"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):
@@ -104,7 +95,7 @@ 

  

      def test_buildtargetcreate_another_case(self):

          """Test buildtargetcreate function valid case (another)."""

-         urlencode_data = b"another=True"

+         urlencode_data = "another=True"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):

@@ -1,11 +1,10 @@ 

  from unittest import mock

  import unittest

- import cgi

  

  import koji

- from io import BytesIO

  from .loadwebindex import webidx

  from koji.server import ServerRedirect

+ from kojiweb.util import FieldStorageCompat

  

  

  class TestBuildTargetEdit(unittest.TestCase):
@@ -28,20 +27,12 @@ 

      def tearDown(self):

          mock.patch.stopall()

  

-     def get_fs(self, urlencode_data):

-         urlencode_environ = {

-             'CONTENT_LENGTH': str(len(urlencode_data)),

-             'CONTENT_TYPE': 'application/x-www-form-urlencoded',

-             'QUERY_STRING': '',

-             'REQUEST_METHOD': 'POST',

-         }

-         data = BytesIO(urlencode_data)

-         data.seek(0)

-         return cgi.FieldStorage(fp=data, environ=urlencode_environ)

+     def get_fs(self, query):

+         return FieldStorageCompat({'QUERY_STRING': query})

  

      def test_buildtargetedit_exception(self):

          """Test buildtargetedit function raises exception when build target not exists."""

-         urlencode_data = b"save=True&name=testname&buildTag=11&destTag=99"

+         urlencode_data = "save=True&name=testname&buildTag=11&destTag=99"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):
@@ -63,7 +54,7 @@ 

  

      def test_buildtargetedit_exception_build_tag(self):

          """Test buildtargetedit function raises exception when build tag not exists."""

-         urlencode_data = b"save=True&name=testname&buildTag=11&destTag=99"

+         urlencode_data = "save=True&name=testname&buildTag=11&destTag=99"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):
@@ -86,7 +77,7 @@ 

  

      def test_buildtargetedit_exception_dest_tag(self):

          """Test buildtargetedit function raises exception when destination tag not exists."""

-         urlencode_data = b"save=True&name=testname&buildTag=11&destTag=99"

+         urlencode_data = "save=True&name=testname&buildTag=11&destTag=99"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):
@@ -110,7 +101,7 @@ 

  

      def test_buildtargetedit_save_case_valid(self):

          """Test buildtargetedit function valid case (save)."""

-         urlencode_data = b"save=True&name=testname&buildTag=11&destTag=99"

+         urlencode_data = "save=True&name=testname&buildTag=11&destTag=99"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):
@@ -140,7 +131,7 @@ 

      def test_buildtargetedit_cancel_case(self):

          """Test buildtargetedit function valid case (cancel)."""

          self.server.getBuildTarget.return_value = {'id': int(self.buildtarget_id)}

-         urlencode_data = b"cancel=True"

+         urlencode_data = "cancel=True"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):
@@ -161,7 +152,7 @@ 

  

      def test_buildtargetedit_another_case(self):

          """Test buildtargetedit function valid case (another)."""

-         urlencode_data = b"another=True"

+         urlencode_data = "another=True"

          fs = self.get_fs(urlencode_data)

          self.server.getBuildTarget.return_value = {'id': int(self.buildtarget_id)}

  

@@ -1,11 +1,10 @@ 

  import unittest

  import koji

- import cgi

  

  from unittest import mock

- from io import BytesIO

  from .loadwebindex import webidx

  from koji.server import ServerRedirect

+ from kojiweb.util import FieldStorageCompat

  

  

  class TestHostEdit(unittest.TestCase):
@@ -27,16 +26,8 @@ 

      def tearDown(self):

          mock.patch.stopall()

  

-     def get_fs(self, urlencode_data):

-         urlencode_environ = {

-             'CONTENT_LENGTH': str(len(urlencode_data)),

-             'CONTENT_TYPE': 'application/x-www-form-urlencoded',

-             'QUERY_STRING': '',

-             'REQUEST_METHOD': 'POST',

-         }

-         data = BytesIO(urlencode_data)

-         data.seek(0)

-         return cgi.FieldStorage(fp=data, environ=urlencode_environ)

+     def get_fs(self, query):

+         return FieldStorageCompat({'QUERY_STRING': query})

  

      def test_hostedit_exception_host_not_existing(self):

          """Test hostedit function raises exception when host ID not exists."""
@@ -55,8 +46,8 @@ 

  

      def test_hostedit_save_case_valid(self):

          """Test hostedit function valid case (save)."""

-         urlencode_data = b"save=True&arches=x86_64&capacity=1.0&description=test-desc&" \

-                          b"comment=test-comment&enable=True&channels=default"

+         urlencode_data = "save=True&arches=x86_64&capacity=1.0&description=test-desc&" \

+                          "comment=test-comment&enable=True&channels=default"

          fs = self.get_fs(urlencode_data)

          self.server.getHost.return_value = self.host_info

          self.server.editHost.return_value = True
@@ -84,7 +75,7 @@ 

  

      def test_hostedit_cancel_case_valid(self):

          """Test hostedit function valid case (cancel)."""

-         urlencode_data = b"cancel=True"

+         urlencode_data = "cancel=True"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):
@@ -107,7 +98,7 @@ 

  

      def test_hostedit_another_case(self):

          """Test hostedit function valid case (another)."""

-         urlencode_data = b"another=True"

+         urlencode_data = "another=True"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):

@@ -1,11 +1,10 @@ 

  from unittest import mock

  import unittest

- import cgi

  

  import koji

- from io import BytesIO

  from .loadwebindex import webidx

  from koji.server import ServerRedirect

+ from kojiweb.util import FieldStorageCompat

  

  

  class TestNotificationCreate(unittest.TestCase):
@@ -27,16 +26,8 @@ 

      def tearDown(self):

          mock.patch.stopall()

  

-     def get_fs(self, urlencode_data):

-         urlencode_environ = {

-             'CONTENT_LENGTH': str(len(urlencode_data)),

-             'CONTENT_TYPE': 'application/x-www-form-urlencoded',

-             'QUERY_STRING': '',

-             'REQUEST_METHOD': 'POST',

-         }

-         data = BytesIO(urlencode_data)

-         data.seek(0)

-         return cgi.FieldStorage(fp=data, environ=urlencode_environ)

+     def get_fs(self, query):

+         return FieldStorageCompat({'QUERY_STRING': query})

  

      def test_notificationcreate_add_case_not_logged(self):

          """Test notificationcreate function raises exception when user is not logged."""
@@ -47,7 +38,7 @@ 

              },

              'koji.currentUser': None,

          }

-         urlencode_data = b"add=True&package=2&tag=11"

+         urlencode_data = "add=True&package=2&tag=11"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):
@@ -66,7 +57,7 @@ 

  

      def test_notificationcreate_add_case_int(self):

          """Test notificationcreate function valid case (add)"""

-         urlencode_data = b"add=True&package=2&tag=11&success_only=True"

+         urlencode_data = "add=True&package=2&tag=11&success_only=True"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):
@@ -87,7 +78,7 @@ 

  

      def test_notificationcreate_add_case_all(self):

          """Test notificationcreate function valid case (add)"""

-         urlencode_data = b"add=True&package=all&tag=all"

+         urlencode_data = "add=True&package=all&tag=all"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):
@@ -107,7 +98,7 @@ 

  

      def test_notificationcreate_cancel_case(self):

          """Test notificationcreate function valid case (cancel)."""

-         urlencode_data = b"cancel=True"

+         urlencode_data = "cancel=True"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):
@@ -126,7 +117,7 @@ 

  

      def test_notificationcreate_another_case(self):

          """Test notificationcreate function valid case (another)."""

-         urlencode_data = b"another=True"

+         urlencode_data = "another=True"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):

@@ -1,11 +1,10 @@ 

  import unittest

  import koji

- import cgi

  

  from unittest import mock

- from io import BytesIO

  from .loadwebindex import webidx

  from koji.server import ServerRedirect

+ from kojiweb.util import FieldStorageCompat

  

  

  class TestNotificationEdit(unittest.TestCase):
@@ -28,16 +27,8 @@ 

      def tearDown(self):

          mock.patch.stopall()

  

-     def get_fs(self, urlencode_data):

-         urlencode_environ = {

-             'CONTENT_LENGTH': str(len(urlencode_data)),

-             'CONTENT_TYPE': 'application/x-www-form-urlencoded',

-             'QUERY_STRING': '',

-             'REQUEST_METHOD': 'POST',

-         }

-         data = BytesIO(urlencode_data)

-         data.seek(0)

-         return cgi.FieldStorage(fp=data, environ=urlencode_environ)

+     def get_fs(self, query):

+         return FieldStorageCompat({'QUERY_STRING': query})

  

      def test_notificationedit_exception(self):

          """Test notificationedit function raises exception when notification ID is not exists."""
@@ -54,7 +45,7 @@ 

  

      def test_notificationedit_save_case_int(self):

          """Test notificationedit function valid case (save)."""

-         urlencode_data = b"save=True&package=2&tag=11&success_only=True"

+         urlencode_data = "save=True&package=2&tag=11&success_only=True"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):
@@ -77,7 +68,7 @@ 

  

      def test_notificationedit_save_case_all(self):

          """Test notificationedit function valid case (all)."""

-         urlencode_data = b"save=True&package=all&tag=all"

+         urlencode_data = "save=True&package=all&tag=all"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):
@@ -100,7 +91,7 @@ 

  

      def test_notificationedit_cancel_case(self):

          """Test notificationedit function valid case (cancel)."""

-         urlencode_data = b"cancel=True"

+         urlencode_data = "cancel=True"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):
@@ -121,7 +112,7 @@ 

  

      def test_notificationedit_another_case(self):

          """Test notificationedit function valid case (another)."""

-         urlencode_data = b"another=True"

+         urlencode_data = "another=True"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):

file modified
+3 -12
@@ -1,10 +1,9 @@ 

  from unittest import mock

  import unittest

- import cgi

  

  import koji

- from io import BytesIO

  from .loadwebindex import webidx

+ from kojiweb.util import FieldStorageCompat

  

  

  class TestSearch(unittest.TestCase):
@@ -18,16 +17,8 @@ 

              },

              'koji.currentUser': None,

          }

-         urlencode_data = b"terms=test&type=package&match=testmatch"

-         urlencode_environ = {

-             'CONTENT_LENGTH': str(len(urlencode_data)),

-             'CONTENT_TYPE': 'application/x-www-form-urlencoded',

-             'QUERY_STRING': '',

-             'REQUEST_METHOD': 'POST',

-         }

-         data = BytesIO(urlencode_data)

-         data.seek(0)

-         self.fs = cgi.FieldStorage(fp=data, environ=urlencode_environ)

+         urlencode_data = "terms=test&type=package&match=testmatch"

+         self.fs = FieldStorageCompat({'QUERY_STRING': urlencode_data})

  

          def __get_server(env):

              env['koji.form'] = self.fs

@@ -1,10 +1,9 @@ 

  import unittest

- import cgi

  

  from unittest import mock

- from io import BytesIO

  from .loadwebindex import webidx

  from koji.server import ServerRedirect

+ from kojiweb.util import FieldStorageCompat

  

  

  class TestTagCreate(unittest.TestCase):
@@ -24,21 +23,13 @@ 

      def tearDown(self):

          mock.patch.stopall()

  

-     def get_fs(self, urlencode_data):

-         urlencode_environ = {

-             'CONTENT_LENGTH': str(len(urlencode_data)),

-             'CONTENT_TYPE': 'application/x-www-form-urlencoded',

-             'QUERY_STRING': '',

-             'REQUEST_METHOD': 'POST',

-         }

-         data = BytesIO(urlencode_data)

-         data.seek(0)

-         return cgi.FieldStorage(fp=data, environ=urlencode_environ)

+     def get_fs(self, query):

+         return FieldStorageCompat({'QUERY_STRING': query})

  

      def test_tagcreate_add_case_valid(self):

          """Test tagcreate function valid case (add)"""

-         urlencode_data = b"add=True&name=testname&arches=x86_64&locked=True&permission=1" \

-                          b"&maven_support=True&maven_include_all=True"

+         urlencode_data = "add=True&name=testname&arches=x86_64&locked=True&permission=1" \

+                          "&maven_support=True&maven_include_all=True"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):
@@ -60,7 +51,7 @@ 

  

      def test_tagcreate_cancel_case_valid(self):

          """Test tagcreate function valid cases (cancel)."""

-         urlencode_data = b"cancel=True"

+         urlencode_data = "cancel=True"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):
@@ -79,7 +70,7 @@ 

  

      def test_tagedit_another_case_valid(self):

          """Test tagedit function valid case (another)."""

-         urlencode_data = b"another=True"

+         urlencode_data = "another=True"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):

@@ -1,11 +1,10 @@ 

  import unittest

  import koji

- import cgi

  

  from unittest import mock

- from io import BytesIO

  from .loadwebindex import webidx

  from koji.server import ServerRedirect

+ from kojiweb.util import FieldStorageCompat

  

  

  class TestTagEdit(unittest.TestCase):
@@ -26,16 +25,9 @@ 

      def tearDown(self):

          mock.patch.stopall()

  

-     def get_fs(self, urlencode_data):

-         urlencode_environ = {

-             'CONTENT_LENGTH': str(len(urlencode_data)),

-             'CONTENT_TYPE': 'application/x-www-form-urlencoded',

-             'QUERY_STRING': '',

-             'REQUEST_METHOD': 'POST',

-         }

-         data = BytesIO(urlencode_data)

-         data.seek(0)

-         return cgi.FieldStorage(fp=data, environ=urlencode_environ)

+     def get_fs(self, query):

+         environ = {'QUERY_STRING': query}

+         return FieldStorageCompat(environ)

  

      def test_tagedit_exception(self):

          """Test tagedit function raises exception when tag ID not exists."""
@@ -52,8 +44,8 @@ 

  

      def test_tagedit_add_case_valid(self):

          """Test tagedit function valid case (save)."""

-         urlencode_data = b"save=True&name=testname&arches=x86_64&locked=True&permission=1" \

-                          b"&maven_support=True&maven_include_all=True"

+         urlencode_data = "save=True&name=testname&arches=x86_64&locked=True&permission=1" \

+                          "&maven_support=True&maven_include_all=True"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):
@@ -77,7 +69,7 @@ 

  

      def test_tagedit_cancel_case_valid(self):

          """Test tagedit function valid case (cancel)."""

-         urlencode_data = b"cancel=True"

+         urlencode_data = "cancel=True"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):
@@ -99,7 +91,7 @@ 

  

      def test_tagedit_another_case_valid(self):

          """Test tagedit function valid case (another)."""

-         urlencode_data = b"another=True"

+         urlencode_data = "another=True"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):

@@ -1,12 +1,11 @@ 

  from __future__ import absolute_import

  import unittest

  from unittest import mock

- import cgi

  

  import koji

- from io import BytesIO

  from koji.server import ServerRedirect

  from .loadwebindex import webidx

+ from kojiweb.util import FieldStorageCompat

  

  

  class TestActiveSessionDelete(unittest.TestCase):
@@ -33,16 +32,8 @@ 

      def tearDown(self):

          mock.patch.stopall()

  

-     def get_fs(self, urlencode_data):

-         urlencode_environ = {

-             'CONTENT_LENGTH': str(len(urlencode_data)),

-             'CONTENT_TYPE': 'application/x-www-form-urlencoded',

-             'QUERY_STRING': '',

-             'REQUEST_METHOD': 'POST',

-         }

-         data = BytesIO(urlencode_data)

-         data.seek(0)

-         return cgi.FieldStorage(fp=data, environ=urlencode_environ)

+     def get_fs(self, query):

+         return FieldStorageCompat({'QUERY_STRING': query})

  

      def test_tagparent_remove(self):

          """Test tagparent function with remove action."""
@@ -103,8 +94,8 @@ 

          parent_id = 123

          action = 'add'

          data = [{'parent_id': 111}]

-         urlencode_data = (b"add=true&priority=10&maxdepth=5&pkg_filter=pkg_filter&"

-                           b"intransitive=true&noconfig=false")

+         urlencode_data = ("add=true&priority=10&maxdepth=5&pkg_filter=pkg_filter&"

+                           "intransitive=true&noconfig=false")

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):
@@ -130,7 +121,7 @@ 

          tag_id = 456

          parent_id = 123

          action = 'add'

-         urlencode_data = b"cancel=true"

+         urlencode_data = "cancel=true"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):
@@ -154,7 +145,7 @@ 

          action = 'add'

          data = [{'parent_id': 111, 'priority': 1}]

  

-         urlencode_data = b"edit=true"

+         urlencode_data = "edit=true"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):
@@ -177,7 +168,7 @@ 

          action = 'add'

          data = [{'parent_id': 123, 'priority': 1}]

  

-         urlencode_data = b"edit=true"

+         urlencode_data = "edit=true"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):
@@ -202,7 +193,7 @@ 

                  {'parent_id': 123, 'priority': 2},

                  {'parent_id': 123, 'priority': 3}]

  

-         urlencode_data = b"edit=true"

+         urlencode_data = "edit=true"

          fs = self.get_fs(urlencode_data)

  

          def __get_server(env):

file modified
+3 -19
@@ -19,7 +19,6 @@ 

  # Authors:

  #       Mike McLean <mikem@redhat.com>

  

- import cgi

  import inspect

  import logging

  import os.path
@@ -241,29 +240,14 @@ 

          func = self.handler_index.get(method)

          if not func:

              raise URLNotFound

-         # parse form args

-         data = {}

-         fs = cgi.FieldStorage(fp=environ['wsgi.input'],

-                               environ=environ.copy(),

-                               keep_blank_values=True)

-         for field in fs.list:

-             if field.filename:

-                 val = field

-             else:

-                 val = field.value

-             data.setdefault(field.name, []).append(val)

-         # replace singleton lists with single values

-         # XXX - this is a bad practice, but for now we strive to emulate mod_python.publisher

-         for arg in data:

-             val = data[arg]

-             if isinstance(val, list) and len(val) == 1:

-                 data[arg] = val[0]

+         # parse url args

+         fs = kojiweb.util.FieldStorageCompat(environ)

          environ['koji.form'] = fs

          args, varargs, varkw, defaults, kwonlyargs, kwonlydefaults, ann = \

              inspect.getfullargspec(func)

          if not varkw:

              # remove any unexpected args

-             data = dslice(data, args, strict=False)

+             data = dslice(fs.data, args, strict=False)

              # TODO (warning in header or something?)

          return func, data

  

file modified
+52 -2
@@ -26,10 +26,11 @@ 

  import ssl

  import stat

  import urllib

- # a bunch of exception classes that explainError needs

+ from collections.abc import Mapping

+ from functools import wraps

  from socket import error as socket_error

+ from urllib.parse import parse_qs

  from xml.parsers.expat import ExpatError

- from functools import wraps

  

  import Cheetah.Template

  
@@ -206,6 +207,55 @@ 

      return tokens

  

  

+ class FieldStorageCompat(Mapping):

+     """Emulate the parts of cgi.FieldStorage that we need"""

+ 

+     def __init__(self, environ):

+         data = parse_qs(environ.get('QUERY_STRING', ''), strict_parsing=True,

+                         keep_blank_values=True)

+         # replace singleton lists with single values

+         for arg in data:

+             val = data[arg]

+             if isinstance(val, list) and len(val) == 1:

+                 data[arg] = val[0]

+         self.data = data

+ 

+     # we need getitem, iter, and len for the Mapping inheritance to work

+ 

+     def __getitem__(self, key):

+         return FieldCompat(self.data[key])

+ 

+     def __iter__(self):

+         iter(self.data)

+ 

+     def __len__(self):

+         return len(self.data)

+ 

+     def getfirst(self, name, default=None):

+         """Get first value from list entries"""

+         value = self.data.get(name, default)

+         if isinstance(value, (list, tuple)):

+             return value[0]

+         else:

+             return value

+ 

+     def getlist(self, name):

+         """Get value, wrap in list if not already"""

+         if name not in self.data:

+             return []

+         value = self.data[name]

+         if isinstance(value, (list, tuple)):

+             return value

+         else:

+             return [value]

+ 

+ 

+ class FieldCompat:

+ 

+     def __init__(self, value):

+         self.value = value

+ 

+ 

  def toggleOrder(template, sortKey, orderVar='order'):

      """

      If orderVar equals 'sortKey', return '-sortKey', else