Skip to content

Commit

Permalink
MAINT: simplify file identifiers generation
Browse files Browse the repository at this point in the history
  • Loading branch information
exiledkingcc committed Jul 22, 2023
1 parent 6df64af commit 5fd1e91
Showing 1 changed file with 28 additions and 21 deletions.
49 changes: 28 additions & 21 deletions pypdf/_writer.py
Expand Up @@ -33,6 +33,7 @@
import enum
import hashlib
import re
import time
import uuid
import warnings
from io import BytesIO, FileIO, IOBase
Expand Down Expand Up @@ -136,13 +137,6 @@ class ObjectDeletionFlag(enum.IntFlag):
ALL_ANNOTATIONS = enum.auto()


def _rolling_checksum(stream: BytesIO, blocksize: int = 65536) -> str:
hash = hashlib.md5()
for block in iter(lambda: stream.read(blocksize), b""):
hash.update(block)
return hash.hexdigest()


class PdfWriter:
"""
Write a PDF file out, given pages produced by another class.
Expand Down Expand Up @@ -206,6 +200,7 @@ def __init__(

self._encryption: Optional[Encryption] = None
self._encrypt_entry: Optional[DictionaryObject] = None
self._ID: Union[ArrayObject, None] = None

def __enter__(self) -> "PdfWriter":
"""Store that writer is initialized by 'with'."""
Expand Down Expand Up @@ -1128,25 +1123,35 @@ def cloneDocumentFromReader(
)
self.clone_document_from_reader(reader, after_page_append)

def _compute_document_identifier_from_content(self) -> ByteStringObject:
stream = BytesIO()
self._write_pdf_structure(stream)
stream.seek(0)
return ByteStringObject(_rolling_checksum(stream).encode("utf8"))
def _compute_document_identifier(self) -> ByteStringObject:
md5 = hashlib.md5()
md5.update(str(time.time()).encode("utf-8"))
md5.update(str(self.fileobj).encode("utf-8"))
md5.update(str(len(self._objects)).encode("utf-8"))
if hasattr(self, "_info"):
for k, v in cast(DictionaryObject, self._info.get_object()).items():
md5.update(f"{k}={v}".encode())
return ByteStringObject(md5.hexdigest().encode("utf-8"))

def generate_file_identifiers(self) -> None:
"""
Generate an identifier for the PDF that will be written.
The only point of this is ensuring uniqueness. Reproducibility is not
required; see 14.4 "File Identifiers".
"""
if hasattr(self, "_ID") and self._ID and len(self._ID) == 2:
ID_1 = self._ID[0]
required;
When a file is first written, both identifiers shall be set to the same value.
If both identifiers match when a file reference is resolved, it is very
likely that the correct and unchanged file has been found. If only the first
identifier matches, a different version of the correct file has been found.
see 14.4 "File Identifiers".
"""
if self._ID:
id1 = self._ID[0]
id2 = self._compute_document_identifier()
else:
ID_1 = self._compute_document_identifier_from_content()
ID_2 = self._compute_document_identifier_from_content()
self._ID = ArrayObject((ID_1, ID_2))
id1 = self._compute_document_identifier()
id2 = ByteStringObject(id1.original_bytes)
self._ID = ArrayObject((id1, id2))

def encrypt(
self,
Expand Down Expand Up @@ -1230,7 +1235,9 @@ def encrypt(
if not use_128bit:
alg = EncryptAlgorithm.RC4_40
self.generate_file_identifiers()
self._encryption = Encryption.make(alg, permissions_flag, self._ID[0])
self._encryption = Encryption.make(
alg, permissions_flag, cast(ArrayObject, self._ID)[0]
)
# in case call `encrypt` again
entry = self._encryption.write_entry(user_password, owner_password)
if self._encrypt_entry:
Expand Down Expand Up @@ -1331,7 +1338,7 @@ def _write_trailer(self, stream: StreamType, xref_location: int) -> None:
NameObject(TK.INFO): self._info,
}
)
if hasattr(self, "_ID"):
if self._ID:
trailer[NameObject(TK.ID)] = self._ID
if self._encrypt_entry:
trailer[NameObject(TK.ENCRYPT)] = self._encrypt_entry.indirect_reference
Expand Down

0 comments on commit 5fd1e91

Please sign in to comment.