diff --git a/PyPDF2/_merger.py b/PyPDF2/_merger.py index 338584b5c..33e8725bf 100644 --- a/PyPDF2/_merger.py +++ b/PyPDF2/_merger.py @@ -702,6 +702,7 @@ def add_outline_item( title, page_number, parent, + None, color, bold, italic, diff --git a/PyPDF2/_page.py b/PyPDF2/_page.py index e944947b3..309874266 100644 --- a/PyPDF2/_page.py +++ b/PyPDF2/_page.py @@ -46,6 +46,7 @@ ) from ._cmap import build_char_map, unknown_char_map +from ._protocols import PdfReaderProtocol from ._utils import ( CompressedTransformationMatrix, File, @@ -288,16 +289,17 @@ class PageObject(DictionaryObject): this object in its source PDF """ + original_page: "PageObject" # very local use in writer when appending + def __init__( self, - pdf: Optional[Any] = None, # PdfReader + pdf: Optional[PdfReaderProtocol] = None, indirect_reference: Optional[IndirectObject] = None, indirect_ref: Optional[IndirectObject] = None, ) -> None: - from ._reader import PdfReader DictionaryObject.__init__(self) - self.pdf: Optional[PdfReader] = pdf + self.pdf: Optional[PdfReaderProtocol] = pdf if indirect_ref is not None: # deprecated warnings.warn( "Use indirect_reference instead of indirect_ref.", DeprecationWarning diff --git a/PyPDF2/_protocols.py b/PyPDF2/_protocols.py new file mode 100644 index 000000000..b83db961b --- /dev/null +++ b/PyPDF2/_protocols.py @@ -0,0 +1,65 @@ +"""Helpers for working with PDF types.""" + +from io import BufferedReader, BufferedWriter, BytesIO, FileIO +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple, Union + +try: + # Python 3.8+: https://peps.python.org/pep-0586 + from typing import Protocol # type: ignore[attr-defined] +except ImportError: + from typing_extensions import Protocol # type: ignore[misc] + +from ._utils import StrByteType + + +class PdfObjectProtocol(Protocol): + indirect_reference: Any + + def clone( + self, + pdf_dest: Any, + force_duplicate: bool = False, + ignore_fields: Union[Tuple[str, ...], List[str], None] = (), + ) -> Any: + ... + + def _reference_clone(self, clone: Any, pdf_dest: Any) -> Any: + ... + + def get_object(self) -> Optional["PdfObjectProtocol"]: + ... + + +class PdfReaderProtocol(Protocol): # pragma: no cover + @property + def pdf_header(self) -> str: + ... + + @property + def strict(self) -> bool: + ... + + @property + def xref(self) -> Dict[int, Dict[int, Any]]: + ... + + @property + def pages(self) -> List[Any]: + ... + + def get_object(self, indirect_reference: Any) -> Optional[PdfObjectProtocol]: + ... + + +class PdfWriterProtocol(Protocol): # pragma: no cover + _objects: List[Any] + _id_translated: Dict[int, Dict[int, int]] + + def get_object(self, indirect_reference: Any) -> Optional[PdfObjectProtocol]: + ... + + def write( + self, stream: Union[Path, StrByteType] + ) -> Tuple[bool, Union[FileIO, BytesIO, BufferedReader, BufferedWriter]]: + ... diff --git a/PyPDF2/_reader.py b/PyPDF2/_reader.py index 914a55317..d7ccca91c 100644 --- a/PyPDF2/_reader.py +++ b/PyPDF2/_reader.py @@ -972,6 +972,7 @@ def _build_outline_item(self, node: DictionaryObject) -> Optional[Destination]: # absolute value = num. visible children # positive = open/unfolded, negative = closed/folded outline_item[NameObject("/Count")] = node["/Count"] + outline_item.node = node return outline_item @property @@ -1389,6 +1390,8 @@ def cache_indirect_object( raise PdfReadError(msg) logger_warning(msg, __name__) self.resolved_objects[(generation, idnum)] = obj + if obj is not None: + obj.indirect_reference = IndirectObject(idnum, generation, self) return obj def cacheIndirectObject( diff --git a/PyPDF2/_writer.py b/PyPDF2/_writer.py index 9f6361c20..db5e394ac 100644 --- a/PyPDF2/_writer.py +++ b/PyPDF2/_writer.py @@ -32,12 +32,13 @@ import decimal import logging import random +import re import struct import time import uuid import warnings from hashlib import md5 -from io import BufferedReader, BufferedWriter, BytesIO, FileIO +from io import BufferedReader, BufferedWriter, BytesIO, FileIO, IOBase from pathlib import Path from types import TracebackType from typing import ( @@ -45,14 +46,17 @@ Callable, Deque, Dict, + Iterable, List, Optional, + Pattern, Tuple, Type, Union, cast, ) +from ._encryption import Encryption from ._page import PageObject, _VirtualList from ._reader import PdfReader from ._security import _alg33, _alg34, _alg35 @@ -106,11 +110,13 @@ create_string_object, hex_to_rgb, ) +from .pagerange import PageRange, PageRangeSpec from .types import ( BorderArrayType, FitType, LayoutType, OutlineItemType, + OutlineType, PagemodeType, ZoomArgType, ) @@ -132,6 +138,7 @@ def __init__(self, fileobj: StrByteType = "") -> None: self._header = b"%PDF-1.3" self._objects: List[PdfObject] = [] # array of indirect objects self._idnum_hash: Dict[bytes, IndirectObject] = {} + self._id_translated: Dict[int, Dict[int, int]] = {} # The root of our page tree node. pages = DictionaryObject() @@ -156,15 +163,14 @@ def __init__(self, fileobj: StrByteType = "") -> None: self._info = self._add_object(info) # root object - root = DictionaryObject() - root.update( + self._root_object = DictionaryObject() + self._root_object.update( { NameObject(PA.TYPE): NameObject(CO.CATALOG), NameObject(CO.PAGES): self._pages, } ) - self._root: Optional[IndirectObject] = None - self._root_object = root + self._root = self._add_object(self._root_object) self.fileobj = fileobj self.with_as_usage = False @@ -199,8 +205,11 @@ def pdf_header(self, new_header: bytes) -> None: self._header = new_header def _add_object(self, obj: PdfObject) -> IndirectObject: + if hasattr(obj, "indirect_reference") and obj.indirect_reference.pdf == self: # type: ignore + return obj.indirect_reference # type: ignore self._objects.append(obj) - return IndirectObject(len(self._objects), 0, self) + obj.indirect_reference = IndirectObject(len(self._objects), 0, self) + return obj.indirect_reference def get_object( self, @@ -227,7 +236,9 @@ def get_object( raise ValueError("pdf must be self") return self._objects[indirect_reference.idnum - 1] # type: ignore - def getObject(self, ido: IndirectObject) -> PdfObject: # pragma: no cover + def getObject( + self, ido: Union[int, IndirectObject] + ) -> PdfObject: # pragma: no cover """ .. deprecated:: 1.28.0 @@ -237,20 +248,38 @@ def getObject(self, ido: IndirectObject) -> PdfObject: # pragma: no cover return self.get_object(ido) def _add_page( - self, page: PageObject, action: Callable[[Any, IndirectObject], None] - ) -> None: + self, + page: PageObject, + action: Callable[[Any, IndirectObject], None], + excluded_keys: Iterable[str] = (), + ) -> PageObject: assert cast(str, page[PA.TYPE]) == CO.PAGE - if page.pdf is not None: - other = page.pdf.pdf_header + page_org = page + excluded_keys = list(excluded_keys) + excluded_keys += [PA.PARENT, "/StructParents"] + # acrobat does not accept to have two indirect ref pointing on the same page; + # therefore in order to add easily multiple copies of the same page, we need to create a new + # dictionary for the page, however the objects below (including content) is not duplicated + try: # delete an already existing page + del self._id_translated[id(page_org.indirect_reference.pdf)][ # type: ignore + page_org.indirect_reference.idnum # type: ignore + ] + except Exception: + pass + page = cast("PageObject", page_org.clone(self, False, excluded_keys)) + # page_ind = self._add_object(page) + if page_org.pdf is not None: + other = page_org.pdf.pdf_header if isinstance(other, str): other = other.encode() # type: ignore self.pdf_header = _get_max_pdf_version_header(self.pdf_header, other) # type: ignore page[NameObject(PA.PARENT)] = self._pages - page_ind = self._add_object(page) pages = cast(DictionaryObject, self.get_object(self._pages)) - action(pages[PA.KIDS], page_ind) + assert page.indirect_reference is not None + action(pages[PA.KIDS], page.indirect_reference) page_count = cast(int, pages[PA.COUNT]) pages[NameObject(PA.COUNT)] = NumberObject(page_count + 1) + return page def set_need_appearances_writer(self) -> None: # See 12.7.2 and 7.7.2 for more information: @@ -272,9 +301,14 @@ def set_need_appearances_writer(self) -> None: except Exception as exc: logger.error("set_need_appearances_writer() catch : ", repr(exc)) - def add_page(self, page: PageObject) -> None: + def add_page( + self, + page: PageObject, + excluded_keys: Iterable[str] = (), + ) -> PageObject: """ Add a page to this PDF file. + Recommended for advanced usage including the adequate excluded_keys The page is usually acquired from a :class:`PdfReader` instance. @@ -282,18 +316,27 @@ def add_page(self, page: PageObject) -> None: :param PageObject page: The page to add to the document. Should be an instance of :class:`PageObject` """ - self._add_page(page, list.append) + return self._add_page(page, list.append, excluded_keys) - def addPage(self, page: PageObject) -> None: # pragma: no cover + def addPage( + self, + page: PageObject, + excluded_keys: Iterable[str] = (), + ) -> PageObject: # pragma: no cover """ .. deprecated:: 1.28.0 Use :meth:`add_page` instead. """ deprecate_with_replacement("addPage", "add_page") - self.add_page(page) + return self.add_page(page, excluded_keys) - def insert_page(self, page: PageObject, index: int = 0) -> None: + def insert_page( + self, + page: PageObject, + index: int = 0, + excluded_keys: Iterable[str] = (), + ) -> PageObject: """ Insert a page in this PDF file. The page is usually acquired from a :class:`PdfReader` instance. @@ -301,16 +344,21 @@ def insert_page(self, page: PageObject, index: int = 0) -> None: :param PageObject page: The page to add to the document. :param int index: Position at which the page will be inserted. """ - self._add_page(page, lambda l, p: l.insert(index, p)) + return self._add_page(page, lambda l, p: l.insert(index, p)) - def insertPage(self, page: PageObject, index: int = 0) -> None: # pragma: no cover + def insertPage( + self, + page: PageObject, + index: int = 0, + excluded_keys: Iterable[str] = (), + ) -> PageObject: # pragma: no cover """ .. deprecated:: 1.28.0 Use :meth:`insert_page` instead. """ deprecate_with_replacement("insertPage", "insert_page") - self.insert_page(page, index) + return self.insert_page(page, index, excluded_keys) def get_page( self, page_number: Optional[int] = None, pageNumber: Optional[int] = None @@ -640,13 +688,10 @@ def append_pages_from_reader( """ # Get page count from writer and reader reader_num_pages = len(reader.pages) - writer_num_pages = len(self.pages) - # Copy pages from reader to writer for reader_page_number in range(reader_num_pages): reader_page = reader.pages[reader_page_number] - self.add_page(reader_page) - writer_page = self.pages[writer_num_pages + reader_page_number] + writer_page = self.add_page(reader_page) # Trigger callback, pass writer page as parameter if callable(after_page_append): after_page_append(writer_page) @@ -780,6 +825,7 @@ def clone_document_from_reader( (delegates to append_pages_from_reader). The single parameter of the callback is a reference to the page just appended to the document. """ + # TODO : ppZZ may be limited because we do not copy all info... self.clone_reader_document_root(reader) self.append_pages_from_reader(reader, after_page_append) @@ -934,6 +980,7 @@ def write( if isinstance(stream, (str, Path)): stream = FileIO(stream, "wb") + self.with_as_usage = True # my_file = True self.write_stream(stream) @@ -1058,11 +1105,12 @@ def _sweep_indirect_references( ) ) elif isinstance(data, IndirectObject): - data = self._resolve_indirect_object(data) + if data.pdf != self: + data = self._resolve_indirect_object(data) - if str(data) not in discovered: - discovered.append(str(data)) - stack.append((data.get_object(), None, None, [])) + if str(data) not in discovered: + discovered.append(str(data)) + stack.append((data.get_object(), None, None, [])) # Check if data has a parent and if it is a dict or an array update the value if isinstance(parent, (DictionaryObject, ArrayObject)): @@ -1102,6 +1150,9 @@ def _resolve_indirect_object(self, data: IndirectObject) -> IndirectObject: if hasattr(data.pdf, "stream") and data.pdf.stream.closed: raise ValueError(f"I/O operation on closed file: {data.pdf.stream.name}") + if data.pdf == self: + return data + # Get real object indirect object real_obj = data.pdf.get_object(data) @@ -1194,15 +1245,11 @@ def get_named_dest_root(self) -> ArrayObject: self._root_object[CA.NAMES], DictionaryObject ): names = cast(DictionaryObject, self._root_object[CA.NAMES]) - idnum = self._objects.index(names) + 1 - names_ref = IndirectObject(idnum, 0, self) - assert names_ref.get_object() == names + names_ref = names.indirect_reference if CA.DESTS in names and isinstance(names[CA.DESTS], DictionaryObject): # 3.6.3 Name Dictionary (PDF spec 1.7) dests = cast(DictionaryObject, names[CA.DESTS]) - idnum = self._objects.index(dests) + 1 - dests_ref = IndirectObject(idnum, 0, self) - assert dests_ref.get_object() == dests + dests_ref = dests.indirect_reference if CA.NAMES in dests: # TABLE 3.33 Entries in a name tree node dictionary nd = cast(ArrayObject, dests[CA.NAMES]) @@ -1241,6 +1288,7 @@ def add_outline_item_destination( self, page_destination: Union[None, PageObject, TreeObject] = None, parent: Union[None, TreeObject, IndirectObject] = None, + before: Union[None, TreeObject, IndirectObject] = None, dest: Union[None, PageObject, TreeObject] = None, # deprecated ) -> IndirectObject: if page_destination is not None and dest is not None: # deprecated @@ -1265,7 +1313,9 @@ def add_outline_item_destination( parent = cast(TreeObject, parent.get_object()) page_destination_ref = self._add_object(page_destination) - parent.add_child(page_destination_ref, self) + if before is not None: + before = before.indirect_reference + parent.insert_child(page_destination_ref, before, self) return page_destination_ref @@ -1299,7 +1349,10 @@ def addBookmarkDestination( @deprecate_bookmark(bookmark="outline_item") def add_outline_item_dict( - self, outline_item: OutlineItemType, parent: Optional[TreeObject] = None + self, + outline_item: OutlineItemType, + parent: Union[None, TreeObject, IndirectObject] = None, + before: Union[None, TreeObject, IndirectObject] = None, ) -> IndirectObject: outline_item_object = TreeObject() for k, v in list(outline_item.items()): @@ -1314,7 +1367,7 @@ def add_outline_item_dict( action_ref = self._add_object(action) outline_item_object[NameObject("/A")] = action_ref - return self.add_outline_item_destination(outline_item_object, parent) + return self.add_outline_item_destination(outline_item_object, parent, before) @deprecate_bookmark(bookmark="outline_item") def add_bookmark_dict( @@ -1343,8 +1396,9 @@ def addBookmarkDict( def add_outline_item( self, title: str, - page_number: Optional[int] = None, + page_number: Union[None, PageObject, IndirectObject, int], parent: Union[None, TreeObject, IndirectObject] = None, + before: Union[None, TreeObject, IndirectObject] = None, color: Optional[Union[Tuple[float, float, float], str]] = None, bold: bool = False, italic: bool = False, @@ -1356,6 +1410,8 @@ def add_outline_item( :param str title: Title to use for this outline item. :param int page_number: Page number this outline item will point to. + :param parent: A reference to a parent outline item to create nested + outline items. :param parent: A reference to a parent outline item to create nested outline items. :param tuple color: Color of the outline item's font as a red, green, blue tuple @@ -1364,42 +1420,54 @@ def add_outline_item( :param bool italic: Outline item font is italic :param Fit fit: The fit of the destination page. """ + page_ref: Union[None, NullObject, IndirectObject, NumberObject] + if isinstance(italic, Fit): # it means that we are on the old params + if fit is not None and page_number is None: + page_number = fit # type: ignore + return self.add_outline_item( + title, page_number, parent, None, before, color, bold, italic # type: ignore + ) if page_number is not None and pagenum is not None: raise ValueError( "The argument pagenum of add_outline_item is deprecated. Use page_number only." ) - if pagenum is not None: - old_term = "pagenum" - new_term = "page_number" - warnings.warn( - message=( - f"{old_term} is deprecated as an argument. Use {new_term} instead" + if page_number is None: + action_ref = None + else: + if isinstance(page_number, IndirectObject): + page_ref = page_number + elif isinstance(page_number, PageObject): + page_ref = page_number.indirect_reference + elif isinstance(page_number, int): + try: + page_ref = self.pages[page_number].indirect_reference + except IndexError: + page_ref = NumberObject(page_number) + if page_ref is None: + logger_warning( + f"can not find reference of page {page_number}", + __name__, ) + page_ref = NullObject() + dest = Destination( + NameObject("/" + title + " outline item"), + page_ref, + fit, ) - page_number = pagenum - if page_number is None: - raise ValueError("page_number may not be None") - page_ref = NumberObject(page_number) - - dest = Destination( - NameObject("/" + title + " outline item"), - page_ref, - fit, - ) - action_ref = self._add_object( - DictionaryObject( - { - NameObject(GoToActionArguments.D): dest.dest_array, - NameObject(GoToActionArguments.S): NameObject("/GoTo"), - } + action_ref = self._add_object( + DictionaryObject( + { + NameObject(GoToActionArguments.D): dest.dest_array, + NameObject(GoToActionArguments.S): NameObject("/GoTo"), + } + ) ) - ) outline_item = _create_outline_item(action_ref, title, color, italic, bold) if parent is None: parent = self.get_outline_root() - return self.add_outline_item_destination(outline_item, parent) + return self.add_outline_item_destination(outline_item, parent, before) def add_bookmark( self, @@ -1449,6 +1517,7 @@ def addBookmark( title, pagenum, parent, + None, color, bold, italic, @@ -1460,6 +1529,21 @@ def add_outline(self) -> None: "This method is not yet implemented. Use :meth:`add_outline_item` instead." ) + def add_named_destination_array( + self, title: TextStringObject, destination: Union[IndirectObject, ArrayObject] + ) -> None: + nd = self.get_named_dest_root() + i = 0 + while i < len(nd): + if title < nd[i]: + nd.insert(i, destination) + nd.insert(i, TextStringObject(title)) + return + else: + i += 2 + nd.extend([TextStringObject(title), destination]) + return + def add_named_destination_object( self, page_destination: Optional[PdfObject] = None, @@ -1481,14 +1565,15 @@ def add_named_destination_object( if page_destination is None: # deprecated raise ValueError("page_destination may not be None") - page_destination_ref = self._add_object(page_destination) + page_destination_ref = self._add_object(page_destination.dest_array) # type: ignore + self.add_named_destination_array( + cast("TextStringObject", page_destination["/Title"]), page_destination_ref # type: ignore + ) - nd = self.get_named_dest_root() - nd.extend([page_destination["/Title"], page_destination_ref]) # type: ignore return page_destination_ref def addNamedDestinationObject( - self, dest: PdfObject + self, dest: Destination ) -> IndirectObject: # pragma: no cover """ .. deprecated:: 1.28.0 @@ -1534,6 +1619,8 @@ def add_named_destination( dest_ref = self._add_object(dest) nd = self.get_named_dest_root() + if not isinstance(title, TextStringObject): + title = TextStringObject(str(title)) nd.extend([title, dest_ref]) return dest_ref @@ -1905,6 +1992,32 @@ def _set_page_layout(self, layout: Union[NameObject, LayoutType]) -> None: layout = NameObject(layout) self._root_object.update({NameObject("/PageLayout"): layout}) + def set_page_layout(self, layout: LayoutType) -> None: + """ + Set the page layout. + + :param str layout: The page layout to be used + + .. list-table:: Valid ``layout`` arguments + :widths: 50 200 + + * - /NoLayout + - Layout explicitly not specified + * - /SinglePage + - Show one page at a time + * - /OneColumn + - Show one column at a time + * - /TwoColumnLeft + - Show pages in two columns, odd-numbered pages on the left + * - /TwoColumnRight + - Show pages in two columns, odd-numbered pages on the right + * - /TwoPageLeft + - Show two pages at a time, odd-numbered pages on the left + * - /TwoPageRight + - Show two pages at a time, odd-numbered pages on the right + """ + self._set_page_layout(layout) + def setPageLayout(self, layout: LayoutType) -> None: # pragma: no cover """ .. deprecated:: 1.28.0 @@ -2086,6 +2199,533 @@ def add_annotation(self, page_number: int, annotation: Dict[str, Any]) -> None: page.annotations.append(ind_obj) + def clean_page(self, page: Union[PageObject, IndirectObject]) -> PageObject: + """ + Perform some clean up in the page. + Currently: convert NameObject nameddestination to TextStringObject (required for names/dests list) + """ + page = cast("PageObject", page.get_object()) + for a in page.get("/Annots", []): + a_obj = a.get_object() + d = a_obj.get("/Dest", None) + act = a_obj.get("/A", None) + if isinstance(d, NameObject): + a_obj[NameObject("/Dest")] = TextStringObject(d) + elif act is not None: + act = act.get_object() + d = act.get("/D", None) + if isinstance(d, NameObject): + act[NameObject("/D")] = TextStringObject(d) + return page + + def _create_stream( + self, fileobj: Union[Path, StrByteType, PdfReader] + ) -> Tuple[IOBase, Optional[Encryption]]: + # If the fileobj parameter is a string, assume it is a path + # and create a file object at that location. If it is a file, + # copy the file's contents into a BytesIO stream object; if + # it is a PdfReader, copy that reader's stream into a + # BytesIO stream. + # If fileobj is none of the above types, it is not modified + encryption_obj = None + stream: IOBase + if isinstance(fileobj, (str, Path)): + with FileIO(fileobj, "rb") as f: + stream = BytesIO(f.read()) + elif isinstance(fileobj, PdfReader): + if fileobj._encryption: + encryption_obj = fileobj._encryption + orig_tell = fileobj.stream.tell() + fileobj.stream.seek(0) + stream = BytesIO(fileobj.stream.read()) + + # reset the stream to its original location + fileobj.stream.seek(orig_tell) + elif hasattr(fileobj, "seek") and hasattr(fileobj, "read"): + fileobj.seek(0) + filecontent = fileobj.read() + stream = BytesIO(filecontent) + else: + raise NotImplementedError( + "PdfMerger.merge requires an object that PdfReader can parse. " + "Typically, that is a Path or a string representing a Path, " + "a file object, or an object implementing .seek and .read. " + "Passing a PdfReader directly works as well." + ) + return stream, encryption_obj + + def append( + self, + fileobj: Union[StrByteType, PdfReader, Path], + outline_item: Union[ + str, None, PageRange, Tuple[int, int], Tuple[int, int, int], List[int] + ] = None, + pages: Union[ + None, PageRange, Tuple[int, int], Tuple[int, int, int], List[int] + ] = None, + import_outline: bool = True, + excluded_fields: Optional[Union[List[str], Tuple[str, ...]]] = None, + ) -> None: + """ + Identical to the :meth:`merge()` method, but assumes you want to + concatenate all pages onto the end of the file instead of specifying a + position. + + :param fileobj: A File Object or an object that supports the standard + read and seek methods similar to a File Object. Could also be a + string representing a path to a PDF file. + + :param str outline_item: Optionally, you may specify a string to build an outline + (aka 'bookmark') to identify the + beginning of the included file. + + :param pages: can be a :class:`PageRange` + or a ``(start, stop[, step])`` tuple + or a list of pages to be processed + to merge only the specified range of pages from the source + document into the output document. + + :param bool import_outline: You may prevent the source document's + outline (collection of outline items, previously referred to as + 'bookmarks') from being imported by specifying this as ``False``. + + :param List excluded_fields: provide the list of fields/keys to be ignored + if "/Annots" is part of the list, the annotation will be ignored + if "/B" is part of the list, the articles will be ignored + """ + if excluded_fields is None: + excluded_fields = () + if isinstance(outline_item, (tuple, list, PageRange)): + if isinstance(pages, bool): + if not isinstance(import_outline, bool): + excluded_fields = import_outline + import_outline = pages + pages = outline_item + self.merge(None, fileobj, None, pages, import_outline, excluded_fields) + else: # if isinstance(outline_item,str): + self.merge( + None, fileobj, outline_item, pages, import_outline, excluded_fields + ) + + @deprecate_bookmark(bookmark="outline_item", import_bookmarks="import_outline") + def merge( + self, + position: Optional[int], + fileobj: Union[Path, StrByteType, PdfReader], + outline_item: Optional[str] = None, + pages: Optional[PageRangeSpec] = None, + import_outline: bool = True, + excluded_fields: Optional[Union[List[str], Tuple[str, ...]]] = (), + ) -> None: + """ + Merge the pages from the given file into the output file at the + specified page number. + + :param int position: The *page number* to insert this file. File will + be inserted after the given number. + + :param fileobj: A File Object or an object that supports the standard + read and seek methods similar to a File Object. Could also be a + string representing a path to a PDF file. + + :param str outline_item: Optionally, you may specify a string to build an outline + (aka 'bookmark') to identify the + beginning of the included file. + + :param pages: can be a :class:`PageRange` + or a ``(start, stop[, step])`` tuple + or a list of pages to be processed + to merge only the specified range of pages from the source + document into the output document. + + :param bool import_outline: You may prevent the source document's + outline (collection of outline items, previously referred to as + 'bookmarks') from being imported by specifying this as ``False``. + + :param List excluded_fields: provide the list of fields/keys to be ignored + if "/Annots" is part of the list, the annotation will be ignored + if "/B" is part of the list, the articles will be ignored + """ + if isinstance(fileobj, PdfReader): + reader = fileobj + else: + stream, encryption_obj = self._create_stream(fileobj) + # Create a new PdfReader instance using the stream + # (either file or BytesIO or StringIO) created above + reader = PdfReader(stream, strict=False) # type: ignore[arg-type] + + if excluded_fields is None: + excluded_fields = () + # Find the range of pages to merge. + if pages is None: + pages = list(range(0, len(reader.pages))) + elif isinstance(pages, PageRange): + pages = list(range(*pages.indices(len(reader.pages)))) + elif isinstance(pages, list): + pass # keep unchanged + elif isinstance(pages, tuple) and len(pages) <= 3: + pages = list(range(*pages)) + elif not isinstance(pages, tuple): + raise TypeError( + '"pages" must be a tuple of (start, stop[, step]) or a list' + ) + + srcpages = {} + for i in pages: + pg = reader.pages[i] + assert pg.indirect_reference is not None + if position is None: + srcpages[pg.indirect_reference.idnum] = self.add_page( + pg, list(excluded_fields) + ["/B", "/Annots"] # type: ignore + ) + else: + srcpages[pg.indirect_reference.idnum] = self.insert_page( + pg, position, list(excluded_fields) + ["/B", "/Annots"] # type: ignore + ) + position += 1 + srcpages[pg.indirect_reference.idnum].original_page = pg + + reader._namedDests = ( + reader.named_destinations + ) # need for the outline processing below + for dest in reader._namedDests.values(): + arr = dest.dest_array + # try: + if isinstance(dest["/Page"], NullObject): + pass # self.add_named_destination_array(dest["/Title"],arr) + elif dest["/Page"].indirect_reference.idnum in srcpages: + arr[NumberObject(0)] = srcpages[ + dest["/Page"].indirect_reference.idnum + ].indirect_reference + self.add_named_destination_array(dest["/Title"], arr) + # except Exception as e: + # logger_warning(f"can not insert {dest} : {e.msg}",__name__) + + outline_item_typ: TreeObject + if outline_item is not None: + outline_item_typ = cast( + "TreeObject", + self.add_outline_item( + TextStringObject(outline_item), + list(srcpages.values())[0].indirect_reference, + fit=PAGE_FIT, + ).get_object(), + ) + else: + outline_item_typ = self.get_outline_root() + + _ro = cast("DictionaryObject", reader.trailer[TK.ROOT]) + if import_outline and CO.OUTLINES in _ro: + outline = self._get_filtered_outline( + _ro.get(CO.OUTLINES, None), srcpages, reader + ) + self._insert_filtered_outline( + outline, outline_item_typ, None + ) # TODO : use before parameter + + if "/Annots" not in excluded_fields: + for pag in srcpages.values(): + lst = self._insert_filtered_annotations( + pag.original_page.get("/Annots", ()), pag, srcpages, reader + ) + if len(lst) > 0: + pag[NameObject("/Annots")] = lst + self.clean_page(pag) + + if "/B" not in excluded_fields: + self.add_filtered_articles("", srcpages, reader) + + return + + def _add_articles_thread( + self, + thread: DictionaryObject, # thread entry from the reader's array of threads + pages: Dict[int, PageObject], + reader: PdfReader, + ) -> IndirectObject: + """ + clone the thread with only the applicable articles + + """ + nthread = thread.clone( + self, force_duplicate=True, ignore_fields=("/F",) + ) # use of clone to keep link between reader and writer + self.threads.append(nthread.indirect_reference) + first_article = cast("DictionaryObject", thread["/F"]) + current_article: Optional[DictionaryObject] = first_article + new_article: Optional[DictionaryObject] = None + while current_article is not None: + pag = self._get_cloned_page( + cast("PageObject", current_article["/P"]), pages, reader + ) + if pag is not None: + if new_article is None: + new_article = cast( + "DictionaryObject", + self._add_object(DictionaryObject()).get_object(), + ) + new_first = new_article + nthread[NameObject("/F")] = new_article.indirect_reference + else: + new_article2 = cast( + "DictionaryObject", + self._add_object( + DictionaryObject( + {NameObject("/V"): new_article.indirect_reference} + ) + ).get_object(), + ) + new_article[NameObject("/N")] = new_article2.indirect_reference + new_article = new_article2 + new_article[NameObject("/P")] = pag + new_article[NameObject("/T")] = nthread.indirect_reference + new_article[NameObject("/R")] = current_article["/R"] + pag_obj = cast("PageObject", pag.get_object()) + if "/B" not in pag_obj: + pag_obj[NameObject("/B")] = ArrayObject() + cast("ArrayObject", pag_obj["/B"]).append(new_article.indirect_reference) + current_article = cast("DictionaryObject", current_article["/N"]) + if current_article == first_article: + new_article[NameObject("/N")] = new_first.indirect_reference # type: ignore + new_first[NameObject("/V")] = new_article.indirect_reference # type: ignore + current_article = None + assert nthread.indirect_reference is not None + return nthread.indirect_reference + + def add_filtered_articles( + self, + fltr: Union[Pattern, str], # thread entry from the reader's array of threads + pages: Dict[int, PageObject], + reader: PdfReader, + ) -> None: + """ + Add articles matching the defined criteria + """ + if isinstance(fltr, str): + fltr = re.compile(fltr) + elif not isinstance(fltr, Pattern): + fltr = re.compile("") + for p in pages.values(): + pp = p.original_page + for a in pp.get("/B", ()): + thr = a.get_object()["/T"] + if thr.indirect_reference.idnum not in self._id_translated[ + id(reader) + ] and fltr.search(thr["/I"]["/Title"]): + self._add_articles_thread(thr, pages, reader) + + def _get_cloned_page( + self, + page: Union[None, int, IndirectObject, PageObject, NullObject], + pages: Dict[int, PageObject], + reader: PdfReader, + ) -> Optional[IndirectObject]: + if isinstance(page, NullObject): + return None + if isinstance(page, int): + _i = reader.pages[page].indirect_reference + # elif isinstance(page, PageObject): + # _i = page.indirect_reference + elif isinstance(page, DictionaryObject) and page.get("/Type", "") == "/Page": + _i = page.indirect_reference + elif isinstance(page, IndirectObject): + _i = page + try: + return pages[_i.idnum].indirect_reference # type: ignore + except Exception: + return None + + def _insert_filtered_annotations( + self, + annots: Union[IndirectObject, List[DictionaryObject]], + page: PageObject, + pages: Dict[int, PageObject], + reader: PdfReader, + ) -> List[Destination]: + outlist = ArrayObject() + if isinstance(annots, IndirectObject): + annots = cast("List", annots.get_object()) + for an in annots: + ano = cast("DictionaryObject", an.get_object()) + if ( + ano["/Subtype"] != "/Link" + or "/A" not in ano + or cast("DictionaryObject", ano["/A"])["/S"] != "/GoTo" + or "/Dest" in ano + ): + if "/Dest" not in ano: + outlist.append(ano.clone(self).indirect_reference) + else: + d = ano["/Dest"] + if isinstance(d, str): + # it is a named dest + if str(d) in self.get_named_dest_root(): + outlist.append(ano.clone(self).indirect_reference) + else: + d = cast("ArrayObject", d) + p = self._get_cloned_page(d[0], pages, reader) + if p is not None: + anc = ano.clone(self, ignore_fields=("/Dest",)) + anc[NameObject("/Dest")] = ArrayObject([p] + d[1:]) + outlist.append(anc.indirect_reference) + else: + d = cast("DictionaryObject", ano["/A"])["/D"] + if isinstance(d, str): + # it is a named dest + if str(d) in self.get_named_dest_root(): + outlist.append(ano.clone(self).indirect_reference) + else: + d = cast("ArrayObject", d) + p = self._get_cloned_page(d[0], pages, reader) + if p is not None: + anc = ano.clone(self, ignore_fields=("/D",)) + anc = cast("DictionaryObject", anc) + cast("DictionaryObject", anc["/A"])[ + NameObject("/D") + ] = ArrayObject([p] + d[1:]) + outlist.append(anc.indirect_reference) + return outlist + + def _get_filtered_outline( + self, + node: Any, + pages: Dict[int, PageObject], + reader: PdfReader, + ) -> List[Destination]: + """Extract outline item entries that are part of the specified page set.""" + new_outline = [] + node = node.get_object() + if node.get("/Type", "") == "/Outlines" or "/Title" not in node: + node = node.get("/First", None) + if node is not None: + node = node.get_object() + new_outline += self._get_filtered_outline(node, pages, reader) + else: + v: Union[None, IndirectObject, NullObject] + while node is not None: + node = node.get_object() + o = cast("Destination", reader._build_outline_item(node)) + v = self._get_cloned_page(cast("PageObject", o["/Page"]), pages, reader) + if v is None: + v = NullObject() + o[NameObject("/Page")] = v + if "/First" in node: + o.childs = self._get_filtered_outline(node["/First"], pages, reader) + else: + o.childs = [] + if not isinstance(o["/Page"], NullObject) or len(o.childs) > 0: + new_outline.append(o) + node = node.get("/Next", None) + return new_outline + + def _clone_outline(self, dest: Destination) -> TreeObject: + n_ol = TreeObject() + self._add_object(n_ol) + n_ol[NameObject("/Title")] = TextStringObject(dest["/Title"]) + if not isinstance(dest["/Page"], NullObject): + if dest.node is not None and "/A" in dest.node: + n_ol[NameObject("/A")] = dest.node["/A"].clone(self) + # elif "/D" in dest.node: + # n_ol[NameObject("/Dest")] = dest.node["/D"].clone(self) + # elif "/Dest" in dest.node: + # n_ol[NameObject("/Dest")] = dest.node["/Dest"].clone(self) + else: + n_ol[NameObject("/Dest")] = dest.dest_array + # TODO: /SE + if dest.node is not None: + n_ol[NameObject("/F")] = NumberObject(dest.node.get("/F", 0)) + n_ol[NameObject("/C")] = ArrayObject( + dest.node.get( + "/C", [FloatObject(0.0), FloatObject(0.0), FloatObject(0.0)] + ) + ) + return n_ol + + def _insert_filtered_outline( + self, + outlines: List[Destination], + parent: Union[TreeObject, IndirectObject], + before: Union[None, TreeObject, IndirectObject] = None, + ) -> None: + for dest in outlines: + # TODO : can be improved to keep A and SE entries (ignored for the moment) + # np=self.add_outline_item_destination(dest,parent,before) + if dest.get("/Type", "") == "/Outlines" or "/Title" not in dest: + np = parent + else: + np = self._clone_outline(dest) + cast(TreeObject, parent.get_object()).insert_child(np, before, self) + self._insert_filtered_outline(dest.childs, np, None) + + def close(self) -> None: + """To match the functions from Merger""" + return + + # @deprecate_bookmark(bookmark="outline_item") + def find_outline_item( + self, + outline_item: Dict[str, Any], + root: Optional[OutlineType] = None, + ) -> Optional[List[int]]: + if root is None: + o = self.get_outline_root() + else: + o = cast("TreeObject", root) + + i = 0 + while o is not None: + if o.indirect_reference == outline_item or o.get("/Title", None) == outline_item: + return [i] + else: + if "/First" in o: + res = self.find_outline_item( + outline_item, cast(OutlineType, o["/First"]) + ) + if res: + return ([i] if "/Title" in o else []) + res + if "/Next" in o: + i += 1 + o = cast(TreeObject, o["/Next"]) + else: + return None + + @deprecate_bookmark(bookmark="outline_item") + def find_bookmark( + self, + outline_item: Dict[str, Any], + root: Optional[OutlineType] = None, + ) -> Optional[List[int]]: # pragma: no cover + """ + .. deprecated:: 2.9.0 + Use :meth:`find_outline_item` instead. + """ + return self.find_outline_item(outline_item, root) + + def reset_translation( + self, reader: Union[None, PdfReader, IndirectObject] = None + ) -> None: + """ + reset the translation table between reader and the writer object. + late cloning will create new independent objects + + :param reader: PdfReader or IndirectObject refering a PdfReader object. + if set to None or omitted, all tables will be reset. + """ + if reader is None: + self._id_translated = {} + elif isinstance(reader, PdfReader): + try: + del self._id_translated[id(reader)] + except Exception: + pass + elif isinstance(reader, IndirectObject): + try: + del self._id_translated[id(reader.pdf)] + except Exception: + pass + else: + raise Exception("invalid parameter {reader}") + def _pdf_objectify(obj: Union[Dict[str, Any], str, int, List[Any]]) -> PdfObject: if isinstance(obj, PdfObject): @@ -2116,16 +2756,17 @@ def _pdf_objectify(obj: Union[Dict[str, Any], str, int, List[Any]]) -> PdfObject def _create_outline_item( - action_ref: IndirectObject, + action_ref: Union[None, IndirectObject], title: str, color: Union[Tuple[float, float, float], str, None], italic: bool, bold: bool, ) -> TreeObject: outline_item = TreeObject() + if action_ref is not None: + outline_item[NameObject("/A")] = action_ref outline_item.update( { - NameObject("/A"): action_ref, NameObject("/Title"): create_string_object(title), } ) diff --git a/PyPDF2/generic/_base.py b/PyPDF2/generic/_base.py index fcfcbf275..872c529ad 100644 --- a/PyPDF2/generic/_base.py +++ b/PyPDF2/generic/_base.py @@ -30,9 +30,10 @@ import hashlib import re from binascii import unhexlify -from typing import Any, Callable, Optional, Union +from typing import Any, Callable, List, Optional, Tuple, Union, cast from .._codecs import _pdfdoc_encoding_rev +from .._protocols import PdfObjectProtocol, PdfWriterProtocol from .._utils import ( StreamType, b_, @@ -50,9 +51,10 @@ __author_email__ = "biziqe@mathieu.fenniak.net" -class PdfObject: +class PdfObject(PdfObjectProtocol): # function for calculating a hash value hash_func: Callable[..., "hashlib._Hash"] = hashlib.sha1 + indirect_reference: Optional["IndirectObject"] def hash_value_data(self) -> bytes: return ("%s" % self).encode() @@ -66,6 +68,52 @@ def hash_value(self) -> bytes: ) ).encode() + def clone( + self, + pdf_dest: PdfWriterProtocol, + force_duplicate: bool = False, + ignore_fields: Union[Tuple[str, ...], List[str], None] = (), + ) -> "PdfObject": + """ + clone object into pdf_dest (PdfWriterProtocol which is an interface for PdfWriter) + force_duplicate: in standard if the object has been already cloned and reference, + the copy is returned; when force_duplicate == True, a new copy is always performed + ignore_fields : list/tuple of Fields names (for dictionaries that will be ignored during cloning (apply also to childs duplication) + in standard, clone function call _reference_clone (see _reference) + """ + raise Exception("clone PdfObject") + + def _reference_clone( + self, clone: Any, pdf_dest: PdfWriterProtocol + ) -> PdfObjectProtocol: + """ + reference the object within the _objects of pdf_dest only if + indirect_reference attribute exists (which means the objects + was already identified in xref/xobjstm) + if object has been already referenced do nothing + """ + try: + if clone.indirect_reference.pdf == pdf_dest: + return clone + except Exception: + pass + if hasattr(self, "indirect_reference"): + ind = self.indirect_reference + i = len(pdf_dest._objects) + 1 + if ind is not None: + if id(ind.pdf) not in pdf_dest._id_translated: + pdf_dest._id_translated[id(ind.pdf)] = {} + if ind.idnum in pdf_dest._id_translated[id(ind.pdf)]: + obj = pdf_dest.get_object( + pdf_dest._id_translated[id(ind.pdf)][ind.idnum] + ) + assert obj is not None + return obj + pdf_dest._id_translated[id(ind.pdf)][ind.idnum] = i + pdf_dest._objects.append(clone) + clone.indirect_reference = IndirectObject(i, 0, pdf_dest) + return clone + def get_object(self) -> Optional["PdfObject"]: """Resolve indirect references.""" return self @@ -81,6 +129,15 @@ def write_to_stream( class NullObject(PdfObject): + def clone( + self, + pdf_dest: PdfWriterProtocol, + force_duplicate: bool = False, + ignore_fields: Union[Tuple[str, ...], List[str], None] = (), + ) -> "NullObject": + """clone object into pdf_dest""" + return cast("NullObject", self._reference_clone(NullObject(), pdf_dest)) + def write_to_stream( self, stream: StreamType, encryption_key: Union[None, str, bytes] ) -> None: @@ -112,6 +169,17 @@ class BooleanObject(PdfObject): def __init__(self, value: Any) -> None: self.value = value + def clone( + self, + pdf_dest: PdfWriterProtocol, + force_duplicate: bool = False, + ignore_fields: Union[Tuple[str, ...], List[str], None] = (), + ) -> "BooleanObject": + """clone object into pdf_dest""" + return cast( + "BooleanObject", self._reference_clone(BooleanObject(self.value), pdf_dest) + ) + def __eq__(self, __o: object) -> bool: if isinstance(__o, BooleanObject): return self.value == __o.value @@ -160,7 +228,34 @@ def __init__(self, idnum: int, generation: int, pdf: Any) -> None: # PdfReader self.generation = generation self.pdf = pdf - def get_object(self) -> Optional[PdfObject]: + def clone( + self, + pdf_dest: PdfWriterProtocol, + force_duplicate: bool = False, + ignore_fields: Union[Tuple[str, ...], List[str], None] = (), + ) -> "IndirectObject": + """clone object into pdf_dest""" + if self.pdf == pdf_dest and not force_duplicate: + # Already duplicated and no extra duplication required + return self + if id(self.pdf) not in pdf_dest._id_translated: + pdf_dest._id_translated[id(self.pdf)] = {} + + if not force_duplicate and self.idnum in pdf_dest._id_translated[id(self.pdf)]: + dup = pdf_dest.get_object(pdf_dest._id_translated[id(self.pdf)][self.idnum]) + else: + obj = self.get_object() + assert obj is not None + dup = obj.clone(pdf_dest, force_duplicate, ignore_fields) + assert dup is not None + assert dup.indirect_reference is not None + return dup.indirect_reference + + @property + def indirect_reference(self) -> "IndirectObject": # type: ignore[override] + return self + + def get_object(self) -> Optional["PdfObject"]: obj = self.pdf.get_object(self) if obj is None: return None @@ -239,6 +334,15 @@ def __new__( logger_warning(f"FloatObject ({value}) invalid; use 0.0 instead", __name__) return decimal.Decimal.__new__(cls, "0.0") + def clone( + self, + pdf_dest: Any, + force_duplicate: bool = False, + ignore_fields: Union[Tuple[str, ...], List[str], None] = (), + ) -> "FloatObject": + """clone object into pdf_dest""" + return cast("FloatObject", self._reference_clone(FloatObject(self), pdf_dest)) + def __repr__(self) -> str: if self == self.to_integral(): # If this is an integer, format it with no decimal place. @@ -273,6 +377,15 @@ def __new__(cls, value: Any) -> "NumberObject": logger_warning(f"NumberObject({value}) invalid; use 0 instead", __name__) return int.__new__(cls, 0) + def clone( + self, + pdf_dest: Any, + force_duplicate: bool = False, + ignore_fields: Union[Tuple[str, ...], List[str], None] = (), + ) -> "NumberObject": + """clone object into pdf_dest""" + return cast("NumberObject", self._reference_clone(NumberObject(self), pdf_dest)) + def as_numeric(self) -> int: return int(repr(self).encode("utf8")) @@ -288,7 +401,7 @@ def writeToStream( self.write_to_stream(stream, encryption_key) @staticmethod - def read_from_stream(stream: StreamType) -> Union["NumberObject", FloatObject]: + def read_from_stream(stream: StreamType) -> Union["NumberObject", "FloatObject"]: num = read_until_regex(stream, NumberObject.NumberPattern) if num.find(b".") != -1: return FloatObject(num) @@ -297,7 +410,7 @@ def read_from_stream(stream: StreamType) -> Union["NumberObject", FloatObject]: @staticmethod def readFromStream( stream: StreamType, - ) -> Union["NumberObject", FloatObject]: # pragma: no cover + ) -> Union["NumberObject", "FloatObject"]: # pragma: no cover deprecate_with_replacement("readFromStream", "read_from_stream") return NumberObject.read_from_stream(stream) @@ -310,6 +423,18 @@ class ByteStringObject(bytes, PdfObject): /O) is clearly not text, but is still stored in a "String" object. """ + def clone( + self, + pdf_dest: Any, + force_duplicate: bool = False, + ignore_fields: Union[Tuple[str, ...], List[str], None] = (), + ) -> "ByteStringObject": + """clone object into pdf_dest""" + return cast( + "ByteStringObject", + self._reference_clone(ByteStringObject(bytes(self)), pdf_dest), + ) + @property def original_bytes(self) -> bytes: """For compatibility with TextStringObject.original_bytes.""" @@ -342,6 +467,18 @@ class TextStringObject(str, PdfObject): occur. """ + def clone( + self, + pdf_dest: Any, + force_duplicate: bool = False, + ignore_fields: Union[Tuple[str, ...], List[str], None] = (), + ) -> "TextStringObject": + """clone object into pdf_dest""" + obj = TextStringObject(self) + obj.autodetect_pdfdocencoding = self.autodetect_pdfdocencoding + obj.autodetect_utf16 = self.autodetect_utf16 + return cast("TextStringObject", self._reference_clone(obj, pdf_dest)) + autodetect_pdfdocencoding = False autodetect_utf16 = False @@ -415,6 +552,15 @@ class NameObject(str, PdfObject): **{chr(i): f"#{i:02X}".encode() for i in range(33)}, } + def clone( + self, + pdf_dest: Any, + force_duplicate: bool = False, + ignore_fields: Union[Tuple[str, ...], List[str], None] = (), + ) -> "NameObject": + """clone object into pdf_dest""" + return cast("NameObject", self._reference_clone(NameObject(self), pdf_dest)) + def write_to_stream( self, stream: StreamType, encryption_key: Union[None, str, bytes] ) -> None: diff --git a/PyPDF2/generic/_data_structures.py b/PyPDF2/generic/_data_structures.py index 0aafc28ce..a60133532 100644 --- a/PyPDF2/generic/_data_structures.py +++ b/PyPDF2/generic/_data_structures.py @@ -34,6 +34,7 @@ from io import BytesIO from typing import Any, Dict, Iterable, List, Optional, Tuple, Union, cast +from .._protocols import PdfWriterProtocol from .._utils import ( WHITESPACES, StreamType, @@ -74,6 +75,33 @@ class ArrayObject(list, PdfObject): + def clone( + self, + pdf_dest: PdfWriterProtocol, + force_duplicate: bool = False, + ignore_fields: Union[Tuple[str, ...], List[str], None] = (), + ) -> "ArrayObject": + """clone object into pdf_dest""" + try: + if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore + return self + except Exception: + pass + arr = cast("ArrayObject", self._reference_clone(ArrayObject(), pdf_dest)) + for data in self: + if isinstance(data, StreamObject): + # if not hasattr(data, "indirect_reference"): + # data.indirect_reference = None + dup = data._reference_clone( + data.clone(pdf_dest, force_duplicate, ignore_fields), pdf_dest + ) + arr.append(dup.indirect_reference) + elif hasattr(data, "clone"): + arr.append(data.clone(pdf_dest, force_duplicate, ignore_fields)) + else: + arr.append(data) + return cast("ArrayObject", arr) + def items(self) -> Iterable[Any]: """ Emulate DictionaryObject.items for a list @@ -130,6 +158,92 @@ def readFromStream( class DictionaryObject(dict, PdfObject): + def clone( + self, + pdf_dest: PdfWriterProtocol, + force_duplicate: bool = False, + ignore_fields: Union[Tuple[str, ...], List[str], None] = (), + ) -> "DictionaryObject": + """clone object into pdf_dest""" + try: + if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore + return self + except Exception: + pass + + d__ = cast( + "DictionaryObject", self._reference_clone(self.__class__(), pdf_dest) + ) + if ignore_fields is None: + ignore_fields = [] + if len(d__.keys()) == 0: + d__._clone(self, pdf_dest, force_duplicate, ignore_fields) + return d__ + + def _clone( + self, + src: "DictionaryObject", + pdf_dest: PdfWriterProtocol, + force_duplicate: bool, + ignore_fields: Union[Tuple[str, ...], List[str]], + ) -> None: + """update the object from src""" + # First check if this is a chain list, we need to loop to prevent recur + if ( + ("/Next" not in ignore_fields and "/Next" in src) + or ("/Prev" not in ignore_fields and "/Prev" in src) + ) or ( + ("/N" not in ignore_fields and "/N" in src) + or ("/V" not in ignore_fields and "/V" in src) + ): + ignore_fields = list(ignore_fields) + for lst in (("/Next", "/Prev"), ("/N", "/V")): + for k in lst: + objs = [] + if ( + k in src + and k not in self + and isinstance(src.raw_get(k), IndirectObject) + ): + cur_obj: Optional["DictionaryObject"] = cast( + "DictionaryObject", src[k] + ) + prev_obj: Optional["DictionaryObject"] = self + while cur_obj is not None: + clon = cast( + "DictionaryObject", + cur_obj._reference_clone(cur_obj.__class__(), pdf_dest), + ) + objs.append((cur_obj, clon)) + assert prev_obj is not None + prev_obj[NameObject(k)] = clon.indirect_reference + prev_obj = clon + try: + if cur_obj == src: + cur_obj = None + else: + cur_obj = cast("DictionaryObject", cur_obj[k]) + except Exception: + cur_obj = None + for (s, c) in objs: + c._clone(s, pdf_dest, force_duplicate, ignore_fields + [k]) + + for k, v in src.items(): + if k not in ignore_fields: + if isinstance(v, StreamObject): + if not hasattr(v, "indirect_reference"): + v.indirect_reference = None + vv = v.clone(pdf_dest, force_duplicate, ignore_fields) + assert vv.indirect_reference is not None + self[k.clone(pdf_dest)] = vv.indirect_reference # type: ignore[attr-defined] + else: + if k not in self: + self[NameObject(k)] = ( + v.clone(pdf_dest, force_duplicate, ignore_fields) + if hasattr(v, "clone") + else v + ) + def raw_get(self, key: Any) -> Any: return dict.__getitem__(self, key) @@ -388,33 +502,63 @@ def addChild(self, child: Any, pdf: Any) -> None: # pragma: no cover deprecate_with_replacement("addChild", "add_child") self.add_child(child, pdf) - def add_child(self, child: Any, pdf: Any) -> None: # PdfWriter + def add_child(self, child: Any, pdf: PdfWriterProtocol) -> None: + self.insert_child(child, None, pdf) + + def insert_child(self, child: Any, before: Any, pdf: PdfWriterProtocol) -> None: + def inc_parent_counter( + parent: Union[None, IndirectObject, TreeObject], n: int + ) -> None: + if parent is None: + return + parent = cast("TreeObject", parent.get_object()) + if "/Count" in parent: + parent[NameObject("/Count")] = NumberObject( + cast(int, parent[NameObject("/Count")]) + n + ) + inc_parent_counter(parent.get("/Parent", None), n) + child_obj = child.get_object() - child = pdf.get_reference(child_obj) - assert isinstance(child, IndirectObject) + child = child.indirect_reference # get_reference(child_obj) + # assert isinstance(child, IndirectObject) prev: Optional[DictionaryObject] - if "/First" not in self: + if "/First" not in self: # no child yet self[NameObject("/First")] = child self[NameObject("/Count")] = NumberObject(0) - prev = None + self[NameObject("/Last")] = child + child_obj[NameObject("/Parent")] = self.indirect_reference + inc_parent_counter(self, child_obj.get("/Count", 1)) + if "/Next" in child_obj: + del child_obj["/Next"] + if "/Prev" in child_obj: + del child_obj["/Prev"] + return else: - prev = cast( - DictionaryObject, self["/Last"] - ) # TABLE 8.3 Entries in the outline dictionary - - self[NameObject("/Last")] = child - self[NameObject("/Count")] = NumberObject(self[NameObject("/Count")] + 1) # type: ignore - - if prev: - prev_ref = pdf.get_reference(prev) - assert isinstance(prev_ref, IndirectObject) - child_obj[NameObject("/Prev")] = prev_ref - prev[NameObject("/Next")] = child - - parent_ref = pdf.get_reference(self) - assert isinstance(parent_ref, IndirectObject) - child_obj[NameObject("/Parent")] = parent_ref + prev = cast("DictionaryObject", self["/Last"]) + + while prev.indirect_reference != before: + if "/Next" in prev: + prev = cast("TreeObject", prev["/Next"]) + else: # append at the end + prev[NameObject("/Next")] = cast("TreeObject", child) + child_obj[NameObject("/Prev")] = prev.indirect_reference + child_obj[NameObject("/Parent")] = self.indirect_reference + if "/Next" in child_obj: + del child_obj["/Next"] + self[NameObject("/Last")] = child + inc_parent_counter(self, child_obj.get("/Count", 1)) + return + try: # insert as first or in the middle + assert isinstance(prev["/Prev"], DictionaryObject) + prev["/Prev"][NameObject("/Next")] = child + child_obj[NameObject("/Prev")] = prev["/Prev"] + except Exception: # it means we are inserting in first position + del child_obj["/Next"] + child_obj[NameObject("/Next")] = prev + prev[NameObject("/Prev")] = child + child_obj[NameObject("/Parent")] = self.indirect_reference + inc_parent_counter(self, child_obj.get("/Count", 1)) def removeChild(self, child: Any) -> None: # pragma: no cover deprecate_with_replacement("removeChild", "remove_child") @@ -457,6 +601,7 @@ def _remove_node_from_tree( def remove_child(self, child: Any) -> None: child_obj = child.get_object() + child = child_obj.indirect_reference if NameObject("/Parent") not in child_obj: raise ValueError("Removed child does not appear to be a tree item") @@ -533,7 +678,27 @@ def _reset_node_tree_relationship(child_obj: Any) -> None: class StreamObject(DictionaryObject): def __init__(self) -> None: self.__data: Optional[str] = None - self.decoded_self: Optional[DecodedStreamObject] = None + self.decoded_self: Optional["DecodedStreamObject"] = None + + def _clone( + self, + src: DictionaryObject, + pdf_dest: PdfWriterProtocol, + force_duplicate: bool, + ignore_fields: Union[Tuple[str, ...], List[str]], + ) -> None: + """update the object from src""" + self._data = cast("StreamObject", src)._data + try: + decoded_self = cast("StreamObject", src).decoded_self + if decoded_self is None: + self.decoded_self = None + else: + self.decoded_self = decoded_self.clone(pdf_dest, True, ignore_fields) # type: ignore[assignment] + except Exception: + pass + super()._clone(src, pdf_dest, force_duplicate, ignore_fields) + return def hash_value_data(self) -> bytes: data = super().hash_value_data() @@ -636,7 +801,7 @@ def setData(self, data: Any) -> None: # pragma: no cover class EncodedStreamObject(StreamObject): def __init__(self) -> None: - self.decoded_self: Optional[DecodedStreamObject] = None + self.decoded_self: Optional["DecodedStreamObject"] = None @property def decodedSelf(self) -> Optional["DecodedStreamObject"]: # pragma: no cover @@ -693,21 +858,58 @@ def __init__( # stream may be a StreamObject or an ArrayObject containing # multiple StreamObjects to be cat'd together. - stream = stream.get_object() - if isinstance(stream, ArrayObject): - data = b"" - for s in stream: - data += b_(s.get_object().get_data()) - if len(data) == 0 or data[-1] != b"\n": - data += b"\n" - stream_bytes = BytesIO(data) - else: - stream_data = stream.get_data() - assert stream_data is not None - stream_data_bytes = b_(stream_data) - stream_bytes = BytesIO(stream_data_bytes) - self.forced_encoding = forced_encoding - self.__parse_content_stream(stream_bytes) + if stream is not None: + stream = stream.get_object() + if isinstance(stream, ArrayObject): + data = b"" + for s in stream: + data += b_(s.get_object().get_data()) + if len(data) == 0 or data[-1] != b"\n": + data += b"\n" + stream_bytes = BytesIO(data) + else: + stream_data = stream.get_data() + assert stream_data is not None + stream_data_bytes = b_(stream_data) + stream_bytes = BytesIO(stream_data_bytes) + self.forced_encoding = forced_encoding + self.__parse_content_stream(stream_bytes) + + def clone( + self, + pdf_dest: Any, + force_duplicate: bool = False, + ignore_fields: Union[Tuple[str, ...], List[str], None] = (), + ) -> "ContentStream": + """clone object into pdf_dest""" + try: + if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore + return self + except Exception: + pass + + d__ = cast( + "ContentStream", self._reference_clone(self.__class__(None, None), pdf_dest) + ) + if ignore_fields is None: + ignore_fields = [] + d__._clone(self, pdf_dest, force_duplicate, ignore_fields) + return d__ + + def _clone( + self, + src: DictionaryObject, + pdf_dest: PdfWriterProtocol, + force_duplicate: bool, + ignore_fields: Union[Tuple[str, ...], List[str]], + ) -> None: + """update the object from src""" + self.pdf = pdf_dest + self.operations = list(cast("ContentStream", src).operations) + self.forced_encoding = cast("ContentStream", src).forced_encoding + # no need to call DictionaryObjection or any + # super(DictionaryObject,self)._clone(src, pdf_dest, force_duplicate, ignore_fields) + return def __parse_content_stream(self, stream: StreamType) -> None: stream.seek(0, 0) @@ -921,7 +1123,7 @@ def parent(self) -> Optional[DictionaryObject]: return self.get(FieldDictionaryAttributes.Parent) @property - def kids(self) -> Optional[ArrayObject]: + def kids(self) -> Optional["ArrayObject"]: """Read-only property accessing the kids of this field.""" return self.get(FieldDictionaryAttributes.Kids) @@ -1029,6 +1231,11 @@ class Destination(TreeObject): """ + node: Optional[ + DictionaryObject + ] = None # node provide access to the original Object + childs: List[Any] = [] # used in PdfWriter + def __init__( self, title: str, @@ -1073,7 +1280,7 @@ def __init__( raise PdfReadError(f"Unknown Destination Type: {typ!r}") @property - def dest_array(self) -> ArrayObject: + def dest_array(self) -> "ArrayObject": return ArrayObject( [self.raw_get("/Page"), self["/Type"]] + [ @@ -1083,7 +1290,7 @@ def dest_array(self) -> ArrayObject: ] ) - def getDestArray(self) -> ArrayObject: # pragma: no cover + def getDestArray(self) -> "ArrayObject": # pragma: no cover """ .. deprecated:: 1.28.3 @@ -1152,7 +1359,7 @@ def bottom(self) -> Optional[FloatObject]: return self.get("/Bottom", None) @property - def color(self) -> Optional[ArrayObject]: + def color(self) -> Optional["ArrayObject"]: """Read-only property accessing the color in (R, G, B) with values 0.0-1.0""" return self.get( "/C", ArrayObject([FloatObject(0), FloatObject(0), FloatObject(0)]) diff --git a/PyPDF2/types.py b/PyPDF2/types.py index 8b96e7e7e..9683c1edd 100644 --- a/PyPDF2/types.py +++ b/PyPDF2/types.py @@ -1,12 +1,12 @@ """Helpers for working with PDF types.""" -from typing import Any, Dict, List, Optional, Union +from typing import List, Union try: # Python 3.8+: https://peps.python.org/pep-0586 - from typing import Literal, Protocol # type: ignore[attr-defined] + from typing import Literal # type: ignore[attr-defined] except ImportError: - from typing_extensions import Literal, Protocol # type: ignore[misc] + from typing_extensions import Literal # type: ignore[misc] try: # Python 3.10+: https://www.python.org/dev/peps/pep-0484/ @@ -54,24 +54,3 @@ "/UseOC", "/UseAttachments", ] - - -class PdfReaderProtocol(Protocol): # pragma: no cover - @property - def pdf_header(self) -> str: - ... - - @property - def strict(self) -> bool: - ... - - @property - def xref(self) -> Dict[int, Dict[int, Any]]: - ... - - @property - def pages(self) -> List[Any]: - ... - - def get_object(self, indirect_reference: Any) -> Optional[Any]: - ... diff --git a/docs/user/merging-pdfs.md b/docs/user/merging-pdfs.md index 9cbc1e94b..ff5d20ff7 100644 --- a/docs/user/merging-pdfs.md +++ b/docs/user/merging-pdfs.md @@ -3,9 +3,9 @@ ## Basic Example ```python -from PyPDF2 import PdfMerger +from PyPDF2 import PdfWriter -merger = PdfMerger() +merger = PdfWriter() for pdf in ["file1.pdf", "file2.pdf", "file3.pdf"]: merger.append(pdf) @@ -21,9 +21,9 @@ by Paul Rooney. ## Showing more merging options ```python -from PyPDF2 import PdfMerger +from PyPDF2 import PdfWriter -merger = PdfMerger() +merger = PdfWriter() input1 = open("document1.pdf", "rb") input2 = open("document2.pdf", "rb") @@ -46,3 +46,76 @@ merger.write(output) merger.close() output.close() ``` + +## append +`append` has been slighlty extended in `PdfWriter`. + +see [pdfWriter.append](../modules/PdfWriter.html#PyPDF2.PdfWriter.append) for more details + +**parameters:** + +*fileobj*: PdfReader or filename to merge +*outline_item*: string of a outline/bookmark pointing to the beginning of the inserted file. + if None, or omitted, no bookmark will be added. +*pages*: pages to merge ; you can also provide a list of pages to merge + None(default) means that the full document will be merged. +*import_outline*: import/ignore the pertinent outlines from the source (default True) +*excluded_fields*: list of keys to be ignored for the imported objects; + if "/Annots" is part of the list, the annotation will be ignored + if "/B" is part of the list, the articles will be ignored + +examples: + +`writer.append("source.pdf",(0,10)) # append the first 10 pages of source.pdf` + +`writer.append(reader,"page 1 and 10",[0,9]) #append first and 10th page from reader and create an outline)` + +During the merging, the relevant named destination will also imported. + +If you want to insert pages in the middle of the destination, use merge (which provides (insert) position) + +You can now insert the same page multiple times. You can also insert the same page many time at once with a list: + +eg: +`writer.append(reader,[0,1,0,2,0])` +will insert the pages (1), (2), with page (0) before, in the middle and after + +## add_page / insert_page +It is recommended to use `append` or `merge` instead + +## reset_translation +During the cloning, if an object has been already cloned, it will not be cloned again, + a pointer this previously cloned object is returned. because of that, if you add/merge a page that has + been already added, the same object will be added the second time. If later you modify any of these two page, + both pages can be modified independantly. + +To reset, call `writer.reset_translation(reader)` + +## Advanced cloning +In order to prevent side effect between pages/objects objects and all objects linked are linked during merging. + +This process will be automatically applied if you use PdfWriter.append/merge/add_page/insert_page. +If you want to clone an object before attaching it "manually", use clone function of any PdfObject: +eg: + +`cloned_object = object.clone(writer)` + +if you try clone an object already belonging to writer, it will return the same object + +`cloned_object == object.clone(writer) # -> returns True` + +the same, if you try to clone twice an object it will return the previously cloned object + +`object.clone(writer) == object.clone(writer) # -> returns True` + +Also, note that if you clone an object, you will clone all the objects below +including the objects pointed by IndirectObject. because of that if you clone +a page that includes some articles ("/B"), +not only the first article, but also all the chained articles, and the pages +where those articles can be read will be copied. +It means that you may copy lots of objects, that will be saved in the output pdf. + +In order to prevent, that you can provide the list of defined the fields in the dictionaries to be ignored: + +eg: +`new_page = writer.add_page(reader.pages[0],excluded_fields=["/B"])` diff --git a/resources/Seige_of_Vicksburg_Sample_OCR-crazyones-merged.pdf b/resources/Seige_of_Vicksburg_Sample_OCR-crazyones-merged.pdf index 6ce0ca1bb..fea766cc7 100644 Binary files a/resources/Seige_of_Vicksburg_Sample_OCR-crazyones-merged.pdf and b/resources/Seige_of_Vicksburg_Sample_OCR-crazyones-merged.pdf differ diff --git a/tests/test_generic.py b/tests/test_generic.py index 5f9e67dd3..cffacc964 100644 --- a/tests/test_generic.py +++ b/tests/test_generic.py @@ -22,7 +22,9 @@ NullObject, NumberObject, OutlineItem, + PdfObject, RectangleObject, + StreamObject, TextStringObject, TreeObject, create_string_object, @@ -473,8 +475,8 @@ def test_remove_child_not_in_that_tree(): tree = TreeObject() tree.indirect_reference = NullObject() - # child = ChildDummy(TreeObject()) child = TreeObject() + child.indirect_reference = NullObject() with pytest.raises(ValueError) as exc: child.remove_from_tree() assert exc.value.args[0] == "Removed child does not appear to be a tree item" @@ -943,3 +945,48 @@ def test_create_string_object_force(): ) def test_float_object_decimal_to_string(value, expected): assert repr(FloatObject(value)) == expected + + +def test_cloning(caplog): + # pdf_path = RESOURCE_ROOT / "crazyones.pdf" + # reader = PdfReader(pdf_path) + # page = reader.pages[0] + writer = PdfWriter() + with pytest.raises(Exception) as exc: + PdfObject().clone(writer) + assert "clone PdfObject" in exc.value.args[0] + + obj1 = DictionaryObject() + obj1.indirect_reference = None + n = len(writer._objects) + obj2 = obj1.clone(writer) + assert len(writer._objects) == n + 1 + obj3 = obj2.clone(writer) + assert len(writer._objects) == n + 1 + assert obj2.indirect_reference == obj3.indirect_reference + obj3 = obj2.indirect_reference.clone(writer) + assert len(writer._objects) == n + 1 + assert obj2.indirect_reference == obj3.indirect_reference + assert obj2.indirect_reference == obj2._reference_clone(obj2, writer).indirect_reference + assert len(writer._objects) == n + 1 + assert obj2.indirect_reference == obj3.indirect_reference + + obj3 = obj2.indirect_reference.clone(writer, True) + assert len(writer._objects) == n + 2 + assert obj2.indirect_reference != obj3.indirect_reference + + arr1 = ArrayObject([obj2]) + arr2 = arr1.clone(writer) + arr3 = arr2.clone(writer) + assert arr2 == arr3 + obj10 = StreamObject() + arr1 = ArrayObject([obj10]) + obj11 = obj10.clone(writer) + assert arr1[0] == obj11 + + obj20 = DictionaryObject( + {NameObject("/Test"): NumberObject(1), NameObject("/Test2"): StreamObject()} + ) + obj21 = obj20.clone(writer, ignore_fields=None) + assert "/Test" in obj21 + assert isinstance(obj21.get("/Test2"), IndirectObject) diff --git a/tests/test_merger.py b/tests/test_merger.py index 45eda8b07..f33c29d7e 100644 --- a/tests/test_merger.py +++ b/tests/test_merger.py @@ -6,7 +6,7 @@ import pytest import PyPDF2 -from PyPDF2 import PdfMerger, PdfReader +from PyPDF2 import PdfMerger, PdfReader, PdfWriter from PyPDF2.generic import Destination, Fit from . import get_pdf_from_url @@ -172,6 +172,19 @@ def test_merger_operations_by_traditional_usage(tmp_path): check_outline(path) +def test_merger_operations_by_traditional_usage_with_writer(tmp_path): + # Arrange + merger = PdfWriter() + merger_operate(merger) + path = tmp_path / tmp_filename + + # Act + merger.write(path) + merger.close() + # Assert + check_outline(path) + + def test_merger_operations_by_semi_traditional_usage(tmp_path): path = tmp_path / tmp_filename @@ -184,10 +197,31 @@ def test_merger_operations_by_semi_traditional_usage(tmp_path): check_outline(path) +def test_merger_operations_by_semi_traditional_usage_with_writer(tmp_path): + path = tmp_path / tmp_filename + + with PdfWriter() as merger: + merger_operate(merger) + merger.write(path) # Act + + # Assert + assert os.path.isfile(path) + check_outline(path) + + def test_merger_operation_by_new_usage(tmp_path): path = tmp_path / tmp_filename with PdfMerger(fileobj=path) as merger: merger_operate(merger) + # Assert + assert os.path.isfile(path) + check_outline(path) + + +def test_merger_operation_by_new_usage_with_writer(tmp_path): + path = tmp_path / tmp_filename + with PdfWriter(fileobj=path) as merger: + merger_operate(merger) # Assert assert os.path.isfile(path) @@ -203,6 +237,18 @@ def test_merge_page_exception(): merger.close() +def test_merge_page_exception_with_writer(): + merger = PyPDF2.PdfWriter() + pdf_path = RESOURCE_ROOT / "crazyones.pdf" + with pytest.raises(TypeError) as exc: + merger.merge(0, pdf_path, pages="a:b") + assert ( + exc.value.args[0] + == '"pages" must be a tuple of (start, stop[, step]) or a list' + ) + merger.close() + + def test_merge_page_tuple(): merger = PyPDF2.PdfMerger() pdf_path = RESOURCE_ROOT / "crazyones.pdf" @@ -210,6 +256,13 @@ def test_merge_page_tuple(): merger.close() +def test_merge_page_tuple_with_writer(): + merger = PyPDF2.PdfWriter() + pdf_path = RESOURCE_ROOT / "crazyones.pdf" + merger.merge(0, pdf_path, pages=(0, 1)) + merger.close() + + def test_merge_write_closed_fh(): merger = PyPDF2.PdfMerger() pdf_path = RESOURCE_ROOT / "crazyones.pdf" @@ -247,6 +300,43 @@ def test_merge_write_closed_fh(): assert exc.value.args[0] == err_closed +def test_merge_write_closed_fh_with_writer(): + merger = PyPDF2.PdfWriter() + pdf_path = RESOURCE_ROOT / "crazyones.pdf" + merger.append(pdf_path) + + # err_closed = "close() was called and thus the writer cannot be used anymore" + + merger.close() + # with pytest.raises(RuntimeError) as exc: + merger.write("stream.pdf") + # assert exc.value.args[0] == err_closed + + # with pytest.raises(RuntimeError) as exc: + merger.add_metadata({"author": "Martin Thoma"}) + # assert exc.value.args[0] == err_closed + + # with pytest.raises(RuntimeError) as exc: + merger.set_page_layout("/SinglePage") + # assert exc.value.args[0] == err_closed + + # with pytest.raises(RuntimeError) as exc: + merger.set_page_mode("/UseNone") + # assert exc.value.args[0] == err_closed + + # with pytest.raises(RuntimeError) as exc: + # merger._write_outline() + # assert exc.value.args[0] == err_closed + + # with pytest.raises(RuntimeError) as exc: + merger.add_outline_item("An outline item", 0) + # assert exc.value.args[0] == err_closed + + # with pytest.raises(RuntimeError) as exc: + # merger._write_dests() + # assert exc.value.args[0] == err_closed + + @pytest.mark.external def test_trim_outline_list(): url = "https://corpora.tika.apache.org/base/docs/govdocs1/995/995175.pdf" @@ -261,6 +351,20 @@ def test_trim_outline_list(): os.remove("tmp-merger-do-not-commit.pdf") +@pytest.mark.external +def test_trim_outline_list_with_writer(): + url = "https://corpora.tika.apache.org/base/docs/govdocs1/995/995175.pdf" + name = "tika-995175.pdf" + reader = PdfReader(BytesIO(get_pdf_from_url(url, name=name))) + merger = PdfWriter() + merger.append(reader) + merger.write("tmp-merger-do-not-commit.pdf") + merger.close() + + # cleanup + os.remove("tmp-merger-do-not-commit.pdf") + + @pytest.mark.external def test_zoom(): url = "https://corpora.tika.apache.org/base/docs/govdocs1/994/994759.pdf" @@ -275,6 +379,20 @@ def test_zoom(): os.remove("tmp-merger-do-not-commit.pdf") +@pytest.mark.external +def test_zoom_with_writer(): + url = "https://corpora.tika.apache.org/base/docs/govdocs1/994/994759.pdf" + name = "tika-994759.pdf" + reader = PdfReader(BytesIO(get_pdf_from_url(url, name=name))) + merger = PdfWriter() + merger.append(reader) + merger.write("tmp-merger-do-not-commit.pdf") + merger.close() + + # cleanup + os.remove("tmp-merger-do-not-commit.pdf") + + @pytest.mark.external def test_zoom_xyz_no_left(): url = "https://corpora.tika.apache.org/base/docs/govdocs1/933/933322.pdf" @@ -289,6 +407,20 @@ def test_zoom_xyz_no_left(): os.remove("tmp-merger-do-not-commit.pdf") +@pytest.mark.external +def test_zoom_xyz_no_left_with_writer(): + url = "https://corpora.tika.apache.org/base/docs/govdocs1/933/933322.pdf" + name = "tika-933322.pdf" + reader = PdfReader(BytesIO(get_pdf_from_url(url, name=name))) + merger = PdfWriter() + merger.append(reader) + merger.write("tmp-merger-do-not-commit.pdf") + merger.close() + + # cleanup + os.remove("tmp-merger-do-not-commit.pdf") + + @pytest.mark.external def test_outline_item(): url = "https://corpora.tika.apache.org/base/docs/govdocs1/997/997511.pdf" @@ -303,6 +435,21 @@ def test_outline_item(): os.remove("tmp-merger-do-not-commit.pdf") +@pytest.mark.external +@pytest.mark.slow +def test_outline_item_with_writer(): + url = "https://corpora.tika.apache.org/base/docs/govdocs1/997/997511.pdf" + name = "tika-997511.pdf" + reader = PdfReader(BytesIO(get_pdf_from_url(url, name=name))) + merger = PdfWriter() + merger.append(reader) + merger.write("tmp-merger-do-not-commit.pdf") + merger.close() + + # cleanup + os.remove("tmp-merger-do-not-commit.pdf") + + @pytest.mark.external @pytest.mark.slow def test_trim_outline(): @@ -318,6 +465,21 @@ def test_trim_outline(): os.remove("tmp-merger-do-not-commit.pdf") +@pytest.mark.external +@pytest.mark.slow +def test_trim_outline_with_writer(): + url = "https://corpora.tika.apache.org/base/docs/govdocs1/982/982336.pdf" + name = "tika-982336.pdf" + reader = PdfReader(BytesIO(get_pdf_from_url(url, name=name))) + merger = PdfWriter() + merger.append(reader) + merger.write("tmp-merger-do-not-commit.pdf") + merger.close() + + # cleanup + os.remove("tmp-merger-do-not-commit.pdf") + + @pytest.mark.external @pytest.mark.slow def test1(): @@ -333,6 +495,21 @@ def test1(): os.remove("tmp-merger-do-not-commit.pdf") +@pytest.mark.external +@pytest.mark.slow +def test1_with_writer(): + url = "https://corpora.tika.apache.org/base/docs/govdocs1/923/923621.pdf" + name = "tika-923621.pdf" + reader = PdfReader(BytesIO(get_pdf_from_url(url, name=name))) + merger = PdfWriter() + merger.append(reader) + merger.write("tmp-merger-do-not-commit.pdf") + merger.close() + + # cleanup + os.remove("tmp-merger-do-not-commit.pdf") + + @pytest.mark.external @pytest.mark.slow def test_sweep_recursion1(): @@ -352,6 +529,25 @@ def test_sweep_recursion1(): os.remove("tmp-merger-do-not-commit.pdf") +@pytest.mark.external +@pytest.mark.slow +def test_sweep_recursion1_with_writer(): + # TODO: This test looks like an infinite loop. + url = "https://corpora.tika.apache.org/base/docs/govdocs1/924/924546.pdf" + name = "tika-924546.pdf" + reader = PdfReader(BytesIO(get_pdf_from_url(url, name=name))) + merger = PdfWriter() + merger.append(reader) + merger.write("tmp-merger-do-not-commit.pdf") + merger.close() + + reader2 = PdfReader("tmp-merger-do-not-commit.pdf") + reader2.pages + + # cleanup + os.remove("tmp-merger-do-not-commit.pdf") + + @pytest.mark.external @pytest.mark.slow @pytest.mark.parametrize( @@ -383,7 +579,37 @@ def test_sweep_recursion2(url, name): @pytest.mark.external -def test_sweep_indirect_list_newobj_is_None(caplog): +@pytest.mark.slow +@pytest.mark.parametrize( + ("url", "name"), + [ + ( + # TODO: This test looks like an infinite loop. + "https://corpora.tika.apache.org/base/docs/govdocs1/924/924794.pdf", + "tika-924794.pdf", + ), + ( + "https://corpora.tika.apache.org/base/docs/govdocs1/924/924546.pdf", + "tika-924546.pdf", + ), + ], +) +def test_sweep_recursion2_with_writer(url, name): + reader = PdfReader(BytesIO(get_pdf_from_url(url, name=name))) + merger = PdfMerger() + merger.append(reader) + merger.write("tmp-merger-do-not-commit.pdf") + merger.close() + + reader2 = PdfReader("tmp-merger-do-not-commit.pdf") + reader2.pages + + # cleanup + os.remove("tmp-merger-do-not-commit.pdf") + + +@pytest.mark.external +def test_sweep_indirect_list_newobj_is_none(caplog): url = "https://corpora.tika.apache.org/base/docs/govdocs1/906/906769.pdf" name = "tika-906769.pdf" reader = PdfReader(BytesIO(get_pdf_from_url(url, name=name))) @@ -400,6 +626,24 @@ def test_sweep_indirect_list_newobj_is_None(caplog): os.remove("tmp-merger-do-not-commit.pdf") +@pytest.mark.external +def test_sweep_indirect_list_newobj_is_none_with_writer(caplog): + url = "https://corpora.tika.apache.org/base/docs/govdocs1/906/906769.pdf" + name = "tika-906769.pdf" + reader = PdfReader(BytesIO(get_pdf_from_url(url, name=name))) + merger = PdfWriter() + merger.append(reader) + merger.write("tmp-merger-do-not-commit.pdf") + merger.close() + # used to be: assert "Object 21 0 not defined." in caplog.text + + reader2 = PdfReader("tmp-merger-do-not-commit.pdf") + reader2.pages + + # cleanup + os.remove("tmp-merger-do-not-commit.pdf") + + @pytest.mark.external def test_iss1145(): # issue with FitH destination with null param @@ -410,6 +654,16 @@ def test_iss1145(): merger.close() +@pytest.mark.external +def test_iss1145_with_writer(): + # issue with FitH destination with null param + url = "https://github.com/py-pdf/PyPDF2/files/9164743/file-0.pdf" + name = "iss1145.pdf" + merger = PdfWriter() + merger.append(PdfReader(BytesIO(get_pdf_from_url(url, name=name)))) + merger.close() + + def test_deprecate_bookmark_decorator_warning(): reader = PdfReader(RESOURCE_ROOT / "outlines-with-invalid-destinations.pdf") merger = PdfMerger() @@ -420,6 +674,16 @@ def test_deprecate_bookmark_decorator_warning(): merger.merge(0, reader, import_bookmarks=True) +def test_deprecate_bookmark_decorator_warning_with_writer(): + reader = PdfReader(RESOURCE_ROOT / "outlines-with-invalid-destinations.pdf") + merger = PdfWriter() + with pytest.warns( + UserWarning, + match="import_bookmarks is deprecated as an argument. Use import_outline instead", + ): + merger.merge(0, reader, import_bookmarks=True) + + @pytest.mark.filterwarnings("ignore::UserWarning") def test_deprecate_bookmark_decorator_output(): reader = PdfReader(RESOURCE_ROOT / "outlines-with-invalid-destinations.pdf") @@ -429,6 +693,17 @@ def test_deprecate_bookmark_decorator_output(): assert merger.outline[0].title == first_oi_title +@pytest.mark.filterwarnings("ignore::UserWarning") +def test_deprecate_bookmark_decorator_output_with_writer(): + reader = PdfReader(RESOURCE_ROOT / "outlines-with-invalid-destinations.pdf") + merger = PdfWriter() + merger.merge(0, reader, import_bookmarks=True) + first_oi_title = 'Valid Destination: Action /GoTo Named Destination "section.1"' + # TODO? : add outline property ??? + # assert merger.outline[0].title == first_oi_title + assert merger.find_outline_item(first_oi_title) == [0] + + @pytest.mark.external def test_iss1344(caplog): url = "https://github.com/py-pdf/PyPDF2/files/9549001/input.pdf" @@ -437,6 +712,34 @@ def test_iss1344(caplog): m.append(PdfReader(BytesIO(get_pdf_from_url(url, name=name)))) b = BytesIO() m.write(b) + r = PdfReader(b) + p = r.pages[0] + assert "/DIJMAC+Arial Black" in p._debug_for_extract() + assert "adresse où le malade peut être visité" in p.extract_text() + assert r.threads is None + + +@pytest.mark.external +def test_iss1344_with_writer(caplog): + url = "https://github.com/py-pdf/PyPDF2/files/9549001/input.pdf" + name = "iss1344.pdf" + m = PdfWriter() + m.append(PdfReader(BytesIO(get_pdf_from_url(url, name=name)))) + b = BytesIO() + m.write(b) p = PdfReader(b).pages[0] assert "/DIJMAC+Arial Black" in p._debug_for_extract() assert "adresse où le malade peut être visité" in p.extract_text() + + +@pytest.mark.external +def test_articles_with_writer(caplog): + url = "https://corpora.tika.apache.org/base/docs/govdocs1/924/924666.pdf" + name = "924666.pdf" + m = PdfWriter() + m.append(PdfReader(BytesIO(get_pdf_from_url(url, name=name))), (2, 10)) + b = BytesIO() + m.write(b) + r = PdfReader(b) + assert len(r.threads) == 4 + assert r.threads[0].get_object()["/F"]["/P"] == r.pages[0] diff --git a/tests/test_writer.py b/tests/test_writer.py index 6f1219a08..f179a1443 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -512,11 +512,18 @@ def test_add_named_destination(): assert root[0] == "A named dest" assert root[1].pdf == writer assert root[1].get_object()["/S"] == NameObject("/GoTo") - assert root[1].get_object()["/D"][0].get_object() == writer.pages[2] + assert root[1].get_object()["/D"][0] == writer.pages[2].indirect_reference assert root[2] == "A named dest2" assert root[3].pdf == writer assert root[3].get_object()["/S"] == NameObject("/GoTo") - assert root[3].get_object()["/D"][0].get_object() == writer.pages[2] + assert root[3].get_object()["/D"][0] == writer.pages[2].indirect_reference + + # test get_object + + assert writer.get_object(root[1].idnum) == writer.get_object(root[1]) + with pytest.raises(ValueError) as exc: + writer.get_object(reader.pages[0].indirect_reference) + assert exc.value.args[0] == "pdf must be self" # write "output" to PyPDF2-output.pdf tmp_filename = "dont_commit_named_destination.pdf" @@ -732,14 +739,7 @@ def test_write_dict_stream_object(): # Writer will replace this stream object with indirect object page_object[NameObject("/Test")] = stream_object - writer.add_page(page_object) - - for k, v in page_object.items(): - if k == "/Test": - assert str(v) == str(stream_object) - break - else: - assert False, "/Test not found" + page_object = writer.add_page(page_object) with open("tmp-writer-do-not-commit.pdf", "wb") as fp: writer.write(fp) @@ -882,6 +882,44 @@ def test_startup_dest(): pdf_file_writer.open_destination = None +def test_iss471(): + url = "https://github.com/py-pdf/PyPDF2/files/9139245/book.pdf" + name = "book_471.pdf" + reader = PdfReader(BytesIO(get_pdf_from_url(url, name=name))) + + writer = PdfWriter() + writer.append(reader, excluded_fields=[]) + assert isinstance( + writer.pages[0]["/Annots"][0].get_object()["/Dest"], TextStringObject + ) + + +def test_reset_translation(): + url = "https://corpora.tika.apache.org/base/docs/govdocs1/924/924666.pdf" + name = "tika-924666.pdf" + reader = PdfReader(BytesIO(get_pdf_from_url(url, name=name))) + writer = PdfWriter() + writer.append(reader, (0, 10)) + nb = len(writer._objects) + writer.append(reader, (0, 10)) + assert ( + len(writer._objects) == nb + 11 + ) # +10 (pages) +1 because of the added outline + nb += 1 + writer.reset_translation(reader) + writer.append(reader, (0, 10)) + assert len(writer._objects) >= nb + 200 + nb = len(writer._objects) + writer.reset_translation(reader.pages[0].indirect_reference) + writer.append(reader, (0, 10)) + assert len(writer._objects) >= nb + 200 + nb = len(writer._objects) + writer.reset_translation() + writer.append(reader, (0, 10)) + assert len(writer._objects) >= nb + 200 + nb = len(writer._objects) + + def test_threads_empty(): writer = PdfWriter() thr = writer.threads @@ -889,3 +927,33 @@ def test_threads_empty(): assert len(thr) == 0 thr2 = writer.threads assert thr == thr2 + + +def test_append_without_annots_and_articles(): + url = "https://corpora.tika.apache.org/base/docs/govdocs1/924/924666.pdf" + name = "tika-924666.pdf" + reader = PdfReader(BytesIO(get_pdf_from_url(url, name=name))) + writer = PdfWriter() + writer.append(reader, None, (0, 10), True, ["/B"]) + assert writer.threads == [] + writer = PdfWriter() + writer.append(reader, None, (0, 10), True, ["/Annots"]) + assert "/Annots" not in writer.pages[5] + writer = PdfWriter() + writer.append(reader, None, (0, 10), True, []) + assert "/Annots" in writer.pages[5] + assert len(writer.threads) >= 1 + + +def test_append_multiple(): + url = "https://corpora.tika.apache.org/base/docs/govdocs1/924/924666.pdf" + name = "tika-924666.pdf" + reader = PdfReader(BytesIO(get_pdf_from_url(url, name=name))) + writer = PdfWriter() + writer.append( + reader, [0, 0, 0] + ) # to demonstre multiple insertion of same page at once + writer.append(reader, [0, 0, 0]) # second pack + pages = writer._root_object["/Pages"]["/Kids"] + assert pages[0] not in pages[1:] # page not repeated + assert pages[-1] not in pages[0:-1] # page not repeated