Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG/ENH: make attachements compatible with kids, and allow list in RF #2197

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions pypdf/_protocols.py
Expand Up @@ -76,6 +76,9 @@ def write(self, stream: Union[Path, StrByteType]) -> Tuple[bool, IO]:
def _add_object(self, obj: Any) -> Any:
...

def _replace_object(self, indirect_reference: Any, obj: Any) -> Any:
...

@property
def pages(self) -> List[Any]:
...
Expand Down
131 changes: 37 additions & 94 deletions pypdf/_reader.py
Expand Up @@ -39,9 +39,7 @@
Callable,
Dict,
Iterable,
Iterator,
List,
Mapping,
Optional,
Tuple,
Union,
Expand Down Expand Up @@ -87,6 +85,7 @@
)
from .generic import (
ArrayObject,
AttachmentBytesDictionary,
BooleanObject,
ContentStream,
DecodedStreamObject,
Expand All @@ -98,6 +97,7 @@
FloatObject,
IndirectObject,
NameObject,
NameTree,
NullObject,
NumberObject,
PdfObject,
Expand Down Expand Up @@ -2206,107 +2206,50 @@ def rename_form_topname(self, name: str) -> Optional[DictionaryObject]:
interim[NameObject("/T")] = TextStringObject(name)
return interim

@property
def attachments(self) -> Mapping[str, List[bytes]]:
return LazyDict(
{
name: (self._get_attachment_list, name)
for name in self._list_attachments()
}
)

def _list_attachments(self) -> List[str]:
def _get_embedded_files_root(self) -> Optional[NameTree]:
"""
Retrieves the list of filenames of file attachments.

Returns:
list of filenames
Returns the EmbeddedFiles root as a NameTree Object
if the root does not exists, return None
"""
catalog = cast(DictionaryObject, self.trailer["/Root"])
# From the catalog get the embedded file names
try:
filenames = cast(
ArrayObject,
cast(
DictionaryObject,
cast(DictionaryObject, catalog["/Names"])["/EmbeddedFiles"],
)["/Names"],
)
except KeyError:
return []
attachments_names = [f for f in filenames if isinstance(f, str)]
return attachments_names

def _get_attachment_list(self, name: str) -> List[bytes]:
out = self._get_attachments(name)[name]
if isinstance(out, list):
return out
return [out]

def _get_attachments(
self, filename: Optional[str] = None
) -> Dict[str, Union[bytes, List[bytes]]]:
if "/Names" not in catalog:
return None
ef = cast(DictionaryObject, catalog["/Names"]).get("/EmbeddedFiles", None)
if ef is None:
return None
efo = ef.get_object()
# not for reader
"""
Retrieves all or selected file attachments of the PDF as a dictionary of file names
and the file data as a bytestring.

Args:
filename: If filename is None, then a dictionary of all attachments
will be returned, where the key is the filename and the value
is the content. Otherwise, a dictionary with just a single key
- the filename - and its content will be returned.
if not isinstance(efo,NameTree):
if isinstance(ef,IndirectObject):
ef.replace_object(efo)
else:
cast(DictionaryObject,catalog["/Names"])[
NameObject("/EmbeddedFiles")] = NameTree(efo)
"""
return NameTree(efo)

@property
def attachments_names(self) -> List[str]:
"""
Returns:
dictionary of filename -> Union[bytestring or List[ByteString]]
if the filename exists multiple times a List of the different version will be provided
List of names
"""
catalog = cast(DictionaryObject, self.trailer["/Root"])
# From the catalog get the embedded file names
try:
filenames = cast(
ArrayObject,
cast(
DictionaryObject,
cast(DictionaryObject, catalog["/Names"])["/EmbeddedFiles"],
)["/Names"],
)
except KeyError:
return {}
attachments: Dict[str, Union[bytes, List[bytes]]] = {}
# Loop through attachments
for i in range(len(filenames)):
f = filenames[i]
if isinstance(f, str):
if filename is not None and f != filename:
continue
name = f
f_dict = filenames[i + 1].get_object()
f_data = f_dict["/EF"]["/F"].get_data()
if name in attachments:
if not isinstance(attachments[name], list):
attachments[name] = [attachments[name]] # type:ignore
attachments[name].append(f_data) # type:ignore
else:
attachments[name] = f_data
return attachments
return self.attachments.keys()
Comment on lines +2232 to +2238
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer not to have the new attachments_names property. I think users and pypdf itself should call self.attachments.keys() directly.


@property
def attachments(self) -> AttachmentBytesDictionary:
"""
extracts the /EF entries as bytes from the embedded files
Returns:
Dictionary with the filenames as keys and the file content as bytes,
extra data cah be accessed with Attachmentbytes extra properties(.name,
.list_rf_names(), .get_embeddedfile(), .all_files)

class LazyDict(Mapping):
def __init__(self, *args: Any, **kw: Any) -> None:
self._raw_dict = dict(*args, **kw)

def __getitem__(self, key: str) -> Any:
func, arg = self._raw_dict.__getitem__(key)
return func(arg)

def __iter__(self) -> Iterator[Any]:
return iter(self._raw_dict)

def __len__(self) -> int:
return len(self._raw_dict)

def __str__(self) -> str:
return f"LazyDict(keys={list(self.keys())})"
Note:
If you want to access /RF
"""
return AttachmentBytesDictionary(self._get_embedded_files_root())


class PdfFileReader(PdfReader): # deprecated
Expand Down
156 changes: 113 additions & 43 deletions pypdf/_writer.py
Expand Up @@ -95,6 +95,7 @@
from .generic import (
PAGE_FIT,
ArrayObject,
AttachmentBytesDictionary,
BooleanObject,
ByteStringObject,
ContentStream,
Expand All @@ -105,6 +106,7 @@
FloatObject,
IndirectObject,
NameObject,
NameTree,
NullObject,
NumberObject,
PdfObject,
Expand Down Expand Up @@ -702,7 +704,71 @@
deprecation_with_replacement("addJS", "add_js", "3.0.0")
return self.add_js(javascript)

def add_attachment(self, filename: str, data: Union[str, bytes]) -> None:
def _get_embedded_files_root(self) -> Optional[NameTree]:
"""
Returns the EmbeddedFiles root as a NameTree Object
if the root does not exists, return None
"""
catalog = self._root_object
if "/Names" not in catalog:
return None
ef = cast(DictionaryObject, catalog["/Names"]).get("/EmbeddedFiles", None)
if ef is None:
return None

Check warning on line 717 in pypdf/_writer.py

View check run for this annotation

Codecov / codecov/patch

pypdf/_writer.py#L717

Added line #L717 was not covered by tests
efo = ef.get_object()
if not isinstance(efo, NameTree):
efo = NameTree(efo)

Check warning on line 720 in pypdf/_writer.py

View check run for this annotation

Codecov / codecov/patch

pypdf/_writer.py#L720

Added line #L720 was not covered by tests
if isinstance(ef, IndirectObject):
ef.replace_object(efo)

Check warning on line 722 in pypdf/_writer.py

View check run for this annotation

Codecov / codecov/patch

pypdf/_writer.py#L722

Added line #L722 was not covered by tests
else:
cast(DictionaryObject, catalog["/Names"])[

Check warning on line 724 in pypdf/_writer.py

View check run for this annotation

Codecov / codecov/patch

pypdf/_writer.py#L724

Added line #L724 was not covered by tests
NameObject("/EmbeddedFiles")
] = efo
return efo

def _create_attachment_root(self) -> NameTree:
if "/Names" not in self._root_object:
self._root_object[NameObject("/Names")] = self._add_object(
DictionaryObject()
)
node = cast(DictionaryObject, self._root_object["/Names"])
if "/EmbeddedFiles" not in node:
node[NameObject("/EmbeddedFiles")] = self._add_object(NameTree())
node = cast(NameTree, node["/EmbeddedFiles"])
if "/Kids" not in node and "/Names" not in node:
node[NameObject("/Names")] = ArrayObject()

Check warning on line 739 in pypdf/_writer.py

View check run for this annotation

Codecov / codecov/patch

pypdf/_writer.py#L739

Added line #L739 was not covered by tests
return node

@property
def attachments_names(self) -> List[str]:
"""
Returns:
List of names
"""
return self.attachments.keys()

@property
def attachments(self) -> AttachmentBytesDictionary:
"""
extracts the /EF entries as bytes from the embedded files
Returns:
Dictionary with the filenames as keys and the file content as bytes,
extra data cah be accessed with Attachmentbytes extra properties(.name,
.list_rf_names(), .get_embeddedfile(), .all_files)

Note:
If you want to access /RF
"""
return AttachmentBytesDictionary(self._get_embedded_files_root())

def add_attachment(
self,
filename: str,
data: Union[str, bytes, List[Tuple[str, bytes]]],
overwrite: bool = True,
fname: Optional[str] = None,
desc: str = "",
) -> Optional[DictionaryObject]:
"""
Embed a file inside the PDF.

Expand All @@ -711,9 +777,22 @@
Section 7.11.3

Args:
filename: The filename to display.
filename: The filename to display (in UTF-16).
data: The data in the file.
if data is an array, it will feed
fname: an old style name for "/F" entry (should be ansi). if None will be automatically proposed
desc: a description string

Returns:
The filespec DictionaryObject
"""
if not overwrite and filename in self.attachments_names:
return None
if fname is None:
st = filename.replace("/", "\\/").replace("\\\\/", "\\/")
fname = st.encode().decode("ascii", errors="xmlcharreplace")
fname = f"{fname}" # to escape string

# We need three entries:
# * The file's data
# * The /Filespec entry
Expand All @@ -731,9 +810,22 @@
# endstream
# endobj

file_entry = DecodedStreamObject()
file_entry.set_data(b_(data))
file_entry.update({NameObject(PA.TYPE): NameObject("/EmbeddedFile")})
if isinstance(data, list):
ef_entry = DictionaryObject()
a = ArrayObject()
ef_entry.update({NameObject("/F"): self._add_object(a)})

Check warning on line 816 in pypdf/_writer.py

View check run for this annotation

Codecov / codecov/patch

pypdf/_writer.py#L814-L816

Added lines #L814 - L816 were not covered by tests
for fn, da in data:
a.append(TextStringObject(fn))
file_entry = DecodedStreamObject()
file_entry.set_data(b_(da))
file_entry.update({NameObject(PA.TYPE): NameObject("/EmbeddedFile")})
a.append(self._add_object(file_entry))

Check warning on line 822 in pypdf/_writer.py

View check run for this annotation

Codecov / codecov/patch

pypdf/_writer.py#L818-L822

Added lines #L818 - L822 were not covered by tests
else:
file_entry = DecodedStreamObject()
file_entry.set_data(b_(data))
file_entry.update({NameObject(PA.TYPE): NameObject("/EmbeddedFile")})
ef_entry = DictionaryObject()
ef_entry.update({NameObject("/F"): self._add_object(file_entry)})

# The Filespec entry
# Sample:
Expand All @@ -744,51 +836,29 @@
# /EF << /F 8 0 R >>
# >>

ef_entry = DictionaryObject()
ef_entry.update({NameObject("/F"): self._add_object(file_entry)})

filespec = DictionaryObject()
filespec.update(
{
NameObject(PA.TYPE): NameObject("/Filespec"),
NameObject(FileSpecificationDictionaryEntries.F): create_string_object(
NameObject(FileSpecificationDictionaryEntries.UF): TextStringObject(
filename
), # Perhaps also try TextStringObject
NameObject(FileSpecificationDictionaryEntries.EF): ef_entry,
),
NameObject(FileSpecificationDictionaryEntries.F): TextStringObject(
fname
),
NameObject(FileSpecificationDictionaryEntries.DESC): TextStringObject(
desc
),
}
)

# Then create the entry for the root, as it needs
# a reference to the Filespec
# Sample:
# 1 0 obj
# <<
# /Type /Catalog
# /Outlines 2 0 R
# /Pages 3 0 R
# /Names << /EmbeddedFiles << /Names [(hello.txt) 7 0 R] >> >>
# >>
# endobj

if CA.NAMES not in self._root_object:
self._root_object[NameObject(CA.NAMES)] = self._add_object(
DictionaryObject()
)
if "/EmbeddedFiles" not in cast(DictionaryObject, self._root_object[CA.NAMES]):
embedded_files_names_dictionary = DictionaryObject(
{NameObject(CA.NAMES): ArrayObject()}
)
cast(DictionaryObject, self._root_object[CA.NAMES])[
NameObject("/EmbeddedFiles")
] = self._add_object(embedded_files_names_dictionary)
if isinstance(data, list):
filespec[NameObject(FileSpecificationDictionaryEntries.RF)] = ef_entry

Check warning on line 855 in pypdf/_writer.py

View check run for this annotation

Codecov / codecov/patch

pypdf/_writer.py#L855

Added line #L855 was not covered by tests
else:
embedded_files_names_dictionary = cast(
DictionaryObject,
cast(DictionaryObject, self._root_object[CA.NAMES])["/EmbeddedFiles"],
)
cast(ArrayObject, embedded_files_names_dictionary[CA.NAMES]).extend(
[create_string_object(filename), filespec]
)
filespec[NameObject(FileSpecificationDictionaryEntries.EF)] = ef_entry

nm = self._get_embedded_files_root() or self._create_attachment_root()
nm.list_add(filename, filespec, overwrite=True)
return filespec

def addAttachment(self, fname: str, fdata: Union[str, bytes]) -> None: # deprecated
"""
Expand All @@ -797,7 +867,7 @@
.. deprecated:: 1.28.0
"""
deprecation_with_replacement("addAttachment", "add_attachment", "3.0.0")
return self.add_attachment(fname, fdata)
self.add_attachment(fname, fdata)

def append_pages_from_reader(
self,
Expand Down