-
Notifications
You must be signed in to change notification settings - Fork 7
/
keyring.py
985 lines (781 loc) · 35.7 KB
/
keyring.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
# SPDX-License-Identifier: GPL-3.0-or-later
from collections import defaultdict
from collections.abc import Iterable
from itertools import chain
from logging import debug
from pathlib import Path
from re import escape
from re import match
from re import sub
from shutil import copytree
from subprocess import PIPE
from subprocess import Popen
from tempfile import NamedTemporaryFile
from tempfile import mkdtemp
from typing import Dict
from typing import List
from typing import Optional
from typing import Set
from .sequoia import inspect
from .sequoia import keyring_merge
from .sequoia import keyring_split
from .sequoia import latest_certification
from .sequoia import packet_dump_field
from .sequoia import packet_join
from .sequoia import packet_split
from .trust import certificate_trust
from .trust import certificate_trust_from_paths
from .trust import format_trust_label
from .types import Fingerprint
from .types import Trust
from .types import Uid
from .types import Username
from .util import filter_fingerprints_by_trust
from .util import get_cert_paths
from .util import system
from .util import transform_fd_to_tmpfile
def is_pgp_fingerprint(string: str) -> bool:
"""Returns whether the passed string looks like a PGP (long) fingerprint
Parameters
----------
string: Input to consider as a fingerprint
Returns
-------
RWhether string is a fingerprint
"""
if len(string) not in [16, 40]:
return False
return match("^[A-F0-9]+$", string) is not None
def transform_username_to_keyring_path(keyring_dir: Path, paths: List[Path]) -> None:
"""Mutates the input sources by transforming passed usernames to keyring paths
Parameters
----------
keyring_dir: The directory underneath the username needs to exist
paths: A list of paths to mutate and replace usernames to keyring paths
"""
for index, source in enumerate(paths):
if source.exists():
continue
packager_source = keyring_dir / source.name
if not packager_source.exists():
continue
paths[index] = packager_source
def transform_fingerprint_to_keyring_path(keyring_root: Path, paths: List[Path]) -> None:
"""Mutates the input sources by transforming passed fingerprints to keyring paths
Parameters
----------
keyring_root: The keyring root directory to look up fingerprints in
paths: A list of paths to mutate and replace fingerprints to keyring paths
"""
for index, source in enumerate(paths):
if source.exists():
continue
if not is_pgp_fingerprint(source.name):
continue
fingerprint_paths = list(keyring_root.glob(f"*/*/*{source.name}"))
if not fingerprint_paths:
continue
paths[index] = fingerprint_paths[0]
# TODO: simplify to lower complexity
def convert_certificate( # noqa: ignore=C901
working_dir: Path,
certificate: Path,
keyring_dir: Path,
name_override: Optional[Username] = None,
fingerprint_filter: Optional[Set[Fingerprint]] = None,
) -> Path:
"""Convert a single file public key certificate into a decomposed directory structure of multiple PGP packets
The output directory structure is created per user. The username is derived from the certificate via
`derive_username_from_fingerprint` or overridden via `name_override`.
Below the username directory a directory tree describes the public keys components split up into certifications
and revocations, as well as per subkey and per uid certifications and revocations.
Parameters
----------
working_dir: The path of the working directory below which to create split certificates
certificate: The path to a public key certificate
keyring_dir: The path of the keyring used to try to derive the username from the public key fingerprint
name_override: An optional string to override the username in the to be created output directory structure
fingerprint_filter: Optional list of fingerprints of PGP public keys that all certifications will be filtered with
Raises
------
Exception: If required PGP packets are not found
Returns
-------
The path of the key directory (which is located below working_dir below the user_dir)
"""
# root packets
certificate_fingerprint: Optional[Fingerprint] = None
pubkey: Optional[Path] = None
# TODO: direct key certifications are not yet selecting the latest sig, owner may have multiple
# TODO: direct key certifications are not yet single packet per file
direct_sigs: Dict[Fingerprint, List[Path]] = defaultdict(list)
direct_revocations: Dict[Fingerprint, List[Path]] = defaultdict(list)
# subkey packets
subkeys: Dict[Fingerprint, Path] = {}
subkey_bindings: Dict[Fingerprint, List[Path]] = defaultdict(list)
subkey_revocations: Dict[Fingerprint, List[Path]] = defaultdict(list)
# uid packets
uids: Dict[Uid, Path] = {}
certifications: Dict[Uid, Dict[Fingerprint, List[Path]]] = defaultdict(lambda: defaultdict(list))
revocations: Dict[Uid, Dict[Fingerprint, List[Path]]] = defaultdict(lambda: defaultdict(list))
# intermediate variables
current_packet_mode: Optional[str] = None
current_packet_fingerprint: Optional[Fingerprint] = None
current_packet_uid: Optional[Uid] = None
# XXX: PrimaryKeyBinding
# TODO: remove 3rd party direct key signatures, seems to be leaked by export-clean
debug(f"Processing certificate {certificate}")
for packet in packet_split(working_dir=working_dir, certificate=certificate):
debug(f"Processing packet {packet.name}")
if packet.name.endswith("--PublicKey"):
current_packet_mode = "pubkey"
current_packet_fingerprint = Fingerprint(packet_dump_field(packet, "Fingerprint"))
current_packet_uid = None
certificate_fingerprint = current_packet_fingerprint
pubkey = packet
elif packet.name.endswith("--UserID"):
current_packet_mode = "uid"
current_packet_fingerprint = None
current_packet_uid = simplify_user_id(Uid(packet_dump_field(packet, "Value")))
uids[current_packet_uid] = packet
elif packet.name.endswith("--PublicSubkey"):
current_packet_mode = "subkey"
current_packet_fingerprint = Fingerprint(packet_dump_field(packet, "Fingerprint"))
current_packet_uid = None
subkeys[current_packet_fingerprint] = packet
elif packet.name.endswith("--Signature"):
if not certificate_fingerprint:
raise Exception('missing certificate fingerprint for "{packet.name}"')
issuer: Fingerprint = Fingerprint(packet_dump_field(packet, "Issuer"))
signature_type = packet_dump_field(packet, "Type")
if current_packet_mode == "pubkey":
if not current_packet_fingerprint:
raise Exception('missing current packet fingerprint for "{packet.name}"')
if signature_type == "KeyRevocation" and certificate_fingerprint.endswith(issuer):
direct_revocations[issuer].append(packet)
elif signature_type in ["DirectKey", "GenericCertification"]:
direct_sigs[issuer].append(packet)
else:
raise Exception(f"unknown signature type: {signature_type}")
elif current_packet_mode == "uid":
if not current_packet_uid:
raise Exception('missing current packet uid for "{packet.name}"')
if signature_type == "CertificationRevocation":
revocations[current_packet_uid][issuer].append(packet)
elif signature_type.endswith("Certification"):
if fingerprint_filter is not None and any([fp.endswith(issuer) for fp in fingerprint_filter]):
debug(f"The certification by issuer {issuer} is appended as it is found in the filter.")
certifications[current_packet_uid][issuer].append(packet)
else:
debug(f"The certification by issuer {issuer} is not appended because it is not in the filter")
else:
raise Exception(f"unknown signature type: {signature_type}")
elif current_packet_mode == "subkey":
if not current_packet_fingerprint:
raise Exception('missing current packet fingerprint for "{packet.name}"')
if signature_type == "SubkeyBinding":
subkey_bindings[current_packet_fingerprint].append(packet)
elif signature_type == "SubkeyRevocation":
subkey_revocations[certificate_fingerprint].append(packet)
else:
raise Exception(f"unknown signature type: {signature_type}")
else:
raise Exception(f'unknown signature root for "{packet.name}"')
else:
raise Exception(f'unknown packet type "{packet.name}"')
if not certificate_fingerprint:
raise Exception("missing certificate fingerprint")
if not pubkey:
raise Exception("missing certificate public-key")
name_override = (
name_override
or derive_username_from_fingerprint(keyring_dir=keyring_dir, certificate_fingerprint=certificate_fingerprint)
or Username(certificate.stem)
)
user_dir = working_dir / name_override
key_dir = user_dir / certificate_fingerprint
key_dir.mkdir(parents=True, exist_ok=True)
persist_public_key(
certificate_fingerprint=certificate_fingerprint,
pubkey=pubkey,
key_dir=key_dir,
)
persist_direct_key_certifications(
direct_key_certifications=direct_sigs,
key_dir=key_dir,
)
persist_direct_key_revocations(
direct_key_revocations=direct_revocations,
key_dir=key_dir,
)
persist_subkeys(
key_dir=key_dir,
subkeys=subkeys,
)
persist_subkey_bindings(
key_dir=key_dir,
subkey_bindings=subkey_bindings,
)
persist_subkey_revocations(
key_dir=key_dir,
subkey_revocations=subkey_revocations,
)
persist_uids(
key_dir=key_dir,
uids=uids,
)
persist_uid_certifications(
certifications=certifications,
key_dir=key_dir,
)
persist_uid_revocations(
revocations=revocations,
key_dir=key_dir,
)
return key_dir
def persist_public_key(
certificate_fingerprint: Fingerprint,
pubkey: Path,
key_dir: Path,
) -> None:
"""Persist the Public-Key packet
Parameters
----------
certificate_fingerprint: The unique fingerprint of the public key
pubkey: The path to the public key of the root key
key_dir: The root directory below which the basic key material is persisted
"""
packets: List[Path] = [pubkey]
output_file = key_dir / f"{certificate_fingerprint}.asc"
output_file.parent.mkdir(parents=True, exist_ok=True)
debug(f"Writing file {output_file} from {[str(packet) for packet in packets]}")
packet_join(packets, output_file, force=True)
def persist_uids(
key_dir: Path,
uids: Dict[Uid, Path],
) -> None:
"""Persist the User IDs that belong to a PublicKey
The User ID material consists of a single User ID Packet.
The files are written to a UID specific directory and file below key_dir/uid.
Parameters
----------
key_dir: The root directory below which the basic key material is persisted
uids: The User IDs of a Public-Key (the root key)
"""
for uid, uid_packet in uids.items():
output_file = key_dir / "uid" / uid / f"{uid}.asc"
output_file.parent.mkdir(parents=True, exist_ok=True)
debug(f"Writing file {output_file} from {uid_packet}")
packet_join(packets=[uid_packet], output=output_file, force=True)
def persist_subkeys(
key_dir: Path,
subkeys: Dict[Fingerprint, Path],
) -> None:
"""Persist all Public-Subkeys of a root key file to file(s)
Parameters
----------
key_dir: The root directory below which the basic key material is persisted
subkeys: The PublicSubkeys of a key
"""
for fingerprint, subkey in subkeys.items():
output_file = key_dir / "subkey" / fingerprint / f"{fingerprint}.asc"
output_file.parent.mkdir(parents=True, exist_ok=True)
debug(f"Writing file {output_file} from {str(subkey)}")
packet_join(packets=[subkey], output=output_file, force=True)
def persist_subkey_bindings(
key_dir: Path,
subkey_bindings: Dict[Fingerprint, List[Path]],
) -> None:
"""Persist all SubkeyBinding of a root key file to file(s)
Parameters
----------
key_dir: The root directory below which the basic key material is persisted
subkey_bindings: The SubkeyBinding signatures of a Public-Subkey
"""
for fingerprint, bindings in subkey_bindings.items():
subkey_binding = latest_certification(bindings)
issuer = packet_dump_field(subkey_binding, "Issuer")
output_file = key_dir / "subkey" / fingerprint / "certification" / f"{issuer}.asc"
output_file.parent.mkdir(parents=True, exist_ok=True)
debug(f"Writing file {output_file} from {str(subkey_binding)}")
packet_join(packets=[subkey_binding], output=output_file, force=True)
def persist_subkey_revocations(
key_dir: Path,
subkey_revocations: Dict[Fingerprint, List[Path]],
) -> None:
"""Persist the SubkeyRevocations of all Public-Subkeys of a root key to file(s)
Parameters
----------
key_dir: The root directory below which the basic key material is persisted
subkey_revocations: The SubkeyRevocations of PublicSubkeys of a key
"""
for fingerprint, revocations in subkey_revocations.items():
revocation = latest_certification(revocations)
issuer = packet_dump_field(revocation, "Issuer")
output_file = key_dir / "subkey" / fingerprint / "revocation" / f"{issuer}.asc"
output_file.parent.mkdir(parents=True, exist_ok=True)
debug(f"Writing file {output_file} from {revocation}")
packet_join(packets=[revocation], output=output_file, force=True)
def persist_direct_key_certifications(
direct_key_certifications: Dict[Fingerprint, List[Path]],
key_dir: Path,
) -> None:
"""Persist the signatures directly on a root key (such as DirectKeys or *Certifications without a User ID) to
file(s)
Parameters
----------
direct_key_certifications: The direct key certifications to write to file
key_dir: The root directory below which the Directkeys are persisted
"""
for issuer, certifications in direct_key_certifications.items():
output_file = key_dir / "certification" / f"{issuer}.asc"
output_file.parent.mkdir(parents=True, exist_ok=True)
debug(f"Writing file {output_file} from {[str(cert) for cert in certifications]}")
packet_join(packets=certifications, output=output_file, force=True)
def persist_direct_key_revocations(
direct_key_revocations: Dict[Fingerprint, List[Path]],
key_dir: Path,
) -> None:
"""Persist the revocations directly on a root key (such as KeyRevocation) to file(s)
Parameters
----------
direct_key_revocations: The direct key revocations to write to file
key_dir: The root directory below which the Directkeys are persisted
"""
for issuer, certifications in direct_key_revocations.items():
output_file = key_dir / "revocation" / f"{issuer}.asc"
output_file.parent.mkdir(parents=True, exist_ok=True)
debug(f"Writing file {output_file} from {[str(cert) for cert in certifications]}")
packet_join(packets=certifications, output=output_file, force=True)
def persist_uid_certifications(
certifications: Dict[Uid, Dict[Fingerprint, List[Path]]],
key_dir: Path,
) -> None:
"""Persist the certifications of a root key to file(s)
The certifications include all CasualCertifications, GenericCertifications, PersonaCertifications and
PositiveCertifications for all User IDs of the given root key.
All certifications are persisted in per User ID certification directories below key_dir.
Parameters
----------
certifications: The certifications to write to file
key_dir: The root directory below which certifications are persisted
"""
for uid, uid_certifications in certifications.items():
for issuer, issuer_certifications in uid_certifications.items():
certification_dir = key_dir / "uid" / uid / "certification"
certification_dir.mkdir(parents=True, exist_ok=True)
certification = latest_certification(issuer_certifications)
output_file = certification_dir / f"{issuer}.asc"
debug(f"Writing file {output_file} from {certification}")
packet_join(packets=[certification], output=output_file, force=True)
def persist_uid_revocations(
revocations: Dict[Uid, Dict[Fingerprint, List[Path]]],
key_dir: Path,
) -> None:
"""Persist the revocations of a root key to file(s)
The revocations include all CertificationRevocations for all User IDs of the given root key.
All revocations are persisted in per User ID 'revocation' directories below key_dir.
Parameters
----------
revocations: The revocations to write to file
key_dir: The root directory below which revocations will be persisted
"""
for uid, uid_revocations in revocations.items():
for issuer, issuer_revocations in uid_revocations.items():
revocation_dir = key_dir / "uid" / uid / "revocation"
revocation_dir.mkdir(parents=True, exist_ok=True)
revocation = latest_certification(issuer_revocations)
output_file = revocation_dir / f"{issuer}.asc"
debug(f"Writing file {output_file} from {revocation}")
packet_join(packets=[revocation], output=output_file, force=True)
def simplify_user_id(user_id: Uid) -> Uid:
"""Simplify the User ID string to contain more filesystem friendly characters
Parameters
----------
user_id: A User ID string (e.g. 'Foobar McFooface <foobar@foo.face>')
Returns
-------
The simplified representation of user_id
"""
user_id_str: str = user_id.replace("@", "_at_")
user_id_str = sub("[<>]", "", user_id_str)
user_id_str = sub("[" + escape(r" !@#$%^&*()_-+=[]{}\|;:,.<>/?") + "]", "_", user_id_str)
return Uid(user_id_str)
def derive_username_from_fingerprint(keyring_dir: Path, certificate_fingerprint: Fingerprint) -> Optional[Username]:
"""Attempt to derive the username of a public key fingerprint from a keyring directory
Parameters
----------
keyring_dir: The directory in which to look up a username
certificate_fingerprint: The public key fingerprint to derive the username from
Raises
------
Exception: If more than one username is found (a public key can only belong to one individual)
Returns
-------
A string representing the username a public key certificate belongs to, None otherwise
"""
matches = list(keyring_dir.glob(f"*/*{certificate_fingerprint}"))
if len(matches) > 1:
raise Exception(
f"More than one username found in {keyring_dir} when probing for fingerprint '{certificate_fingerprint}': "
f"{matches}"
)
elif not matches:
debug(f"Can not derive username from target directory for fingerprint {certificate_fingerprint}")
return None
else:
username = matches[0].parent.stem
debug(
f"Successfully derived username '{username}' from target directory for fingerprint "
f"{certificate_fingerprint}"
)
return Username(username)
def convert(
working_dir: Path,
keyring_root: Path,
sources: List[Path],
target_dir: Path,
name_override: Optional[Username] = None,
) -> Path:
"""Convert a path containing PGP certificate material to a decomposed directory structure
Any input is first split by `keyring_split()` into individual certificates.
Parameters
----------
working_dir: A directory to use for temporary files
keyring_root: The keyring root directory to look up accepted fingerprints for certifications
sources: A path to a file or directory to decompose
target_dir: A directory path to write the new directory structure to
name_override: An optional username override for the call to `convert_certificate()`
Returns
-------
The directory that contains the resulting directory structure (target_dir)
"""
directories: List[Path] = []
transform_fd_to_tmpfile(working_dir=working_dir, sources=sources)
keys: Iterable[Path] = set(chain.from_iterable(map(lambda s: s.iterdir() if s.is_dir() else [s], sources)))
fingerprint_filter = set(
get_fingerprints(
working_dir=working_dir,
sources=sources,
paths=[keyring_root] if keyring_root.exists() else [],
).keys()
)
for key in keys:
for cert in keyring_split(working_dir=working_dir, keyring=key, preserve_filename=True):
directories.append(
convert_certificate(
working_dir=working_dir,
certificate=cert,
keyring_dir=target_dir,
name_override=name_override,
fingerprint_filter=fingerprint_filter,
)
)
for path in directories:
user_dir = path.parent
(target_dir / user_dir.name).mkdir(parents=True, exist_ok=True)
copytree(src=user_dir, dst=(target_dir / user_dir.name), dirs_exist_ok=True)
return target_dir
def export_ownertrust(certs: List[Path], output: Path) -> List[Fingerprint]:
"""Export ownertrust from a set of keys and return the trusted and revoked fingerprints
The output file format is compatible with `gpg --import-ownertrust` and lists the main fingerprint ID of all
non-revoked keys as fully trusted.
The exported file is used by pacman-key when importing a keyring (see
https://man.archlinux.org/man/pacman-key.8#PROVIDING_A_KEYRING_FOR_IMPORT).
Parameters
----------
certs: The certificates to trust
output: The file path to write to
Returns
-------
List of ownertrust fingerprints
"""
main_trusts = certificate_trust_from_paths(sources=certs, main_keys=get_fingerprints_from_paths(sources=certs))
trusted_certs: List[Fingerprint] = filter_fingerprints_by_trust(main_trusts, Trust.full)
with open(file=output, mode="w") as trusted_certs_file:
for cert in sorted(set(trusted_certs)):
debug(f"Writing {cert} to {output}")
trusted_certs_file.write(f"{cert}:4:\n")
return trusted_certs
def export_revoked(certs: List[Path], main_keys: Set[Fingerprint], output: Path) -> None:
"""Export the PGP revoked status from a set of keys
The output file contains the fingerprints of all self-revoked keys and all keys for which at least two revocations
by any main key exist.
The exported file is used by pacman-key when importing a keyring (see
https://man.archlinux.org/man/pacman-key.8#PROVIDING_A_KEYRING_FOR_IMPORT).
Parameters
----------
certs: A list of directories with keys to check for their revocation status
main_keys: A list of strings representing the fingerprints of (current and/or revoked) main keys
output: The file path to write to
"""
certificate_trusts = certificate_trust_from_paths(sources=certs, main_keys=main_keys)
revoked_certs: List[Fingerprint] = filter_fingerprints_by_trust(certificate_trusts, Trust.revoked)
with open(file=output, mode="w") as revoked_certs_file:
for cert in sorted(set(revoked_certs)):
debug(f"Writing {cert} to {output}")
revoked_certs_file.write(f"{cert}\n")
def get_fingerprints_from_keyring_files(working_dir: Path, source: Iterable[Path]) -> Dict[Fingerprint, Username]:
"""Get all fingerprints of PGP public keys from import file(s)
Parameters
----------
working_dir: A directory to use for temporary files
source: The path to a source file or directory containing keyrings
Returns
-------
A dict of all fingerprints and their usernames of PGP public keys below path
"""
fingerprints: Dict[Fingerprint, Username] = {}
keys: Iterable[Path] = set(chain.from_iterable(map(lambda s: s.iterdir() if s.is_dir() else [s], source)))
for key in keys:
for certificate in keyring_split(working_dir=working_dir, keyring=key, preserve_filename=True):
for packet in packet_split(working_dir=working_dir, certificate=certificate):
if packet.name.endswith("--PublicKey"):
fingerprints[Fingerprint(packet_dump_field(packet, "Fingerprint"))] = Username(certificate.stem)
debug(f"Fingerprints of PGP public keys in {source}: {fingerprints}")
return fingerprints
def get_fingerprints_from_certificate_directory(
paths: List[Path], prefix: str = "", postfix: str = ""
) -> Dict[Fingerprint, Username]:
"""Get all fingerprints of PGP public keys from decomposed directory structures
Parameters
----------
paths: The path to a decomposed directory structure
prefix: Prefix to add to each username
postfix: Postfix to add to each username
Returns
-------
A dict of all fingerprints and their usernames of PGP public keys below path
"""
fingerprints: Dict[Fingerprint, Username] = {}
for cert in sorted(get_cert_paths(paths)):
fingerprints[Fingerprint(cert.name)] = Username(f"{prefix}{cert.parent.name}{postfix}")
debug(f"Fingerprints of PGP public keys in {paths}: {fingerprints}")
return fingerprints
def get_fingerprints(working_dir: Path, sources: Iterable[Path], paths: List[Path]) -> Dict[Fingerprint, Username]:
"""Get the fingerprints of PGP public keys from input paths and decomposed directory structures
Parameters
----------
working_dir: A directory to use for temporary files
sources: A list of directories or files from which to read PGP keyring information
paths: A list of paths that identify decomposed PGP data in directory structures
Returns
-------
A dict of all fingerprints and their usernames of PGP public keys below path
"""
fingerprints: Dict[Fingerprint, Username] = {}
fingerprints.update(
get_fingerprints_from_keyring_files(
working_dir=working_dir,
source=sources,
)
)
fingerprints.update(get_fingerprints_from_certificate_directory(paths=paths))
return fingerprints
def get_packets_from_path(path: Path) -> List[Path]:
"""Collects packets from one level by appending the root, certifications and revocations.
Parameters
----------
path: Filesystem path used to collect the packets from
Returns
-------
A list of packets ordered by root, certification, revocation
"""
if not path.exists():
return []
packets: List[Path] = []
packets += sorted(path.glob("*.asc"))
certifications = path / "certification"
if certifications.exists():
packets += sorted(certifications.glob("*.asc"))
revocations = path / "revocation"
if revocations.exists():
packets += sorted(revocations.glob("*.asc"))
return packets
def get_packets_from_listing(path: Path) -> List[Path]:
"""Collects packets from a listing of directories holding one level each by calling `get_get_packets_from_path`.
Parameters
----------
path: Filesystem path used as listing to collect the packets from
Returns
-------
A list of packets ordered by root, certification, revocation for each level
"""
if not path.exists():
return []
packets: List[Path] = []
for sub_path in sorted(path.iterdir()):
packets += get_packets_from_path(sub_path)
return packets
def export(
working_dir: Path,
keyring_root: Path,
sources: Optional[List[Path]] = None,
output: Optional[Path] = None,
) -> Optional[str]:
"""Export all provided PGP packet files to a single output file
If sources contains directories, any .asc files below them are considered.
Parameters
----------
working_dir: A directory to use for temporary files
keyring_root: The keyring root directory to look up username shorthand sources
sources: A list of username, fingerprint or directories from which to read PGP packet information
(defaults to `keyring_root`)
output: An output file that all PGP packet data is written to, return the result instead if None
Returns
-------
The result if no output file has been used
"""
if not sources:
sources = [keyring_root]
# transform shorthand paths to actual keyring paths
transform_username_to_keyring_path(keyring_dir=keyring_root / "packager", paths=sources)
transform_fingerprint_to_keyring_path(keyring_root=keyring_root, paths=sources)
temp_dir = Path(mkdtemp(dir=working_dir, prefix="arch-keyringctl-export-join-")).absolute()
cert_paths: Set[Path] = get_cert_paths(sources)
certificates: List[Path] = []
for cert_dir in sorted(cert_paths):
packets: List[Path] = []
packets += get_packets_from_path(cert_dir)
packets += get_packets_from_listing(cert_dir / "subkey")
packets += get_packets_from_listing(cert_dir / "uid")
output_path = temp_dir / f"{cert_dir.name}.asc"
debug(f"Joining {cert_dir} in {output_path}")
packet_join(
packets=packets,
output=output_path,
force=True,
)
certificates.append(output_path)
if not certificates:
return None
return keyring_merge(certificates, output, force=True)
def build(
working_dir: Path,
keyring_root: Path,
target_dir: Path,
) -> None:
"""Build keyring PGP artifacts alongside ownertrust and revoked status files
Parameters
----------
working_dir: A directory to use for temporary files
keyring_root: The keyring root directory to build the artifacts from
target_dir: Output directory that all artifacts are written to
"""
target_dir.mkdir(parents=True, exist_ok=True)
keyring: Path = target_dir / Path("archlinux.gpg")
export(working_dir=working_dir, keyring_root=keyring_root, output=keyring)
trusted_main_keys = export_ownertrust(
certs=[keyring_root / "main"],
output=target_dir / "archlinux-trusted",
)
export_revoked(
certs=[keyring_root],
main_keys=set(trusted_main_keys),
output=target_dir / "archlinux-revoked",
)
def list_keyring(keyring_root: Path, sources: Optional[List[Path]] = None, main_keys: bool = False) -> None:
"""List certificates in the keyring
If sources contains directories, all certificate below them are considered.
Parameters
----------
keyring_root: The keyring root directory to look up username shorthand sources
sources: A list of username, fingerprint or directories from which to read PGP packet information
(defaults to `keyring_root`)
main_keys: List main keys instead of packager keys (defaults to False)
"""
keyring_dir = keyring_root / ("main" if main_keys else "packager")
if not sources:
sources = list(sorted(keyring_dir.iterdir(), key=lambda path: path.name.casefold()))
# transform shorthand paths to actual keyring paths
transform_username_to_keyring_path(keyring_dir=keyring_dir, paths=sources)
transform_fingerprint_to_keyring_path(keyring_root=keyring_root, paths=sources)
# resolve all sources to certificate paths
sources = list(sorted(get_cert_paths(sources), key=lambda path: str(path).casefold()))
username_length = max([len(source.parent.name) for source in sources])
for certificate in sources:
username: Username = Username(certificate.parent.name)
trust = certificate_trust(
certificate=certificate, main_keys=get_fingerprints_from_paths([keyring_root / "main"])
)
trust_label = format_trust_label(trust=trust)
print(f"{username:<{username_length}} {certificate.name} {trust_label}")
def inspect_keyring(working_dir: Path, keyring_root: Path, sources: Optional[List[Path]]) -> str:
"""Inspect certificates in the keyring and pretty print the data
If sources contains directories, all certificate below them are considered.
Parameters
----------
working_dir: A directory to use for temporary files
keyring_root: The keyring root directory to look up username shorthand sources
sources: A list of username, fingerprint or directories from which to read PGP packet information
(defaults to `keyring_root`)
Returns
-------
The result of the inspect
"""
if not sources:
sources = [keyring_root]
# transform shorthand paths to actual keyring paths
transform_username_to_keyring_path(keyring_dir=keyring_root / "packager", paths=sources)
transform_fingerprint_to_keyring_path(keyring_root=keyring_root, paths=sources)
with NamedTemporaryFile(dir=working_dir, prefix="packet-", suffix=".asc") as keyring:
keyring_path = Path(keyring.name)
export(working_dir=working_dir, keyring_root=keyring_root, sources=sources, output=keyring_path)
fingerprints: Dict[Fingerprint, Username] = get_fingerprints_from_certificate_directory(
paths=[keyring_root / "packager"]
) | get_fingerprints_from_certificate_directory(paths=[keyring_root / "main"], postfix=" (main)")
return inspect(
packet=keyring_path,
certifications=True,
fingerprints=fingerprints,
)
def verify(
working_dir: Path,
keyring_root: Path,
sources: Optional[List[Path]],
lint_hokey: bool = True,
lint_sq_keyring: bool = True,
) -> None:
"""Verify certificates against modern expectations using sq-keyring-linter and hokey
Parameters
----------
working_dir: A directory to use for temporary files
keyring_root: The keyring root directory to look up username shorthand sources
sources: A list of username, fingerprint or directories from which to read PGP packet information
(defaults to `keyring_root`)
lint_hokey: Whether to run hokey lint
lint_sq_keyring: Whether to run sq-keyring-linter
"""
if not sources:
sources = [keyring_root]
# transform shorthand paths to actual keyring paths
transform_username_to_keyring_path(keyring_dir=keyring_root / "packager", paths=sources)
transform_fingerprint_to_keyring_path(keyring_root=keyring_root, paths=sources)
cert_paths: Set[Path] = get_cert_paths(sources)
for certificate in sorted(cert_paths):
print(f"Verify {certificate.name} owned by {certificate.parent.name}")
with NamedTemporaryFile(
dir=working_dir, prefix=f"{certificate.parent.name}-{certificate.name}", suffix=".asc"
) as keyring:
keyring_path = Path(keyring.name)
export(
working_dir=working_dir,
keyring_root=keyring_root,
sources=[certificate],
output=keyring_path,
)
if lint_hokey:
keyring_fd = Popen(("sq", "dearmor", f"{str(keyring_path)}"), stdout=PIPE)
print(system(["hokey", "lint"], _stdin=keyring_fd.stdout), end="")
if lint_sq_keyring:
print(system(["sq-keyring-linter", f"{str(keyring_path)}"]), end="")
def get_fingerprints_from_paths(sources: Iterable[Path]) -> Set[Fingerprint]:
"""Get the fingerprints of all certificates found in the sources paths.
Parameters
----------
sources: A list of directories from which to get fingerprints of the certificates.
Returns
-------
The list of all fingerprints obtained from the sources.
"""
return set([Fingerprint(cert.name) for cert in get_cert_paths(sources)])