Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MAINT: Simplify file identifiers generation #2003

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
49 changes: 28 additions & 21 deletions pypdf/_writer.py
Expand Up @@ -33,6 +33,7 @@
import enum
import hashlib
import re
import time
import uuid
import warnings
from io import BytesIO, FileIO, IOBase
Expand Down Expand Up @@ -136,13 +137,6 @@ class ObjectDeletionFlag(enum.IntFlag):
ALL_ANNOTATIONS = enum.auto()


def _rolling_checksum(stream: BytesIO, blocksize: int = 65536) -> str:
hash = hashlib.md5()
for block in iter(lambda: stream.read(blocksize), b""):
hash.update(block)
return hash.hexdigest()


class PdfWriter:
"""
Write a PDF file out, given pages produced by another class.
Expand Down Expand Up @@ -206,6 +200,7 @@ def __init__(

self._encryption: Optional[Encryption] = None
self._encrypt_entry: Optional[DictionaryObject] = None
self._ID: Union[ArrayObject, None] = None

def __enter__(self) -> "PdfWriter":
"""Store that writer is initialized by 'with'."""
Expand Down Expand Up @@ -1128,25 +1123,35 @@ def cloneDocumentFromReader(
)
self.clone_document_from_reader(reader, after_page_append)

def _compute_document_identifier_from_content(self) -> ByteStringObject:
stream = BytesIO()
self._write_pdf_structure(stream)
stream.seek(0)
return ByteStringObject(_rolling_checksum(stream).encode("utf8"))
def _compute_document_identifier(self) -> ByteStringObject:
md5 = hashlib.md5()
md5.update(str(time.time()).encode("utf-8"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes document-generation non-deterministic, right?

md5.update(str(self.fileobj).encode("utf-8"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is self.fileobj equivalent to self._write_pdf_structure(stream)?

md5.update(str(len(self._objects)).encode("utf-8"))
if hasattr(self, "_info"):
for k, v in cast(DictionaryObject, self._info.get_object()).items():
md5.update(f"{k}={v}".encode())
return ByteStringObject(md5.hexdigest().encode("utf-8"))

def generate_file_identifiers(self) -> None:
"""
Generate an identifier for the PDF that will be written.

The only point of this is ensuring uniqueness. Reproducibility is not
required; see 14.4 "File Identifiers".
"""
if hasattr(self, "_ID") and self._ID and len(self._ID) == 2:
ID_1 = self._ID[0]
required;
When a file is first written, both identifiers shall be set to the same value.
If both identifiers match when a file reference is resolved, it is very
likely that the correct and unchanged file has been found. If only the first
identifier matches, a different version of the correct file has been found.
see 14.4 "File Identifiers".
"""
if self._ID:
id1 = self._ID[0]
id2 = self._compute_document_identifier()
else:
ID_1 = self._compute_document_identifier_from_content()
ID_2 = self._compute_document_identifier_from_content()
self._ID = ArrayObject((ID_1, ID_2))
id1 = self._compute_document_identifier()
id2 = ByteStringObject(id1.original_bytes)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

id1 is a ByteStringObject already. So .original_bytes just returns id1. Then wrapping it in ByteStringObject doesn't do anything, right?

self._ID = ArrayObject((id1, id2))

def encrypt(
self,
Expand Down Expand Up @@ -1230,7 +1235,9 @@ def encrypt(
if not use_128bit:
alg = EncryptAlgorithm.RC4_40
self.generate_file_identifiers()
self._encryption = Encryption.make(alg, permissions_flag, self._ID[0])
self._encryption = Encryption.make(
alg, permissions_flag, cast(ArrayObject, self._ID)[0]
)
MartinThoma marked this conversation as resolved.
Show resolved Hide resolved
# in case call `encrypt` again
entry = self._encryption.write_entry(user_password, owner_password)
if self._encrypt_entry:
Expand Down Expand Up @@ -1331,7 +1338,7 @@ def _write_trailer(self, stream: StreamType, xref_location: int) -> None:
NameObject(TK.INFO): self._info,
}
)
if hasattr(self, "_ID"):
if self._ID:
trailer[NameObject(TK.ID)] = self._ID
if self._encrypt_entry:
trailer[NameObject(TK.ENCRYPT)] = self._encrypt_entry.indirect_reference
Expand Down