Marco Ricci commited on 2025-07-26 17:35:21
Zeige 11 geänderte Dateien mit 127 Einfügungen und 81 Löschungen.
Add shorthand codes for different types of coverage exclusions, explained in `pyproject.toml` in the `tool.coverage` section, because reasons for excluding code branches from coverage tend to fall into a handful of common categories. Also add explicit coverage exclusion patterns for dummy classes which are never intended to actually be called, such as the `_types._Omitted` class (for styling function signatures in the generated documentation) and the two `_DummyModule` classes (which stub out missing dependencies to appease the type checker). A very few coverage exclusions were actually unnecessary or nonsensical, and have been rectified.
... | ... |
@@ -248,8 +248,40 @@ exclude_also = [ |
248 | 248 |
'class .*\(Protocol\):', |
249 | 249 |
'pytest\.fail\(', |
250 | 250 |
'@(?:(?:pytest\.)?mark\.)?xfail\(', |
251 |
+ 'class _Omitted:', |
|
252 |
+ 'class _DummyModule:', |
|
251 | 253 |
] |
252 | 254 |
|
255 |
+# We use a bunch of "codes", similar to the "type: ignore" comments, as |
|
256 |
+# shorthand notation for why we are excluding a certain piece of code |
|
257 |
+# manually from coverage, instead of writing a verbose explanation at each |
|
258 |
+# call site: |
|
259 |
+# |
|
260 |
+# "external-api" and "unused": The code is defined by an external API not |
|
261 |
+# under our control, or it implements an external interface, and some of the |
|
262 |
+# branches of that code we do not yet use ("unused") or don't ever intend to |
|
263 |
+# use ("external-api"). In particular, in production and in testing use, we |
|
264 |
+# do not expect to hit these code branches unless there are errors in our |
|
265 |
+# code. |
|
266 |
+# |
|
267 |
+# "external": Whether or not we hit this code branch is non-deterministic, |
|
268 |
+# and dependent on external factors that are difficult or impossible for us |
|
269 |
+# to control, predict, or simulate. We therefore categorically exclude all |
|
270 |
+# branches from coverage. (Typically though, one particular branch *will* |
|
271 |
+# actually run. We just don't know ahead of time which one.) (While this |
|
272 |
+# applies to "choice of operating system" and "installed Python/library |
|
273 |
+# version" as well, these cases are usually tagged with more specific codes, |
|
274 |
+# or explained in long-form commentary.) |
|
275 |
+# |
|
276 |
+# "failsafe": Akin to assertions, these are used to catch "impossible" |
|
277 |
+# situations and to still fail in a controlled manner. We do not expect to |
|
278 |
+# hit these code branches during production or testing. |
|
279 |
+# |
|
280 |
+# "debug", "internal" and "interactive": These code branches are not |
|
281 |
+# intended to be called by the general public ("debug", "internal"), or they |
|
282 |
+# are only intended to be called interactively ("interactive"). We do not |
|
283 |
+# expect to hit these code branches during production or testing. |
|
284 |
+ |
|
253 | 285 |
[tool.coverage.run] |
254 | 286 |
source_pkgs = ["derivepassphrase", "tests"] |
255 | 287 |
branch = true |
... | ... |
@@ -241,7 +241,7 @@ class ConfigurationMutex: |
241 | 241 |
|
242 | 242 |
def __init__(self) -> None: |
243 | 243 |
"""Initialize self.""" |
244 |
- if sys.platform == 'win32': # pragma: no cover |
|
244 |
+ if sys.platform == 'win32': # pragma: unless the-annoying-os no cover |
|
245 | 245 |
import msvcrt # noqa: PLC0415 |
246 | 246 |
|
247 | 247 |
locking = msvcrt.locking |
... | ... |
@@ -254,7 +254,7 @@ class ConfigurationMutex: |
254 | 254 |
def unlock_fd(fd: int, /) -> None: |
255 | 255 |
locking(fd, LK_UNLCK, LOCK_SIZE) |
256 | 256 |
|
257 |
- else: |
|
257 |
+ else: # pragma: unless posix no cover |
|
258 | 258 |
import fcntl # noqa: PLC0415 |
259 | 259 |
|
260 | 260 |
flock = fcntl.flock |
... | ... |
@@ -440,7 +440,7 @@ def config_filename( |
440 | 440 |
return temp_path / filename_ |
441 | 441 |
try: |
442 | 442 |
filename = config_filename_table[subsystem] |
443 |
- except (KeyError, TypeError): # pragma: no cover |
|
443 |
+ except (KeyError, TypeError): # pragma: no cover [failsafe] |
|
444 | 444 |
msg = f'Unknown configuration subsystem: {subsystem!r}' |
445 | 445 |
raise AssertionError(msg) from None |
446 | 446 |
return path / filename |
... | ... |
@@ -596,14 +596,14 @@ def get_suitable_ssh_keys( |
596 | 596 |
with ssh_agent.SSHAgentClient.ensure_agent_subcontext(conn) as client: |
597 | 597 |
try: |
598 | 598 |
all_key_comment_pairs = list(client.list_keys()) |
599 |
- except EOFError as exc: # pragma: no cover |
|
599 |
+ except EOFError as exc: # pragma: no cover [failsafe] |
|
600 | 600 |
raise RuntimeError(AGENT_COMMUNICATION_ERROR) from exc |
601 | 601 |
suitable_keys = copy.copy(all_key_comment_pairs) |
602 | 602 |
for pair in all_key_comment_pairs: |
603 | 603 |
key, _comment = pair |
604 | 604 |
if vault.Vault.is_suitable_ssh_key(key, client=client): |
605 | 605 |
yield pair |
606 |
- if not suitable_keys: # pragma: no cover |
|
606 |
+ if not suitable_keys: |
|
607 | 607 |
raise LookupError(NO_SUITABLE_KEYS) |
608 | 608 |
|
609 | 609 |
|
... | ... |
@@ -666,7 +666,7 @@ def prompt_for_selection( |
666 | 666 |
show_default=False, |
667 | 667 |
default='', |
668 | 668 |
) |
669 |
- except click.Abort: # pragma: no cover |
|
669 |
+ except click.Abort: # pragma: no cover [external] |
|
670 | 670 |
# This branch will not be triggered during testing on |
671 | 671 |
# `click` versions < 8.2.1, due to (non-monkeypatch-able) |
672 | 672 |
# deficiencies in `click.testing.CliRunner`. Therefore, as |
... | ... |
@@ -784,7 +784,7 @@ def prompt_for_passphrase() -> str: |
784 | 784 |
err=True, |
785 | 785 |
), |
786 | 786 |
) |
787 |
- except click.Abort: # pragma: no cover |
|
787 |
+ except click.Abort: # pragma: no cover [external] |
|
788 | 788 |
# This branch will not be triggered during testing on `click` |
789 | 789 |
# versions < 8.2.1, due to (non-monkeypatch-able) deficiencies |
790 | 790 |
# in `click.testing.CliRunner`. Therefore, as an external source |
... | ... |
@@ -144,7 +144,7 @@ class CLIofPackageFormatter(logging.Formatter): |
144 | 144 |
""" |
145 | 145 |
preliminary_result = record.getMessage() |
146 | 146 |
prefix = f'{self.prog_name}: ' |
147 |
- if record.levelname == 'DEBUG': # pragma: no cover |
|
147 |
+ if record.levelname == 'DEBUG': # pragma: no cover [unused] |
|
148 | 148 |
level_indicator = 'Debug: ' |
149 | 149 |
elif record.levelname == 'INFO': |
150 | 150 |
level_indicator = '' |
... | ... |
@@ -156,7 +156,7 @@ class CLIofPackageFormatter(logging.Formatter): |
156 | 156 |
) |
157 | 157 |
elif record.levelname in {'ERROR', 'CRITICAL'}: |
158 | 158 |
level_indicator = '' |
159 |
- else: # pragma: no cover |
|
159 |
+ else: # pragma: no cover [failsafe] |
|
160 | 160 |
msg = f'Unsupported logging level: {record.levelname}' |
161 | 161 |
raise AssertionError(msg) |
162 | 162 |
parts = [ |
... | ... |
@@ -299,7 +299,7 @@ class StandardWarningsLoggingContextManager(StandardLoggingContextManager): |
299 | 299 |
file: TextIO | None = None, |
300 | 300 |
line: str | None = None, |
301 | 301 |
) -> None: |
302 |
- if file is not None: # pragma: no cover |
|
302 |
+ if file is not None: # pragma: no cover [external-api] |
|
303 | 303 |
self.stack[0][1]( |
304 | 304 |
message, category, filename, lineno, file, line |
305 | 305 |
) |
... | ... |
@@ -492,7 +492,9 @@ class CommandWithHelpGroups(click.Command): |
492 | 492 |
""" |
493 | 493 |
help_options = self.get_help_option_names(ctx) |
494 | 494 |
|
495 |
- if not help_options or not self.add_help_option: # pragma: no cover |
|
495 |
+ if ( |
|
496 |
+ not help_options or not self.add_help_option |
|
497 |
+ ): # pragma: no cover [external-api] |
|
496 | 498 |
return None |
497 | 499 |
|
498 | 500 |
def show_help( |
... | ... |
@@ -534,15 +536,15 @@ class CommandWithHelpGroups(click.Command): |
534 | 536 |
# to allow help texts to be general objects, not just strings. |
535 | 537 |
# Used to implement translatable strings, as objects that |
536 | 538 |
# stringify to the translation. |
537 |
- if self.short_help: # pragma: no cover |
|
539 |
+ if self.short_help: # pragma: no cover [external-api] |
|
538 | 540 |
text = inspect.cleandoc(self._text(self.short_help)) |
539 | 541 |
elif self.help: |
540 | 542 |
text = click.utils.make_default_short_help( |
541 | 543 |
self._text(self.help), limit |
542 | 544 |
) |
543 |
- else: # pragma: no cover |
|
545 |
+ else: # pragma: no cover [external-api] |
|
544 | 546 |
text = '' |
545 |
- if self.deprecated: # pragma: no cover |
|
547 |
+ if self.deprecated: # pragma: no cover [external-api] |
|
546 | 548 |
# Modification against click 8.1: The translated string is |
547 | 549 |
# looked up in the derivepassphrase message domain, not the |
548 | 550 |
# gettext default domain. |
... | ... |
@@ -577,7 +579,7 @@ class CommandWithHelpGroups(click.Command): |
577 | 579 |
if self.help is not None |
578 | 580 |
else '' |
579 | 581 |
) |
580 |
- if self.deprecated: # pragma: no cover |
|
582 |
+ if self.deprecated: # pragma: no cover [external-api] |
|
581 | 583 |
# Modification against click 8.1: The translated string is |
582 | 584 |
# looked up in the derivepassphrase message domain, not the |
583 | 585 |
# gettext default domain. |
... | ... |
@@ -638,7 +640,7 @@ class CommandWithHelpGroups(click.Command): |
638 | 640 |
if isinstance(param, OptionGroupOption): |
639 | 641 |
group_name = self._text(param.option_group_name) |
640 | 642 |
epilogs.setdefault(group_name, self._text(param.epilog)) |
641 |
- else: # pragma: no cover |
|
643 |
+ else: # pragma: no cover [external-api] |
|
642 | 644 |
group_name = default_group_name |
643 | 645 |
help_records.setdefault(group_name, []).append(rec) |
644 | 646 |
if default_group_name in help_records: # pragma: no branch |
... | ... |
@@ -688,7 +690,7 @@ class CommandWithHelpGroups(click.Command): |
688 | 690 |
commands: list[tuple[str, click.Command]] = [] |
689 | 691 |
for subcommand in self.list_commands(ctx): |
690 | 692 |
cmd = self.get_command(ctx, subcommand) |
691 |
- if cmd is None or cmd.hidden: # pragma: no cover |
|
693 |
+ if cmd is None or cmd.hidden: # pragma: no cover [external-api] |
|
692 | 694 |
continue |
693 | 695 |
commands.append((subcommand, cmd)) |
694 | 696 |
if commands: # pragma: no branch |
... | ... |
@@ -795,7 +797,7 @@ class DefaultToVaultGroup(CommandWithHelpGroups, click.Group): |
795 | 797 |
|
796 | 798 |
# If we can't find the command but there is a normalization |
797 | 799 |
# function available, we try with that one. |
798 |
- if ( # pragma: no cover |
|
800 |
+ if ( # pragma: no cover [external-api] |
|
799 | 801 |
cmd is None and ctx.token_normalize_func is not None |
800 | 802 |
): |
801 | 803 |
cmd_name = ctx.token_normalize_func(cmd_name) |
... | ... |
@@ -860,7 +862,7 @@ class TopLevelCLIEntryPoint(DefaultToVaultGroup): |
860 | 862 |
|
861 | 863 |
""" |
862 | 864 |
|
863 |
- def __call__( # pragma: no cover |
|
865 |
+ def __call__( # pragma: no cover [external-api] |
|
864 | 866 |
self, |
865 | 867 |
*args: Any, # noqa: ANN401 |
866 | 868 |
**kwargs: Any, # noqa: ANN401 |
... | ... |
@@ -1108,7 +1110,6 @@ def export_vault_version_option_callback( |
1108 | 1110 |
known_extras = { |
1109 | 1111 |
_types.PEP508Extra.EXPORT: False, |
1110 | 1112 |
} |
1111 |
- try: |
|
1112 | 1113 |
from derivepassphrase.exporter import storeroom, vault_native # noqa: I001,PLC0415 |
1113 | 1114 |
|
1114 | 1115 |
foreign_configuration_formats[ |
... | ... |
@@ -1123,8 +1124,6 @@ def export_vault_version_option_callback( |
1123 | 1124 |
known_extras[_types.PEP508Extra.EXPORT] = ( |
1124 | 1125 |
not storeroom.STUBBED and not vault_native.STUBBED |
1125 | 1126 |
) |
1126 |
- except ModuleNotFoundError: # pragma: no cover |
|
1127 |
- pass |
|
1128 | 1127 |
click.echo() |
1129 | 1128 |
version_info_types: dict[_msg.Label, list[str]] = { |
1130 | 1129 |
_msg.Label.SUPPORTED_FOREIGN_CONFIGURATION_FORMATS: [ |
... | ... |
@@ -1391,5 +1390,5 @@ fi |
1391 | 1390 |
""" |
1392 | 1391 |
if ( |
1393 | 1392 |
click.shell_completion.ZshComplete.source_template == _ORIG_SOURCE_TEMPLATE |
1394 |
-): # pragma: no cover |
|
1393 |
+): # pragma: no cover [external] |
|
1395 | 1394 |
click.shell_completion.add_completion_class(ZshComplete) |
... | ... |
@@ -51,7 +51,7 @@ def load_translations( |
51 | 51 |
localedirs: list[str | bytes | os.PathLike] | None = None, |
52 | 52 |
languages: Sequence[str] | None = None, |
53 | 53 |
class_: type[gettext.NullTranslations] | None = None, |
54 |
-) -> gettext.NullTranslations: # pragma: no cover |
|
54 |
+) -> gettext.NullTranslations: # pragma: no cover [external] |
|
55 | 55 |
"""Load a translation catalog for derivepassphrase. |
56 | 56 |
|
57 | 57 |
Runs [`gettext.translation`][] under the hood for multiple locale |
... | ... |
@@ -149,8 +149,7 @@ class DebugTranslations(gettext.NullTranslations): |
149 | 149 |
plural = v.plural |
150 | 150 |
context = v.l10n_context |
151 | 151 |
cache.setdefault((context, singular), (member, trimmed)) |
152 |
- # Currently no translatable messages use plural forms |
|
153 |
- if plural: # pragma: no cover |
|
152 |
+ if plural: # pragma: no cover [unused] |
|
154 | 153 |
cache.setdefault((context, plural), (member, trimmed)) |
155 | 154 |
|
156 | 155 |
@classmethod |
... | ... |
@@ -206,7 +205,8 @@ class DebugTranslations(gettext.NullTranslations): |
206 | 205 |
msgid2: str, |
207 | 206 |
n: int, |
208 | 207 |
/, |
209 |
- ) -> str: # pragma: no cover |
|
208 |
+ ) -> str: # pragma: no cover [unused] |
|
209 |
+ """""" # noqa: D419 |
|
210 | 210 |
return self._locate_message(msgid1, message_plural=msgid2, n=n) |
211 | 211 |
|
212 | 212 |
@override |
... | ... |
@@ -226,7 +226,8 @@ class DebugTranslations(gettext.NullTranslations): |
226 | 226 |
msgid2: str, |
227 | 227 |
n: int, |
228 | 228 |
/, |
229 |
- ) -> str: # pragma: no cover |
|
229 |
+ ) -> str: # pragma: no cover [unused] |
|
230 |
+ """""" # noqa: D419 |
|
230 | 231 |
return self._locate_message( |
231 | 232 |
msgid1, |
232 | 233 |
context=context, |
... | ... |
@@ -276,7 +277,7 @@ class TranslatableString(NamedTuple): |
276 | 277 |
implemented. |
277 | 278 |
|
278 | 279 |
""" |
279 |
- if 'python-format' in self.flags: # pragma: no cover |
|
280 |
+ if 'python-format' in self.flags: # pragma: no cover [unused] |
|
280 | 281 |
err_msg = ( |
281 | 282 |
'Replacement field discovery for %-formatting ' |
282 | 283 |
'is not implemented' |
... | ... |
@@ -334,8 +335,7 @@ class TranslatableString(NamedTuple): |
334 | 335 |
c, sep2, d = self.plural.partition(filename_str) |
335 | 336 |
if sep1: |
336 | 337 |
ret = ret._replace(singular=(a + b)) |
337 |
- # Currently no translatable messages use plural forms |
|
338 |
- if sep2: # pragma: no cover |
|
338 |
+ if sep2: # pragma: no cover [unused] |
|
339 | 339 |
ret = ret._replace(plural=(c + d)) |
340 | 340 |
return ret |
341 | 341 |
|
... | ... |
@@ -370,7 +370,7 @@ class TranslatableString(NamedTuple): |
370 | 370 |
""" |
371 | 371 |
if comments.strip() and not comments.lstrip().startswith( |
372 | 372 |
'TRANSLATORS:' |
373 |
- ): # pragma: no cover |
|
373 |
+ ): # pragma: no cover [unused] |
|
374 | 374 |
comments = 'TRANSLATORS: ' + comments.lstrip() |
375 | 375 |
comments = self._maybe_rewrap(comments, fix_sentence_endings=False) |
376 | 376 |
return self._replace(translator_comments=comments) |
... | ... |
@@ -511,15 +511,16 @@ class TranslatedString: |
511 | 511 |
"""Return true if the rendered string is truthy.""" |
512 | 512 |
return bool(str(self)) |
513 | 513 |
|
514 |
- def __eq__(self, other: object) -> bool: # pragma: no cover |
|
514 |
+ def __eq__(self, other: object) -> bool: # pragma: no cover [debug] |
|
515 | 515 |
"""Return true if the rendered string is equal to `other`.""" |
516 | 516 |
return str(self) == other |
517 | 517 |
|
518 |
- def __hash__(self) -> int: # pragma: no cover |
|
518 |
+ def __hash__(self) -> int: # pragma: no cover [debug] |
|
519 | 519 |
"""Return the hash of the rendered string.""" |
520 | 520 |
return hash(str(self)) |
521 | 521 |
|
522 |
- def __repr__(self) -> str: # pragma: no cover |
|
522 |
+ def __repr__(self) -> str: # pragma: no cover [debug] |
|
523 |
+ """""" # noqa: D419 |
|
523 | 524 |
return ( |
524 | 525 |
f'{self.__class__.__name__}({self.template!r}, ' |
525 | 526 |
f'{dict(self.kwargs)!r})' |
... | ... |
@@ -2348,7 +2349,7 @@ def _write_po_file( # noqa: C901,PLR0912 |
2348 | 2349 |
is_template: bool = True, |
2349 | 2350 |
version: str = VERSION, |
2350 | 2351 |
build_time: datetime.datetime | None = None, |
2351 |
-) -> None: # pragma: no cover |
|
2352 |
+) -> None: # pragma: no cover [interactive] |
|
2352 | 2353 |
r"""Write a .po file to the given file object. |
2353 | 2354 |
|
2354 | 2355 |
Assumes the file object is opened for writing and accepts string |
... | ... |
@@ -2483,7 +2484,8 @@ def _write_po_file( # noqa: C901,PLR0912 |
2483 | 2484 |
def _format_po_info( |
2484 | 2485 |
data: Mapping[str, Any], |
2485 | 2486 |
/, |
2486 |
-) -> Iterator[str]: # pragma: no cover |
|
2487 |
+) -> Iterator[str]: # pragma: no cover [internal] |
|
2488 |
+ """""" # noqa: D419 |
|
2487 | 2489 |
sortorder = [ |
2488 | 2490 |
'project-id-version', |
2489 | 2491 |
'report-msgid-bugs-to', |
... | ... |
@@ -2517,7 +2519,8 @@ def _format_po_entry( |
2517 | 2519 |
*, |
2518 | 2520 |
is_debug_translation: bool = False, |
2519 | 2521 |
transformed_string: TranslatableString | None = None, |
2520 |
-) -> tuple[str, ...]: # pragma: no cover |
|
2522 |
+) -> tuple[str, ...]: # pragma: no cover [internal] |
|
2523 |
+ """""" # noqa: D419 |
|
2521 | 2524 |
ret: list[str] = ['\n'] |
2522 | 2525 |
ts = transformed_string or cast('TranslatableString', enum_value.value) |
2523 | 2526 |
if ts.translator_comments: |
... | ... |
@@ -2542,7 +2545,9 @@ def _format_po_entry( |
2542 | 2545 |
return tuple(ret) |
2543 | 2546 |
|
2544 | 2547 |
|
2545 |
-def _cstr(s: str) -> str: # pragma: no cover |
|
2548 |
+def _cstr(s: str) -> str: # pragma: no cover [internal] |
|
2549 |
+ """""" # noqa: D419 |
|
2550 |
+ |
|
2546 | 2551 |
def escape(string: str) -> str: |
2547 | 2552 |
return string.translate({ |
2548 | 2553 |
0: r'\000', |
... | ... |
@@ -42,7 +42,7 @@ __all__ = ( |
42 | 42 |
) |
43 | 43 |
|
44 | 44 |
|
45 |
-class _Omitted: # pragma: no cover |
|
45 |
+class _Omitted: |
|
46 | 46 |
def __bool__(self) -> bool: |
47 | 47 |
return False |
48 | 48 |
|
... | ... |
@@ -331,14 +331,20 @@ class _VaultConfigValidator: |
331 | 331 |
or 'services' not in obj |
332 | 332 |
or not isinstance(obj['services'], dict) |
333 | 333 |
): |
334 |
- raise ValueError(self.INVALID_CONFIG_ERROR) # pragma: no cover |
|
334 |
+ raise ValueError( |
|
335 |
+ self.INVALID_CONFIG_ERROR |
|
336 |
+ ) # pragma: no cover [failsafe] |
|
335 | 337 |
if 'global' in obj and not isinstance(obj['global'], dict): |
336 |
- raise ValueError(self.INVALID_CONFIG_ERROR) # pragma: no cover |
|
338 |
+ raise ValueError( |
|
339 |
+ self.INVALID_CONFIG_ERROR |
|
340 |
+ ) # pragma: no cover [failsafe] |
|
337 | 341 |
if not all( |
338 | 342 |
isinstance(service_obj, dict) |
339 | 343 |
for service_obj in obj['services'].values() |
340 | 344 |
): |
341 |
- raise ValueError(self.INVALID_CONFIG_ERROR) # pragma: no cover |
|
345 |
+ raise ValueError( |
|
346 |
+ self.INVALID_CONFIG_ERROR |
|
347 |
+ ) # pragma: no cover [failsafe] |
|
342 | 348 |
|
343 | 349 |
def falsy(value: Any) -> bool: # noqa: ANN401 |
344 | 350 |
return not js_truthiness(value) |
... | ... |
@@ -460,7 +466,7 @@ def validate_vault_config( |
460 | 466 |
# then include this in coverage. |
461 | 467 |
if not isinstance( |
462 | 468 |
allow_derivepassphrase_extensions, _Omitted |
463 |
- ): # pragma: no cover |
|
469 |
+ ): # pragma: no cover [unused] |
|
464 | 470 |
warnings.warn( |
465 | 471 |
get_overloads(validate_vault_config)[0].__deprecated__, # type: ignore[attr-defined] |
466 | 472 |
DeprecationWarning, |
... | ... |
@@ -488,7 +494,7 @@ def is_vault_config(obj: Any) -> TypeIs[VaultConfig]: # noqa: ANN401 |
488 | 494 |
allow_unknown_settings=True, |
489 | 495 |
) |
490 | 496 |
except (TypeError, ValueError) as exc: |
491 |
- if 'vault config ' not in str(exc): # pragma: no cover |
|
497 |
+ if 'vault config ' not in str(exc): # pragma: no cover [failsafe] |
|
492 | 498 |
raise |
493 | 499 |
return False |
494 | 500 |
return True |
... | ... |
@@ -150,7 +150,7 @@ def derivepassphrase_export(ctx: click.Context, /) -> None: |
150 | 150 |
# Constructing the subcontext above will usually already |
151 | 151 |
# lead to a click.UsageError, so this block typically won't |
152 | 152 |
# actually be called. |
153 |
- with sub_ctx: # pragma: no cover |
|
153 |
+ with sub_ctx: # pragma: no cover [unused] |
|
154 | 154 |
return derivepassphrase_export_vault.invoke(sub_ctx) |
155 | 155 |
return None |
156 | 156 |
|
... | ... |
@@ -407,7 +407,7 @@ class _VaultContext: # noqa: PLR0904 |
407 | 407 |
if isinstance(param, class_): |
408 | 408 |
group = class_ |
409 | 409 |
break |
410 |
- else: # pragma: no cover |
|
410 |
+ else: # pragma: no cover [failsafe] |
|
411 | 411 |
assert False, f'Unknown option group for {param!r}' # noqa: B011,PT015 |
412 | 412 |
else: |
413 | 413 |
group = click.Option |
... | ... |
@@ -36,7 +36,8 @@ class NotAVaultConfigError(ValueError): |
36 | 36 |
self.path = os.fspath(path) |
37 | 37 |
self.format = format |
38 | 38 |
|
39 |
- def __str__(self) -> str: # pragma: no cover |
|
39 |
+ def __str__(self) -> str: # pragma: no cover [failsafe] |
|
40 |
+ """""" # noqa: D419 |
|
40 | 41 |
formatted_format = ( |
41 | 42 |
f'vault {self.format} configuration' |
42 | 43 |
if self.format |
... | ... |
@@ -64,10 +65,10 @@ def get_vault_key() -> bytes: |
64 | 65 |
|
65 | 66 |
""" |
66 | 67 |
|
67 |
- def getenv_environb(env_var: str) -> bytes: # pragma: no cover |
|
68 |
+ def getenv_environb(env_var: str) -> bytes: # pragma: no cover [external] |
|
68 | 69 |
return os.environb.get(env_var.encode('UTF-8'), b'') # type: ignore[attr-defined] |
69 | 70 |
|
70 |
- def getenv_environ(env_var: str) -> bytes: # pragma: no cover |
|
71 |
+ def getenv_environ(env_var: str) -> bytes: # pragma: no cover [external] |
|
71 | 72 |
return os.environ.get(env_var, '').encode('UTF-8') |
72 | 73 |
|
73 | 74 |
getenv: Callable[[str], bytes] = ( |
... | ... |
@@ -107,7 +108,7 @@ def get_vault_path() -> pathlib.Path: |
107 | 108 |
).expanduser() |
108 | 109 |
|
109 | 110 |
|
110 |
-class ExportVaultConfigDataFunction(Protocol): # pragma: no cover |
|
111 |
+class ExportVaultConfigDataFunction(Protocol): |
|
111 | 112 |
"""Typing protocol for vault config data export handlers.""" |
112 | 113 |
|
113 | 114 |
def __call__( |
... | ... |
@@ -53,7 +53,7 @@ else: |
53 | 53 |
importlib.import_module('cryptography') |
54 | 54 |
except ModuleNotFoundError as exc: |
55 | 55 |
|
56 |
- class _DummyModule: # pragma: no cover |
|
56 |
+ class _DummyModule: |
|
57 | 57 |
def __init__(self, exc: type[Exception]) -> None: |
58 | 58 |
self.exc = exc |
59 | 59 |
|
... | ... |
@@ -115,7 +115,7 @@ def export_storeroom_data( # noqa: C901,D417,PLR0912,PLR0914,PLR0915 |
115 | 115 |
path = pathlib.Path(os.fsdecode(path)) |
116 | 116 |
if key is None: |
117 | 117 |
key = exporter.get_vault_key() |
118 |
- if format != 'storeroom': # pragma: no cover |
|
118 |
+ if format != 'storeroom': # pragma: no cover [failsafe] |
|
119 | 119 |
msg = exporter.INVALID_VAULT_NATIVE_CONFIGURATION_FORMAT.format( |
120 | 120 |
fmt=format |
121 | 121 |
) |
... | ... |
@@ -57,7 +57,7 @@ else: |
57 | 57 |
importlib.import_module('cryptography') |
58 | 58 |
except ModuleNotFoundError as exc: |
59 | 59 |
|
60 |
- class _DummyModule: # pragma: no cover |
|
60 |
+ class _DummyModule: |
|
61 | 61 |
def __init__(self, exc: type[Exception]) -> None: |
62 | 62 |
self.exc = exc |
63 | 63 |
|
... | ... |
@@ -121,7 +121,7 @@ def export_vault_native_data( # noqa: D417 |
121 | 121 |
'v0.2': VaultNativeV02ConfigParser, |
122 | 122 |
'v0.3': VaultNativeV03ConfigParser, |
123 | 123 |
}.get(format) |
124 |
- if parser_class is None: # pragma: no cover |
|
124 |
+ if parser_class is None: # pragma: no cover [failsafe] |
|
125 | 125 |
msg = exporter.INVALID_VAULT_NATIVE_CONFIGURATION_FORMAT.format( |
126 | 126 |
fmt=format |
127 | 127 |
) |
... | ... |
@@ -50,14 +50,15 @@ class SSHAgentFailedError(RuntimeError): |
50 | 50 |
b'', |
51 | 51 |
): |
52 | 52 |
return 'The SSH agent failed to complete the request' |
53 |
- elif self.args[1]: # noqa: RET505 # pragma: no cover |
|
53 |
+ elif self.args[1]: # noqa: RET505 # pragma: no cover [failsafe] |
|
54 | 54 |
code = self.args[0] |
55 | 55 |
msg = self.args[1].decode('utf-8', 'surrogateescape') |
56 | 56 |
return f'[Code {code:d}] {msg:s}' |
57 |
- else: # pragma: no cover |
|
57 |
+ else: # pragma: no cover [failsafe] |
|
58 | 58 |
return repr(self) |
59 | 59 |
|
60 |
- def __repr__(self) -> str: # pragma: no cover |
|
60 |
+ def __repr__(self) -> str: # pragma: no cover [debug] |
|
61 |
+ """""" # noqa: D419 |
|
61 | 62 |
return f'{self.__class__.__name__}{self.args!r}' |
62 | 63 |
|
63 | 64 |
|
... | ... |
@@ -335,7 +336,7 @@ class SSHAgentClient: |
335 | 336 |
elif isinstance(conn, socket.socket) or conn is None: |
336 | 337 |
with SSHAgentClient(socket=conn) as client: |
337 | 338 |
yield client |
338 |
- else: # pragma: no cover |
|
339 |
+ else: # pragma: no cover [failsafe] |
|
339 | 340 |
assert_type(conn, Never) |
340 | 341 |
msg = f'invalid connection hint: {conn!r}' |
341 | 342 |
raise TypeError(msg) |
... | ... |
@@ -479,7 +480,7 @@ class SSHAgentClient: |
479 | 480 |
if len(response) < response_length: |
480 | 481 |
msg = 'truncated response from SSH agent' |
481 | 482 |
raise EOFError(msg) |
482 |
- if not response_code: # pragma: no cover |
|
483 |
+ if not response_code: # pragma: no cover [failsafe] |
|
483 | 484 |
return response[0], response[1:] |
484 | 485 |
if response[0] not in response_code: |
485 | 486 |
raise SSHAgentFailedError(response[0], response[1:]) |
... | ... |
@@ -50,7 +50,7 @@ def _hypothesis_settings_setup() -> None: |
50 | 50 |
importlib.util.find_spec('coverage') is not None |
51 | 51 |
and settings.deadline is not None |
52 | 52 |
and settings.deadline.total_seconds() < 1.0 |
53 |
- ): # pragma: no cover |
|
53 |
+ ): # pragma: no cover [external] |
|
54 | 54 |
ctracer_class = ( |
55 | 55 |
importlib.import_module('coverage.tracer').CTracer |
56 | 56 |
if importlib.util.find_spec('coverage.tracer') is not None |
... | ... |
@@ -112,7 +112,7 @@ _hypothesis_settings_setup() |
112 | 112 |
# https://docs.pytest.org/en/stable/explanation/fixtures.html#a-note-about-fixture-cleanup |
113 | 113 |
# https://github.com/pytest-dev/pytest/issues/5243#issuecomment-491522595 |
114 | 114 |
@pytest.fixture(scope='session', autouse=False) |
115 |
-def term_handler() -> Iterator[None]: # pragma: no cover |
|
115 |
+def term_handler() -> Iterator[None]: # pragma: no cover [external] |
|
116 | 116 |
try: |
117 | 117 |
import signal # noqa: PLC0415 |
118 | 118 |
|
... | ... |
@@ -126,7 +126,7 @@ def term_handler() -> Iterator[None]: # pragma: no cover |
126 | 126 |
|
127 | 127 |
|
128 | 128 |
@pytest.fixture(scope='session') |
129 |
-def skip_if_no_af_unix_support() -> None: # pragma: no cover |
|
129 |
+def skip_if_no_af_unix_support() -> None: # pragma: no cover [external] |
|
130 | 130 |
"""Skip the test if Python does not support AF_UNIX. |
131 | 131 |
|
132 | 132 |
Implemented as a fixture instead of a mark because it has |
... | ... |
@@ -170,7 +170,7 @@ class SpawnFunc(Protocol): |
170 | 170 |
""" |
171 | 171 |
|
172 | 172 |
|
173 |
-def spawn_pageant( # pragma: no cover |
|
173 |
+def spawn_pageant( # pragma: no cover [external] |
|
174 | 174 |
executable: str | None, env: dict[str, str] |
175 | 175 |
) -> subprocess.Popen[str] | None: |
176 | 176 |
"""Spawn an isolated Pageant, if possible. |
... | ... |
@@ -195,7 +195,7 @@ def spawn_pageant( # pragma: no cover |
195 | 195 |
subprocess. |
196 | 196 |
|
197 | 197 |
""" |
198 |
- if executable is None: # pragma: no cover |
|
198 |
+ if executable is None: # pragma: no cover [external] |
|
199 | 199 |
return None |
200 | 200 |
|
201 | 201 |
# Apparently, Pageant 0.81 and lower running in debug mode does |
... | ... |
@@ -227,7 +227,7 @@ def spawn_pageant( # pragma: no cover |
227 | 227 |
v0_82 = packaging.version.Version('0.82') |
228 | 228 |
pageant_version = packaging.version.Version(pageant_version_string) |
229 | 229 |
|
230 |
- if pageant_version < v0_82: # pragma: no cover |
|
230 |
+ if pageant_version < v0_82: # pragma: no cover [external] |
|
231 | 231 |
return None |
232 | 232 |
|
233 | 233 |
return subprocess.Popen( |
... | ... |
@@ -242,7 +242,7 @@ def spawn_pageant( # pragma: no cover |
242 | 242 |
) |
243 | 243 |
|
244 | 244 |
|
245 |
-def spawn_openssh_agent( # pragma: no cover |
|
245 |
+def spawn_openssh_agent( # pragma: no cover [external] |
|
246 | 246 |
executable: str | None, env: dict[str, str] |
247 | 247 |
) -> subprocess.Popen[str] | None: |
248 | 248 |
"""Spawn an isolated OpenSSH agent, if possible. |
... | ... |
@@ -276,7 +276,7 @@ def spawn_openssh_agent( # pragma: no cover |
276 | 276 |
) |
277 | 277 |
|
278 | 278 |
|
279 |
-def spawn_noop( # pragma: no cover |
|
279 |
+def spawn_noop( # pragma: no cover [unused] |
|
280 | 280 |
executable: str | None, env: dict[str, str] |
281 | 281 |
) -> None: |
282 | 282 |
"""Placeholder function. Does nothing.""" |
... | ... |
@@ -323,7 +323,7 @@ def spawn_named_agent( |
323 | 323 |
exec_name: str, |
324 | 324 |
spawn_func: SpawnFunc, |
325 | 325 |
agent_type: tests.KnownSSHAgent, |
326 |
-) -> Iterator[tests.SpawnedSSHAgentInfo]: # pragma: no cover |
|
326 |
+) -> Iterator[tests.SpawnedSSHAgentInfo]: # pragma: no cover [external] |
|
327 | 327 |
"""Spawn the named SSH agent and check that it is operational. |
328 | 328 |
|
329 | 329 |
Using the correct agent-specific spawn function from the |
... | ... |
@@ -379,7 +379,7 @@ def spawn_named_agent( |
379 | 379 |
with exit_stack: |
380 | 380 |
if spawn_func is spawn_noop: |
381 | 381 |
ssh_auth_sock = os.environ['SSH_AUTH_SOCK'] |
382 |
- elif proc is None: # pragma: no cover |
|
382 |
+ elif proc is None: # pragma: no cover [external] |
|
383 | 383 |
err_msg = f'Cannot spawn usable {exec_name}' |
384 | 384 |
raise CannotSpawnError(err_msg) |
385 | 385 |
else: |
... | ... |
@@ -393,14 +393,14 @@ def spawn_named_agent( |
393 | 393 |
ssh_auth_sock = tests.parse_sh_export_line( |
394 | 394 |
ssh_auth_sock_line, env_name='SSH_AUTH_SOCK' |
395 | 395 |
) |
396 |
- except ValueError: # pragma: no cover |
|
396 |
+ except ValueError: # pragma: no cover [external] |
|
397 | 397 |
err_msg = f'Cannot parse agent output: {ssh_auth_sock_line!r}' |
398 | 398 |
raise CannotSpawnError(err_msg) from None |
399 | 399 |
pid_line = proc.stdout.readline() |
400 | 400 |
if ( |
401 | 401 |
'pid' not in pid_line.lower() |
402 | 402 |
and '_pid' not in pid_line.lower() |
403 |
- ): # pragma: no cover |
|
403 |
+ ): # pragma: no cover [external] |
|
404 | 404 |
err_msg = f'Cannot parse agent output: {pid_line!r}' |
405 | 405 |
raise CannotSpawnError(err_msg) |
406 | 406 |
monkeypatch = exit_stack.enter_context(pytest.MonkeyPatch.context()) |
... | ... |
@@ -418,7 +418,7 @@ def spawn_named_agent( |
418 | 418 |
|
419 | 419 |
|
420 | 420 |
@pytest.fixture |
421 |
-def running_ssh_agent( # pragma: no cover |
|
421 |
+def running_ssh_agent( # pragma: no cover [external] |
|
422 | 422 |
skip_if_no_af_unix_support: None, |
423 | 423 |
) -> Iterator[tests.RunningSSHAgentInfo]: |
424 | 424 |
"""Ensure a running SSH agent, if possible, as a pytest fixture. |
... | ... |
@@ -473,7 +473,7 @@ def running_ssh_agent( # pragma: no cover |
473 | 473 |
def spawn_ssh_agent( |
474 | 474 |
request: pytest.FixtureRequest, |
475 | 475 |
skip_if_no_af_unix_support: None, |
476 |
-) -> Iterator[tests.SpawnedSSHAgentInfo]: # pragma: no cover |
|
476 |
+) -> Iterator[tests.SpawnedSSHAgentInfo]: # pragma: no cover [external] |
|
477 | 477 |
"""Spawn an isolated SSH agent, if possible, as a pytest fixture. |
478 | 478 |
|
479 | 479 |
Spawn a new SSH agent isolated from other SSH use by other |
... | ... |
@@ -500,9 +500,9 @@ def spawn_ssh_agent( |
500 | 500 |
# fixtures of the same parameter set have run. So set |
501 | 501 |
# SSH_AUTH_SOCK explicitly to the value saved at interpreter |
502 | 502 |
# startup. |
503 |
- if startup_ssh_auth_sock: # pragma: no cover |
|
503 |
+ if startup_ssh_auth_sock: # pragma: no cover [external] |
|
504 | 504 |
monkeypatch.setenv('SSH_AUTH_SOCK', startup_ssh_auth_sock) |
505 |
- else: # pragma: no cover |
|
505 |
+ else: # pragma: no cover [external] |
|
506 | 506 |
monkeypatch.delenv('SSH_AUTH_SOCK', raising=False) |
507 | 507 |
try: |
508 | 508 |
yield from spawn_named_agent(*request.param) |
... | ... |
@@ -571,7 +571,7 @@ def ssh_agent_client_with_test_keys_loaded( # noqa: C901 |
571 | 571 |
try: |
572 | 572 |
for key_type, key_struct in tests.ALL_KEYS.items(): |
573 | 573 |
private_key_data = key_struct.private_key_blob |
574 |
- if private_key_data is None: # pragma: no cover |
|
574 |
+ if private_key_data is None: # pragma: no cover [failsafe] |
|
575 | 575 |
continue |
576 | 576 |
request_code, payload = prepare_payload( |
577 | 577 |
private_key_data, isolated=isolated, time_to_live=30 |
... | ... |
@@ -583,7 +583,9 @@ def ssh_agent_client_with_test_keys_loaded( # noqa: C901 |
583 | 583 |
payload, |
584 | 584 |
response_code=_types.SSH_AGENT.SUCCESS, |
585 | 585 |
) |
586 |
- except ssh_agent.SSHAgentFailedError: # pragma: no cover |
|
586 |
+ except ( |
|
587 |
+ ssh_agent.SSHAgentFailedError |
|
588 |
+ ): # pragma: no cover [external] |
|
587 | 589 |
# Pageant can fail to accept a key for two separate |
588 | 590 |
# reasons: |
589 | 591 |
# |
... | ... |
@@ -618,16 +620,16 @@ def ssh_agent_client_with_test_keys_loaded( # noqa: C901 |
618 | 620 |
EOFError, |
619 | 621 |
OSError, |
620 | 622 |
ssh_agent.SSHAgentFailedError, |
621 |
- ): # pragma: no cover |
|
623 |
+ ): # pragma: no cover [external] |
|
622 | 624 |
pass |
623 |
- else: # pragma: no cover |
|
625 |
+ else: # pragma: no cover [external] |
|
624 | 626 |
successfully_loaded_keys.add(key_type) |
625 | 627 |
yield client |
626 | 628 |
finally: |
627 | 629 |
for key_type, key_struct in tests.ALL_KEYS.items(): |
628 | 630 |
if not isolated and ( |
629 | 631 |
key_type in successfully_loaded_keys |
630 |
- ): # pragma: no cover |
|
632 |
+ ): # pragma: no cover [external] |
|
631 | 633 |
# The public key blob is the base64-encoded part in |
632 | 634 |
# the "public key line". |
633 | 635 |
public_key = base64.standard_b64decode( |
634 | 636 |