#4370 adjust upload offset and overwrite logic
Merged a month ago by tkopecek. Opened 2 months ago by mikem.
mikem/koji upload-fixes  into  master

file modified
+42 -4
@@ -16268,8 +16268,10 @@ 

      start = time.time()

      if not context.session.logged_in:

          raise koji.ActionNotAllowed('you must be logged-in to upload a file')

+ 

+     # read upload parameters

      args = parse_qs(environ.get('QUERY_STRING', ''), strict_parsing=True)

-     # XXX - already parsed by auth

+     # TODO - unify with earlier query parsing in auth?

      name = args['filename'][0]

      path = args.get('filepath', ('',))[0]

      verify = args.get('fileverify', ('',))[0]
@@ -16277,26 +16279,50 @@ 

      offset = args.get('offset', ('0',))[0]

      offset = int(offset)

      volume = args.get('volume', ('DEFAULT',))[0]

+ 

+     # check upload destination

      fn = get_upload_path(path, name, create=True, volume=volume)

-     if os.path.exists(fn):

-         if not os.path.isfile(fn):

+     try:

+         st = os.lstat(fn)

+     except FileNotFoundError:

+         st = None

+     if st:

+         if stat.S_ISLNK(st.st_mode):

+             # upload paths should never by symlinks

+             raise koji.GenericError("destination is a symlink: %s" % fn)

+         if not stat.S_ISREG(st.st_mode):

              raise koji.GenericError("destination not a file: %s" % fn)

          if offset == 0 and not overwrite:

              raise koji.GenericError("upload path exists: %s" % fn)

+ 

+     # handle the upload

      chksum = get_verify_class(verify)()

      size = 0

      inf = environ['wsgi.input']

      fd = os.open(fn, os.O_RDWR | os.O_CREAT, 0o666)

      try:

+         # acquire lock

          try:

              fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)

          except IOError as e:

              raise koji.LockError(e)

+ 

+         # handle offset

          if offset == -1:

+             # append to end of file

              offset = os.lseek(fd, 0, 2)

-         else:

+         elif overwrite:

+             # if we're overwriting an older upload (e.g. logs for restarted task)

+             # then truncate the file at the write offset

              os.ftruncate(fd, offset)

              os.lseek(fd, offset, 0)

+         else:

+             # if we're not overwriting, then do not truncate

+             # note that call may be a retry, so we may be writing over an existing chunk

+             os.lseek(fd, offset, 0)

+             # in this case we have an additional check after the loop

+ 

+         # i/o loop

          while True:

              try:

                  chunk = inf.read(65536)
@@ -16317,9 +16343,21 @@ 

              if verify:

                  chksum.update(chunk)

              os.write(fd, chunk)

+ 

+         # offset check

+         if not overwrite:

+             # end of file should match where we think we are

+             flen = os.lseek(fd, 0, 2)

+             expected = offset + size

+             if flen != expected:

+                 raise koji.GenericError(f"Incorrect upload length for {fn} - "

+                                         f"Expected {expected}, actual {flen}")

+ 

      finally:

          # this will also remove our lock

          os.close(fd)

+ 

+     # return some basic stats for client verification

      ret = {

          'size': size,

          'fileverify': verify,

@@ -0,0 +1,221 @@ 

+ import os

+ import io

+ from unittest import mock

+ import shutil

+ import tempfile

+ import urllib.parse

+ import unittest

+ 

+ from kojihub import kojihub

+ import koji

+ 

+ 

+ class TestHandleUpload(unittest.TestCase):

+ 

+     def setUp(self):

+         self.tempdir = tempfile.mkdtemp()

+         self.pathinfo = koji.PathInfo(self.tempdir)

+         mock.patch('koji.pathinfo', new=self.pathinfo).start()

+         self.lookup_name = mock.patch('kojihub.kojihub.lookup_name').start()

+         self.context = mock.patch('kojihub.kojihub.context').start()

+         self.context.session.logged_in = True

+         self.context.session.user_id = 1

+ 

+     def tearDown(self):

+         shutil.rmtree(self.tempdir)

+         mock.patch.stopall()

+ 

+     def test_simple_upload(self):

+         environ = {}

+         args = {

+             'filename': 'hello.txt',

+             'filepath': 'FOO',

+             'fileverify': 'adler32',

+             'offset': '0',

+         }

+         environ['QUERY_STRING'] = urllib.parse.urlencode(args)

+         contents = b'hello world\n'

+         environ['wsgi.input'] = io.BytesIO(contents)

+ 

+         # upload

+         kojihub.handle_upload(environ)

+ 

+         # verify

+         fn = f'{self.tempdir}/work/FOO/hello.txt'

+         self.assertEqual(contents, open(fn, 'rb').read())

+ 

+     def test_no_overwrite(self):

+         environ = {}

+         args = {

+             # overwrite should default to False

+             'filename': 'hello.txt',

+             'filepath': 'FOO',

+             'fileverify': 'adler32',

+             'offset': '0',

+         }

+         environ['QUERY_STRING'] = urllib.parse.urlencode(args)

+         contents = b'hello world\n'

+         environ['wsgi.input'] = io.BytesIO(contents)

+         fn = f'{self.tempdir}/work/FOO/hello.txt'

+         koji.ensuredir(os.path.dirname(fn))

+         with open(fn, 'wt') as fp:

+             fp.write('already exists')

+ 

+         # upload

+         with self.assertRaises(koji.GenericError) as ex:

+             kojihub.handle_upload(environ)

+ 

+         # verify error

+         self.assertIn('upload path exists', str(ex.exception))

+ 

+     def test_no_symlink(self):

+         environ = {}

+         args = {

+             # overwrite should default to False

+             'filename': 'hello.txt',

+             'filepath': 'FOO',

+             'fileverify': 'adler32',

+             'offset': '0',

+         }

+         environ['QUERY_STRING'] = urllib.parse.urlencode(args)

+         fn = f'{self.tempdir}/work/FOO/hello.txt'

+         koji.ensuredir(os.path.dirname(fn))

+         os.symlink('link_target', fn)

+ 

+         # upload

+         with self.assertRaises(koji.GenericError) as ex:

+             kojihub.handle_upload(environ)

+ 

+         # verify error

+         self.assertIn('destination is a symlink', str(ex.exception))

+ 

+     def test_no_nonfile(self):

+         environ = {}

+         args = {

+             # overwrite should default to False

+             'filename': 'hello.txt',

+             'filepath': 'FOO',

+             'fileverify': 'adler32',

+             'offset': '0',

+         }

+         environ['QUERY_STRING'] = urllib.parse.urlencode(args)

+         fn = f'{self.tempdir}/work/FOO/hello.txt'

+         koji.ensuredir(os.path.dirname(fn))

+         os.mkdir(fn)

+ 

+         # upload

+         with self.assertRaises(koji.GenericError) as ex:

+             kojihub.handle_upload(environ)

+ 

+         # verify error

+         self.assertIn('destination not a file', str(ex.exception))

+ 

+     def test_login_required(self):

+         environ = {}

+         self.context.session.logged_in = False

+ 

+         with self.assertRaises(koji.ActionNotAllowed):

+             kojihub.handle_upload(environ)

+ 

+     def test_retry(self):

+         # uploading the same chunk twice should be fine

+         environ = {}

+         args = {

+             'filename': 'hello.txt',

+             'filepath': 'FOO',

+             'fileverify': 'adler32',

+             'offset': '0',

+         }

+         environ['QUERY_STRING'] = urllib.parse.urlencode(args)

+         contents = b'hello world\nthis is line two'

+         chunks = contents.splitlines(keepends=True)

+ 

+         # chunk 0

+         environ['wsgi.input'] = io.BytesIO(chunks[0])

+         kojihub.handle_upload(environ)

+ 

+         # chunk 1

+         environ['wsgi.input'] = io.BytesIO(chunks[1])

+         args['offset'] = str(len(chunks[0]))

+         environ['QUERY_STRING'] = urllib.parse.urlencode(args)

+         kojihub.handle_upload(environ)

+ 

+         # chunk 1, again

+         environ['wsgi.input'] = io.BytesIO(chunks[1])

+         kojihub.handle_upload(environ)

+ 

+         # verify

+         fn = f'{self.tempdir}/work/FOO/hello.txt'

+         self.assertEqual(contents, open(fn, 'rb').read())

+ 

+     def test_no_truncate(self):

+         # uploading a chunk out of order without overwrite should:

+         # 1. not truncate

+         # 2. error

+         environ = {}

+         args = {

+             'filename': 'hello.txt',

+             'filepath': 'FOO',

+             'fileverify': 'adler32',

+             'offset': '0',

+         }

+         environ['QUERY_STRING'] = urllib.parse.urlencode(args)

+         contents = b'hello world\nthis is line two\nthis is line three'

+         chunks = contents.splitlines(keepends=True)

+ 

+         # chunk 0

+         environ['wsgi.input'] = io.BytesIO(chunks[0])

+         kojihub.handle_upload(environ)

+ 

+         # chunk 1

+         environ['wsgi.input'] = io.BytesIO(chunks[1])

+         args['offset'] = str(len(chunks[0]))

+         environ['QUERY_STRING'] = urllib.parse.urlencode(args)

+         kojihub.handle_upload(environ)

+ 

+         # chunk 2

+         environ['wsgi.input'] = io.BytesIO(chunks[2])

+         args['offset'] = str(len(chunks[0]) + len(chunks[1]))

+         environ['QUERY_STRING'] = urllib.parse.urlencode(args)

+         kojihub.handle_upload(environ)

+ 

+         # chunk 1, again

+         environ['wsgi.input'] = io.BytesIO(chunks[1])

+         args['offset'] = str(len(chunks[0]))

+         environ['QUERY_STRING'] = urllib.parse.urlencode(args)

+         with self.assertRaises(koji.GenericError) as ex:

+             kojihub.handle_upload(environ)

+ 

+         # verify

+         self.assertIn('Incorrect upload length', str(ex.exception))

+         # previous upload contents should still be there

+         fn = f'{self.tempdir}/work/FOO/hello.txt'

+         self.assertEqual(contents, open(fn, 'rb').read())

+ 

+     def test_truncate(self):

+         # uploading a chunk with overwrite SHOULD truncate:

+         environ = {}

+         args = {

+             'filename': 'hello.txt',

+             'filepath': 'FOO',

+             'fileverify': 'adler32',

+             'offset': '0',

+         }

+         environ['QUERY_STRING'] = urllib.parse.urlencode(args)

+         contents1 = b'hello world\nthis is line two\nthis is line three'

+         contents2 = b'hello world\n'

+ 

+         # pass1

+         environ['wsgi.input'] = io.BytesIO(contents1)

+         kojihub.handle_upload(environ)

+ 

+         # pass2

+         args['overwrite'] = '1'

+         environ['QUERY_STRING'] = urllib.parse.urlencode(args)

+         environ['wsgi.input'] = io.BytesIO(contents2)

+         kojihub.handle_upload(environ)

+ 

+         # verify

+         fn = f'{self.tempdir}/work/FOO/hello.txt'

+         self.assertEqual(contents2, open(fn, 'rb').read())

+ # the end

@@ -0,0 +1,94 @@ 

+ import io

+ import shutil

+ import tempfile

+ import urllib.parse

+ import unittest

+ from unittest import mock

+ 

+ import koji

+ from kojihub import kojihub

+ 

+ """

+ This test involves both client and hub code.

+ Since hub code has higher requirements, we group it there.

+ """

+ 

+ 

+ def get_callMethod(self):

+     # create a function to replace ClientSession._callMethod

+     # self is the session instance

+ 

+     def my_callMethod(name, args, kwargs=None, retry=True):

+         # we only handle the methods that fastUpload will use

+         handler, headers, request = self._prepCall(name, args, kwargs)

+         self.retries = 0

+         if name == 'rawUpload':

+             parts = urllib.parse.urlparse(handler)

+             query = parts[4]

+             environ = {

+                 'QUERY_STRING': query,

+                 'wsgi.input': io.BytesIO(request),

+                 'CONTENT_LENGTH': len(request),

+             }

+             return kojihub.handle_upload(environ)

+         elif name == 'checkUpload':

+             exports = kojihub.RootExports()

+             return exports.checkUpload(*args, **kwargs)

+         # else

+         raise ValueError(f'Unexected call {name}')

+ 

+     return my_callMethod

+ 

+ 

+ class TestHandleUpload(unittest.TestCase):

+ 

+     def setUp(self):

+         self.tempdir = tempfile.mkdtemp()

+         self.pathinfo = koji.PathInfo(self.tempdir)

+         mock.patch('koji.pathinfo', new=self.pathinfo).start()

+         self.lookup_name = mock.patch('kojihub.kojihub.lookup_name').start()

+         self.context = mock.patch('kojihub.kojihub.context').start()

+         self.context.session.logged_in = True

+         self.context.session.user_id = 1

+         self.session = koji.ClientSession('https://koji.example.com/NOTUSED')

+         self.session._callMethod = get_callMethod(self.session)

+ 

+     def tearDown(self):

+         shutil.rmtree(self.tempdir)

+         mock.patch.stopall()

+ 

+     def test_upload_client(self):

+         # write a test file

+         contents = b'Hello World. Upload me.\n' * 100

+         orig = f'{self.tempdir}/orig.txt'

+         with open(orig, 'wb') as fp:

+             fp.write(contents)

+         sinfo = {'session-id': '123', 'session-key': '456', 'callnum': 1}

+         self.session.setSession(sinfo)  # marks logged in

+ 

+         self.session.fastUpload(orig, 'testpath', blocksize=137)

+ 

+         # files should be identical

+         dup = f'{self.tempdir}/work/testpath/orig.txt'

+         with open(dup, 'rb') as fp:

+             check = fp.read()

+         self.assertEqual(check, contents)

+ 

+     def test_upload_client_overwrite(self):

+         # same as above, but with overwrite flag

+         contents = b'Hello World. Upload me.\n' * 100

+         orig = f'{self.tempdir}/orig.txt'

+         with open(orig, 'wb') as fp:

+             fp.write(contents)

+         sinfo = {'session-id': '123', 'session-key': '456', 'callnum': 1}

+         self.session.setSession(sinfo)  # marks logged in

+ 

+         self.session.fastUpload(orig, 'testpath', blocksize=137, overwrite=True)

+ 

+         # files should be identical

+         dup = f'{self.tempdir}/work/testpath/orig.txt'

+         with open(dup, 'rb') as fp:

+             check = fp.read()

+         self.assertEqual(check, contents)

+ 

+ # the end

The Fedora instance ran into a situation where the code managed to leave a truncated upload without generating an error. Their case is unusual for a few reasons (multiple hubs behind reverse proxy, extremely high load from ai bots), but I think we can make the code somewhat better.

This PR makes a few adjustments to the overwrite logic and adds a number of tests.

Fixes https://pagure.io/koji/issue/4371

While I can't be 100% sure, I'm fairly confident that my second theory on the linked issue is what happened (duplicate, delayed upload call handler). If that theory is correct, then this code should prevent similar truncations (see test_no_truncate)

unused import (koji.GenericError is used instead)

Metadata Update from @tkopecek:
- Pull-request tagged with: testing-basic

2 months ago

1 new commit added

  • drop unused import in test
2 months ago

Metadata Update from @mfilip:
- Pull-request tagged with: testing-done

a month ago

Commit c44a33c fixes this pull-request

Pull-Request has been merged by tkopecek

a month ago