Skip to content

Commit

Permalink
Calculate MD5 for each S3 multipart upload incrementally
Browse files Browse the repository at this point in the history
If ContentMD5 is not passed, botocore process the entire file to work
out the MD5 for the upload to S3:

https://github.com/boto/botocore/blob/1ae092ed95b39c3336ba48c5cbd71b7c37924d05/botocore/handlers.py#L200

It's likely better in many situations to calculate the MD5 for each set
of bytes as they come in: the data is already in memory/caches.

For event-loop based applications, where file io is often still
blocking, this is especially valuable: botocore would block the event
loop when it tried to calculate MD5.
  • Loading branch information
michalc committed Feb 24, 2019
1 parent efcad7f commit 1b31762
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 3 deletions.
1 change: 1 addition & 0 deletions AUTHORS
Expand Up @@ -36,6 +36,7 @@ By order of apparition, thanks:
* Alex Watt (Google Cloud Storage patch)
* Jumpei Yoshimura (S3 docs)
* Jon Dufresne
* Michal Charemza



Expand Down
13 changes: 11 additions & 2 deletions storages/backends/s3boto3.py
@@ -1,3 +1,5 @@
import base64
import hashlib
import io
import mimetypes
import os
Expand Down Expand Up @@ -72,6 +74,7 @@ def __init__(self, name, mode, storage, buffer_size=None):
self.obj.load()
self._is_dirty = False
self._file = None
self._file_md5 = hashlib.md5()
self._multipart = None
# 5 MB is the minimum part size (if there is more than one part).
# Amazon allows up to 10,000 parts. The default supports uploads
Expand Down Expand Up @@ -127,7 +130,9 @@ def write(self, content):
self._multipart = self.obj.initiate_multipart_upload(**parameters)
if self.buffer_size <= self._buffer_file_size:
self._flush_write_buffer()
return super(S3Boto3StorageFile, self).write(force_bytes(content))
content_bytes = force_bytes(content)
self._file_md5.update(content_bytes)
return super(S3Boto3StorageFile, self).write(content_bytes)

@property
def _buffer_file_size(self):
Expand All @@ -145,9 +150,13 @@ def _flush_write_buffer(self):
self._write_counter += 1
self.file.seek(0)
part = self._multipart.Part(self._write_counter)
part.upload(Body=self.file.read())
part.upload(
Body=self.file.read(),
ContentMD5=base64.b64encode(self._file_md5.digest())
)
self.file.seek(0)
self.file.truncate()
self._file_md5 = hashlib.md5()

def close(self):
if self._is_dirty:
Expand Down
14 changes: 13 additions & 1 deletion tests/test_s3boto3.py
@@ -1,7 +1,9 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals

import base64
import gzip
import hashlib
import pickle
import threading
import warnings
Expand Down Expand Up @@ -269,7 +271,8 @@ def test_storage_open_write(self):
file.close()
multipart.Part.assert_called_with(1)
part = multipart.Part.return_value
part.upload.assert_called_with(Body=content.encode('utf-8'))
content_md5 = base64.b64encode(hashlib.md5(content.encode('utf-8')).digest())
part.upload.assert_called_with(Body=content.encode('utf-8'), ContentMD5=content_md5)
multipart.complete.assert_called_once_with(
MultipartUpload={'Parts': [{'ETag': '123', 'PartNumber': 1}]})

Expand Down Expand Up @@ -328,6 +331,15 @@ def test_storage_write_beyond_buffer_size(self):
for args_list in part.upload.call_args_list)
)
self.assertEqual(uploaded_content, written_content)
uploaded_md5s = [
args_list[1]['ContentMD5']
for args_list in part.upload.call_args_list
]
correct_md5s = [
base64.b64encode(hashlib.md5(args_list[1]['Body']).digest())
for args_list in part.upload.call_args_list
]
self.assertListEqual(uploaded_md5s, correct_md5s)
multipart.complete.assert_called_once_with(
MultipartUpload={'Parts': [
{'ETag': '123', 'PartNumber': 1},
Expand Down

0 comments on commit 1b31762

Please sign in to comment.