# SPDX-FileCopyrightText: 2025 Marco Ricci # # SPDX-License-Identifier: Zlib from __future__ import annotations import base64 import contextlib import copy import enum import errno import importlib.util import json import logging import os import pathlib import re import shlex import stat import sys import tempfile import types import zipfile from typing import TYPE_CHECKING, TypedDict import click.testing import hypothesis import pytest from hypothesis import strategies from typing_extensions import NamedTuple, assert_never, overload from derivepassphrase import _types, cli, ssh_agent, vault from derivepassphrase._internals import cli_helpers, cli_machinery from derivepassphrase.ssh_agent import socketprovider __all__ = () if TYPE_CHECKING: import socket from collections.abc import Callable, Iterator, Mapping, Sequence from contextlib import AbstractContextManager from typing import IO, NotRequired from typing_extensions import Any, Buffer, Self class SSHTestKeyDeterministicSignatureClass(str, enum.Enum): """The class of a deterministic signature from an SSH test key. Attributes: SPEC: A deterministic signature directly implied by the specification of the signature algorithm. RFC_6979: A deterministic signature as specified by RFC 6979. Only used with DSA and ECDSA keys (that aren't also EdDSA keys). Pageant_068_080: A deterministic signature as specified by Pageant 0.68. Only used with DSA and ECDSA keys (that aren't also EdDSA keys), and only used with Pageant from 0.68 up to and including 0.80. Usage of this signature class together with an ECDSA NIST P-521 key [turned out to leak enough information per signature to quickly compromise the entire private key (CVE-2024-31497)][PUTTY_CVE_2024_31497], so newer Pageant versions abandon this signature class in favor of RFC 6979. [PUTTY_CVE_2024_31497]: https://www.chiark.greenend.org.uk/~sgtatham/putty/wishlist/vuln-p521-bias.html """ SPEC = enum.auto() """""" RFC_6979 = enum.auto() """""" Pageant_068_080 = enum.auto() """""" class SSHTestKeyDeterministicSignature(NamedTuple): """An SSH test key deterministic signature. Attributes: signature: The binary signature of the [vault UUID][vault.Vault.UUID] under this signature class. derived_passphrase: The equivalent master passphrase derived from this signature. signature_class: The [signature class][SSHTestKeyDeterministicSignatureClass]. """ signature: bytes """""" derived_passphrase: bytes """""" signature_class: SSHTestKeyDeterministicSignatureClass = ( SSHTestKeyDeterministicSignatureClass.SPEC ) """""" class SSHTestKey(NamedTuple): """An SSH test key. Attributes: public_key: The SSH public key string, as used e.g. by OpenSSH's `authorized_keys` file. Includes a comment. public_key_data: The SSH protocol wire format of the public key. private_key: A base64 encoded representation of the private key, in OpenSSH's v1 private key format. private_key_blob: The SSH protocol wire format of the private key. expected_signatures: A mapping of deterministic signature classes to the expected, deterministic signature (of that class) of the vault UUID for this key, together with the respective "equivalent master passphrase" derived from this signature. """ public_key: bytes """""" public_key_data: bytes """""" private_key: bytes """""" private_key_blob: bytes """""" expected_signatures: Mapping[ SSHTestKeyDeterministicSignatureClass, SSHTestKeyDeterministicSignature ] """""" def is_suitable( self, *, client: ssh_agent.SSHAgentClient | None = None, ) -> bool: """Return if this key is suitable for use with vault. Args: client: An optional SSH agent client to check for additional deterministic key types. If not given, assume no such types. """ return vault.Vault.is_suitable_ssh_key( self.public_key_data, client=client ) class ValidationSettings(NamedTuple): """Validation settings for [`VaultTestConfig`][]s. Attributes: allow_unknown_settings: See [`_types.validate_vault_config`][]. """ allow_unknown_settings: bool """""" class VaultTestConfig(NamedTuple): """A (not necessarily valid) sample vault config, plus metadata. Attributes: config: The actual configuration object. Usually a [`dict`][]. comment: An explanatory comment for what is wrong with this config, or empty if the config is valid. This is intended as a debugging message to be shown to the user (e.g. when an assertion fails), not as an error message to programmatically match against. validation_settings: See [`_types.validate_vault_config`][]. """ config: Any """""" comment: str """""" validation_settings: ValidationSettings | None """""" TEST_CONFIGS: list[VaultTestConfig] = [ VaultTestConfig(None, "not a dict", None), VaultTestConfig({}, "missing required keys", None), VaultTestConfig( {"global": None, "services": {}}, "bad config value: global", None ), VaultTestConfig( {"global": {"key": 123}, "services": {}}, "bad config value: global.key", None, ), VaultTestConfig( {"global": {"phrase": "abc", "key": "..."}, "services": {}}, "", None, ), VaultTestConfig({"services": None}, "bad config value: services", None), VaultTestConfig( {"services": {"1": {}, 2: {}}}, 'bad config value: services."2"', None ), VaultTestConfig( {"services": {"1": {}, "2": 2}}, 'bad config value: services."2"', None ), VaultTestConfig( {"services": {"sv": {"notes": ["sentinel", "list"]}}}, "bad config value: services.sv.notes", None, ), VaultTestConfig( {"services": {"sv": {"notes": "blah blah blah"}}}, "", None ), VaultTestConfig( {"services": {"sv": {"length": "200"}}}, "bad config value: services.sv.length", None, ), VaultTestConfig( {"services": {"sv": {"length": 0.5}}}, "bad config value: services.sv.length", None, ), VaultTestConfig( {"services": {"sv": {"length": ["sentinel", "list"]}}}, "bad config value: services.sv.length", None, ), VaultTestConfig( {"services": {"sv": {"length": -10}}}, "bad config value: services.sv.length", None, ), VaultTestConfig( {"services": {"sv": {"lower": "10"}}}, "bad config value: services.sv.lower", None, ), VaultTestConfig( {"services": {"sv": {"upper": -10}}}, "bad config value: services.sv.upper", None, ), VaultTestConfig( {"services": {"sv": {"number": ["sentinel", "list"]}}}, "bad config value: services.sv.number", None, ), VaultTestConfig( { "global": {"phrase": "my secret phrase"}, "services": {"sv": {"length": 10}}, }, "", None, ), VaultTestConfig( {"services": {"sv": {"length": 10, "phrase": "..."}}}, "", None ), VaultTestConfig( {"services": {"sv": {"length": 10, "key": "..."}}}, "", None ), VaultTestConfig( {"services": {"sv": {"upper": 10, "key": "..."}}}, "", None ), VaultTestConfig( {"services": {"sv": {"phrase": "abc", "key": "..."}}}, "", None ), VaultTestConfig( { "global": {"phrase": "abc"}, "services": {"sv": {"phrase": "abc", "length": 10}}, }, "", None, ), VaultTestConfig( { "global": {"key": "..."}, "services": {"sv": {"phrase": "abc", "length": 10}}, }, "", None, ), VaultTestConfig( { "global": {"key": "..."}, "services": {"sv": {"phrase": "abc", "key": "...", "length": 10}}, }, "", None, ), VaultTestConfig( { "global": {"key": "..."}, "services": { "sv1": {"phrase": "abc", "length": 10, "upper": 1}, "sv2": {"length": 10, "repeat": 1, "lower": 1}, }, }, "", None, ), VaultTestConfig( { "global": {"key": "...", "unicode_normalization_form": "NFC"}, "services": { "sv1": {"phrase": "abc", "length": 10, "upper": 1}, "sv2": {"length": 10, "repeat": 1, "lower": 1}, }, }, "", None, ), VaultTestConfig( { "global": {"key": "...", "unicode_normalization_form": True}, "services": {}, }, "bad config value: global.unicode_normalization_form", None, ), VaultTestConfig( { "global": {"key": "...", "unicode_normalization_form": "NFC"}, "services": { "sv1": {"phrase": "abc", "length": 10, "upper": 1}, "sv2": {"length": 10, "repeat": 1, "lower": 1}, }, }, "", ValidationSettings(True), ), VaultTestConfig( { "global": {"key": "...", "unicode_normalization_form": "NFC"}, "services": { "sv1": {"phrase": "abc", "length": 10, "upper": 1}, "sv2": {"length": 10, "repeat": 1, "lower": 1}, }, }, "extension/unknown key: .global.unicode_normalization_form", ValidationSettings(False), ), VaultTestConfig( { "global": {"key": "...", "unknown_key": True}, "services": { "sv1": {"phrase": "abc", "length": 10, "upper": 1}, "sv2": {"length": 10, "repeat": 1, "lower": 1}, }, }, "", ValidationSettings(True), ), VaultTestConfig( { "global": {"key": "...", "unknown_key": True}, "services": { "sv1": {"phrase": "abc", "length": 10, "upper": 1}, "sv2": {"length": 10, "repeat": 1, "lower": 1}, }, }, "unknown key: .global.unknown_key", ValidationSettings(False), ), VaultTestConfig( { "global": {"key": "..."}, "services": { "sv1": {"phrase": "abc", "length": 10, "upper": 1}, "sv2": { "length": 10, "repeat": 1, "lower": 1, "unknown_key": True, }, }, }, "unknown key: .services.sv2.unknown_key", ValidationSettings(False), ), VaultTestConfig( { "global": {"key": "...", "unicode_normalization_form": "NFC"}, "services": { "sv1": {"phrase": "abc", "length": 10, "upper": 1}, "sv2": { "length": 10, "repeat": 1, "lower": 1, "unknown_key": True, }, }, }, "", ValidationSettings(True), ), VaultTestConfig( { "global": {"key": "...", "unicode_normalization_form": "NFC"}, "services": { "sv1": {"phrase": "abc", "length": 10, "upper": 1}, "sv2": { "length": 10, "repeat": 1, "lower": 1, "unknown_key": True, }, }, }, "", ValidationSettings(True), ), VaultTestConfig( { "global": {"key": "...", "unicode_normalization_form": "NFC"}, "services": { "sv1": {"phrase": "abc", "length": 10, "upper": 1}, "sv2": { "length": 10, "repeat": 1, "lower": 1, "unknown_key": True, }, }, }, "", ValidationSettings(True), ), ] """The master list of test configurations for vault.""" def is_valid_test_config(conf: VaultTestConfig, /) -> bool: """Return true if the test config is valid. Args: conf: The test config to check. """ return not conf.comment and conf.validation_settings in { None, (True,), } def _test_config_ids(val: VaultTestConfig) -> Any: # pragma: no cover """pytest id function for VaultTestConfig objects.""" assert isinstance(val, VaultTestConfig) return val[1] or (val[0], val[1], val[2]) @strategies.composite def vault_full_service_config(draw: strategies.DrawFn) -> dict[str, int]: """Hypothesis strategy for full vault service configurations. Returns a sample configuration with restrictions on length, repeat count, and all character classes, while ensuring the settings are not obviously unsatisfiable. Args: draw: The `draw` function, as provided for by hypothesis. """ repeat = draw(strategies.integers(min_value=0, max_value=10)) lower = draw(strategies.integers(min_value=0, max_value=10)) upper = draw(strategies.integers(min_value=0, max_value=10)) number = draw(strategies.integers(min_value=0, max_value=10)) space = draw(strategies.integers(min_value=0, max_value=repeat)) dash = draw(strategies.integers(min_value=0, max_value=10)) symbol = draw(strategies.integers(min_value=0, max_value=10)) length = draw( strategies.integers( min_value=max(1, lower + upper + number + space + dash + symbol), max_value=70, ) ) hypothesis.assume(lower + upper + number + dash + symbol > 0) hypothesis.assume(lower + upper + number + space + symbol > 0) hypothesis.assume(repeat >= space) return { "lower": lower, "upper": upper, "number": number, "space": space, "dash": dash, "symbol": symbol, "repeat": repeat, "length": length, } def is_smudgable_vault_test_config(conf: VaultTestConfig) -> bool: """Check whether this vault test config can be effectively smudged. A "smudged" test config is one where falsy values (in the JavaScript sense) can be replaced by other falsy values without changing the meaning of the config. Args: conf: A test config to check. Returns: True if the test config can be smudged, False otherwise. """ config = conf.config return bool( isinstance(config, dict) and ("global" not in config or isinstance(config["global"], dict)) and ("services" in config and isinstance(config["services"], dict)) and all(isinstance(x, dict) for x in config["services"].values()) and (config["services"] or config.get("global")) ) @strategies.composite def smudged_vault_test_config( draw: strategies.DrawFn, config: Any = strategies.sampled_from(TEST_CONFIGS).filter( # noqa: B008 is_smudgable_vault_test_config ), ) -> Any: """Hypothesis strategy to replace falsy values with other falsy values. Uses [`_types.js_truthiness`][] internally, which is tested separately by [`tests.test_derivepassphrase_types.test_100_js_truthiness`][]. Args: draw: The `draw` function, as provided for by hypothesis. config: A strategy which generates [`VaultTestConfig`][] objects. Returns: A new [`VaultTestConfig`][] where some falsy values have been replaced or added. """ falsy = (None, False, 0, 0.0, "", float("nan")) falsy_no_str = (None, False, 0, 0.0, float("nan")) falsy_no_zero = (None, False, "", float("nan")) conf = draw(config) hypothesis.assume(is_smudgable_vault_test_config(conf)) obj = copy.deepcopy(conf.config) services: list[dict[str, Any]] = list(obj["services"].values()) if "global" in obj: services.append(obj["global"]) assert all(isinstance(x, dict) for x in services), ( "is_smudgable_vault_test_config guard failed to " "ensure each settings dict is a dict" ) for service in services: for key in ("phrase",): value = service.get(key) if not _types.js_truthiness(value) and value != "": service[key] = draw(strategies.sampled_from(falsy_no_str)) for key in ( "notes", "key", "length", "repeat", ): value = service.get(key) if not _types.js_truthiness(value): service[key] = draw(strategies.sampled_from(falsy)) for key in ( "lower", "upper", "number", "space", "dash", "symbol", ): value = service.get(key) if not _types.js_truthiness(value) and value != 0: service[key] = draw(strategies.sampled_from(falsy_no_zero)) hypothesis.assume(obj != conf.config) return VaultTestConfig(obj, conf.comment, conf.validation_settings) class KnownSSHAgent(str, enum.Enum): """Known SSH agents. Attributes: UNKNOWN (str): Not a known agent, or not known statically. Pageant (str): The agent from Simon Tatham's PuTTY suite. OpenSSHAgent (str): The agent from OpenBSD's OpenSSH suite. StubbedSSHAgent (str): The stubbed, fake agent pseudo-socket defined in this test suite. """ UNKNOWN = "(unknown)" """""" Pageant = "Pageant" """""" OpenSSHAgent = "OpenSSHAgent" """""" StubbedSSHAgent = "StubbedSSHAgent" """""" class SpawnedSSHAgentInfo(NamedTuple): """Info about a spawned SSH agent, as provided by some fixtures. Differs from [`RunningSSHAgentInfo`][] in that this info object already provides a functional client connected to the agent, but not the address. Attributes: agent_type: The agent's type. client: An SSH agent client connected to this agent. isolated: Whether this agent was spawned specifically for this test suite, with attempts to isolate it from the user. If false, then the user may be interacting with the agent externally, meaning e.g. keys other than the test keys may be visible in this agent. """ agent_type: KnownSSHAgent """""" client: ssh_agent.SSHAgentClient """""" isolated: bool """""" class RunningSSHAgentInfo(NamedTuple): """Info about a running SSH agent, as provided by some fixtures. Differs from [`SpawnedSSHAgentInfo`][] in that this info object provides only an address of the agent, not a functional client already connected to it. The running SSH agent may or may not be isolated. Attributes: socket: A socket address to connect to the agent. agent_type: The agent's type. """ socket: str | type[_types.SSHAgentSocket] """""" agent_type: KnownSSHAgent """""" def require_external_address(self) -> str: # pragma: no cover if not isinstance(self.socket, str): pytest.skip( reason="This test requires a real, externally resolvable " "address for the SSH agent socket." ) return self.socket ALL_KEYS: Mapping[str, SSHTestKey] = { "ed25519": SSHTestKey( private_key=rb"""-----BEGIN OPENSSH PRIVATE KEY----- b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW QyNTUxOQAAACCBeIFoJtYCSF8P/zJIb+TBMIncHGpFBgnpCQ/7whJpdgAAAKDweO7H8Hju xwAAAAtzc2gtZWQyNTUxOQAAACCBeIFoJtYCSF8P/zJIb+TBMIncHGpFBgnpCQ/7whJpdg AAAEAbM/A869nkWZbe2tp3Dm/L6gitvmpH/aRZt8sBII3ExYF4gWgm1gJIXw//Mkhv5MEw idwcakUGCekJD/vCEml2AAAAG3Rlc3Qga2V5IHdpdGhvdXQgcGFzc3BocmFzZQEC -----END OPENSSH PRIVATE KEY----- """, private_key_blob=bytes.fromhex(""" 00 00 00 0b 73 73 68 2d 65 64 32 35 35 31 39 00 00 00 20 81 78 81 68 26 d6 02 48 5f 0f ff 32 48 6f e4 c1 30 89 dc 1c 6a 45 06 09 e9 09 0f fb c2 12 69 76 00 00 00 40 1b 33 f0 3c eb d9 e4 59 96 de da da 77 0e 6f cb ea 08 ad be 6a 47 fd a4 59 b7 cb 01 20 8d c4 c5 81 78 81 68 26 d6 02 48 5f 0f ff 32 48 6f e4 c1 30 89 dc 1c 6a 45 06 09 e9 09 0f fb c2 12 69 76 00 00 00 1b 74 65 73 74 20 6b 65 79 20 77 69 74 68 6f 75 74 20 70 61 73 73 70 68 72 61 73 65 """), public_key=rb"""ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIIF4gWgm1gJIXw//Mkhv5MEwidwcakUGCekJD/vCEml2 test key without passphrase """, public_key_data=bytes.fromhex(""" 00 00 00 0b 73 73 68 2d 65 64 32 35 35 31 39 00 00 00 20 81 78 81 68 26 d6 02 48 5f 0f ff 32 48 6f e4 c1 30 89 dc 1c 6a 45 06 09 e9 09 0f fb c2 12 69 76 """), expected_signatures={ SSHTestKeyDeterministicSignatureClass.SPEC: SSHTestKeyDeterministicSignature( signature=bytes.fromhex(""" 00 00 00 0b 73 73 68 2d 65 64 32 35 35 31 39 00 00 00 40 f0 98 19 80 6c 1a 97 d5 26 03 6e cc e3 65 8f 86 66 07 13 19 13 09 21 33 33 f9 e4 36 53 1d af fd 0d 08 1f ec f8 73 9b 8c 5f 55 39 16 7c 53 54 2c 1e 52 bb 30 ed 7f 89 e2 2f 69 51 55 d8 9e a6 02 """), derived_passphrase=rb"""8JgZgGwal9UmA27M42WPhmYHExkTCSEzM/nkNlMdr/0NCB/s+HObjF9VORZ8U1QsHlK7MO1/ieIvaVFV2J6mAg==""", ), }, ), # Currently only supported by PuTTY (which is deficient in other # niceties of the SSH agent and the agent's client). "ed448": SSHTestKey( private_key=rb"""-----BEGIN OPENSSH PRIVATE KEY----- b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAASgAAAAlz c2gtZWQ0NDgAAAA54vZy009Wu8wExjvEb3hqtLz1GO/+d5vmGUbErWQ4AUO9mYLT zHJHc2m4s+yWzP29Cc3EcxizLG8AAAAA8BdhfCcXYXwnAAAACXNzaC1lZDQ0OAAA ADni9nLTT1a7zATGO8RveGq0vPUY7/53m+YZRsStZDgBQ72ZgtPMckdzabiz7JbM /b0JzcRzGLMsbwAAAAByM7GIMRvWJB3YD6SIpAF2uudX4ozZe0X917wPwiBrs373 9TM1n94Nib6hrxGNmCk2iBQDe2KALPgA4vZy009Wu8wExjvEb3hqtLz1GO/+d5vm GUbErWQ4AUO9mYLTzHJHc2m4s+yWzP29Cc3EcxizLG8AAAAAG3Rlc3Qga2V5IHdp dGhvdXQgcGFzc3BocmFzZQECAwQFBgcICQ== -----END OPENSSH PRIVATE KEY----- """, private_key_blob=bytes.fromhex(""" 00 00 00 09 73 73 68 2d 65 64 34 34 38 00 00 00 39 e2 f6 72 d3 4f 56 bb cc 04 c6 3b c4 6f 78 6a b4 bc f5 18 ef fe 77 9b e6 19 46 c4 ad 64 38 01 43 bd 99 82 d3 cc 72 47 73 69 b8 b3 ec 96 cc fd bd 09 cd c4 73 18 b3 2c 6f 00 00 00 00 72 33 b1 88 31 1b d6 24 1d d8 0f a4 88 a4 01 76 ba e7 57 e2 8c d9 7b 45 fd d7 bc 0f c2 20 6b b3 7e f7 f5 33 35 9f de 0d 89 be a1 af 11 8d 98 29 36 88 14 03 7b 62 80 2c f8 00 e2 f6 72 d3 4f 56 bb cc 04 c6 3b c4 6f 78 6a b4 bc f5 18 ef fe 77 9b e6 19 46 c4 ad 64 38 01 43 bd 99 82 d3 cc 72 47 73 69 b8 b3 ec 96 cc fd bd 09 cd c4 73 18 b3 2c 6f 00 00 00 00 1b 74 65 73 74 20 6b 65 79 20 77 69 74 68 6f 75 74 20 70 61 73 73 70 68 72 61 73 65 """), public_key=rb"""ssh-ed448 AAAACXNzaC1lZDQ0OAAAADni9nLTT1a7zATGO8RveGq0vPUY7/53m+YZRsStZDgBQ72ZgtPMckdzabiz7JbM/b0JzcRzGLMsbwA= test key without passphrase """, public_key_data=bytes.fromhex(""" 00 00 00 09 73 73 68 2d 65 64 34 34 38 00 00 00 39 e2 f6 72 d3 4f 56 bb cc 04 c6 3b c4 6f 78 6a b4 bc f5 18 ef fe 77 9b e6 19 46 c4 ad 64 38 01 43 bd 99 82 d3 cc 72 47 73 69 b8 b3 ec 96 cc fd bd 09 cd c4 73 18 b3 2c 6f 00 """), expected_signatures={ SSHTestKeyDeterministicSignatureClass.SPEC: SSHTestKeyDeterministicSignature( signature=bytes.fromhex(""" 00 00 00 09 73 73 68 2d 65 64 34 34 38 00 00 00 72 06 86 f4 64 a4 a6 ba d9 c3 22 c4 93 49 99 fc 11 de 67 97 08 f2 d8 b7 3c 2c 13 e7 c5 1c 1e 92 a6 0e d8 2f 6d 81 03 82 00 e3 72 e4 32 6d 72 d2 6d 32 84 3f cc a9 1e 57 2c 00 9a b3 99 de 45 da ce 2e d1 db e5 89 f3 35 be 24 58 90 c6 ca 04 f0 db 88 80 db bd 77 7c 80 20 7f 3a 48 61 f6 1f ae a9 5e 53 7b e0 9d 93 1e ea dc eb b5 cd 56 4c ea 8f 08 00 """), derived_passphrase=rb"""Bob0ZKSmutnDIsSTSZn8Ed5nlwjy2Lc8LBPnxRwekqYO2C9tgQOCAONy5DJtctJtMoQ/zKkeVywAmrOZ3kXazi7R2+WJ8zW+JFiQxsoE8NuIgNu9d3yAIH86SGH2H66pXlN74J2THurc67XNVkzqjwgA""", ), }, ), "rsa": SSHTestKey( private_key=rb"""-----BEGIN OPENSSH PRIVATE KEY----- b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAABlwAAAAdzc2gtcn NhAAAAAwEAAQAAAYEAsaHu6Xs4cVsuDSNJlMCqoPVgmDgEviI8TfXmHKqX3JkIqI3LsvV7 Ijf8WCdTveEq7CkuZhImtsR52AOEVAoU8mDXDNr+nJ5wUPzf1UIaRjDe0lcXW4SlF01hQs G4wYDuqxshwelraB/L3e0zhD7fjYHF8IbFsqGlFHWEwOtlfhhfbxJsTGguLm4A8/gdEJD5 2rkqDcZpIXCHtJbCzW9aQpWcs/PDw5ylwl/3dB7jfxyfrGz4O3QrzsqhWEsip97mOmwl6q CHbq8V8x9zu89D/H+bG5ijqxhijbjcVUW3lZfw/97gy9J6rG31HNar5H8GycLTFwuCFepD mTEpNgQLKoe8ePIEPq4WHhFUovBdwlrOByUKKqxreyvWt5gkpTARz+9Lt8OjBO3rpqK8sZ VKH3sE3de2RJM3V9PJdmZSs2b8EFK3PsUGdlMPM9pn1uk4uIItKWBmooOynuD8Ll6aPwuW AFn3l8nLLyWdrmmEYzHWXiRjQJxy1Bi5AbHMOWiPAAAFkDPkuBkz5LgZAAAAB3NzaC1yc2 EAAAGBALGh7ul7OHFbLg0jSZTAqqD1YJg4BL4iPE315hyql9yZCKiNy7L1eyI3/FgnU73h KuwpLmYSJrbEedgDhFQKFPJg1wza/pyecFD839VCGkYw3tJXF1uEpRdNYULBuMGA7qsbIc Hpa2gfy93tM4Q+342BxfCGxbKhpRR1hMDrZX4YX28SbExoLi5uAPP4HRCQ+dq5Kg3GaSFw h7SWws1vWkKVnLPzw8OcpcJf93Qe438cn6xs+Dt0K87KoVhLIqfe5jpsJeqgh26vFfMfc7 vPQ/x/mxuYo6sYYo243FVFt5WX8P/e4MvSeqxt9RzWq+R/BsnC0xcLghXqQ5kxKTYECyqH vHjyBD6uFh4RVKLwXcJazgclCiqsa3sr1reYJKUwEc/vS7fDowTt66aivLGVSh97BN3Xtk STN1fTyXZmUrNm/BBStz7FBnZTDzPaZ9bpOLiCLSlgZqKDsp7g/C5emj8LlgBZ95fJyy8l na5phGMx1l4kY0CcctQYuQGxzDlojwAAAAMBAAEAAAF/cNVYT+Om4x9+SItcz5bOByGIOj yWUH8f9rRjnr5ILuwabIDgvFaVG+xM1O1hWADqzMnSEcknHRkTYEsqYPykAtxFvjOFEh70 6qRUJ+fVZkqRGEaI3oWyWKTOhcCIYImtONvb0LOv/HQ2H2AXCoeqjST1qr/xSuljBtcB8u wxs3EqaO1yU7QoZpDcMX9plH7Rmc9nNfZcgrnktPk2deX2+Y/A5tzdVgG1IeqYp6CBMLNM uhL0OPdDehgBoDujx+rhkZ1gpo1wcULIM94NL7VSHBPX0Lgh9T+3j1HVP+YnMAvhfOvfct LlbJ06+TYGRAMuF2LPCAZM/m0FEyAurRgWxAjLXm+4kp2GAJXlw82deDkQ+P8cHNT6s9ZH R5YSy3lpZ35594ZMOLR8KqVvhgJGF6i9019BiF91SDxjE+sp6dNGfN8W+64tHdDv2a0Mso +8Qjyx7sTpi++EjLU8Iy73/e4B8qbXMyheyA/UUfgMtNKShh6sLlrD9h2Sm9RFTuEAAADA Jh3u7WfnjhhKZYbAW4TsPNXDMrB0/t7xyAQgFmko7JfESyrJSLg1cO+QMOiDgD7zuQ9RSp NIKdPsnIna5peh979mVjb2HgnikjyJECmBpLdwZKhX7MnIvgKw5lnQXHboEtWCa1N58l7f srzwbi9pFUuUp9dShXNffmlUCjDRsVLbK5C6+iaIQyCWFYK8mc6dpNkIoPKf+Xg+EJCIFQ oITqeu30Gc1+M+fdZc2ghq0b6XLthh/uHEry8b68M5KglMAAAAwQDw1i+IdcvPV/3u/q9O /kzLpKO3tbT89sc1zhjZsDNjDAGluNr6n38iq/XYRZu7UTL9BG+EgFVfIUV7XsYT5e+BPf 13VS94rzZ7maCsOlULX+VdMO2zBucHIoec9RUlRZrfB21B2W7YGMhbpoa5lN3lKJQ7afHo dXZUMp0cTFbOmbzJgSzO2/NE7BhVwmvcUzTDJGMMKuxBO6w99YKDKRKm0PNLFDz26rWm9L dNS2MVfVuPMTpzT26HQG4pFageq9cAAADBALzRBXdZF8kbSBa5MTUBVTTzgKQm1C772gJ8 T01DJEXZsVtOv7mUC1/m/by6Hk4tPyvDBuGj9hHq4N7dPqGutHb1q5n0ADuoQjRW7BXw5Q vC2EAD91xexdorIA5BgXU+qltBqzzBVzVtF7+jOZOjfzOlaTX9I5I5veyeTaTxZj1XXUzi btBNdMEJJp7ifucYmoYAAwE7K+VlWagDEK2y8Mte9y9E+N0uO2j+h85sQt/UIb2iE/vhcg Bgp6142WnSCQAAABt0ZXN0IGtleSB3aXRob3V0IHBhc3NwaHJhc2UB -----END OPENSSH PRIVATE KEY----- """, private_key_blob=bytes.fromhex(""" 00 00 00 07 73 73 68 2d 72 73 61 00 00 01 81 00 b1 a1 ee e9 7b 38 71 5b 2e 0d 23 49 94 c0 aa a0 f5 60 98 38 04 be 22 3c 4d f5 e6 1c aa 97 dc 99 08 a8 8d cb b2 f5 7b 22 37 fc 58 27 53 bd e1 2a ec 29 2e 66 12 26 b6 c4 79 d8 03 84 54 0a 14 f2 60 d7 0c da fe 9c 9e 70 50 fc df d5 42 1a 46 30 de d2 57 17 5b 84 a5 17 4d 61 42 c1 b8 c1 80 ee ab 1b 21 c1 e9 6b 68 1f cb dd ed 33 84 3e df 8d 81 c5 f0 86 c5 b2 a1 a5 14 75 84 c0 eb 65 7e 18 5f 6f 12 6c 4c 68 2e 2e 6e 00 f3 f8 1d 10 90 f9 da b9 2a 0d c6 69 21 70 87 b4 96 c2 cd 6f 5a 42 95 9c b3 f3 c3 c3 9c a5 c2 5f f7 74 1e e3 7f 1c 9f ac 6c f8 3b 74 2b ce ca a1 58 4b 22 a7 de e6 3a 6c 25 ea a0 87 6e af 15 f3 1f 73 bb cf 43 fc 7f 9b 1b 98 a3 ab 18 62 8d b8 dc 55 45 b7 95 97 f0 ff de e0 cb d2 7a ac 6d f5 1c d6 ab e4 7f 06 c9 c2 d3 17 0b 82 15 ea 43 99 31 29 36 04 0b 2a 87 bc 78 f2 04 3e ae 16 1e 11 54 a2 f0 5d c2 5a ce 07 25 0a 2a ac 6b 7b 2b d6 b7 98 24 a5 30 11 cf ef 4b b7 c3 a3 04 ed eb a6 a2 bc b1 95 4a 1f 7b 04 dd d7 b6 44 93 37 57 d3 c9 76 66 52 b3 66 fc 10 52 b7 3e c5 06 76 53 0f 33 da 67 d6 e9 38 b8 82 2d 29 60 66 a2 83 b2 9e e0 fc 2e 5e 9a 3f 0b 96 00 59 f7 97 c9 cb 2f 25 9d ae 69 84 63 31 d6 5e 24 63 40 9c 72 d4 18 b9 01 b1 cc 39 68 8f 00 00 00 03 01 00 01 00 00 01 7f 70 d5 58 4f e3 a6 e3 1f 7e 48 8b 5c cf 96 ce 07 21 88 3a 3c 96 50 7f 1f f6 b4 63 9e be 48 2e ec 1a 6c 80 e0 bc 56 95 1b ec 4c d4 ed 61 58 00 ea cc c9 d2 11 c9 27 1d 19 13 60 4b 2a 60 fc a4 02 dc 45 be 33 85 12 1e f4 ea a4 54 27 e7 d5 66 4a 91 18 46 88 de 85 b2 58 a4 ce 85 c0 88 60 89 ad 38 db db d0 b3 af fc 74 36 1f 60 17 0a 87 aa 8d 24 f5 aa bf f1 4a e9 63 06 d7 01 f2 ec 31 b3 71 2a 68 ed 72 53 b4 28 66 90 dc 31 7f 69 94 7e d1 99 cf 67 35 f6 5c 82 b9 e4 b4 f9 36 75 e5 f6 f9 8f c0 e6 dc dd 56 01 b5 21 ea 98 a7 a0 81 30 b3 4c ba 12 f4 38 f7 43 7a 18 01 a0 3b a3 c7 ea e1 91 9d 60 a6 8d 70 71 42 c8 33 de 0d 2f b5 52 1c 13 d7 d0 b8 21 f5 3f b7 8f 51 d5 3f e6 27 30 0b e1 7c eb df 72 d2 e5 6c 9d 3a f9 36 06 44 03 2e 17 62 cf 08 06 4c fe 6d 05 13 20 2e ad 18 16 c4 08 cb 5e 6f b8 92 9d 86 00 95 e5 c3 cd 9d 78 39 10 f8 ff 1c 1c d4 fa b3 d6 47 47 96 12 cb 79 69 67 7e 79 f7 86 4c 38 b4 7c 2a a5 6f 86 02 46 17 a8 bd d3 5f 41 88 5f 75 48 3c 63 13 eb 29 e9 d3 46 7c df 16 fb ae 2d 1d d0 ef d9 ad 0c b2 8f bc 42 3c b1 ee c4 e9 8b ef 84 8c b5 3c 23 2e f7 fd ee 01 f2 a6 d7 33 28 5e c8 0f d4 51 f8 0c b4 d2 92 86 1e ac 2e 5a c3 f6 1d 92 9b d4 45 4e e1 00 00 00 c0 26 1d ee ed 67 e7 8e 18 4a 65 86 c0 5b 84 ec 3c d5 c3 32 b0 74 fe de f1 c8 04 20 16 69 28 ec 97 c4 4b 2a c9 48 b8 35 70 ef 90 30 e8 83 80 3e f3 b9 0f 51 4a 93 48 29 d3 ec 9c 89 da e6 97 a1 f7 bf 66 56 36 f6 1e 09 e2 92 3c 89 10 29 81 a4 b7 70 64 a8 57 ec c9 c8 be 02 b0 e6 59 d0 5c 76 e8 12 d5 82 6b 53 79 f2 5e df b2 bc f0 6e 2f 69 15 4b 94 a7 d7 52 85 73 5f 7e 69 54 0a 30 d1 b1 52 db 2b 90 ba fa 26 88 43 20 96 15 82 bc 99 ce 9d a4 d9 08 a0 f2 9f f9 78 3e 10 90 88 15 0a 08 4e a7 ae df 41 9c d7 e3 3e 7d d6 5c da 08 6a d1 be 97 2e d8 61 fe e1 c4 af 2f 1b eb c3 39 2a 09 4c 00 00 00 c1 00 f0 d6 2f 88 75 cb cf 57 fd ee fe af 4e fe 4c cb a4 a3 b7 b5 b4 fc f6 c7 35 ce 18 d9 b0 33 63 0c 01 a5 b8 da fa 9f 7f 22 ab f5 d8 45 9b bb 51 32 fd 04 6f 84 80 55 5f 21 45 7b 5e c6 13 e5 ef 81 3d fd 77 55 2f 78 af 36 7b 99 a0 ac 3a 55 0b 5f e5 5d 30 ed b3 06 e7 07 22 87 9c f5 15 25 45 9a df 07 6d 41 d9 6e d8 18 c8 5b a6 86 b9 94 dd e5 28 94 3b 69 f1 e8 75 76 54 32 9d 1c 4c 56 ce 99 bc c9 81 2c ce db f3 44 ec 18 55 c2 6b dc 53 34 c3 24 63 0c 2a ec 41 3b ac 3d f5 82 83 29 12 a6 d0 f3 4b 14 3c f6 ea b5 a6 f4 b7 4d 4b 63 15 7d 5b 8f 31 3a 73 4f 6e 87 40 6e 29 15 a8 1e ab d7 00 00 00 c1 00 bc d1 05 77 59 17 c9 1b 48 16 b9 31 35 01 55 34 f3 80 a4 26 d4 2e fb da 02 7c 4f 4d 43 24 45 d9 b1 5b 4e bf b9 94 0b 5f e6 fd bc ba 1e 4e 2d 3f 2b c3 06 e1 a3 f6 11 ea e0 de dd 3e a1 ae b4 76 f5 ab 99 f4 00 3b a8 42 34 56 ec 15 f0 e5 0b c2 d8 40 03 f7 5c 5e c5 da 2b 20 0e 41 81 75 3e aa 5b 41 ab 3c c1 57 35 6d 17 bf a3 39 93 a3 7f 33 a5 69 35 fd 23 92 39 bd ec 9e 4d a4 f1 66 3d 57 5d 4c e2 6e d0 4d 74 c1 09 26 9e e2 7e e7 18 9a 86 00 03 01 3b 2b e5 65 59 a8 03 10 ad b2 f0 cb 5e f7 2f 44 f8 dd 2e 3b 68 fe 87 ce 6c 42 df d4 21 bd a2 13 fb e1 72 00 60 a7 ad 78 d9 69 d2 09 00 00 00 1b 74 65 73 74 20 6b 65 79 20 77 69 74 68 6f 75 74 20 70 61 73 73 70 68 72 61 73 65 """), public_key=rb"""ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQCxoe7pezhxWy4NI0mUwKqg9WCYOAS+IjxN9eYcqpfcmQiojcuy9XsiN/xYJ1O94SrsKS5mEia2xHnYA4RUChTyYNcM2v6cnnBQ/N/VQhpGMN7SVxdbhKUXTWFCwbjBgO6rGyHB6WtoH8vd7TOEPt+NgcXwhsWyoaUUdYTA62V+GF9vEmxMaC4ubgDz+B0QkPnauSoNxmkhcIe0lsLNb1pClZyz88PDnKXCX/d0HuN/HJ+sbPg7dCvOyqFYSyKn3uY6bCXqoIdurxXzH3O7z0P8f5sbmKOrGGKNuNxVRbeVl/D/3uDL0nqsbfUc1qvkfwbJwtMXC4IV6kOZMSk2BAsqh7x48gQ+rhYeEVSi8F3CWs4HJQoqrGt7K9a3mCSlMBHP70u3w6ME7eumoryxlUofewTd17ZEkzdX08l2ZlKzZvwQUrc+xQZ2Uw8z2mfW6Ti4gi0pYGaig7Ke4PwuXpo/C5YAWfeXycsvJZ2uaYRjMdZeJGNAnHLUGLkBscw5aI8= test key without passphrase """, public_key_data=bytes.fromhex(""" 00 00 00 07 73 73 68 2d 72 73 61 00 00 00 03 01 00 01 00 00 01 81 00 b1 a1 ee e9 7b 38 71 5b 2e 0d 23 49 94 c0 aa a0 f5 60 98 38 04 be 22 3c 4d f5 e6 1c aa 97 dc 99 08 a8 8d cb b2 f5 7b 22 37 fc 58 27 53 bd e1 2a ec 29 2e 66 12 26 b6 c4 79 d8 03 84 54 0a 14 f2 60 d7 0c da fe 9c 9e 70 50 fc df d5 42 1a 46 30 de d2 57 17 5b 84 a5 17 4d 61 42 c1 b8 c1 80 ee ab 1b 21 c1 e9 6b 68 1f cb dd ed 33 84 3e df 8d 81 c5 f0 86 c5 b2 a1 a5 14 75 84 c0 eb 65 7e 18 5f 6f 12 6c 4c 68 2e 2e 6e 00 f3 f8 1d 10 90 f9 da b9 2a 0d c6 69 21 70 87 b4 96 c2 cd 6f 5a 42 95 9c b3 f3 c3 c3 9c a5 c2 5f f7 74 1e e3 7f 1c 9f ac 6c f8 3b 74 2b ce ca a1 58 4b 22 a7 de e6 3a 6c 25 ea a0 87 6e af 15 f3 1f 73 bb cf 43 fc 7f 9b 1b 98 a3 ab 18 62 8d b8 dc 55 45 b7 95 97 f0 ff de e0 cb d2 7a ac 6d f5 1c d6 ab e4 7f 06 c9 c2 d3 17 0b 82 15 ea 43 99 31 29 36 04 0b 2a 87 bc 78 f2 04 3e ae 16 1e 11 54 a2 f0 5d c2 5a ce 07 25 0a 2a ac 6b 7b 2b d6 b7 98 24 a5 30 11 cf ef 4b b7 c3 a3 04 ed eb a6 a2 bc b1 95 4a 1f 7b 04 dd d7 b6 44 93 37 57 d3 c9 76 66 52 b3 66 fc 10 52 b7 3e c5 06 76 53 0f 33 da 67 d6 e9 38 b8 82 2d 29 60 66 a2 83 b2 9e e0 fc 2e 5e 9a 3f 0b 96 00 59 f7 97 c9 cb 2f 25 9d ae 69 84 63 31 d6 5e 24 63 40 9c 72 d4 18 b9 01 b1 cc 39 68 8f """), expected_signatures={ SSHTestKeyDeterministicSignatureClass.SPEC: SSHTestKeyDeterministicSignature( signature=bytes.fromhex(""" 00 00 00 07 73 73 68 2d 72 73 61 00 00 01 80 a2 10 7c 2e f6 bb 53 a8 74 2a a1 19 99 ad 81 be 79 9c ed d6 9d 09 4e 6e c5 18 48 33 90 77 99 68 f7 9e 03 5a cd 4e 18 eb 89 7d 85 a2 ee ae 4a 92 f6 6f ce b9 fe 86 7f 2a 6b 31 da 6e 1a fe a2 a5 88 b8 44 7f a1 76 73 b3 ec 75 b5 d0 a6 b9 15 97 65 09 13 7d 94 21 d1 fb 5d 0f 8b 23 04 77 c2 c3 55 22 b1 a0 09 8a f5 38 2a d6 7f 1b 87 29 a0 25 d3 25 6f cb 64 61 07 98 dc 14 c5 84 f8 92 24 5e 50 11 6b 49 e5 f0 cc 29 cb 29 a9 19 d8 a7 71 1f 91 0b 05 b1 01 4b c2 5f 00 a5 b6 21 bf f8 2c 9d 67 9b 47 3b 0a 49 6b 79 2d fc 1d ec 0c b0 e5 27 22 d5 a9 f8 d3 c3 f9 df 48 68 e9 fb ef 3c dc 26 bf cf ea 29 43 01 a6 e3 c5 51 95 f4 66 6d 8a 55 e2 47 ec e8 30 45 4c ae 47 e7 c9 a4 21 8b 64 ba b6 88 f6 21 f8 73 b9 cb 11 a1 78 75 92 c6 5a e5 64 fe ed 42 d9 95 99 e6 2b 6f 3c 16 3c 28 74 a4 72 2f 0d 3f 2c 33 67 aa 35 19 8e e7 b5 11 2f b3 f7 6a c5 02 e2 6f a3 42 e3 62 19 99 03 ea a5 20 e7 a1 e3 bc c8 06 a3 b5 7c d6 76 5d df 6f 60 46 83 2a 08 00 d6 d3 d9 a4 c1 41 8c f8 60 56 45 81 da 3b a2 16 1f 9e 4e 75 83 17 da c3 53 c3 3e 19 a4 1b bc d2 29 b8 78 61 2b 78 e6 b1 52 b0 d5 ec de 69 2c 48 62 d9 fd d1 9b 6b b0 49 db d3 ff 38 e7 10 d9 2d ce 9f 0d 5e 09 7b 37 d2 7b c3 bf ce """), derived_passphrase=rb"""ohB8Lva7U6h0KqEZma2Bvnmc7dadCU5uxRhIM5B3mWj3ngNazU4Y64l9haLurkqS9m/Ouf6GfyprMdpuGv6ipYi4RH+hdnOz7HW10Ka5FZdlCRN9lCHR+10PiyMEd8LDVSKxoAmK9Tgq1n8bhymgJdMlb8tkYQeY3BTFhPiSJF5QEWtJ5fDMKcspqRnYp3EfkQsFsQFLwl8ApbYhv/gsnWebRzsKSWt5Lfwd7Ayw5Sci1an408P530ho6fvvPNwmv8/qKUMBpuPFUZX0Zm2KVeJH7OgwRUyuR+fJpCGLZLq2iPYh+HO5yxGheHWSxlrlZP7tQtmVmeYrbzwWPCh0pHIvDT8sM2eqNRmO57URL7P3asUC4m+jQuNiGZkD6qUg56HjvMgGo7V81nZd329gRoMqCADW09mkwUGM+GBWRYHaO6IWH55OdYMX2sNTwz4ZpBu80im4eGEreOaxUrDV7N5pLEhi2f3Rm2uwSdvT/zjnENktzp8NXgl7N9J7w7/O""", ), }, ), "dsa1024": SSHTestKey( private_key=rb"""-----BEGIN OPENSSH PRIVATE KEY----- b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAABsQAAAAdzc2gtZH NzAAAAgQC7KAZXqBGNVLBQPrcMYAoNW54BhD8aIhe7BDWYzJcsaMt72VKSkguZ8+XR7nRa 0C/ZsBi+uJp0dpxy9ZMTOWX4u5YPMeQcXEdGExZIfimGqSOAsy6fCld2IfJZJZExcCmhe9 Ssjsd3YSAPJRluOXFQc95MZoR5hMwlIDD8QzrE7QAAABUA99nOZOgd7aHMVGoXpUEBcn7H ossAAACALr2Ag3hxM3rKdxzVUw8fX0VVPXO+3+Kr8hGe0Kc/7NwVaBVL1GQ8fenBuWynpA UbH0wo3h1wkB/8hX6p+S8cnu5rIBlUuVNwLw/bIYohK98LfqTYK/V+g6KD+8m34wvEiXZm qywY54n2bksch1Nqvj/tNpLzExSx/XS0kSM1aigAAACAbQNRPcVEuGDrEcf+xg5tgAejPX BPXr/Jss+Chk64km3mirMYjAWyWYtVcgT+7hOYxtYRin8LyMLqKRmqa0Q5UrvDfChgLhvs G9YSb/Mpw5qm8PiHSafwhkaz/te3+8hKogqoe7sd+tCF06IpJr5k70ACiNtRGqssNF8Elr l1efYAAAH4swlfVrMJX1YAAAAHc3NoLWRzcwAAAIEAuygGV6gRjVSwUD63DGAKDVueAYQ/ GiIXuwQ1mMyXLGjLe9lSkpILmfPl0e50WtAv2bAYvriadHaccvWTEzll+LuWDzHkHFxHRh MWSH4phqkjgLMunwpXdiHyWSWRMXApoXvUrI7Hd2EgDyUZbjlxUHPeTGaEeYTMJSAw/EM6 xO0AAAAVAPfZzmToHe2hzFRqF6VBAXJ+x6LLAAAAgC69gIN4cTN6yncc1VMPH19FVT1zvt /iq/IRntCnP+zcFWgVS9RkPH3pwblsp6QFGx9MKN4dcJAf/IV+qfkvHJ7uayAZVLlTcC8P 2yGKISvfC36k2Cv1foOig/vJt+MLxIl2ZqssGOeJ9m5LHIdTar4/7TaS8xMUsf10tJEjNW ooAAAAgG0DUT3FRLhg6xHH/sYObYAHoz1wT16/ybLPgoZOuJJt5oqzGIwFslmLVXIE/u4T mMbWEYp/C8jC6ikZqmtEOVK7w3woYC4b7BvWEm/zKcOapvD4h0mn8IZGs/7Xt/vISqIKqH u7HfrQhdOiKSa+ZO9AAojbURqrLDRfBJa5dXn2AAAAFQDJHfenj4EJ9WkehpdJatPBlqCW 0gAAABt0ZXN0IGtleSB3aXRob3V0IHBhc3NwaHJhc2UBAgMEBQYH -----END OPENSSH PRIVATE KEY----- """, private_key_blob=bytes.fromhex(""" 00 00 00 07 73 73 68 2d 64 73 73 00 00 00 81 00 bb 28 06 57 a8 11 8d 54 b0 50 3e b7 0c 60 0a 0d 5b 9e 01 84 3f 1a 22 17 bb 04 35 98 cc 97 2c 68 cb 7b d9 52 92 92 0b 99 f3 e5 d1 ee 74 5a d0 2f d9 b0 18 be b8 9a 74 76 9c 72 f5 93 13 39 65 f8 bb 96 0f 31 e4 1c 5c 47 46 13 16 48 7e 29 86 a9 23 80 b3 2e 9f 0a 57 76 21 f2 59 25 91 31 70 29 a1 7b d4 ac 8e c7 77 61 20 0f 25 19 6e 39 71 50 73 de 4c 66 84 79 84 cc 25 20 30 fc 43 3a c4 ed 00 00 00 15 00 f7 d9 ce 64 e8 1d ed a1 cc 54 6a 17 a5 41 01 72 7e c7 a2 cb 00 00 00 80 2e bd 80 83 78 71 33 7a ca 77 1c d5 53 0f 1f 5f 45 55 3d 73 be df e2 ab f2 11 9e d0 a7 3f ec dc 15 68 15 4b d4 64 3c 7d e9 c1 b9 6c a7 a4 05 1b 1f 4c 28 de 1d 70 90 1f fc 85 7e a9 f9 2f 1c 9e ee 6b 20 19 54 b9 53 70 2f 0f db 21 8a 21 2b df 0b 7e a4 d8 2b f5 7e 83 a2 83 fb c9 b7 e3 0b c4 89 76 66 ab 2c 18 e7 89 f6 6e 4b 1c 87 53 6a be 3f ed 36 92 f3 13 14 b1 fd 74 b4 91 23 35 6a 28 00 00 00 80 6d 03 51 3d c5 44 b8 60 eb 11 c7 fe c6 0e 6d 80 07 a3 3d 70 4f 5e bf c9 b2 cf 82 86 4e b8 92 6d e6 8a b3 18 8c 05 b2 59 8b 55 72 04 fe ee 13 98 c6 d6 11 8a 7f 0b c8 c2 ea 29 19 aa 6b 44 39 52 bb c3 7c 28 60 2e 1b ec 1b d6 12 6f f3 29 c3 9a a6 f0 f8 87 49 a7 f0 86 46 b3 fe d7 b7 fb c8 4a a2 0a a8 7b bb 1d fa d0 85 d3 a2 29 26 be 64 ef 40 02 88 db 51 1a ab 2c 34 5f 04 96 b9 75 79 f6 00 00 00 15 00 c9 1d f7 a7 8f 81 09 f5 69 1e 86 97 49 6a d3 c1 96 a0 96 d2 00 00 00 1b 74 65 73 74 20 6b 65 79 20 77 69 74 68 6f 75 74 20 70 61 73 73 70 68 72 61 73 65 """), public_key=rb"""ssh-dss AAAAB3NzaC1kc3MAAACBALsoBleoEY1UsFA+twxgCg1bngGEPxoiF7sENZjMlyxoy3vZUpKSC5nz5dHudFrQL9mwGL64mnR2nHL1kxM5Zfi7lg8x5BxcR0YTFkh+KYapI4CzLp8KV3Yh8lklkTFwKaF71KyOx3dhIA8lGW45cVBz3kxmhHmEzCUgMPxDOsTtAAAAFQD32c5k6B3tocxUahelQQFyfseiywAAAIAuvYCDeHEzesp3HNVTDx9fRVU9c77f4qvyEZ7Qpz/s3BVoFUvUZDx96cG5bKekBRsfTCjeHXCQH/yFfqn5Lxye7msgGVS5U3AvD9shiiEr3wt+pNgr9X6DooP7ybfjC8SJdmarLBjnifZuSxyHU2q+P+02kvMTFLH9dLSRIzVqKAAAAIBtA1E9xUS4YOsRx/7GDm2AB6M9cE9ev8myz4KGTriSbeaKsxiMBbJZi1VyBP7uE5jG1hGKfwvIwuopGaprRDlSu8N8KGAuG+wb1hJv8ynDmqbw+IdJp/CGRrP+17f7yEqiCqh7ux360IXToikmvmTvQAKI21Eaqyw0XwSWuXV59g== test key without passphrase """, public_key_data=bytes.fromhex(""" 00 00 00 07 73 73 68 2d 64 73 73 00 00 00 81 00 bb 28 06 57 a8 11 8d 54 b0 50 3e b7 0c 60 0a 0d 5b 9e 01 84 3f 1a 22 17 bb 04 35 98 cc 97 2c 68 cb 7b d9 52 92 92 0b 99 f3 e5 d1 ee 74 5a d0 2f d9 b0 18 be b8 9a 74 76 9c 72 f5 93 13 39 65 f8 bb 96 0f 31 e4 1c 5c 47 46 13 16 48 7e 29 86 a9 23 80 b3 2e 9f 0a 57 76 21 f2 59 25 91 31 70 29 a1 7b d4 ac 8e c7 77 61 20 0f 25 19 6e 39 71 50 73 de 4c 66 84 79 84 cc 25 20 30 fc 43 3a c4 ed 00 00 00 15 00 f7 d9 ce 64 e8 1d ed a1 cc 54 6a 17 a5 41 01 72 7e c7 a2 cb 00 00 00 80 2e bd 80 83 78 71 33 7a ca 77 1c d5 53 0f 1f 5f 45 55 3d 73 be df e2 ab f2 11 9e d0 a7 3f ec dc 15 68 15 4b d4 64 3c 7d e9 c1 b9 6c a7 a4 05 1b 1f 4c 28 de 1d 70 90 1f fc 85 7e a9 f9 2f 1c 9e ee 6b 20 19 54 b9 53 70 2f 0f db 21 8a 21 2b df 0b 7e a4 d8 2b f5 7e 83 a2 83 fb c9 b7 e3 0b c4 89 76 66 ab 2c 18 e7 89 f6 6e 4b 1c 87 53 6a be 3f ed 36 92 f3 13 14 b1 fd 74 b4 91 23 35 6a 28 00 00 00 80 6d 03 51 3d c5 44 b8 60 eb 11 c7 fe c6 0e 6d 80 07 a3 3d 70 4f 5e bf c9 b2 cf 82 86 4e b8 92 6d e6 8a b3 18 8c 05 b2 59 8b 55 72 04 fe ee 13 98 c6 d6 11 8a 7f 0b c8 c2 ea 29 19 aa 6b 44 39 52 bb c3 7c 28 60 2e 1b ec 1b d6 12 6f f3 29 c3 9a a6 f0 f8 87 49 a7 f0 86 46 b3 fe d7 b7 fb c8 4a a2 0a a8 7b bb 1d fa d0 85 d3 a2 29 26 be 64 ef 40 02 88 db 51 1a ab 2c 34 5f 04 96 b9 75 79 f6 """), expected_signatures={ SSHTestKeyDeterministicSignatureClass.RFC_6979: SSHTestKeyDeterministicSignature( signature=bytes.fromhex(""" 00 00 00 07 73 73 68 2d 64 73 73 00 00 00 28 11 5f 4d 13 c2 ee 61 97 1e f6 23 14 3b 2b dd cf 06 c0 71 13 cc ac 34 19 ad 36 8d 79 aa 25 fb 5e 4f ea fe 6b 5b fa 57 42 """), derived_passphrase=rb"""EV9NE8LuYZce9iMUOyvdzwbAcRPMrDQZrTaNeaol+15P6v5rW/pXQg==""", signature_class=SSHTestKeyDeterministicSignatureClass.RFC_6979, ), SSHTestKeyDeterministicSignatureClass.Pageant_068_080: SSHTestKeyDeterministicSignature( signature=bytes.fromhex(""" 00 00 00 07 73 73 68 2d 64 73 73 00 00 00 28 0b f7 a8 ab 89 f5 b6 c4 1c 9b 78 2c 46 35 69 e2 88 b7 eb 55 37 48 7f 6d 49 a1 e6 de 58 1a 04 eb e6 28 99 0e 3c fd 3b 48 """), derived_passphrase=rb"""C/eoq4n1tsQcm3gsRjVp4oi361U3SH9tSaHm3lgaBOvmKJkOPP07SA==""", signature_class=SSHTestKeyDeterministicSignatureClass.Pageant_068_080, ), }, ), "ecdsa256": SSHTestKey( private_key=rb"""-----BEGIN OPENSSH PRIVATE KEY----- b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAaAAAABNlY2RzYS 1zaGEyLW5pc3RwMjU2AAAACG5pc3RwMjU2AAAAQQTLbU0zDwsk2Dvp+VYIrsNVf5gWwz2S 3SZ8TbxiQRkpnGSVqyIoHJOJc+NQItAa7xlJ/8Z6gfz57Z3apUkaMJm6AAAAuKeY+YinmP mIAAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBMttTTMPCyTYO+n5 Vgiuw1V/mBbDPZLdJnxNvGJBGSmcZJWrIigck4lz41Ai0BrvGUn/xnqB/PntndqlSRowmb oAAAAhAKIl/3n0pKVIxpZkXTGtii782Qr4yIcvHdpxjO/QsIqKAAAAG3Rlc3Qga2V5IHdp dGhvdXQgcGFzc3BocmFzZQECAwQ= -----END OPENSSH PRIVATE KEY----- """, private_key_blob=bytes.fromhex(""" 00 00 00 13 65 63 64 73 61 2d 73 68 61 32 2d 6e 69 73 74 70 32 35 36 00 00 00 08 6e 69 73 74 70 32 35 36 00 00 00 41 04 cb 6d 4d 33 0f 0b 24 d8 3b e9 f9 56 08 ae c3 55 7f 98 16 c3 3d 92 dd 26 7c 4d bc 62 41 19 29 9c 64 95 ab 22 28 1c 93 89 73 e3 50 22 d0 1a ef 19 49 ff c6 7a 81 fc f9 ed 9d da a5 49 1a 30 99 ba 00 00 00 21 00 a2 25 ff 79 f4 a4 a5 48 c6 96 64 5d 31 ad 8a 2e fc d9 0a f8 c8 87 2f 1d da 71 8c ef d0 b0 8a 8a 00 00 00 1b 74 65 73 74 20 6b 65 79 20 77 69 74 68 6f 75 74 20 70 61 73 73 70 68 72 61 73 65 """), public_key=rb"""ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBMttTTMPCyTYO+n5Vgiuw1V/mBbDPZLdJnxNvGJBGSmcZJWrIigck4lz41Ai0BrvGUn/xnqB/PntndqlSRowmbo= test key without passphrase """, public_key_data=bytes.fromhex(""" 00 00 00 13 65 63 64 73 61 2d 73 68 61 32 2d 6e 69 73 74 70 32 35 36 00 00 00 08 6e 69 73 74 70 32 35 36 00 00 00 41 04 cb 6d 4d 33 0f 0b 24 d8 3b e9 f9 56 08 ae c3 55 7f 98 16 c3 3d 92 dd 26 7c 4d bc 62 41 19 29 9c 64 95 ab 22 28 1c 93 89 73 e3 50 22 d0 1a ef 19 49 ff c6 7a 81 fc f9 ed 9d da a5 49 1a 30 99 ba """), expected_signatures={ SSHTestKeyDeterministicSignatureClass.RFC_6979: SSHTestKeyDeterministicSignature( signature=bytes.fromhex(""" 00 00 00 13 65 63 64 73 61 2d 73 68 61 32 2d 6e 69 73 74 70 32 35 36 00 00 00 49 00 00 00 20 22 ad 23 8a 9c 5d ca 4e ea 73 e7 29 77 ab a8 b2 2e 01 d8 de 11 ae c9 b3 57 ce d5 84 9c 85 73 eb 00 00 00 21 00 9b 1a cb dd 45 89 f0 37 95 9c a2 d8 ac c3 f7 71 55 33 50 86 9e cb 3a 95 e4 68 80 1a 9d d6 d5 bc """), derived_passphrase=rb"""AAAAICKtI4qcXcpO6nPnKXerqLIuAdjeEa7Js1fO1YSchXPrAAAAIQCbGsvdRYnwN5Wcotisw/dxVTNQhp7LOpXkaIAandbVvA==""", signature_class=SSHTestKeyDeterministicSignatureClass.RFC_6979, ), SSHTestKeyDeterministicSignatureClass.Pageant_068_080: SSHTestKeyDeterministicSignature( signature=bytes.fromhex(""" 00 00 00 13 65 63 64 73 61 2d 73 68 61 32 2d 6e 69 73 74 70 32 35 36 00 00 00 49 00 00 00 21 00 b7 9e 4f ec ec 9b 77 dd 12 d9 43 a2 f5 bf b5 34 91 e0 89 44 e6 20 48 36 fa 75 22 77 86 38 de 21 00 00 00 20 3f d8 04 0f fa f5 bc d2 26 e0 4c 0c 77 5d 0e 08 ec 30 04 8e 42 58 41 96 f6 7e 4f d2 14 39 f4 87 """), derived_passphrase=rb"""AAAAIQC3nk/s7Jt33RLZQ6L1v7U0keCJROYgSDb6dSJ3hjjeIQAAACA/2AQP+vW80ibgTAx3XQ4I7DAEjkJYQZb2fk/SFDn0hw==""", signature_class=SSHTestKeyDeterministicSignatureClass.Pageant_068_080, ), }, ), "ecdsa384": SSHTestKey( private_key=rb"""-----BEGIN OPENSSH PRIVATE KEY----- b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAiAAAABNlY2RzYS 1zaGEyLW5pc3RwMzg0AAAACG5pc3RwMzg0AAAAYQSgkOjkAvq7v5vHuj3KBL4/EAWcn5hZ DyKcbyV0eBMGFq7hKXQlZqIahLVqeMR0QqmkxNJ2rly2VHcXneq3vZ+9fIsWCOdYk5WP3N ZPzv911Xn7wbEkC7QndD5zKlm4pBUAAADomhj+IZoY/iEAAAATZWNkc2Etc2hhMi1uaXN0 cDM4NAAAAAhuaXN0cDM4NAAAAGEEoJDo5AL6u7+bx7o9ygS+PxAFnJ+YWQ8inG8ldHgTBh au4Sl0JWaiGoS1anjEdEKppMTSdq5ctlR3F53qt72fvXyLFgjnWJOVj9zWT87/ddV5+8Gx JAu0J3Q+cypZuKQVAAAAMQD5sTy8p+B1cn/DhOmXquui1BcxvASqzzevkBlbQoBa73y04B 2OdqVOVRkwZWRROz0AAAAbdGVzdCBrZXkgd2l0aG91dCBwYXNzcGhyYXNlAQIDBA== -----END OPENSSH PRIVATE KEY----- """, private_key_blob=bytes.fromhex(""" 00 00 00 13 65 63 64 73 61 2d 73 68 61 32 2d 6e 69 73 74 70 33 38 34 00 00 00 08 6e 69 73 74 70 33 38 34 00 00 00 61 04 a0 90 e8 e4 02 fa bb bf 9b c7 ba 3d ca 04 be 3f 10 05 9c 9f 98 59 0f 22 9c 6f 25 74 78 13 06 16 ae e1 29 74 25 66 a2 1a 84 b5 6a 78 c4 74 42 a9 a4 c4 d2 76 ae 5c b6 54 77 17 9d ea b7 bd 9f bd 7c 8b 16 08 e7 58 93 95 8f dc d6 4f ce ff 75 d5 79 fb c1 b1 24 0b b4 27 74 3e 73 2a 59 b8 a4 15 00 00 00 31 00 f9 b1 3c bc a7 e0 75 72 7f c3 84 e9 97 aa eb a2 d4 17 31 bc 04 aa cf 37 af 90 19 5b 42 80 5a ef 7c b4 e0 1d 8e 76 a5 4e 55 19 30 65 64 51 3b 3d 00 00 00 1b 74 65 73 74 20 6b 65 79 20 77 69 74 68 6f 75 74 20 70 61 73 73 70 68 72 61 73 65 """), public_key=rb"""ecdsa-sha2-nistp384 AAAAE2VjZHNhLXNoYTItbmlzdHAzODQAAAAIbmlzdHAzODQAAABhBKCQ6OQC+ru/m8e6PcoEvj8QBZyfmFkPIpxvJXR4EwYWruEpdCVmohqEtWp4xHRCqaTE0nauXLZUdxed6re9n718ixYI51iTlY/c1k/O/3XVefvBsSQLtCd0PnMqWbikFQ== test key without passphrase """, public_key_data=bytes.fromhex(""" 00 00 00 13 65 63 64 73 61 2d 73 68 61 32 2d 6e 69 73 74 70 33 38 34 00 00 00 08 6e 69 73 74 70 33 38 34 00 00 00 61 04 a0 90 e8 e4 02 fa bb bf 9b c7 ba 3d ca 04 be 3f 10 05 9c 9f 98 59 0f 22 9c 6f 25 74 78 13 06 16 ae e1 29 74 25 66 a2 1a 84 b5 6a 78 c4 74 42 a9 a4 c4 d2 76 ae 5c b6 54 77 17 9d ea b7 bd 9f bd 7c 8b 16 08 e7 58 93 95 8f dc d6 4f ce ff 75 d5 79 fb c1 b1 24 0b b4 27 74 3e 73 2a 59 b8 a4 15 """), expected_signatures={ SSHTestKeyDeterministicSignatureClass.RFC_6979: SSHTestKeyDeterministicSignature( signature=bytes.fromhex(""" 00 00 00 13 65 63 64 73 61 2d 73 68 61 32 2d 6e 69 73 74 70 33 38 34 00 00 00 68 00 00 00 30 78 e1 a8 f5 8c d2 7a 21 e5 a2 ca e6 d0 1a 19 f8 3a 1c 39 7e 71 a0 e6 7e 93 83 49 95 05 01 d0 3e 23 22 cd 09 63 7f 7c 6c b0 97 44 6d 7e 48 39 87 00 00 00 30 10 ee 85 51 77 2b 91 2c e9 42 79 66 59 8a a2 c0 d2 c8 8a 8f 2f 8f 33 87 9e 12 54 e4 da 02 f9 e7 95 f5 82 6f 82 2b 38 6d 6e 5d 17 15 ac 12 e7 62 """), derived_passphrase=rb"""AAAAMHjhqPWM0noh5aLK5tAaGfg6HDl+caDmfpODSZUFAdA+IyLNCWN/fGywl0Rtfkg5hwAAADAQ7oVRdyuRLOlCeWZZiqLA0siKjy+PM4eeElTk2gL555X1gm+CKzhtbl0XFawS52I=""", signature_class=SSHTestKeyDeterministicSignatureClass.RFC_6979, ), SSHTestKeyDeterministicSignatureClass.Pageant_068_080: SSHTestKeyDeterministicSignature( signature=bytes.fromhex(""" 00 00 00 13 65 63 64 73 61 2d 73 68 61 32 2d 6e 69 73 74 70 33 38 34 00 00 00 69 00 00 00 30 4b 3e b7 22 c2 87 77 6d e0 3e f5 05 75 36 b6 0f cd 9f a4 49 c7 48 ef 76 fd ea 4b 49 e3 b1 f2 22 d5 41 22 d7 96 b2 29 70 ff bb 81 97 27 e2 35 60 00 00 00 31 00 c8 a4 d8 62 fe f2 a6 63 97 98 08 c7 39 24 b2 55 0a b8 e7 79 ab a6 62 96 3e cc ea 73 e2 fb dc 46 d6 25 b9 c8 0c e8 3e 33 91 51 78 25 a8 c5 46 85 """), derived_passphrase=rb"""AAAAMEs+tyLCh3dt4D71BXU2tg/Nn6RJx0jvdv3qS0njsfIi1UEi15ayKXD/u4GXJ+I1YAAAADEAyKTYYv7ypmOXmAjHOSSyVQq453mrpmKWPszqc+L73EbWJbnIDOg+M5FReCWoxUaF""", signature_class=SSHTestKeyDeterministicSignatureClass.Pageant_068_080, ), }, ), "ecdsa521": SSHTestKey( private_key=rb"""-----BEGIN OPENSSH PRIVATE KEY----- b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAArAAAABNlY2RzYS 1zaGEyLW5pc3RwNTIxAAAACG5pc3RwNTIxAAAAhQQASVOdwDznmlcGqiLvFtYeVtrAEiVz iIfsL7jEM8Utu/m8WSkPFQtjwqdFw+WfZ0mi6qMbEFgi/ELzZSKVteCSbcMAhqAkOMFKiD u4bxvsM6bT02Ru7q2yT41ySyGhUD0QySBnI6Ckt/wnQ1TEpj8zDKiRErxs9e6QLGElNRkz LPMs+mMAAAEY2FXeh9hV3ocAAAATZWNkc2Etc2hhMi1uaXN0cDUyMQAAAAhuaXN0cDUyMQ AAAIUEAElTncA855pXBqoi7xbWHlbawBIlc4iH7C+4xDPFLbv5vFkpDxULY8KnRcPln2dJ ouqjGxBYIvxC82UilbXgkm3DAIagJDjBSog7uG8b7DOm09Nkbu6tsk+NckshoVA9EMkgZy OgpLf8J0NUxKY/MwyokRK8bPXukCxhJTUZMyzzLPpjAAAAQSFqUmKK7lGQzxT6GKZSLDju U3otwLYnuj+/5AdzuB/zotu95UdFv9I2DNXzd9E4WAyz6IqBBNcsMkxrzHAdqsYDAAAAG3 Rlc3Qga2V5IHdpdGhvdXQgcGFzc3BocmFzZQ== -----END OPENSSH PRIVATE KEY----- """, private_key_blob=bytes.fromhex(""" 00 00 00 13 65 63 64 73 61 2d 73 68 61 32 2d 6e 69 73 74 70 35 32 31 00 00 00 08 6e 69 73 74 70 35 32 31 00 00 00 85 04 00 49 53 9d c0 3c e7 9a 57 06 aa 22 ef 16 d6 1e 56 da c0 12 25 73 88 87 ec 2f b8 c4 33 c5 2d bb f9 bc 59 29 0f 15 0b 63 c2 a7 45 c3 e5 9f 67 49 a2 ea a3 1b 10 58 22 fc 42 f3 65 22 95 b5 e0 92 6d c3 00 86 a0 24 38 c1 4a 88 3b b8 6f 1b ec 33 a6 d3 d3 64 6e ee ad b2 4f 8d 72 4b 21 a1 50 3d 10 c9 20 67 23 a0 a4 b7 fc 27 43 54 c4 a6 3f 33 0c a8 91 12 bc 6c f5 ee 90 2c 61 25 35 19 33 2c f3 2c fa 63 00 00 00 41 21 6a 52 62 8a ee 51 90 cf 14 fa 18 a6 52 2c 38 ee 53 7a 2d c0 b6 27 ba 3f bf e4 07 73 b8 1f f3 a2 db bd e5 47 45 bf d2 36 0c d5 f3 77 d1 38 58 0c b3 e8 8a 81 04 d7 2c 32 4c 6b cc 70 1d aa c6 03 00 00 00 1b 74 65 73 74 20 6b 65 79 20 77 69 74 68 6f 75 74 20 70 61 73 73 70 68 72 61 73 65 """), public_key=rb"""ecdsa-sha2-nistp521 AAAAE2VjZHNhLXNoYTItbmlzdHA1MjEAAAAIbmlzdHA1MjEAAACFBABJU53APOeaVwaqIu8W1h5W2sASJXOIh+wvuMQzxS27+bxZKQ8VC2PCp0XD5Z9nSaLqoxsQWCL8QvNlIpW14JJtwwCGoCQ4wUqIO7hvG+wzptPTZG7urbJPjXJLIaFQPRDJIGcjoKS3/CdDVMSmPzMMqJESvGz17pAsYSU1GTMs8yz6Yw== test key without passphrase """, public_key_data=bytes.fromhex(""" 00 00 00 13 65 63 64 73 61 2d 73 68 61 32 2d 6e 69 73 74 70 35 32 31 00 00 00 08 6e 69 73 74 70 35 32 31 00 00 00 85 04 00 49 53 9d c0 3c e7 9a 57 06 aa 22 ef 16 d6 1e 56 da c0 12 25 73 88 87 ec 2f b8 c4 33 c5 2d bb f9 bc 59 29 0f 15 0b 63 c2 a7 45 c3 e5 9f 67 49 a2 ea a3 1b 10 58 22 fc 42 f3 65 22 95 b5 e0 92 6d c3 00 86 a0 24 38 c1 4a 88 3b b8 6f 1b ec 33 a6 d3 d3 64 6e ee ad b2 4f 8d 72 4b 21 a1 50 3d 10 c9 20 67 23 a0 a4 b7 fc 27 43 54 c4 a6 3f 33 0c a8 91 12 bc 6c f5 ee 90 2c 61 25 35 19 33 2c f3 2c fa 63 """), expected_signatures={ SSHTestKeyDeterministicSignatureClass.RFC_6979: SSHTestKeyDeterministicSignature( signature=bytes.fromhex(""" 00 00 00 13 65 63 64 73 61 2d 73 68 61 32 2d 6e 69 73 74 70 35 32 31 00 00 00 8b 00 00 00 42 01 d8 ea c2 1e 55 c6 9e dd 4b 00 ed 1b 93 19 cc 9b 74 27 44 c0 c0 e3 5b 3d 81 15 00 12 cc 07 89 54 97 ec 60 42 ad e6 40 c1 c6 5f c0 1b c3 0a 8e 58 6e da 3f a9 57 90 04 79 46 1d 48 bb 19 67 e9 65 19 00 00 00 41 7d 58 e0 2e d7 86 2e 36 8c 1a 44 23 af 19 e7 51 97 bb fb 32 90 a1 35 bb 88 d7 b5 22 37 b3 99 ba e4 a7 9d 2d 56 14 0a f5 68 f5 cc 38 84 e9 b6 c6 71 7a 3b 87 e7 7a b1 37 e7 1d e6 80 96 d1 a6 1e bc """), derived_passphrase=rb"""AAAAQgHY6sIeVcae3UsA7RuTGcybdCdEwMDjWz2BFQASzAeJVJfsYEKt5kDBxl/AG8MKjlhu2j+pV5AEeUYdSLsZZ+llGQAAAEF9WOAu14YuNowaRCOvGedRl7v7MpChNbuI17UiN7OZuuSnnS1WFAr1aPXMOITptsZxejuH53qxN+cd5oCW0aYevA==""", signature_class=SSHTestKeyDeterministicSignatureClass.RFC_6979, ), SSHTestKeyDeterministicSignatureClass.Pageant_068_080: SSHTestKeyDeterministicSignature( signature=bytes.fromhex(""" 00 00 00 13 65 63 64 73 61 2d 73 68 61 32 2d 6e 69 73 74 70 35 32 31 00 00 00 8c 00 00 00 42 01 ce fe 9d 66 b6 01 76 2e 86 c2 ab 68 62 73 44 05 23 fd d1 79 07 fc 45 f5 c0 83 36 88 61 d4 04 79 90 b0 ef 8b 3c b5 55 0e cc 26 6b a0 3e 6a 04 48 ca e4 6a a5 a0 cf 91 5f 71 6f 37 9a 0f 6b a9 fb 9b 00 00 00 42 01 6d 21 77 c6 13 fa ea ac de 90 19 24 5a d2 61 39 d9 66 9b 86 1a 41 04 58 a2 9b b8 93 b6 6f 82 23 f2 01 23 c7 ff 5a d3 86 95 0f da 28 f9 3b e3 9c 27 e7 b2 d7 66 4e 5f 38 36 4c 8c be 76 4e fa 0a 2d """), derived_passphrase=rb"""AAAAQgHO/p1mtgF2LobCq2hic0QFI/3ReQf8RfXAgzaIYdQEeZCw74s8tVUOzCZroD5qBEjK5GqloM+RX3FvN5oPa6n7mwAAAEIBbSF3xhP66qzekBkkWtJhOdlmm4YaQQRYopu4k7ZvgiPyASPH/1rThpUP2ij5O+OcJ+ey12ZOXzg2TIy+dk76Ci0=""", signature_class=SSHTestKeyDeterministicSignatureClass.Pageant_068_080, ), }, ), } """The master list of SSH test keys.""" SUPPORTED_KEYS: Mapping[str, SSHTestKey] = { k: v for k, v in ALL_KEYS.items() if v.is_suitable() } """The subset of SSH test keys suitable for use with vault.""" UNSUITABLE_KEYS: Mapping[str, SSHTestKey] = { k: v for k, v in ALL_KEYS.items() if not v.is_suitable() } """The subset of SSH test keys not suitable for use with vault.""" DUMMY_SERVICE = "service1" """A standard/sample service name.""" DUMMY_PASSPHRASE = "my secret passphrase" """A standard/sample passphrase.""" DUMMY_KEY1 = SUPPORTED_KEYS["ed25519"].public_key_data """A sample universally supported SSH test key (in wire format).""" DUMMY_KEY1_B64 = base64.standard_b64encode(DUMMY_KEY1).decode("ASCII") """ A sample universally supported SSH test key (in `authorized_keys` format). """ DUMMY_KEY2 = SUPPORTED_KEYS["rsa"].public_key_data """A second supported SSH test key (in wire format).""" DUMMY_KEY2_B64 = base64.standard_b64encode(DUMMY_KEY2).decode("ASCII") """A second supported SSH test key (in `authorized_keys` format).""" DUMMY_KEY3 = SUPPORTED_KEYS["ed448"].public_key_data """A third supported SSH test key (in wire format).""" DUMMY_KEY3_B64 = base64.standard_b64encode(DUMMY_KEY3).decode("ASCII") """A third supported SSH test key (in `authorized_keys` format).""" DUMMY_CONFIG_SETTINGS = { "length": 10, "upper": 1, "lower": 1, "repeat": 5, "number": 1, "space": 1, "dash": 1, "symbol": 1, } """Sample vault settings.""" DUMMY_RESULT_PASSPHRASE = b".2V_QJkd o" """ The passphrase derived from [`DUMMY_SERVICE`][] using [`DUMMY_PASSPHRASE`][]. """ DUMMY_RESULT_KEY1 = b"E int: """Return the imposed limit on the number of concurrent threads. We use [`os.process_cpu_count`][] as the limit on Python 3.13 and higher, and [`os.cpu_count`][] on Python 3.12 and below. On Python 3.12 and below, we explicitly support the `PYTHON_CPU_COUNT` environment variable. We guarantee at least [`MIN_CONCURRENCY`][] many threads in any case. """ # noqa: RUF002 result: int | None = None if sys.version_info >= (3, 13): result = os.process_cpu_count() else: with contextlib.suppress(KeyError, ValueError): result = result or int(os.environ["PYTHON_CPU_COUNT"], 10) with contextlib.suppress(AttributeError): result = result or len(os.sched_getaffinity(os.getpid())) return max(result if result is not None else 0, MIN_CONCURRENCY) def get_concurrency_step_count( settings: hypothesis.settings | None = None, ) -> int: """Return the desired step count for concurrency-related tests. This is the smaller of the [general concurrency limit][tests.get_concurrency_limit] and the step count from the current hypothesis settings. Args: settings: The hypothesis settings for a specific tests. If not given, then the current profile will be queried directly. """ if settings is None: # pragma: no cover settings = hypothesis.settings() return min(get_concurrency_limit(), settings.stateful_step_count) def xfail_on_the_annoying_os( f: Callable | None = None, /, *, reason: str = "", ) -> pytest.MarkDecorator | Any: # pragma: no cover """Annotate a test which fails on The Annoying OS. Annotate a test to indicate that it fails on The Annoying Operating System, a.k.a. Microsoft Windows. Usually this is due to differences in the design of OS internals, and usually, these differences are both unnecessary and stupid. Args: f: A callable to decorate. If not given, return the pytest mark directly. reason: An optional, more detailed reason stating why this test fails on The Annoying OS. Returns: The callable, marked as an expected failure on the Annoying OS, or alternatively a suitable pytest mark if no callable was passed. The reason will begin with the phrase "The Annoying OS behaves differently.", and the optional detailed reason, if not empty, will follow. """ base_reason = "The Annoying OS behaves differently." full_reason = base_reason if not reason else f"{base_reason} {reason}" mark = pytest.mark.xfail( sys.platform == "win32", reason=full_reason, raises=(AssertionError, hypothesis.errors.FailedHealthCheck), strict=True, ) return mark if f is None else mark(f) @socketprovider.SocketProvider.register("stub_agent") class StubbedSSHAgentSocket: """A stubbed SSH agent presenting an [`_types.SSHAgentSocket`][].""" _SOCKET_IS_CLOSED = "Socket is closed." _NO_FLAG_SUPPORT = "This stubbed SSH agent socket does not support flags." _PROTOCOL_VIOLATION = "SSH agent protocol violation." _INVALID_REQUEST = "Invalid request." _UNSUPPORTED_REQUEST = "Unsupported request." HEADER_SIZE = 4 CODE_SIZE = 1 KNOWN_EXTENSIONS = frozenset({ "query", "list-extended@putty.projects.tartarus.org", }) """Known and implemented protocol extensions.""" def __init__(self, *extensions: str) -> None: """Initialize the agent.""" self.send_to_client = bytearray() """ The buffered response to the client, read piecemeal by [`recv`][]. """ self.receive_from_client = bytearray() """The last request issued by the client.""" self.closed = False """True if the connection is closed, false otherwise.""" self.enabled_extensions = frozenset(extensions) & self.KNOWN_EXTENSIONS """ Extensions actually enabled in this particular stubbed SSH agent. """ self.try_rfc6979 = False """ Attempt to issue DSA and ECDSA signatures according to RFC 6979? """ self.try_pageant_068_080 = False """ Attempt to issue DSA and ECDSA signatures as per Pageant 0.68–0.80? """ # noqa: RUF001 def __enter__(self) -> Self: """Return self.""" return self def __exit__(self, *args: object) -> None: """Mark the agent's socket as closed.""" self.closed = True def sendall(self, data: Buffer, flags: int = 0, /) -> None: """Send data to the SSH agent. The signature, and behavior, is identical to [`socket.socket.sendall`][]. Upon successful sending, this agent will parse the request, call the appropriate handler, and buffer the result such that it can be read via [`recv`][], in accordance with the SSH agent protocol. Args: data: Binary data to send to the agent. flags: Reserved. Must be 0. Returns: Nothing. The result should be requested via [`recv`][], and interpreted in accordance with the SSH agent protocol. Raises: AssertionError: The flags argument, if specified, must be 0. ValueError: The agent's socket is already closed. No further requests can be sent. """ assert not flags, self._NO_FLAG_SUPPORT if self.closed: raise ValueError(self._SOCKET_IS_CLOSED) self.receive_from_client.extend(memoryview(data)) try: self.parse_client_request_and_dispatch() except ValueError: payload = int.to_bytes(_types.SSH_AGENT.FAILURE.value, 1, "big") self.send_to_client.extend(int.to_bytes(len(payload), 4, "big")) self.send_to_client.extend(payload) finally: self.receive_from_client.clear() def recv(self, count: int, flags: int = 0, /) -> bytes: """Read data from the SSH agent. As per the SSH agent protocol, data is only available to be read immediately after a request via [`sendall`][]. Calls to [`recv`][] at other points in time that attempt to read data violate the protocol, and will fail. Notwithstanding the last sentence, at any point in time, though pointless, it is additionally permissible to read 0 bytes from the agent, or any number of bytes from a closed socket. Args: count: Number of bytes to read from the agent. flags: Reserved. Must be 0. Returns: (A chunk of) the SSH agent's response to the most recent request. If reading 0 bytes, or if reading from a closed socket, the returned chunk is always an empty byte string. Raises: AssertionError: The flags argument, if specified, must be 0. Alternatively, `recv` was called when there was no response to be obtained, in violation of the SSH agent protocol. """ assert not flags, self._NO_FLAG_SUPPORT assert not count or self.closed or self.send_to_client, ( self._PROTOCOL_VIOLATION ) ret = bytes(self.send_to_client[:count]) del self.send_to_client[:count] return ret def parse_client_request_and_dispatch(self) -> None: """Parse the client request and call the matching handler. This agent supports the [`SSH_AGENTC_REQUEST_IDENTITIES`][_types.SSH_AGENTC.REQUEST_IDENTITIES], [`SSH_AGENTC_SIGN_REQUEST`][_types.SSH_AGENTC.SIGN_REQUEST] and the [`SSH_AGENTC_EXTENSION`][_types.SSH_AGENTC.EXTENSION] request types. """ if len(self.receive_from_client) < self.HEADER_SIZE + self.CODE_SIZE: raise ValueError(self._INVALID_REQUEST) target_header = ssh_agent.SSHAgentClient.uint32( len(self.receive_from_client) - self.HEADER_SIZE ) if target_header != self.receive_from_client[: self.HEADER_SIZE]: raise ValueError(self._INVALID_REQUEST) code = _types.SSH_AGENTC( int.from_bytes( self.receive_from_client[ self.HEADER_SIZE : self.HEADER_SIZE + self.CODE_SIZE ], "big", ) ) def is_enabled_extension(extension: str) -> bool: if ( extension not in self.enabled_extensions or code != _types.SSH_AGENTC.EXTENSION ): return False string = ssh_agent.SSHAgentClient.string extension_marker = b"\x1b" + string(extension.encode("ascii")) return self.receive_from_client.startswith(extension_marker, 4) result: Buffer | Iterator[int] if code == _types.SSH_AGENTC.REQUEST_IDENTITIES: result = self.request_identities(list_extended=False) elif code == _types.SSH_AGENTC.SIGN_REQUEST: result = self.sign() elif is_enabled_extension("query"): result = self.query_extensions() elif is_enabled_extension("list-extended@putty.projects.tartarus.org"): result = self.request_identities(list_extended=True) else: raise ValueError(self._UNSUPPORTED_REQUEST) self.send_to_client.extend( ssh_agent.SSHAgentClient.string(bytes(result)) ) def query_extensions(self) -> Iterator[int]: """Answer an `SSH_AGENTC_EXTENSION` request. Yields: The bytes payload of the response, without the protocol framing. The payload is yielded byte by byte, as an iterable of 8-bit integers. """ yield _types.SSH_AGENT.EXTENSION_RESPONSE.value yield from ssh_agent.SSHAgentClient.string(b"query") extension_answers = [ b"query", b"list-extended@putty.projects.tartarus.org", ] for a in extension_answers: yield from ssh_agent.SSHAgentClient.string(a) def request_identities( self, *, list_extended: bool = False ) -> Iterator[int]: """Answer an `SSH_AGENTC_REQUEST_IDENTITIES` request. Args: list_extended: If true, answer an `SSH_AGENTC_EXTENSION` request for the `list-extended@putty.projects.tartarus.org` extension. Otherwise, answer an `SSH_AGENTC_REQUEST_IDENTITIES` request. Yields: The bytes payload of the response, without the protocol framing. The payload is yielded byte by byte, as an iterable of 8-bit integers. """ if list_extended: yield _types.SSH_AGENT.SUCCESS.value else: yield _types.SSH_AGENT.IDENTITIES_ANSWER.value signature_classes = [ SSHTestKeyDeterministicSignatureClass.SPEC, ] if ( "list-extended@putty.projects.tartarus.org" in self.enabled_extensions ): signature_classes.append( SSHTestKeyDeterministicSignatureClass.RFC_6979 ) keys = [ v for v in ALL_KEYS.values() if any(cls in v.expected_signatures for cls in signature_classes) ] yield from ssh_agent.SSHAgentClient.uint32(len(keys)) for key in keys: yield from ssh_agent.SSHAgentClient.string(key.public_key_data) yield from ssh_agent.SSHAgentClient.string( b"test key without passphrase" ) if list_extended: yield from ssh_agent.SSHAgentClient.string( ssh_agent.SSHAgentClient.uint32(0) ) def sign(self) -> bytes: """Answer an `SSH_AGENTC_SIGN_REQUEST` request. Returns: The bytes payload of the response, without the protocol framing. """ try_rfc6979 = ( "list-extended@putty.projects.tartarus.org" in self.enabled_extensions ) spec = SSHTestKeyDeterministicSignatureClass.SPEC rfc6979 = SSHTestKeyDeterministicSignatureClass.RFC_6979 key_blob, rest = ssh_agent.SSHAgentClient.unstring_prefix( self.receive_from_client[self.HEADER_SIZE + self.CODE_SIZE :] ) sign_data, rest = ssh_agent.SSHAgentClient.unstring_prefix(rest) if len(rest) != 4: raise ValueError(self._INVALID_REQUEST) flags = int.from_bytes(rest, "big") if flags: raise ValueError(self._UNSUPPORTED_REQUEST) if sign_data != vault.Vault.UUID: raise ValueError(self._UNSUPPORTED_REQUEST) for key in ALL_KEYS.values(): if key.public_key_data == key_blob: if spec in key.expected_signatures: return int.to_bytes( _types.SSH_AGENT.SIGN_RESPONSE.value, 1, "big" ) + ssh_agent.SSHAgentClient.string( key.expected_signatures[spec].signature ) if try_rfc6979 and rfc6979 in key.expected_signatures: return int.to_bytes( _types.SSH_AGENT.SIGN_RESPONSE.value, 1, "big" ) + ssh_agent.SSHAgentClient.string( key.expected_signatures[rfc6979].signature ) raise ValueError(self._UNSUPPORTED_REQUEST) raise ValueError(self._UNSUPPORTED_REQUEST) @socketprovider.SocketProvider.register("stub_with_address") class StubbedSSHAgentSocketWithAddress(StubbedSSHAgentSocket): """A [`StubbedSSHAgentSocket`][] requiring a specific address.""" ADDRESS = "stub-ssh-agent:" """The correct address for connecting to this stubbed agent.""" def __init__(self, *extensions: str) -> None: """Initialize the agent, based on `SSH_AUTH_SOCK`. Socket addresses of the form `stub-ssh-agent:` will raise an [`OSError`][] (or the respective subclass) with the specified [`errno`][] value. For example, `stub-ssh-agent:EPERM` will raise a [`PermissionError`][]. Raises: KeyError: The `SSH_AUTH_SOCK` environment variable is not set. OSError: The address in `SSH_AUTH_SOCK` is unsuited. """ super().__init__(*extensions) try: orig_address = os.environ["SSH_AUTH_SOCK"] except KeyError as exc: msg = "SSH_AUTH_SOCK environment variable" raise KeyError(msg) from exc address = orig_address if not address.startswith(self.ADDRESS): address = self.ADDRESS + "ENOENT" errcode = address.removeprefix(self.ADDRESS) if errcode and not ( errcode.startswith("E") and hasattr(errno, errcode) ): errcode = "EINVAL" if errcode: errno_val = getattr(errno, errcode) raise OSError(errno_val, os.strerror(errno_val), orig_address) @socketprovider.SocketProvider.register( "stub_with_address_and_deterministic_dsa" ) class StubbedSSHAgentSocketWithAddressAndDeterministicDSA( StubbedSSHAgentSocketWithAddress ): """A [`StubbedSSHAgentSocketWithAddress`][] supporting deterministic DSA.""" def __init__(self) -> None: """Initialize the agent. Set the supported extensions, and try issuing RFC 6979 and Pageant 0.68–0.80 DSA/ECDSA signatures, if possible. See the [superclass constructor][StubbedSSHAgentSocketWithAddress] for other details. Raises: KeyError: See superclass. OSError: See superclass. """ # noqa: RUF002 super().__init__("query", "list-extended@putty.projects.tartarus.org") self.try_rfc6979 = True self.try_pageant_068_080 = True def list_keys(self: Any = None) -> list[_types.SSHKeyCommentPair]: """Return a list of all SSH test keys, as key/comment pairs. Intended as a monkeypatching replacement for [`ssh_agent.SSHAgentClient.list_keys`][]. """ del self # Unused. Pair = _types.SSHKeyCommentPair # noqa: N806 return [ Pair(value.public_key_data, f"{key} test key".encode("ASCII")) for key, value in ALL_KEYS.items() ] def sign( self: Any, key: bytes | bytearray, message: bytes | bytearray ) -> bytes: """Return the signature of `message` under `key`. Can only handle keys in [`SUPPORTED_KEYS`][], and only the vault UUID as the message. Intended as a monkeypatching replacement for [`ssh_agent.SSHAgentClient.sign`][]. """ del self # Unused. assert message == vault.Vault.UUID for value in SUPPORTED_KEYS.values(): if value.public_key_data == key: # pragma: no branch return value.expected_signatures[ SSHTestKeyDeterministicSignatureClass.SPEC ].signature raise AssertionError def list_keys_singleton(self: Any = None) -> list[_types.SSHKeyCommentPair]: """Return a singleton list of the first supported SSH test key. The key is returned as a key/comment pair. Intended as a monkeypatching replacement for [`ssh_agent.SSHAgentClient.list_keys`][]. """ del self # Unused. Pair = _types.SSHKeyCommentPair # noqa: N806 list1 = [ Pair(value.public_key_data, f"{key} test key".encode("ASCII")) for key, value in SUPPORTED_KEYS.items() ] return list1[:1] def suitable_ssh_keys(conn: Any) -> Iterator[_types.SSHKeyCommentPair]: """Return a two-item list of SSH test keys (key/comment pairs). Intended as a monkeypatching replacement for `cli_machinery.get_suitable_ssh_keys` to better script and test the interactive key selection. When used this way, `derivepassphrase` believes that only those two keys are loaded and suitable. """ del conn # Unused. Pair = _types.SSHKeyCommentPair # noqa: N806 yield from [ Pair(DUMMY_KEY1, b"no comment"), Pair(DUMMY_KEY2, b"a comment"), ] def phrase_from_key( key: bytes, /, *, conn: ssh_agent.SSHAgentClient | socket.socket | None = None, ) -> bytes: """Return the "equivalent master passphrase" for key. Only works for key [`DUMMY_KEY1`][]. Intended as a monkeypatching replacement for [`vault.Vault.phrase_from_key`][], bypassing communication with an actual SSH agent. """ del conn if key == DUMMY_KEY1: # pragma: no branch return DUMMY_PHRASE_FROM_KEY1 raise KeyError(key) # pragma: no cover def provider_entry_provider() -> _types.SSHAgentSocket: # pragma: no cover """A pseudo provider for a [`_types.SSHAgentSocketProviderEntry`][].""" msg = "We are not supposed to be called!" raise AssertionError(msg) provider_entry1 = _types.SSHAgentSocketProviderEntry( provider_entry_provider, "entry1", ("entry1a", "entry1b", "entry1c") ) """A sample [`_types.SSHAgentSocketProviderEntry`][].""" provider_entry2 = _types.SSHAgentSocketProviderEntry( provider_entry_provider, "entry2", ("entry2d", "entry2e") ) """A sample [`_types.SSHAgentSocketProviderEntry`][].""" posix_entry = _types.SSHAgentSocketProviderEntry( socketprovider.SocketProvider.resolve("posix"), "posix", () ) """ The standard [`_types.SSHAgentSocketProviderEntry`][] for the UNIX domain socket handler on POSIX systems. """ the_annoying_os_entry = _types.SSHAgentSocketProviderEntry( socketprovider.SocketProvider.resolve("the_annoying_os"), "the_annoying_os", (), ) """ The standard [`_types.SSHAgentSocketProviderEntry`][] for the named pipe handler on The Annoying Operating System. """ faulty_entry_callable = _types.SSHAgentSocketProviderEntry( (), # type: ignore[arg-type] "tuple", (), ) """ A faulty [`_types.SSHAgentSocketProviderEntry`][]: the indicated handler is not a callable. """ faulty_entry_name_exists = _types.SSHAgentSocketProviderEntry( socketprovider.SocketProvider.resolve("the_annoying_os"), "posix", () ) """ A faulty [`_types.SSHAgentSocketProviderEntry`][]: the indicated handler is already registered with a different callable. """ faulty_entry_alias_exists = _types.SSHAgentSocketProviderEntry( socketprovider.SocketProvider.resolve("posix"), "posix", ("unix_domain", "the_annoying_os"), ) """ A faulty [`_types.SSHAgentSocketProviderEntry`][]: the alias is already registered with a different callable. """ @contextlib.contextmanager def faked_entry_point_list( # noqa: C901 additional_entry_points: Sequence[importlib.metadata.EntryPoint], remove_conflicting_entries: bool = False, ) -> Iterator[Sequence[str]]: """Yield a context where additional entry points are visible. Args: additional_entry_points: A sequence of entry point objects that should additionally be visible. remove_conflicting_entries: If true, remove all names provided by the additional entry points, otherwise leave them untouched. Yields: A sequence of registry names that are newly available within the context. """ true_entry_points = importlib.metadata.entry_points() additional_entry_points = list(additional_entry_points) if sys.version_info >= (3, 12): new_entry_points = importlib.metadata.EntryPoints( list(true_entry_points) + additional_entry_points ) @overload def mangled_entry_points( *, group: None = None ) -> importlib.metadata.EntryPoints: ... @overload def mangled_entry_points( *, group: str ) -> importlib.metadata.EntryPoints: ... def mangled_entry_points( **params: Any, ) -> importlib.metadata.EntryPoints: return new_entry_points.select(**params) elif sys.version_info >= (3, 10): # Compatibility concerns within importlib.metadata: depending on # whether the .select() API is used, the result is either the dict # of groups of points (as in < 3.10), or the EntryPoints iterable # (as in >= 3.12). So our wrapper needs to duplicate that # interface. FUN. new_entry_points_dict = { k: list(v) for k, v in true_entry_points.items() } for ep in additional_entry_points: new_entry_points_dict.setdefault(ep.group, []).append(ep) new_entry_points = importlib.metadata.EntryPoints([ ep for group in new_entry_points_dict.values() for ep in group ]) @overload def mangled_entry_points( *, group: None = None ) -> dict[ str, list[importlib.metadata.EntryPoint] | tuple[importlib.metadata.EntryPoint, ...], ]: ... @overload def mangled_entry_points( *, group: str ) -> importlib.metadata.EntryPoints: ... def mangled_entry_points( **params: Any, ) -> ( importlib.metadata.EntryPoints | dict[ str, list[importlib.metadata.EntryPoint] | tuple[importlib.metadata.EntryPoint, ...], ] ): return ( new_entry_points.select(**params) if params else new_entry_points_dict ) else: new_entry_points: dict[ str, list[importlib.metadata.EntryPoint] | tuple[importlib.metadata.EntryPoint, ...], ] = { group_name: list(group) for group_name, group in true_entry_points.items() } for ep in additional_entry_points: new_entry_points.setdefault(ep.group, []) new_entry_points[ep.group].append(ep) new_entry_points = { group_name: tuple(group) for group_name, group in new_entry_points.items() } @overload def mangled_entry_points( *, group: None = None ) -> dict[str, tuple[importlib.metadata.EntryPoint, ...]]: ... @overload def mangled_entry_points( *, group: str ) -> tuple[importlib.metadata.EntryPoint, ...]: ... def mangled_entry_points( *, group: str | None = None ) -> ( dict[str, tuple[importlib.metadata.EntryPoint, ...]] | tuple[importlib.metadata.EntryPoint, ...] ): return ( new_entry_points.get(group, ()) if group is not None else new_entry_points ) registry = socketprovider.SocketProvider.registry new_registry = registry.copy() keys = [ep.load().key for ep in additional_entry_points] aliases = [a for ep in additional_entry_points for a in ep.load().aliases] if remove_conflicting_entries: # pragma: no cover [unused] for name in [*keys, *aliases]: new_registry.pop(name, None) with pytest.MonkeyPatch.context() as monkeypatch: monkeypatch.setattr( socketprovider.SocketProvider, "registry", new_registry ) monkeypatch.setattr( importlib.metadata, "entry_points", mangled_entry_points ) yield (*keys, *aliases) @contextlib.contextmanager def isolated_config( monkeypatch: pytest.MonkeyPatch, runner: CliRunner, main_config_str: str | None = None, ) -> Iterator[None]: """Provide an isolated configuration setup, as a context. This context manager sets up (and changes into) a temporary directory, which holds the user configuration specified in `main_config_str`, if any. The manager also ensures that the environment variables `HOME` and `USERPROFILE` are set, and that `DERIVEPASSPHRASE_PATH` is unset. Upon exiting the context, the changes are undone and the temporary directory is removed. Args: monkeypatch: A monkeypatch fixture object. runner: A `click` CLI runner harness. main_config_str: Optional TOML file contents, to be used as the user configuration. Returns: A context manager, without a return value. """ prog_name = cli_helpers.PROG_NAME env_name = prog_name.replace(" ", "_").upper() + "_PATH" # TODO(the-13th-letter): Rewrite using parenthesized with-statements. # https://the13thletter.info/derivepassphrase/latest/pycompatibility/#after-eol-py3.9 with contextlib.ExitStack() as stack: stack.enter_context(runner.isolated_filesystem()) stack.enter_context( cli_machinery.StandardCLILogging.ensure_standard_logging() ) stack.enter_context( cli_machinery.StandardCLILogging.ensure_standard_warnings_logging() ) cwd = str(pathlib.Path.cwd().resolve()) monkeypatch.setenv("HOME", cwd) monkeypatch.setenv("APPDATA", cwd) monkeypatch.setenv("LOCALAPPDATA", cwd) monkeypatch.delenv(env_name, raising=False) config_dir = cli_helpers.config_filename(subsystem=None) config_dir.mkdir(parents=True, exist_ok=True) if isinstance(main_config_str, str): cli_helpers.config_filename("user configuration").write_text( main_config_str, encoding="UTF-8" ) try: yield finally: cli_helpers.config_filename("write lock").unlink(missing_ok=True) @contextlib.contextmanager def isolated_vault_config( monkeypatch: pytest.MonkeyPatch, runner: CliRunner, vault_config: Any, main_config_str: str | None = None, ) -> Iterator[None]: """Provide an isolated vault configuration setup, as a context. Uses [`isolated_config`][] internally. Beyond those actions, this manager also loads the specified vault configuration into the context. Args: monkeypatch: A monkeypatch fixture object. runner: A `click` CLI runner harness. vault_config: A valid vault configuration, to be integrated into the context. main_config_str: Optional TOML file contents, to be used as the user configuration. Returns: A context manager, without a return value. """ with isolated_config( monkeypatch=monkeypatch, runner=runner, main_config_str=main_config_str ): config_filename = cli_helpers.config_filename(subsystem="vault") with config_filename.open("w", encoding="UTF-8") as outfile: json.dump(vault_config, outfile) yield @contextlib.contextmanager def isolated_vault_exporter_config( monkeypatch: pytest.MonkeyPatch, runner: CliRunner, vault_config: str | bytes | None = None, vault_key: str | None = None, ) -> Iterator[None]: """Provide an isolated vault configuration setup, as a context. Works similarly to [`isolated_config`][], except that no user configuration is accepted or integrated into the context. This manager also accepts a serialized vault-native configuration and a vault encryption key to integrate into the context. Args: monkeypatch: A monkeypatch fixture object. runner: A `click` CLI runner harness. vault_config: An optional serialized vault-native configuration, to be integrated into the context. If a text string, then the contents are written to the file `.vault`. If a byte string, then it is treated as base64-encoded zip file contents, which---once inside the `.vault` directory---will be extracted into the current directory. vault_key: An optional encryption key presumably for the stored vault-native configuration. If given, then the environment variable `VAULT_KEY` will be populated with this key while the context is active. Returns: A context manager, without a return value. """ # TODO(the-13th-letter): Remove the fallback implementation. # https://the13thletter.info/derivepassphrase/latest/pycompatibility/#after-eol-py3.10 if TYPE_CHECKING: chdir: Callable[..., AbstractContextManager] else: try: chdir = contextlib.chdir # type: ignore[attr] except AttributeError: @contextlib.contextmanager def chdir( newpath: str | bytes | os.PathLike, ) -> Iterator[None]: # pragma: no branch oldpath = pathlib.Path.cwd().resolve() os.chdir(newpath) yield os.chdir(oldpath) with runner.isolated_filesystem(): cwd = str(pathlib.Path.cwd().resolve()) monkeypatch.setenv("HOME", cwd) monkeypatch.setenv("USERPROFILE", cwd) monkeypatch.delenv( cli_helpers.PROG_NAME.replace(" ", "_").upper() + "_PATH", raising=False, ) monkeypatch.delenv("VAULT_PATH", raising=False) monkeypatch.delenv("VAULT_KEY", raising=False) monkeypatch.delenv("LOGNAME", raising=False) monkeypatch.delenv("USER", raising=False) monkeypatch.delenv("USERNAME", raising=False) if vault_key is not None: monkeypatch.setenv("VAULT_KEY", vault_key) vault_config_path = pathlib.Path(".vault").resolve() # TODO(the-13th-letter): Rewrite using structural pattern matching. # https://the13thletter.info/derivepassphrase/latest/pycompatibility/#after-eol-py3.9 if isinstance(vault_config, str): vault_config_path.write_text(f"{vault_config}\n", encoding="UTF-8") elif isinstance(vault_config, bytes): vault_config_path.mkdir(parents=True, mode=0o700, exist_ok=True) # TODO(the-13th-letter): Rewrite using parenthesized # with-statements. # https://the13thletter.info/derivepassphrase/latest/pycompatibility/#after-eol-py3.9 with contextlib.ExitStack() as stack: stack.enter_context(chdir(vault_config_path)) tmpzipfile = stack.enter_context( tempfile.NamedTemporaryFile(suffix=".zip") ) for line in vault_config.splitlines(): tmpzipfile.write(base64.standard_b64decode(line)) tmpzipfile.flush() tmpzipfile.seek(0, 0) with zipfile.ZipFile(tmpzipfile.file) as zipfileobj: zipfileobj.extractall() elif vault_config is None: pass else: # pragma: no cover assert_never(vault_config) try: yield finally: cli_helpers.config_filename("write lock").unlink(missing_ok=True) def auto_prompt(*args: Any, **kwargs: Any) -> str: """Return [`DUMMY_PASSPHRASE`][]. Intended as a monkeypatching replacement for `cli.prompt_for_passphrase` to better script and test the interactive passphrase queries. """ del args, kwargs # Unused. return DUMMY_PASSPHRASE def make_file_readonly( pathname: str | bytes | os.PathLike[str], /, *, try_race_free_implementation: bool = True, ) -> None: """Mark a file as read-only. On POSIX, this entails removing the write permission bits for user, group and other, and ensuring the read permission bit for user is set. Unfortunately, The Annoying OS (a.k.a. Microsoft Windows) has its own rules: Set exactly(?) the read permission bit for user to make the file read-only, and set exactly(?) the write permission bit for user to make the file read/write; all other permission bit settings are ignored. The cross-platform procedure therefore is: 1. Call `os.stat` on the file, noting the permission bits. 2. Calculate the new permission bits POSIX-style. 3. Call `os.chmod` with permission bit `stat.S_IREAD`. 4. Call `os.chmod` with the correct POSIX-style permissions. If the platform supports it, we use a file descriptor instead of a path name. Otherwise, we use the same path name multiple times, and are susceptible to race conditions. """ fname: int | str | bytes | os.PathLike if try_race_free_implementation and {os.stat, os.chmod} <= os.supports_fd: # The Annoying OS (v11 at least) supports fstat and fchmod, but # does not support changing the file mode on file descriptors # for read-only files. fname = os.open( pathname, os.O_RDWR | getattr(os, "O_CLOEXEC", 0) | getattr(os, "O_NOCTTY", 0), ) else: fname = pathname try: orig_mode = os.stat(fname).st_mode # noqa: PTH116 new_mode = ( orig_mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH | stat.S_IREAD ) os.chmod(fname, stat.S_IREAD) # noqa: PTH101 os.chmod(fname, new_mode) # noqa: PTH101 finally: if isinstance(fname, int): os.close(fname) class ReadableResult(NamedTuple): """Helper class for formatting and testing click.testing.Result objects.""" exception: BaseException | None exit_code: int stdout: str stderr: str def clean_exit( self, *, output: str = "", empty_stderr: bool = False ) -> bool: """Return whether the invocation exited cleanly. Args: output: An expected output string. """ return ( ( not self.exception or ( isinstance(self.exception, SystemExit) and self.exit_code == 0 ) ) and (not output or output in self.stdout) and (not empty_stderr or not self.stderr) ) def error_exit( self, *, error: str | re.Pattern[str] | type[BaseException] = BaseException, record_tuples: Sequence[tuple[str, int, str]] = (), ) -> bool: """Return whether the invocation exited uncleanly. Args: error: An expected error message, or an expected numeric error code, or an expected exception type. """ def error_match(error: str | re.Pattern[str], line: str) -> bool: return ( error in line if isinstance(error, str) else error.match(line) is not None ) # TODO(the-13th-letter): Rewrite using structural pattern matching. # https://the13thletter.info/derivepassphrase/latest/pycompatibility/#after-eol-py3.9 if isinstance(error, type): return isinstance(self.exception, error) else: # noqa: RET505 assert isinstance(error, (str, re.Pattern)) return ( isinstance(self.exception, SystemExit) and self.exit_code > 0 and ( not error or any( error_match(error, line) for line in self.stderr.splitlines(True) ) or error_emitted(error, record_tuples) ) ) class CliRunner: """An abstracted CLI runner class. Intended to provide similar functionality and scope as the [`click.testing.CliRunner`][] class, though not necessarily `click`-specific. Also allows for seamless migration away from `click`, if/when we decide this. """ _SUPPORTS_MIX_STDERR_ATTRIBUTE = not hasattr(click.testing, "StreamMixer") """ True if and only if [`click.testing.CliRunner`][] supports the `mix_stderr` attribute. It was removed in 8.2.0 in favor of the `click.testing.StreamMixer` class. See also [`pallets/click#2523`](https://github.com/pallets/click/pull/2523). """ def __init__( self, *, mix_stderr: bool = False, color: bool | None = None, ) -> None: self.color = color self.mix_stderr = mix_stderr class MixStderrAttribute(TypedDict): mix_stderr: NotRequired[bool] mix_stderr_args: MixStderrAttribute = ( {"mix_stderr": mix_stderr} if self._SUPPORTS_MIX_STDERR_ATTRIBUTE else {} ) self.click_testing_clirunner = click.testing.CliRunner( **mix_stderr_args ) def invoke( self, cli: click.BaseCommand, args: Sequence[str] | str | None = None, input: str | bytes | IO[Any] | None = None, env: Mapping[str, str | None] | None = None, catch_exceptions: bool = True, color: bool | None = None, **extra: Any, ) -> ReadableResult: if color is None: # pragma: no cover color = self.color if self.color is not None else False raw_result = self.click_testing_clirunner.invoke( cli, args=args, input=input, env=env, catch_exceptions=catch_exceptions, color=color, **extra, ) # In 8.2.0, r.stdout is no longer a property aliasing the # `output` attribute, but rather the raw stdout value. try: stderr = raw_result.stderr except ValueError: stderr = raw_result.stdout return ReadableResult( raw_result.exception, raw_result.exit_code, (raw_result.stdout if not self.mix_stderr else raw_result.output) or "", stderr or "", ) return ReadableResult.parse(raw_result) def isolated_filesystem( self, temp_dir: str | os.PathLike[str] | None = None, ) -> AbstractContextManager[str]: return self.click_testing_clirunner.isolated_filesystem( temp_dir=temp_dir ) def parse_sh_export_line(line: str, *, env_name: str) -> str: """Parse the output of typical SSH agents' SSH_AUTH_SOCK lines. Intentionally parses only a small subset of sh(1) syntax which works with current OpenSSH and PuTTY output. We require exactly one variable setting, and one export instruction, both on the same line, and perhaps combined into one statement. Terminating semicolons after each command are ignored. Args: line: A line of sh(1) script to parse. env_name: The name of the environment variable to expect. Returns: The parsed environment variable value. Raises: ValueError: Cannot parse the sh script. Perhaps it is too complex, perhaps it is malformed. """ line = line.rstrip("\r\n") shlex_parser = shlex.shlex( instream=line, posix=True, punctuation_chars=True ) shlex_parser.whitespace = " \t" tokens = list(shlex_parser) orig_tokens = tokens.copy() if tokens[-1] == ";": tokens.pop() if tokens[-3:] == [";", "export", env_name]: tokens[-3:] = [] tokens[:0] = ["export"] if not ( len(tokens) == 2 and tokens[0] == "export" and tokens[1].startswith(f"{env_name}=") ): msg = f"Cannot parse sh line: {orig_tokens!r} -> {tokens!r}" raise ValueError(msg) return tokens[1].split("=", 1)[1] def message_emitted_factory( level: int, *, logger_name: str = cli.PROG_NAME, ) -> Callable[[str | re.Pattern[str], Sequence[tuple[str, int, str]]], bool]: """Return a function to test if a matching message was emitted. Args: level: The level to match messages at. logger_name: The name of the logger to match against. """ def message_emitted( text: str | re.Pattern[str], record_tuples: Sequence[tuple[str, int, str]], ) -> bool: """Return true if a matching message was emitted. Args: text: Substring or pattern to match against. record_tuples: Items to match. """ def check_record(record: tuple[str, int, str]) -> bool: if record[:2] != (logger_name, level): return False if isinstance(text, str): return text in record[2] return text.match(record[2]) is not None # pragma: no cover return any(map(check_record, record_tuples)) return message_emitted # No need to assert debug messages as of yet. info_emitted = message_emitted_factory(logging.INFO) warning_emitted = message_emitted_factory(logging.WARNING) deprecation_warning_emitted = message_emitted_factory( logging.WARNING, logger_name=f"{cli.PROG_NAME}.deprecation" ) deprecation_info_emitted = message_emitted_factory( logging.INFO, logger_name=f"{cli.PROG_NAME}.deprecation" ) error_emitted = message_emitted_factory(logging.ERROR) class Parametrize(types.SimpleNamespace): VAULT_CONFIG_FORMATS_DATA = pytest.mark.parametrize( ["config", "format", "config_data"], [ pytest.param( VAULT_V02_CONFIG, "v0.2", VAULT_V02_CONFIG_DATA, id="0.2", ), pytest.param( VAULT_V03_CONFIG, "v0.3", VAULT_V03_CONFIG_DATA, id="0.3", ), pytest.param( VAULT_STOREROOM_CONFIG_ZIPPED, "storeroom", VAULT_STOREROOM_CONFIG_DATA, id="storeroom", ), ], )