Skip to content

Commit

Permalink
fix: cgi.FieldStorage not available in Python 3.13
Browse files Browse the repository at this point in the history
cgi.FieldStorage (used for multipart parsing) was deprecated
in Python 3.11 and removed in 3.13. We now have to ship our
own implementation (mostly a copy&paste from the `multipart` module)
  • Loading branch information
defnull committed Nov 4, 2023
1 parent 99341ff commit 2b3571c
Show file tree
Hide file tree
Showing 4 changed files with 304 additions and 21 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/run_tests.yml
Expand Up @@ -7,7 +7,7 @@ jobs:
strategy:
max-parallel: 5
matrix:
python-version: ['2.7', '3.6', '3.7', '3.8', '3.9', 'pypy-2.7', 'pypy-3.6', 'pypy-3.7']
python-version: ['2.7', '3.6', '3.7', '3.8', '3.9', '3.10', '3.11', '3.12']
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Expand Up @@ -3,7 +3,7 @@ ALLFILES = $(shell echo bottle.py test/*.py test/views/*.tpl)
VENV = build/venv
TESTBUILD = build/python

.PHONY: venv release coverage install docs test test_27 test_36 test_37 test_38 test_39 clean
.PHONY: venv release coverage install docs test clean

release: clean test_all venv
$(VENV)/bin/python3 setup.py --version | egrep -q -v '[a-zA-Z]' # Fail on dev/rc versions
Expand Down
320 changes: 302 additions & 18 deletions bottle.py
Expand Up @@ -1378,34 +1378,52 @@ def chunked(self):
def POST(self):
""" The values of :attr:`forms` and :attr:`files` combined into a single
:class:`FormsDict`. Values are either strings (form values) or
instances of :class:`cgi.FieldStorage` (file uploads).
instances of :class:`FileUpload`.
"""
post = FormsDict()
content_type, options = _parse_http_header(self.content_type)[0]
# We default to application/x-www-form-urlencoded for everything that
# is not multipart and take the fast path (also: 3.1 workaround)
if not self.content_type.startswith('multipart/'):
if not content_type.startswith('multipart/'):
body = tonat(self._get_body_string(self.MEMFILE_MAX), 'latin1')
for key, value in _parse_qsl(body):
post[key] = value
return post

safe_env = {'QUERY_STRING': ''} # Build a safe environment for cgi
for key in ('REQUEST_METHOD', 'CONTENT_TYPE', 'CONTENT_LENGTH'):
if key in self.environ: safe_env[key] = self.environ[key]
args = dict(fp=self.body, environ=safe_env, keep_blank_values=True)

if py3k:
args['encoding'] = 'utf8'
if py >= (3,11,0):
post.recode_unicode = False
data = cgi.FieldStorage(**args)
self['_cgi.FieldStorage'] = data #http://bugs.python.org/issue18394
data = data.list or []
for item in data:
if item.filename is None:
post[item.name] = item.value
else:
post[item.name] = FileUpload(item.file, item.name,
item.filename, item.headers)
charset = options.get("charset", "utf8")
boundary = options.get("boundary", "")
if not boundary:
raise MultipartError("Invalid content type header, missing boundary")
parser = _MultipartParser(self.body, boundary, self.content_length,
mem_limit=self.MEMFILE_MAX, memfile_limit=self.MEMFILE_MAX,
charset=charset)

for part in parser:
if not part.filename and part.is_buffered():
post[part.name] = part.value
else:
post[part.name] = FileUpload(part.file, part.name,
part.filename, part.headerlist)

else:
safe_env = {'QUERY_STRING': ''} # Build a safe environment for cgi
for key in ('REQUEST_METHOD', 'CONTENT_TYPE', 'CONTENT_LENGTH'):
if key in self.environ: safe_env[key] = self.environ[key]
args = dict(fp=self.body, environ=safe_env, keep_blank_values=True)
if py3k:
args['encoding'] = 'utf8'
post.recode_unicode = False
data = cgi.FieldStorage(**args)
self['_cgi.FieldStorage'] = data #http://bugs.python.org/issue18394
for item in data.list or []:
if item.filename is None:
post[item.name] = item.value
else:
post[item.name] = FileUpload(item.file, item.name,
item.filename, item.headers)

return post

@property
Expand Down Expand Up @@ -3197,6 +3215,272 @@ def wrapper(*a, **ka):
uninstall = make_default_app_wrapper('uninstall')
url = make_default_app_wrapper('get_url')


###############################################################################
# Multipart Handling ###########################################################
###############################################################################
# cgi.FieldStorage was deprecated in Python 3.11 and removed in 3.13
# This implementation is based on https://github.com/defnull/multipart/


class MultipartError(HTTPError):
def __init__(self, msg):
HTTPError.__init__(self, 400, "MultipartError: " + msg)


class _MultipartParser(object):
def __init__(
self,
stream,
boundary,
content_length=-1,
disk_limit=2 ** 30,
mem_limit=2 ** 20,
memfile_limit=2 ** 18,
buffer_size=2 ** 16,
charset="latin1",
):
""" Parse a multipart/form-data byte stream. This object is an iterator
over the parts of the message.
:param stream: A file-like stream. Must implement ``.read(size)``.
:param boundary: The multipart boundary as a byte string.
:param content_length: The maximum number of bytes to read.
"""
self.stream = stream
self.boundary = boundary
self.content_length = content_length
self.disk_limit = disk_limit
self.memfile_limit = memfile_limit
self.mem_limit = min(mem_limit, self.disk_limit)
self.buffer_size = min(buffer_size, self.mem_limit)
self.charset = charset

if self.buffer_size - 6 < len(boundary): # "--boundary--\r\n"
raise MultipartError("Boundary does not fit into buffer_size.")

self._done = []
self._part_iter = None

def __iter__(self):
""" Iterate over the parts of the multipart message. """
if not self._part_iter:
self._part_iter = self._iterparse()

for part in self._done:
yield part

for part in self._part_iter:
self._done.append(part)
yield part

def _lineiter(self):
""" Iterate over a binary file-like object line by line. Each line is
returned as a (line, line_ending) tuple. If the line does not fit
into self.buffer_size, line_ending is empty and the rest of the line
is returned with the next iteration.
"""
read = self.stream.read
maxread, maxbuf = self.content_length, self.buffer_size
buffer = b"" # buffer for the last (partial) line

while True:
data = read(maxbuf if maxread < 0 else min(maxbuf, maxread))
maxread -= len(data)
lines = (buffer + data).splitlines(True)
len_first_line = len(lines[0])

# be sure that the first line does not become too big
if len_first_line > self.buffer_size:
# at the same time don't split a '\r\n' accidentally
if len_first_line == self.buffer_size + 1 and lines[0].endswith(b"\r\n"):
splitpos = self.buffer_size - 1
else:
splitpos = self.buffer_size
lines[:1] = [lines[0][:splitpos], lines[0][splitpos:]]

if data:
buffer = lines[-1]
lines = lines[:-1]

for line in lines:
if line.endswith(b"\r\n"):
yield line[:-2], b"\r\n"
elif line.endswith(b"\n"):
yield line[:-1], b"\n"
elif line.endswith(b"\r"):
yield line[:-1], b"\r"
else:
yield line, b""

if not data:
break

def _iterparse(self):
lines, line = self._lineiter(), ""
separator = b"--" + tob(self.boundary)
terminator = b"--" + tob(self.boundary) + b"--"

# Consume first boundary. Ignore any preamble, as required by RFC
# 2046, section 5.1.1.
for line, nl in lines:
if line in (separator, terminator):
break
else:
raise MultipartError("Stream does not contain boundary")

# Check for empty data
if line == terminator:
for _ in lines:
raise MultipartError("Data after end of stream")
return

# For each part in stream...
mem_used, disk_used = 0, 0 # Track used resources to prevent DoS
is_tail = False # True if the last line was incomplete (cutted)

opts = {
"buffer_size": self.buffer_size,
"memfile_limit": self.memfile_limit,
"charset": self.charset,
}

part = _MultipartPart(**opts)

for line, nl in lines:
if line == terminator and not is_tail:
part.file.seek(0)
yield part
break

elif line == separator and not is_tail:
if part.is_buffered():
mem_used += part.size
else:
disk_used += part.size
part.file.seek(0)

yield part

part = _MultipartPart(**opts)

else:
is_tail = not nl # The next line continues this one
try:
part.feed(line, nl)

if part.is_buffered():
if part.size + mem_used > self.mem_limit:
raise MultipartError("Memory limit reached.")
elif part.size + disk_used > self.disk_limit:
raise MultipartError("Disk limit reached.")
except MultipartError:
part.close()
raise
else:
part.close()

if line != terminator:
raise MultipartError("Unexpected end of multipart stream.")


class _MultipartPart(object):
def __init__(self, buffer_size=2 ** 16, memfile_limit=2 ** 18, charset="latin1"):
self.headerlist = []
self.headers = None
self.file = False
self.size = 0
self._buf = b""
self.disposition = None
self.name = None
self.filename = None
self.content_type = None
self.charset = charset
self.memfile_limit = memfile_limit
self.buffer_size = buffer_size

def feed(self, line, nl=""):
if self.file:
return self.write_body(line, nl)
return self.write_header(line, nl)

def write_header(self, line, nl):
line = line.decode(self.charset)

if not nl:
raise MultipartError("Unexpected end of line in header.")

if not line.strip(): # blank line -> end of header segment
self.finish_header()
elif line[0] in " \t" and self.headerlist:
name, value = self.headerlist.pop()
self.headerlist.append((name, value + line.strip()))
else:
if ":" not in line:
raise MultipartError("Syntax error in header: No colon.")

name, value = line.split(":", 1)
self.headerlist.append((name.strip(), value.strip()))

def write_body(self, line, nl):
if not line and not nl:
return # This does not even flush the buffer

self.size += len(line) + len(self._buf)
self.file.write(self._buf + line)
self._buf = nl

if self.content_length > 0 and self.size > self.content_length:
raise MultipartError("Size of body exceeds Content-Length header.")

if self.size > self.memfile_limit and isinstance(self.file, BytesIO):
self.file, old = NamedTemporaryFile(mode="w+b"), self.file
old.seek(0)
copy_file(old, self.file, self.size, self.buffer_size)

def finish_header(self):
self.file = BytesIO()
self.headers = HeaderDict(self.headerlist)
content_disposition = self.headers.get("Content-Disposition", "")
content_type = self.headers.get("Content-Type", "")

if not content_disposition:
raise MultipartError("Content-Disposition header is missing.")

self.disposition, self.options = _parse_http_header(content_disposition)[0]
self.name = self.options.get("name")
if "filename" in self.options:
self.filename = self.options.get("filename")
if self.filename[1:3] == ":\\" or self.filename[:2] == "\\\\":
self.filename = self.filename.split("\\")[-1] # ie6 bug

self.content_type, options = _parse_http_header(content_type)[0]
self.charset = options.get("charset") or self.charset
self.content_length = int(self.headers.get("Content-Length", "-1"))

def is_buffered(self):
""" Return true if the data is fully buffered in memory."""
return isinstance(self.file, BytesIO)

@property
def value(self):
""" Data decoded with the specified charset """

return self.raw.decode(self.charset)

@property
def raw(self):
""" Data without decoding """
pos = self.file.tell()
self.file.seek(0)

try:
return self.file.read()
finally:
self.file.seek(pos)



###############################################################################
# Server Adapter ###############################################################
###############################################################################
Expand Down
1 change: 0 additions & 1 deletion test/test_environ.py
Expand Up @@ -349,7 +349,6 @@ def test_multipart(self):
self.assertEqual('value1', request.POST['field1'])
self.assertTrue('field1' not in request.files)
self.assertEqual('value1', request.forms['field1'])
print(request.forms.dict, request.forms.recode_unicode)
self.assertEqual('万难', request.forms['field2'])
self.assertEqual(touni('万难'), request.forms.field2)
# Field (multi)
Expand Down

0 comments on commit 2b3571c

Please sign in to comment.