forked from streamlit/streamlit
/
file_uploader_test.py
164 lines (131 loc) · 6.43 KB
/
file_uploader_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# Copyright 2018-2022 Streamlit Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""file_uploader unit test."""
from unittest.mock import patch
import streamlit as st
from streamlit import config
from streamlit.script_run_context import get_script_run_ctx
from streamlit.uploaded_file_manager import UploadedFileRec, UploadedFile
from tests import testutil
class FileUploaderTest(testutil.DeltaGeneratorTestCase):
def test_just_label(self):
"""Test that it can be called with no other values."""
st.file_uploader("the label")
c = self.get_delta_from_queue().new_element.file_uploader
self.assertEqual(c.label, "the label")
self.assertEqual(c.disabled, False)
def test_just_disabled(self):
"""Test that it can be called with disabled param."""
st.file_uploader("the label", disabled=True)
c = self.get_delta_from_queue().new_element.file_uploader
self.assertEqual(c.disabled, True)
def test_single_type(self):
"""Test that it can be called using a string for type parameter."""
st.file_uploader("the label", type="png")
c = self.get_delta_from_queue().new_element.file_uploader
self.assertEqual(c.type, [".png"])
def test_multiple_types(self):
"""Test that it can be called using an array for type parameter."""
st.file_uploader("the label", type=["png", ".svg", "jpeg"])
c = self.get_delta_from_queue().new_element.file_uploader
self.assertEqual(c.type, [".png", ".svg", ".jpeg"])
@patch("streamlit.elements.file_uploader.FileUploaderMixin._get_file_recs")
def test_multiple_files(self, get_file_recs_patch):
"""Test the accept_multiple_files flag"""
# Patch UploadFileManager to return two files
file_recs = [
UploadedFileRec(1, "file1", "type", b"123"),
UploadedFileRec(2, "file2", "type", b"456"),
]
get_file_recs_patch.return_value = file_recs
for accept_multiple in [True, False]:
return_val = st.file_uploader(
"label", type="png", accept_multiple_files=accept_multiple
)
c = self.get_delta_from_queue().new_element.file_uploader
self.assertEqual(accept_multiple, c.multiple_files)
# If "accept_multiple_files" is True, then we should get a list of
# values back. Otherwise, we should just get a single value.
# Because file_uploader returns unique UploadedFile instances
# each time it's called, we convert the return value back
# from UploadedFile -> UploadedFileRec (which implements
# equals()) to test equality.
if accept_multiple:
results = [
UploadedFileRec(file.id, file.name, file.type, file.getvalue())
for file in return_val
]
self.assertEqual(file_recs, results)
else:
results = UploadedFileRec(
return_val.id,
return_val.name,
return_val.type,
return_val.getvalue(),
)
self.assertEqual(file_recs[0], results)
def test_max_upload_size_mb(self):
"""Test that the max upload size is the configuration value."""
st.file_uploader("the label")
c = self.get_delta_from_queue().new_element.file_uploader
self.assertEqual(
c.max_upload_size_mb, config.get_option("server.maxUploadSize")
)
@patch("streamlit.elements.file_uploader.FileUploaderMixin._get_file_recs")
def test_unique_uploaded_file_instance(self, get_file_recs_patch):
"""We should get a unique UploadedFile instance each time we access
the file_uploader widget."""
# Patch UploadFileManager to return two files
file_recs = [
UploadedFileRec(1, "file1", "type", b"123"),
UploadedFileRec(2, "file2", "type", b"456"),
]
get_file_recs_patch.return_value = file_recs
# These file_uploaders have different labels so that we don't cause
# a DuplicateKey error - but because we're patching the get_files
# function, both file_uploaders will refer to the same files.
file1: UploadedFile = st.file_uploader("a", accept_multiple_files=False)
file2: UploadedFile = st.file_uploader("b", accept_multiple_files=False)
self.assertNotEqual(id(file1), id(file2))
# Seeking in one instance should not impact the position in the other.
file1.seek(2)
self.assertEqual(b"3", file1.read())
self.assertEqual(b"123", file2.read())
@patch("streamlit.uploaded_file_manager.UploadedFileManager.remove_orphaned_files")
@patch("streamlit.elements.file_uploader.FileUploaderMixin._get_file_recs")
def test_remove_orphaned_files(
self, get_file_recs_patch, remove_orphaned_files_patch
):
"""When file_uploader is accessed, it should call
UploadedFileManager.remove_orphaned_files.
"""
ctx = get_script_run_ctx()
ctx.uploaded_file_mgr._file_id_counter = 101
file_recs = [
UploadedFileRec(1, "file1", "type", b"123"),
UploadedFileRec(2, "file2", "type", b"456"),
]
get_file_recs_patch.return_value = file_recs
st.file_uploader("foo", accept_multiple_files=True)
args, kwargs = remove_orphaned_files_patch.call_args
self.assertEqual(len(args), 0)
self.assertEqual(kwargs["session_id"], "test session id")
self.assertEqual(kwargs["newest_file_id"], 100)
self.assertEqual(kwargs["active_file_ids"], [1, 2])
# Patch _get_file_recs to return [] instead. remove_orphaned_files
# should not be called when file_uploader is accessed.
get_file_recs_patch.return_value = []
remove_orphaned_files_patch.reset_mock()
st.file_uploader("foo")
remove_orphaned_files_patch.assert_not_called()