Marco Ricci commited on 2024-07-28 21:32:08
Zeige 15 geänderte Dateien mit 612 Einfügungen und 619 Löschungen.
These modules were already strongly coupled to `derivepassphrase`, and prone to namespace collisions (particularly `ssh_agent_client`). Furthermore, maintaining the author and version information in three top-level modules is error-prone. A direct inclusion of `sequin` and `ssh_agent_client` revealed a circular import, so the bulk of `derivepassphrase` was delegated to a new submodule `derivepassphrase.vault`. This also yields a cleaner import graph. Also, because the shorter name makes more sense in this context, rename `derivepassphrase.ssh_agent_client` to `derivepassphrase.ssh_agent`. The two `types` submodules are not yet consolidated.
| ... | ... |
@@ -63,7 +63,7 @@ exclude = [ |
| 63 | 63 |
] |
| 64 | 64 |
|
| 65 | 65 |
[tool.hatch.build.targets.wheel] |
| 66 |
-packages = ['src/derivepassphrase', 'src/sequin', 'src/ssh_agent_client'] |
|
| 66 |
+packages = ['src/derivepassphrase'] |
|
| 67 | 67 |
|
| 68 | 68 |
[tool.hatch.envs.hatch-test] |
| 69 | 69 |
default-args = ['src', 'tests'] |
| ... | ... |
@@ -99,7 +99,7 @@ check = "mypy --install-types --non-interactive {args:src/derivepassphrase tests
|
| 99 | 99 |
directory = "html/coverage" |
| 100 | 100 |
|
| 101 | 101 |
[tool.coverage.run] |
| 102 |
-source_pkgs = ["derivepassphrase", "sequin", "ssh_agent_client", "tests"] |
|
| 102 |
+source_pkgs = ["derivepassphrase", "tests"] |
|
| 103 | 103 |
branch = true |
| 104 | 104 |
parallel = true |
| 105 | 105 |
omit = [ |
| ... | ... |
@@ -4,537 +4,5 @@ |
| 4 | 4 |
|
| 5 | 5 |
"""Work-alike of vault(1) – a deterministic, stateless password manager""" # noqa: RUF002 |
| 6 | 6 |
|
| 7 |
-from __future__ import annotations |
|
| 8 |
- |
|
| 9 |
-import base64 |
|
| 10 |
-import collections |
|
| 11 |
-import hashlib |
|
| 12 |
-import math |
|
| 13 |
-import unicodedata |
|
| 14 |
-from collections.abc import Callable |
|
| 15 |
-from typing import TypeAlias |
|
| 16 |
- |
|
| 17 |
-from typing_extensions import assert_type |
|
| 18 |
- |
|
| 19 |
-import sequin |
|
| 20 |
-import ssh_agent_client |
|
| 21 |
- |
|
| 22 | 7 |
__author__ = 'Marco Ricci <m@the13thletter.info>' |
| 23 | 8 |
__version__ = '0.1.3' |
| 24 |
- |
|
| 25 |
- |
|
| 26 |
-class AmbiguousByteRepresentationError(ValueError): |
|
| 27 |
- """The object has an ambiguous byte representation.""" |
|
| 28 |
- |
|
| 29 |
- def __init__(self) -> None: |
|
| 30 |
- super().__init__('text string has ambiguous byte representation')
|
|
| 31 |
- |
|
| 32 |
- |
|
| 33 |
-_CHARSETS = collections.OrderedDict([ |
|
| 34 |
- ('lower', b'abcdefghijklmnopqrstuvwxyz'),
|
|
| 35 |
- ('upper', b'ABCDEFGHIJKLMNOPQRSTUVWXYZ'),
|
|
| 36 |
- ('alpha', b''), # Placeholder.
|
|
| 37 |
- ('number', b'0123456789'),
|
|
| 38 |
- ('alphanum', b''), # Placeholder.
|
|
| 39 |
- ('space', b' '),
|
|
| 40 |
- ('dash', b'-_'),
|
|
| 41 |
- ('symbol', b'!"#$%&\'()*+,./:;<=>?@[\\]^{|}~-_'),
|
|
| 42 |
- ('all', b''), # Placeholder.
|
|
| 43 |
-]) |
|
| 44 |
-_CHARSETS['alpha'] = _CHARSETS['lower'] + _CHARSETS['upper'] |
|
| 45 |
-_CHARSETS['alphanum'] = _CHARSETS['alpha'] + _CHARSETS['number'] |
|
| 46 |
-_CHARSETS['all'] = ( |
|
| 47 |
- _CHARSETS['alphanum'] + _CHARSETS['space'] + _CHARSETS['symbol'] |
|
| 48 |
-) |
|
| 49 |
- |
|
| 50 |
- |
|
| 51 |
-class Vault: |
|
| 52 |
- """A work-alike of James Coglan's vault. |
|
| 53 |
- |
|
| 54 |
- Store settings for generating (actually: deriving) passphrases for |
|
| 55 |
- named services, with various constraints, given only a master |
|
| 56 |
- passphrase. Also, actually generate the passphrase. The derivation |
|
| 57 |
- is deterministic and non-secret; only the master passphrase need be |
|
| 58 |
- kept secret. The implementation is compatible with [vault][]. |
|
| 59 |
- |
|
| 60 |
- [James Coglan explains the passphrase derivation algorithm in great |
|
| 61 |
- detail][ALGORITHM] in his blog post on said topic: A principally |
|
| 62 |
- infinite bit stream is obtained by running a key-derivation function |
|
| 63 |
- on the master passphrase and the service name, then this bit stream |
|
| 64 |
- is fed into a [Sequin][sequin.Sequin] to generate random numbers in |
|
| 65 |
- the correct range, and finally these random numbers select |
|
| 66 |
- passphrase characters until the desired length is reached. |
|
| 67 |
- |
|
| 68 |
- [vault]: https://getvau.lt |
|
| 69 |
- [ALGORITHM]: https://blog.jcoglan.com/2012/07/16/designing-vaults-generator-algorithm/ |
|
| 70 |
- |
|
| 71 |
- """ |
|
| 72 |
- |
|
| 73 |
- _UUID = b'e87eb0f4-34cb-46b9-93ad-766c5ab063e7' |
|
| 74 |
- """A tag used by vault in the bit stream generation.""" |
|
| 75 |
- _CHARSETS = _CHARSETS |
|
| 76 |
- """ |
|
| 77 |
- Known character sets from which to draw passphrase characters. |
|
| 78 |
- Relies on a certain, fixed order for their definition and their |
|
| 79 |
- contents. |
|
| 80 |
- |
|
| 81 |
- """ |
|
| 82 |
- |
|
| 83 |
- def __init__( |
|
| 84 |
- self, |
|
| 85 |
- *, |
|
| 86 |
- phrase: bytes | bytearray | str = b'', |
|
| 87 |
- length: int = 20, |
|
| 88 |
- repeat: int = 0, |
|
| 89 |
- lower: int | None = None, |
|
| 90 |
- upper: int | None = None, |
|
| 91 |
- number: int | None = None, |
|
| 92 |
- space: int | None = None, |
|
| 93 |
- dash: int | None = None, |
|
| 94 |
- symbol: int | None = None, |
|
| 95 |
- ) -> None: |
|
| 96 |
- """Initialize the Vault object. |
|
| 97 |
- |
|
| 98 |
- Args: |
|
| 99 |
- phrase: |
|
| 100 |
- The master passphrase from which to derive the service |
|
| 101 |
- passphrases. If a text string, then the byte |
|
| 102 |
- representation must be unique. |
|
| 103 |
- length: |
|
| 104 |
- Desired passphrase length. |
|
| 105 |
- repeat: |
|
| 106 |
- The maximum number of immediate character repetitions |
|
| 107 |
- allowed in the passphrase. Disabled if set to 0. |
|
| 108 |
- lower: |
|
| 109 |
- Optional constraint on ASCII lowercase characters. If |
|
| 110 |
- positive, include this many lowercase characters |
|
| 111 |
- somewhere in the passphrase. If 0, avoid lowercase |
|
| 112 |
- characters altogether. |
|
| 113 |
- upper: |
|
| 114 |
- Same as `lower`, but for ASCII uppercase characters. |
|
| 115 |
- number: |
|
| 116 |
- Same as `lower`, but for ASCII digits. |
|
| 117 |
- space: |
|
| 118 |
- Same as `lower`, but for the space character. |
|
| 119 |
- dash: |
|
| 120 |
- Same as `lower`, but for the hyphen-minus and underscore |
|
| 121 |
- characters. |
|
| 122 |
- symbol: |
|
| 123 |
- Same as `lower`, but for all other hitherto unlisted |
|
| 124 |
- ASCII printable characters (except backquote). |
|
| 125 |
- |
|
| 126 |
- Raises: |
|
| 127 |
- AmbiguousByteRepresentationError: |
|
| 128 |
- The phrase is a text string with differing NFC- and |
|
| 129 |
- NFD-normalized UTF-8 byte representations. |
|
| 130 |
- |
|
| 131 |
- """ |
|
| 132 |
- self._phrase = self._get_binary_string(phrase) |
|
| 133 |
- self._length = length |
|
| 134 |
- self._repeat = repeat |
|
| 135 |
- self._allowed = bytearray(self._CHARSETS['all']) |
|
| 136 |
- self._required: list[bytes] = [] |
|
| 137 |
- |
|
| 138 |
- def subtract_or_require( |
|
| 139 |
- count: int | None, characters: bytes | bytearray |
|
| 140 |
- ) -> None: |
|
| 141 |
- if not isinstance(count, int): |
|
| 142 |
- return |
|
| 143 |
- if count <= 0: |
|
| 144 |
- self._allowed = self._subtract(characters, self._allowed) |
|
| 145 |
- else: |
|
| 146 |
- for _ in range(count): |
|
| 147 |
- self._required.append(characters) |
|
| 148 |
- |
|
| 149 |
- subtract_or_require(lower, self._CHARSETS['lower']) |
|
| 150 |
- subtract_or_require(upper, self._CHARSETS['upper']) |
|
| 151 |
- subtract_or_require(number, self._CHARSETS['number']) |
|
| 152 |
- subtract_or_require(space, self._CHARSETS['space']) |
|
| 153 |
- subtract_or_require(dash, self._CHARSETS['dash']) |
|
| 154 |
- subtract_or_require(symbol, self._CHARSETS['symbol']) |
|
| 155 |
- if len(self._required) > self._length: |
|
| 156 |
- msg = 'requested passphrase length too short' |
|
| 157 |
- raise ValueError(msg) |
|
| 158 |
- if not self._allowed: |
|
| 159 |
- msg = 'no allowed characters left' |
|
| 160 |
- raise ValueError(msg) |
|
| 161 |
- for _ in range(len(self._required), self._length): |
|
| 162 |
- self._required.append(bytes(self._allowed)) |
|
| 163 |
- |
|
| 164 |
- def _entropy(self) -> float: |
|
| 165 |
- """Estimate the passphrase entropy, given the current settings. |
|
| 166 |
- |
|
| 167 |
- The entropy is the base 2 logarithm of the amount of |
|
| 168 |
- possibilities. We operate directly on the logarithms, and use |
|
| 169 |
- sorting and [`math.fsum`][] to keep high accuracy. |
|
| 170 |
- |
|
| 171 |
- Note: |
|
| 172 |
- We actually overestimate the entropy here because of poor |
|
| 173 |
- handling of character repetitions. In the extreme, assuming |
|
| 174 |
- that only one character were allowed, then because there is |
|
| 175 |
- only one possible string of each given length, the entropy |
|
| 176 |
- of that string `s` is always be zero. However, we calculate |
|
| 177 |
- the entropy as `math.log2(math.factorial(len(s)))`, i.e. we |
|
| 178 |
- assume the characters at the respective string position are |
|
| 179 |
- distinguishable from each other. |
|
| 180 |
- |
|
| 181 |
- Returns: |
|
| 182 |
- A valid (and somewhat close) upper bound to the entropy. |
|
| 183 |
- |
|
| 184 |
- """ |
|
| 185 |
- factors: list[int] = [] |
|
| 186 |
- if not self._required or any(not x for x in self._required): |
|
| 187 |
- return float('-inf')
|
|
| 188 |
- for i, charset in enumerate(self._required): |
|
| 189 |
- factors.extend([i + 1, len(charset)]) |
|
| 190 |
- factors.sort() |
|
| 191 |
- return math.fsum(math.log2(f) for f in factors) |
|
| 192 |
- |
|
| 193 |
- def _estimate_sufficient_hash_length( |
|
| 194 |
- self, |
|
| 195 |
- safety_factor: float = 2.0, |
|
| 196 |
- ) -> int: |
|
| 197 |
- """Estimate the sufficient hash length, given the current settings. |
|
| 198 |
- |
|
| 199 |
- Using the entropy (via `_entropy`) and a safety factor, give an |
|
| 200 |
- initial estimate of the length to use for `create_hash` such |
|
| 201 |
- that using a `Sequin` with this hash will not exhaust it during |
|
| 202 |
- passphrase generation. |
|
| 203 |
- |
|
| 204 |
- Args: |
|
| 205 |
- safety_factor: The safety factor. Must be at least 1. |
|
| 206 |
- |
|
| 207 |
- Returns: |
|
| 208 |
- The estimated sufficient hash length. |
|
| 209 |
- |
|
| 210 |
- Warning: |
|
| 211 |
- This is a heuristic, not an exact computation; it may |
|
| 212 |
- underestimate the true necessary hash length. It is |
|
| 213 |
- intended as a starting point for searching for a sufficient |
|
| 214 |
- hash length, usually by doubling the hash length each time |
|
| 215 |
- it does not yet prove so. |
|
| 216 |
- |
|
| 217 |
- """ |
|
| 218 |
- try: |
|
| 219 |
- safety_factor = float(safety_factor) |
|
| 220 |
- except TypeError as e: |
|
| 221 |
- msg = f'invalid safety factor: not a float: {safety_factor!r}'
|
|
| 222 |
- raise TypeError(msg) from e |
|
| 223 |
- if not math.isfinite(safety_factor) or safety_factor < 1.0: |
|
| 224 |
- msg = f'invalid safety factor {safety_factor!r}'
|
|
| 225 |
- raise ValueError(msg) |
|
| 226 |
- # Ensure the bound is strictly positive. |
|
| 227 |
- entropy_bound = max(1, self._entropy()) |
|
| 228 |
- return int(math.ceil(safety_factor * entropy_bound / 8)) |
|
| 229 |
- |
|
| 230 |
- @staticmethod |
|
| 231 |
- def _get_binary_string(s: bytes | bytearray | str, /) -> bytes: |
|
| 232 |
- """Convert the input string to a read-only, binary string. |
|
| 233 |
- |
|
| 234 |
- If it is a text string, then test for an unambiguous UTF-8 |
|
| 235 |
- representation, otherwise abort. (That is, check whether the |
|
| 236 |
- NFC and NFD forms of the string coincide.) |
|
| 237 |
- |
|
| 238 |
- Args: |
|
| 239 |
- s: The string to (check and) convert. |
|
| 240 |
- |
|
| 241 |
- Returns: |
|
| 242 |
- A read-only, binary copy of the string. |
|
| 243 |
- |
|
| 244 |
- Raises: |
|
| 245 |
- AmbiguousByteRepresentationError: |
|
| 246 |
- The text string has differing NFC- and NFD-normalized |
|
| 247 |
- UTF-8 byte representations. |
|
| 248 |
- |
|
| 249 |
- """ |
|
| 250 |
- if isinstance(s, str): |
|
| 251 |
- norm = unicodedata.normalize |
|
| 252 |
- if norm('NFC', s) != norm('NFD', s):
|
|
| 253 |
- raise AmbiguousByteRepresentationError |
|
| 254 |
- return s.encode('UTF-8')
|
|
| 255 |
- return bytes(s) |
|
| 256 |
- |
|
| 257 |
- @classmethod |
|
| 258 |
- def create_hash( |
|
| 259 |
- cls, |
|
| 260 |
- phrase: bytes | bytearray | str, |
|
| 261 |
- service: bytes | bytearray, |
|
| 262 |
- *, |
|
| 263 |
- length: int = 32, |
|
| 264 |
- ) -> bytes: |
|
| 265 |
- r"""Create a pseudorandom byte stream from phrase and service. |
|
| 266 |
- |
|
| 267 |
- Create a pseudorandom byte stream from `phrase` and `service` by |
|
| 268 |
- feeding them into the key-derivation function PBKDF2 |
|
| 269 |
- (8 iterations, using SHA-1). |
|
| 270 |
- |
|
| 271 |
- Args: |
|
| 272 |
- phrase: |
|
| 273 |
- A master passphrase, or sometimes an SSH signature. |
|
| 274 |
- Used as the key for PBKDF2, the underlying cryptographic |
|
| 275 |
- primitive. |
|
| 276 |
- |
|
| 277 |
- If a text string, then the byte representation must be |
|
| 278 |
- unique. |
|
| 279 |
- service: |
|
| 280 |
- A vault service name. Will be suffixed with |
|
| 281 |
- `Vault._UUID`, and then used as the salt value for |
|
| 282 |
- PBKDF2. |
|
| 283 |
- length: |
|
| 284 |
- The length of the byte stream to generate. |
|
| 285 |
- |
|
| 286 |
- Returns: |
|
| 287 |
- A pseudorandom byte string of length `length`. |
|
| 288 |
- |
|
| 289 |
- Raises: |
|
| 290 |
- AmbiguousByteRepresentationError: |
|
| 291 |
- The phrase is a text string with differing NFC- and |
|
| 292 |
- NFD-normalized UTF-8 byte representations. |
|
| 293 |
- |
|
| 294 |
- Note: |
|
| 295 |
- Shorter values returned from this method (with the same key |
|
| 296 |
- and message) are prefixes of longer values returned from |
|
| 297 |
- this method. (This property is inherited from the |
|
| 298 |
- underlying PBKDF2 function.) It is thus safe (if slow) to |
|
| 299 |
- call this method with the same input with ever-increasing |
|
| 300 |
- target lengths. |
|
| 301 |
- |
|
| 302 |
- Examples: |
|
| 303 |
- >>> # See also Vault.phrase_from_key examples. |
|
| 304 |
- >>> phrase = bytes.fromhex('''
|
|
| 305 |
- ... 00 00 00 0b 73 73 68 2d 65 64 32 35 35 31 39 |
|
| 306 |
- ... 00 00 00 40 |
|
| 307 |
- ... f0 98 19 80 6c 1a 97 d5 26 03 6e cc e3 65 8f 86 |
|
| 308 |
- ... 66 07 13 19 13 09 21 33 33 f9 e4 36 53 1d af fd |
|
| 309 |
- ... 0d 08 1f ec f8 73 9b 8c 5f 55 39 16 7c 53 54 2c |
|
| 310 |
- ... 1e 52 bb 30 ed 7f 89 e2 2f 69 51 55 d8 9e a6 02 |
|
| 311 |
- ... ''') |
|
| 312 |
- >>> Vault.create_hash(phrase, b'some_service', length=4) |
|
| 313 |
- b'M\xb1<S' |
|
| 314 |
- >>> Vault.create_hash(phrase, b'some_service', length=16) |
|
| 315 |
- b'M\xb1<S\x827E\xd1M\xaf\xf8~\xc8n\x10\xcc' |
|
| 316 |
- >>> Vault.create_hash(phrase, b'NOSUCHSERVICE', length=16) |
|
| 317 |
- b'\x1c\xc3\x9c\xd9\xb6\x1a\x99CS\x07\xc41\xf4\x85#s' |
|
| 318 |
- |
|
| 319 |
- """ |
|
| 320 |
- phrase = cls._get_binary_string(phrase) |
|
| 321 |
- assert not isinstance(phrase, str) |
|
| 322 |
- salt = bytes(service) + cls._UUID |
|
| 323 |
- return hashlib.pbkdf2_hmac( |
|
| 324 |
- hash_name='sha1', |
|
| 325 |
- password=phrase, |
|
| 326 |
- salt=salt, |
|
| 327 |
- iterations=8, |
|
| 328 |
- dklen=length, |
|
| 329 |
- ) |
|
| 330 |
- |
|
| 331 |
- def generate( |
|
| 332 |
- self, |
|
| 333 |
- service_name: str | bytes | bytearray, |
|
| 334 |
- /, |
|
| 335 |
- *, |
|
| 336 |
- phrase: bytes | bytearray | str = b'', |
|
| 337 |
- ) -> bytes: |
|
| 338 |
- r"""Generate a service passphrase. |
|
| 339 |
- |
|
| 340 |
- Args: |
|
| 341 |
- service_name: |
|
| 342 |
- The service name. |
|
| 343 |
- phrase: |
|
| 344 |
- If given, override the passphrase given during |
|
| 345 |
- construction. |
|
| 346 |
- |
|
| 347 |
- If a text string, then the byte representation must be |
|
| 348 |
- unique. |
|
| 349 |
- |
|
| 350 |
- Returns: |
|
| 351 |
- The service passphrase. |
|
| 352 |
- |
|
| 353 |
- Raises: |
|
| 354 |
- AmbiguousByteRepresentationError: |
|
| 355 |
- The phrase is a text string with differing NFC- and |
|
| 356 |
- NFD-normalized UTF-8 byte representations. |
|
| 357 |
- |
|
| 358 |
- Examples: |
|
| 359 |
- >>> phrase = b'She cells C shells bye the sea shoars' |
|
| 360 |
- >>> # Using default options in constructor. |
|
| 361 |
- >>> Vault(phrase=phrase).generate(b'google') |
|
| 362 |
- b': 4TVH#5:aZl8LueOT\\{'
|
|
| 363 |
- >>> # Also possible: |
|
| 364 |
- >>> Vault().generate(b'google', phrase=phrase) |
|
| 365 |
- b': 4TVH#5:aZl8LueOT\\{'
|
|
| 366 |
- |
|
| 367 |
- """ |
|
| 368 |
- hash_length = self._estimate_sufficient_hash_length() |
|
| 369 |
- assert hash_length >= 1 |
|
| 370 |
- # Ensure the phrase is a bytes object. Needed later for safe |
|
| 371 |
- # concatenation. |
|
| 372 |
- if isinstance(service_name, str): |
|
| 373 |
- service_name = service_name.encode('utf-8')
|
|
| 374 |
- elif not isinstance(service_name, bytes): |
|
| 375 |
- service_name = bytes(service_name) |
|
| 376 |
- assert_type(service_name, bytes) |
|
| 377 |
- if not phrase: |
|
| 378 |
- phrase = self._phrase |
|
| 379 |
- phrase = self._get_binary_string(phrase) |
|
| 380 |
- # Repeat the passphrase generation with ever-increasing hash |
|
| 381 |
- # lengths, until the passphrase can be formed without exhausting |
|
| 382 |
- # the sequin. See the guarantee in the create_hash method for |
|
| 383 |
- # why this works. |
|
| 384 |
- while True: |
|
| 385 |
- try: |
|
| 386 |
- required = self._required[:] |
|
| 387 |
- seq = sequin.Sequin( |
|
| 388 |
- self.create_hash( |
|
| 389 |
- phrase=phrase, service=service_name, length=hash_length |
|
| 390 |
- ) |
|
| 391 |
- ) |
|
| 392 |
- result = bytearray() |
|
| 393 |
- while len(result) < self._length: |
|
| 394 |
- pos = seq.generate(len(required)) |
|
| 395 |
- charset = required.pop(pos) |
|
| 396 |
- # Determine if an unlucky choice right now might |
|
| 397 |
- # violate the restriction on repeated characters. |
|
| 398 |
- # That is, check if the current partial passphrase |
|
| 399 |
- # ends with r - 1 copies of the same character |
|
| 400 |
- # (where r is the repeat limit that must not be |
|
| 401 |
- # reached), and if so, remove this same character |
|
| 402 |
- # from the current character's allowed set. |
|
| 403 |
- if self._repeat and result: |
|
| 404 |
- bad_suffix = bytes(result[-1:]) * (self._repeat - 1) |
|
| 405 |
- if result.endswith(bad_suffix): |
|
| 406 |
- charset = self._subtract( |
|
| 407 |
- bytes(result[-1:]), charset |
|
| 408 |
- ) |
|
| 409 |
- pos = seq.generate(len(charset)) |
|
| 410 |
- result.extend(charset[pos : pos + 1]) |
|
| 411 |
- except sequin.SequinExhaustedError: |
|
| 412 |
- hash_length *= 2 |
|
| 413 |
- else: |
|
| 414 |
- return bytes(result) |
|
| 415 |
- |
|
| 416 |
- @staticmethod |
|
| 417 |
- def _is_suitable_ssh_key(key: bytes | bytearray, /) -> bool: |
|
| 418 |
- """Check whether the key is suitable for passphrase derivation. |
|
| 419 |
- |
|
| 420 |
- Currently, this only checks whether signatures with this key |
|
| 421 |
- type are deterministic. |
|
| 422 |
- |
|
| 423 |
- Args: |
|
| 424 |
- key: SSH public key to check. |
|
| 425 |
- |
|
| 426 |
- Returns: |
|
| 427 |
- True if and only if the key is suitable for use in deriving |
|
| 428 |
- a passphrase deterministically. |
|
| 429 |
- |
|
| 430 |
- """ |
|
| 431 |
- TestFunc: TypeAlias = Callable[[bytes | bytearray], bool] |
|
| 432 |
- deterministic_signature_types: dict[str, TestFunc] |
|
| 433 |
- deterministic_signature_types = {
|
|
| 434 |
- 'ssh-ed25519': lambda k: k.startswith( |
|
| 435 |
- b'\x00\x00\x00\x0bssh-ed25519' |
|
| 436 |
- ), |
|
| 437 |
- 'ssh-ed448': lambda k: k.startswith(b'\x00\x00\x00\x09ssh-ed448'), |
|
| 438 |
- 'ssh-rsa': lambda k: k.startswith(b'\x00\x00\x00\x07ssh-rsa'), |
|
| 439 |
- } |
|
| 440 |
- return any(v(key) for v in deterministic_signature_types.values()) |
|
| 441 |
- |
|
| 442 |
- @classmethod |
|
| 443 |
- def phrase_from_key(cls, key: bytes | bytearray, /) -> bytes: |
|
| 444 |
- """Obtain the master passphrase from a configured SSH key. |
|
| 445 |
- |
|
| 446 |
- vault allows the usage of certain SSH keys to derive a master |
|
| 447 |
- passphrase, by signing the vault UUID with the SSH key. The key |
|
| 448 |
- type must ensure that signatures are deterministic. |
|
| 449 |
- |
|
| 450 |
- Args: |
|
| 451 |
- key: The (public) SSH key to use for signing. |
|
| 452 |
- |
|
| 453 |
- Returns: |
|
| 454 |
- The signature of the vault UUID under this key, unframed but |
|
| 455 |
- encoded in base64. |
|
| 456 |
- |
|
| 457 |
- Raises: |
|
| 458 |
- ValueError: |
|
| 459 |
- The SSH key is principally unsuitable for this use case. |
|
| 460 |
- Usually this means that the signature is not |
|
| 461 |
- deterministic. |
|
| 462 |
- |
|
| 463 |
- Examples: |
|
| 464 |
- >>> import base64 |
|
| 465 |
- >>> # Actual Ed25519 test public key. |
|
| 466 |
- >>> public_key = bytes.fromhex('''
|
|
| 467 |
- ... 00 00 00 0b 73 73 68 2d 65 64 32 35 35 31 39 |
|
| 468 |
- ... 00 00 00 20 |
|
| 469 |
- ... 81 78 81 68 26 d6 02 48 5f 0f ff 32 48 6f e4 c1 |
|
| 470 |
- ... 30 89 dc 1c 6a 45 06 09 e9 09 0f fb c2 12 69 76 |
|
| 471 |
- ... ''') |
|
| 472 |
- >>> expected_sig_raw = bytes.fromhex('''
|
|
| 473 |
- ... 00 00 00 0b 73 73 68 2d 65 64 32 35 35 31 39 |
|
| 474 |
- ... 00 00 00 40 |
|
| 475 |
- ... f0 98 19 80 6c 1a 97 d5 26 03 6e cc e3 65 8f 86 |
|
| 476 |
- ... 66 07 13 19 13 09 21 33 33 f9 e4 36 53 1d af fd |
|
| 477 |
- ... 0d 08 1f ec f8 73 9b 8c 5f 55 39 16 7c 53 54 2c |
|
| 478 |
- ... 1e 52 bb 30 ed 7f 89 e2 2f 69 51 55 d8 9e a6 02 |
|
| 479 |
- ... ''') |
|
| 480 |
- >>> # Raw Ed25519 signatures are 64 bytes long. |
|
| 481 |
- >>> signature_blob = expected_sig_raw[-64:] |
|
| 482 |
- >>> phrase = base64.standard_b64encode(signature_blob) |
|
| 483 |
- >>> Vault.phrase_from_key(phrase) == expected # doctest:+SKIP |
|
| 484 |
- True |
|
| 485 |
- |
|
| 486 |
- """ |
|
| 487 |
- if not cls._is_suitable_ssh_key(key): |
|
| 488 |
- msg = ( |
|
| 489 |
- 'unsuitable SSH key: bad key, or ' |
|
| 490 |
- 'signature not deterministic' |
|
| 491 |
- ) |
|
| 492 |
- raise ValueError(msg) |
|
| 493 |
- with ssh_agent_client.SSHAgentClient() as client: |
|
| 494 |
- raw_sig = client.sign(key, cls._UUID) |
|
| 495 |
- _keytype, trailer = client.unstring_prefix(raw_sig) |
|
| 496 |
- signature_blob = client.unstring(trailer) |
|
| 497 |
- return bytes(base64.standard_b64encode(signature_blob)) |
|
| 498 |
- |
|
| 499 |
- @staticmethod |
|
| 500 |
- def _subtract( |
|
| 501 |
- charset: bytes | bytearray, |
|
| 502 |
- allowed: bytes | bytearray, |
|
| 503 |
- ) -> bytearray: |
|
| 504 |
- """Remove the characters in charset from allowed. |
|
| 505 |
- |
|
| 506 |
- This preserves the relative order of characters in `allowed`. |
|
| 507 |
- |
|
| 508 |
- Args: |
|
| 509 |
- charset: |
|
| 510 |
- Characters to remove. Must not contain duplicate |
|
| 511 |
- characters. |
|
| 512 |
- allowed: |
|
| 513 |
- Character set to remove the other characters from. Must |
|
| 514 |
- not contain duplicate characters. |
|
| 515 |
- |
|
| 516 |
- Returns: |
|
| 517 |
- The pruned "allowed" character set. |
|
| 518 |
- |
|
| 519 |
- Raises: |
|
| 520 |
- ValueError: |
|
| 521 |
- `allowed` or `charset` contained duplicate characters. |
|
| 522 |
- |
|
| 523 |
- """ |
|
| 524 |
- allowed = ( |
|
| 525 |
- allowed if isinstance(allowed, bytearray) else bytearray(allowed) |
|
| 526 |
- ) |
|
| 527 |
- assert_type(allowed, bytearray) |
|
| 528 |
- msg_dup_characters = 'duplicate characters in set' |
|
| 529 |
- if len(frozenset(allowed)) != len(allowed): |
|
| 530 |
- raise ValueError(msg_dup_characters) |
|
| 531 |
- if len(frozenset(charset)) != len(charset): |
|
| 532 |
- raise ValueError(msg_dup_characters) |
|
| 533 |
- for c in charset: |
|
| 534 |
- try: |
|
| 535 |
- pos = allowed.index(c) |
|
| 536 |
- except ValueError: |
|
| 537 |
- pass |
|
| 538 |
- else: |
|
| 539 |
- allowed[pos : pos + 1] = [] |
|
| 540 |
- return allowed |
| ... | ... |
@@ -27,8 +27,7 @@ from typing_extensions import ( |
| 27 | 27 |
) |
| 28 | 28 |
|
| 29 | 29 |
import derivepassphrase as dpp |
| 30 |
-import ssh_agent_client |
|
| 31 |
-import ssh_agent_client.types |
|
| 30 |
+from derivepassphrase import ssh_agent, vault |
|
| 32 | 31 |
from derivepassphrase import types as dpp_types |
| 33 | 32 |
|
| 34 | 33 |
if TYPE_CHECKING: |
| ... | ... |
@@ -128,12 +127,12 @@ def _save_config(config: dpp_types.VaultConfig, /) -> None: |
| 128 | 127 |
|
| 129 | 128 |
|
| 130 | 129 |
def _get_suitable_ssh_keys( |
| 131 |
- conn: ssh_agent_client.SSHAgentClient | socket.socket | None = None, / |
|
| 132 |
-) -> Iterator[ssh_agent_client.types.KeyCommentPair]: |
|
| 130 |
+ conn: ssh_agent.SSHAgentClient | socket.socket | None = None, / |
|
| 131 |
+) -> Iterator[ssh_agent.types.KeyCommentPair]: |
|
| 133 | 132 |
"""Yield all SSH keys suitable for passphrase derivation. |
| 134 | 133 |
|
| 135 | 134 |
Suitable SSH keys are queried from the running SSH agent (see |
| 136 |
- [`ssh_agent_client.SSHAgentClient.list_keys`][]). |
|
| 135 |
+ [`ssh_agent.SSHAgentClient.list_keys`][]). |
|
| 137 | 136 |
|
| 138 | 137 |
Args: |
| 139 | 138 |
conn: |
| ... | ... |
@@ -165,14 +164,14 @@ def _get_suitable_ssh_keys( |
| 165 | 164 |
There was an error communicating with the SSH agent. |
| 166 | 165 |
|
| 167 | 166 |
""" |
| 168 |
- client: ssh_agent_client.SSHAgentClient |
|
| 167 |
+ client: ssh_agent.SSHAgentClient |
|
| 169 | 168 |
client_context: contextlib.AbstractContextManager[Any] |
| 170 | 169 |
match conn: |
| 171 |
- case ssh_agent_client.SSHAgentClient(): |
|
| 170 |
+ case ssh_agent.SSHAgentClient(): |
|
| 172 | 171 |
client = conn |
| 173 | 172 |
client_context = contextlib.nullcontext() |
| 174 | 173 |
case socket.socket() | None: |
| 175 |
- client = ssh_agent_client.SSHAgentClient(socket=conn) |
|
| 174 |
+ client = ssh_agent.SSHAgentClient(socket=conn) |
|
| 176 | 175 |
client_context = client |
| 177 | 176 |
case _: # pragma: no cover |
| 178 | 177 |
assert_never(conn) |
| ... | ... |
@@ -186,7 +185,7 @@ def _get_suitable_ssh_keys( |
| 186 | 185 |
suitable_keys = copy.copy(all_key_comment_pairs) |
| 187 | 186 |
for pair in all_key_comment_pairs: |
| 188 | 187 |
key, _comment = pair |
| 189 |
- if dpp.Vault._is_suitable_ssh_key(key): # noqa: SLF001 |
|
| 188 |
+ if vault.Vault._is_suitable_ssh_key(key): # noqa: SLF001 |
|
| 190 | 189 |
yield pair |
| 191 | 190 |
if not suitable_keys: # pragma: no cover |
| 192 | 191 |
raise IndexError(_NO_USABLE_KEYS) |
| ... | ... |
@@ -262,12 +261,12 @@ def _prompt_for_selection( |
| 262 | 261 |
|
| 263 | 262 |
|
| 264 | 263 |
def _select_ssh_key( |
| 265 |
- conn: ssh_agent_client.SSHAgentClient | socket.socket | None = None, / |
|
| 264 |
+ conn: ssh_agent.SSHAgentClient | socket.socket | None = None, / |
|
| 266 | 265 |
) -> bytes | bytearray: |
| 267 | 266 |
"""Interactively select an SSH key for passphrase derivation. |
| 268 | 267 |
|
| 269 | 268 |
Suitable SSH keys are queried from the running SSH agent (see |
| 270 |
- [`ssh_agent_client.SSHAgentClient.list_keys`][]), then the user is |
|
| 269 |
+ [`ssh_agent.SSHAgentClient.list_keys`][]), then the user is |
|
| 271 | 270 |
prompted interactively (see [`click.prompt`][]) for a selection. |
| 272 | 271 |
|
| 273 | 272 |
Args: |
| ... | ... |
@@ -302,7 +301,7 @@ def _select_ssh_key( |
| 302 | 301 |
""" |
| 303 | 302 |
suitable_keys = list(_get_suitable_ssh_keys(conn)) |
| 304 | 303 |
key_listing: list[str] = [] |
| 305 |
- unstring_prefix = ssh_agent_client.SSHAgentClient.unstring_prefix |
|
| 304 |
+ unstring_prefix = ssh_agent.SSHAgentClient.unstring_prefix |
|
| 306 | 305 |
for key, comment in suitable_keys: |
| 307 | 306 |
keytype = unstring_prefix(key)[0].decode('ASCII')
|
| 308 | 307 |
key_str = base64.standard_b64encode(key).decode('ASCII')
|
| ... | ... |
@@ -1055,7 +1054,7 @@ def derivepassphrase( |
| 1055 | 1054 |
def key_to_phrase( |
| 1056 | 1055 |
key: str | bytes | bytearray, |
| 1057 | 1056 |
) -> bytes | bytearray: |
| 1058 |
- return dpp.Vault.phrase_from_key( |
|
| 1057 |
+ return vault.Vault.phrase_from_key( |
|
| 1059 | 1058 |
base64.standard_b64decode(key) |
| 1060 | 1059 |
) |
| 1061 | 1060 |
|
| ... | ... |
@@ -1065,7 +1064,7 @@ def derivepassphrase( |
| 1065 | 1064 |
# service-specific (use that one). Otherwise, if only one of |
| 1066 | 1065 |
# key and phrase is set in the config, use that one. In all |
| 1067 | 1066 |
# these above cases, set the phrase via |
| 1068 |
- # derivepassphrase.Vault.phrase_from_key if a key is |
|
| 1067 |
+ # derivepassphrase.vault.Vault.phrase_from_key if a key is |
|
| 1069 | 1068 |
# given. Finally, if nothing is set, error out. |
| 1070 | 1069 |
if use_key or use_phrase: |
| 1071 | 1070 |
kwargs['phrase'] = key_to_phrase(key) if use_key else phrase |
| ... | ... |
@@ -1083,8 +1082,7 @@ def derivepassphrase( |
| 1083 | 1082 |
) |
| 1084 | 1083 |
raise click.UsageError(msg) |
| 1085 | 1084 |
kwargs.pop('key', '')
|
| 1086 |
- vault = dpp.Vault(**kwargs) |
|
| 1087 |
- result = vault.generate(service) |
|
| 1085 |
+ result = vault.Vault(**kwargs).generate(service) |
|
| 1088 | 1086 |
click.echo(result.decode('ASCII'))
|
| 1089 | 1087 |
|
| 1090 | 1088 |
|
| ... | ... |
@@ -14,8 +14,8 @@ deterministic, stateless password manager that recomputes passwords |
| 14 | 14 |
instead of storing them), and this reimplementation is used for |
| 15 | 15 |
a similar purpose. |
| 16 | 16 |
|
| 17 |
-The main API is the [`Sequin`] [sequin.Sequin] class, which is |
|
| 18 |
-thoroughly documented. |
|
| 17 |
+The main API is the [`Sequin`] [derivepassphrase.sequin.Sequin] class, |
|
| 18 |
+which is thoroughly documented. |
|
| 19 | 19 |
|
| 20 | 20 |
""" |
| 21 | 21 |
|
| ... | ... |
@@ -33,7 +33,6 @@ if TYPE_CHECKING: |
| 33 | 33 |
|
| 34 | 34 |
__all__ = ('Sequin', 'SequinExhaustedError')
|
| 35 | 35 |
__author__ = 'Marco Ricci <m@the13thletter.info>' |
| 36 |
-__version__ = '0.1.3' |
|
| 37 | 36 |
|
| 38 | 37 |
|
| 39 | 38 |
class Sequin: |
| ... | ... |
@@ -14,7 +14,7 @@ from typing import TYPE_CHECKING |
| 14 | 14 |
|
| 15 | 15 |
from typing_extensions import Self |
| 16 | 16 |
|
| 17 |
-from ssh_agent_client import types |
|
| 17 |
+from derivepassphrase.ssh_agent import types |
|
| 18 | 18 |
|
| 19 | 19 |
if TYPE_CHECKING: |
| 20 | 20 |
from collections.abc import Sequence |
| ... | ... |
@@ -22,7 +22,6 @@ if TYPE_CHECKING: |
| 22 | 22 |
|
| 23 | 23 |
__all__ = ('SSHAgentClient',)
|
| 24 | 24 |
__author__ = 'Marco Ricci <m@the13thletter.info>' |
| 25 |
-__version__ = '0.1.3' |
|
| 26 | 25 |
|
| 27 | 26 |
# In SSH bytestrings, the "length" of the byte string is stored as |
| 28 | 27 |
# a 4-byte/32-bit unsigned integer at the beginning. |
| ... | ... |
@@ -0,0 +1,538 @@ |
| 1 |
+# SPDX-FileCopyrightText: 2024 Marco Ricci <m@the13thletter.info> |
|
| 2 |
+# |
|
| 3 |
+# SPDX-License-Identifier: MIT |
|
| 4 |
+ |
|
| 5 |
+"""Work-alike of vault(1) – a deterministic, stateless password manager""" # noqa: RUF002 |
|
| 6 |
+ |
|
| 7 |
+from __future__ import annotations |
|
| 8 |
+ |
|
| 9 |
+import base64 |
|
| 10 |
+import collections |
|
| 11 |
+import hashlib |
|
| 12 |
+import math |
|
| 13 |
+import unicodedata |
|
| 14 |
+from collections.abc import Callable |
|
| 15 |
+from typing import TypeAlias |
|
| 16 |
+ |
|
| 17 |
+from typing_extensions import assert_type |
|
| 18 |
+ |
|
| 19 |
+from derivepassphrase import sequin, ssh_agent |
|
| 20 |
+ |
|
| 21 |
+__author__ = 'Marco Ricci <m@the13thletter.info>' |
|
| 22 |
+ |
|
| 23 |
+ |
|
| 24 |
+class AmbiguousByteRepresentationError(ValueError): |
|
| 25 |
+ """The object has an ambiguous byte representation.""" |
|
| 26 |
+ |
|
| 27 |
+ def __init__(self) -> None: |
|
| 28 |
+ super().__init__('text string has ambiguous byte representation')
|
|
| 29 |
+ |
|
| 30 |
+ |
|
| 31 |
+_CHARSETS = collections.OrderedDict([ |
|
| 32 |
+ ('lower', b'abcdefghijklmnopqrstuvwxyz'),
|
|
| 33 |
+ ('upper', b'ABCDEFGHIJKLMNOPQRSTUVWXYZ'),
|
|
| 34 |
+ ('alpha', b''), # Placeholder.
|
|
| 35 |
+ ('number', b'0123456789'),
|
|
| 36 |
+ ('alphanum', b''), # Placeholder.
|
|
| 37 |
+ ('space', b' '),
|
|
| 38 |
+ ('dash', b'-_'),
|
|
| 39 |
+ ('symbol', b'!"#$%&\'()*+,./:;<=>?@[\\]^{|}~-_'),
|
|
| 40 |
+ ('all', b''), # Placeholder.
|
|
| 41 |
+]) |
|
| 42 |
+_CHARSETS['alpha'] = _CHARSETS['lower'] + _CHARSETS['upper'] |
|
| 43 |
+_CHARSETS['alphanum'] = _CHARSETS['alpha'] + _CHARSETS['number'] |
|
| 44 |
+_CHARSETS['all'] = ( |
|
| 45 |
+ _CHARSETS['alphanum'] + _CHARSETS['space'] + _CHARSETS['symbol'] |
|
| 46 |
+) |
|
| 47 |
+ |
|
| 48 |
+ |
|
| 49 |
+class Vault: |
|
| 50 |
+ """A work-alike of James Coglan's vault. |
|
| 51 |
+ |
|
| 52 |
+ Store settings for generating (actually: deriving) passphrases for |
|
| 53 |
+ named services, with various constraints, given only a master |
|
| 54 |
+ passphrase. Also, actually generate the passphrase. The derivation |
|
| 55 |
+ is deterministic and non-secret; only the master passphrase need be |
|
| 56 |
+ kept secret. The implementation is compatible with [vault][]. |
|
| 57 |
+ |
|
| 58 |
+ [James Coglan explains the passphrase derivation algorithm in great |
|
| 59 |
+ detail][ALGORITHM] in his blog post on said topic: A principally |
|
| 60 |
+ infinite bit stream is obtained by running a key-derivation function |
|
| 61 |
+ on the master passphrase and the service name, then this bit stream |
|
| 62 |
+ is fed into a [Sequin][sequin.Sequin] to generate random numbers in |
|
| 63 |
+ the correct range, and finally these random numbers select |
|
| 64 |
+ passphrase characters until the desired length is reached. |
|
| 65 |
+ |
|
| 66 |
+ [vault]: https://getvau.lt |
|
| 67 |
+ [ALGORITHM]: https://blog.jcoglan.com/2012/07/16/designing-vaults-generator-algorithm/ |
|
| 68 |
+ |
|
| 69 |
+ """ |
|
| 70 |
+ |
|
| 71 |
+ _UUID = b'e87eb0f4-34cb-46b9-93ad-766c5ab063e7' |
|
| 72 |
+ """A tag used by vault in the bit stream generation.""" |
|
| 73 |
+ _CHARSETS = _CHARSETS |
|
| 74 |
+ """ |
|
| 75 |
+ Known character sets from which to draw passphrase characters. |
|
| 76 |
+ Relies on a certain, fixed order for their definition and their |
|
| 77 |
+ contents. |
|
| 78 |
+ |
|
| 79 |
+ """ |
|
| 80 |
+ |
|
| 81 |
+ def __init__( |
|
| 82 |
+ self, |
|
| 83 |
+ *, |
|
| 84 |
+ phrase: bytes | bytearray | str = b'', |
|
| 85 |
+ length: int = 20, |
|
| 86 |
+ repeat: int = 0, |
|
| 87 |
+ lower: int | None = None, |
|
| 88 |
+ upper: int | None = None, |
|
| 89 |
+ number: int | None = None, |
|
| 90 |
+ space: int | None = None, |
|
| 91 |
+ dash: int | None = None, |
|
| 92 |
+ symbol: int | None = None, |
|
| 93 |
+ ) -> None: |
|
| 94 |
+ """Initialize the Vault object. |
|
| 95 |
+ |
|
| 96 |
+ Args: |
|
| 97 |
+ phrase: |
|
| 98 |
+ The master passphrase from which to derive the service |
|
| 99 |
+ passphrases. If a text string, then the byte |
|
| 100 |
+ representation must be unique. |
|
| 101 |
+ length: |
|
| 102 |
+ Desired passphrase length. |
|
| 103 |
+ repeat: |
|
| 104 |
+ The maximum number of immediate character repetitions |
|
| 105 |
+ allowed in the passphrase. Disabled if set to 0. |
|
| 106 |
+ lower: |
|
| 107 |
+ Optional constraint on ASCII lowercase characters. If |
|
| 108 |
+ positive, include this many lowercase characters |
|
| 109 |
+ somewhere in the passphrase. If 0, avoid lowercase |
|
| 110 |
+ characters altogether. |
|
| 111 |
+ upper: |
|
| 112 |
+ Same as `lower`, but for ASCII uppercase characters. |
|
| 113 |
+ number: |
|
| 114 |
+ Same as `lower`, but for ASCII digits. |
|
| 115 |
+ space: |
|
| 116 |
+ Same as `lower`, but for the space character. |
|
| 117 |
+ dash: |
|
| 118 |
+ Same as `lower`, but for the hyphen-minus and underscore |
|
| 119 |
+ characters. |
|
| 120 |
+ symbol: |
|
| 121 |
+ Same as `lower`, but for all other hitherto unlisted |
|
| 122 |
+ ASCII printable characters (except backquote). |
|
| 123 |
+ |
|
| 124 |
+ Raises: |
|
| 125 |
+ AmbiguousByteRepresentationError: |
|
| 126 |
+ The phrase is a text string with differing NFC- and |
|
| 127 |
+ NFD-normalized UTF-8 byte representations. |
|
| 128 |
+ |
|
| 129 |
+ """ |
|
| 130 |
+ self._phrase = self._get_binary_string(phrase) |
|
| 131 |
+ self._length = length |
|
| 132 |
+ self._repeat = repeat |
|
| 133 |
+ self._allowed = bytearray(self._CHARSETS['all']) |
|
| 134 |
+ self._required: list[bytes] = [] |
|
| 135 |
+ |
|
| 136 |
+ def subtract_or_require( |
|
| 137 |
+ count: int | None, characters: bytes | bytearray |
|
| 138 |
+ ) -> None: |
|
| 139 |
+ if not isinstance(count, int): |
|
| 140 |
+ return |
|
| 141 |
+ if count <= 0: |
|
| 142 |
+ self._allowed = self._subtract(characters, self._allowed) |
|
| 143 |
+ else: |
|
| 144 |
+ for _ in range(count): |
|
| 145 |
+ self._required.append(characters) |
|
| 146 |
+ |
|
| 147 |
+ subtract_or_require(lower, self._CHARSETS['lower']) |
|
| 148 |
+ subtract_or_require(upper, self._CHARSETS['upper']) |
|
| 149 |
+ subtract_or_require(number, self._CHARSETS['number']) |
|
| 150 |
+ subtract_or_require(space, self._CHARSETS['space']) |
|
| 151 |
+ subtract_or_require(dash, self._CHARSETS['dash']) |
|
| 152 |
+ subtract_or_require(symbol, self._CHARSETS['symbol']) |
|
| 153 |
+ if len(self._required) > self._length: |
|
| 154 |
+ msg = 'requested passphrase length too short' |
|
| 155 |
+ raise ValueError(msg) |
|
| 156 |
+ if not self._allowed: |
|
| 157 |
+ msg = 'no allowed characters left' |
|
| 158 |
+ raise ValueError(msg) |
|
| 159 |
+ for _ in range(len(self._required), self._length): |
|
| 160 |
+ self._required.append(bytes(self._allowed)) |
|
| 161 |
+ |
|
| 162 |
+ def _entropy(self) -> float: |
|
| 163 |
+ """Estimate the passphrase entropy, given the current settings. |
|
| 164 |
+ |
|
| 165 |
+ The entropy is the base 2 logarithm of the amount of |
|
| 166 |
+ possibilities. We operate directly on the logarithms, and use |
|
| 167 |
+ sorting and [`math.fsum`][] to keep high accuracy. |
|
| 168 |
+ |
|
| 169 |
+ Note: |
|
| 170 |
+ We actually overestimate the entropy here because of poor |
|
| 171 |
+ handling of character repetitions. In the extreme, assuming |
|
| 172 |
+ that only one character were allowed, then because there is |
|
| 173 |
+ only one possible string of each given length, the entropy |
|
| 174 |
+ of that string `s` is always be zero. However, we calculate |
|
| 175 |
+ the entropy as `math.log2(math.factorial(len(s)))`, i.e. we |
|
| 176 |
+ assume the characters at the respective string position are |
|
| 177 |
+ distinguishable from each other. |
|
| 178 |
+ |
|
| 179 |
+ Returns: |
|
| 180 |
+ A valid (and somewhat close) upper bound to the entropy. |
|
| 181 |
+ |
|
| 182 |
+ """ |
|
| 183 |
+ factors: list[int] = [] |
|
| 184 |
+ if not self._required or any(not x for x in self._required): |
|
| 185 |
+ return float('-inf')
|
|
| 186 |
+ for i, charset in enumerate(self._required): |
|
| 187 |
+ factors.extend([i + 1, len(charset)]) |
|
| 188 |
+ factors.sort() |
|
| 189 |
+ return math.fsum(math.log2(f) for f in factors) |
|
| 190 |
+ |
|
| 191 |
+ def _estimate_sufficient_hash_length( |
|
| 192 |
+ self, |
|
| 193 |
+ safety_factor: float = 2.0, |
|
| 194 |
+ ) -> int: |
|
| 195 |
+ """Estimate the sufficient hash length, given the current settings. |
|
| 196 |
+ |
|
| 197 |
+ Using the entropy (via `_entropy`) and a safety factor, give an |
|
| 198 |
+ initial estimate of the length to use for `create_hash` such |
|
| 199 |
+ that using a `Sequin` with this hash will not exhaust it during |
|
| 200 |
+ passphrase generation. |
|
| 201 |
+ |
|
| 202 |
+ Args: |
|
| 203 |
+ safety_factor: The safety factor. Must be at least 1. |
|
| 204 |
+ |
|
| 205 |
+ Returns: |
|
| 206 |
+ The estimated sufficient hash length. |
|
| 207 |
+ |
|
| 208 |
+ Warning: |
|
| 209 |
+ This is a heuristic, not an exact computation; it may |
|
| 210 |
+ underestimate the true necessary hash length. It is |
|
| 211 |
+ intended as a starting point for searching for a sufficient |
|
| 212 |
+ hash length, usually by doubling the hash length each time |
|
| 213 |
+ it does not yet prove so. |
|
| 214 |
+ |
|
| 215 |
+ """ |
|
| 216 |
+ try: |
|
| 217 |
+ safety_factor = float(safety_factor) |
|
| 218 |
+ except TypeError as e: |
|
| 219 |
+ msg = f'invalid safety factor: not a float: {safety_factor!r}'
|
|
| 220 |
+ raise TypeError(msg) from e |
|
| 221 |
+ if not math.isfinite(safety_factor) or safety_factor < 1.0: |
|
| 222 |
+ msg = f'invalid safety factor {safety_factor!r}'
|
|
| 223 |
+ raise ValueError(msg) |
|
| 224 |
+ # Ensure the bound is strictly positive. |
|
| 225 |
+ entropy_bound = max(1, self._entropy()) |
|
| 226 |
+ return int(math.ceil(safety_factor * entropy_bound / 8)) |
|
| 227 |
+ |
|
| 228 |
+ @staticmethod |
|
| 229 |
+ def _get_binary_string(s: bytes | bytearray | str, /) -> bytes: |
|
| 230 |
+ """Convert the input string to a read-only, binary string. |
|
| 231 |
+ |
|
| 232 |
+ If it is a text string, then test for an unambiguous UTF-8 |
|
| 233 |
+ representation, otherwise abort. (That is, check whether the |
|
| 234 |
+ NFC and NFD forms of the string coincide.) |
|
| 235 |
+ |
|
| 236 |
+ Args: |
|
| 237 |
+ s: The string to (check and) convert. |
|
| 238 |
+ |
|
| 239 |
+ Returns: |
|
| 240 |
+ A read-only, binary copy of the string. |
|
| 241 |
+ |
|
| 242 |
+ Raises: |
|
| 243 |
+ AmbiguousByteRepresentationError: |
|
| 244 |
+ The text string has differing NFC- and NFD-normalized |
|
| 245 |
+ UTF-8 byte representations. |
|
| 246 |
+ |
|
| 247 |
+ """ |
|
| 248 |
+ if isinstance(s, str): |
|
| 249 |
+ norm = unicodedata.normalize |
|
| 250 |
+ if norm('NFC', s) != norm('NFD', s):
|
|
| 251 |
+ raise AmbiguousByteRepresentationError |
|
| 252 |
+ return s.encode('UTF-8')
|
|
| 253 |
+ return bytes(s) |
|
| 254 |
+ |
|
| 255 |
+ @classmethod |
|
| 256 |
+ def create_hash( |
|
| 257 |
+ cls, |
|
| 258 |
+ phrase: bytes | bytearray | str, |
|
| 259 |
+ service: bytes | bytearray, |
|
| 260 |
+ *, |
|
| 261 |
+ length: int = 32, |
|
| 262 |
+ ) -> bytes: |
|
| 263 |
+ r"""Create a pseudorandom byte stream from phrase and service. |
|
| 264 |
+ |
|
| 265 |
+ Create a pseudorandom byte stream from `phrase` and `service` by |
|
| 266 |
+ feeding them into the key-derivation function PBKDF2 |
|
| 267 |
+ (8 iterations, using SHA-1). |
|
| 268 |
+ |
|
| 269 |
+ Args: |
|
| 270 |
+ phrase: |
|
| 271 |
+ A master passphrase, or sometimes an SSH signature. |
|
| 272 |
+ Used as the key for PBKDF2, the underlying cryptographic |
|
| 273 |
+ primitive. |
|
| 274 |
+ |
|
| 275 |
+ If a text string, then the byte representation must be |
|
| 276 |
+ unique. |
|
| 277 |
+ service: |
|
| 278 |
+ A vault service name. Will be suffixed with |
|
| 279 |
+ `Vault._UUID`, and then used as the salt value for |
|
| 280 |
+ PBKDF2. |
|
| 281 |
+ length: |
|
| 282 |
+ The length of the byte stream to generate. |
|
| 283 |
+ |
|
| 284 |
+ Returns: |
|
| 285 |
+ A pseudorandom byte string of length `length`. |
|
| 286 |
+ |
|
| 287 |
+ Raises: |
|
| 288 |
+ AmbiguousByteRepresentationError: |
|
| 289 |
+ The phrase is a text string with differing NFC- and |
|
| 290 |
+ NFD-normalized UTF-8 byte representations. |
|
| 291 |
+ |
|
| 292 |
+ Note: |
|
| 293 |
+ Shorter values returned from this method (with the same key |
|
| 294 |
+ and message) are prefixes of longer values returned from |
|
| 295 |
+ this method. (This property is inherited from the |
|
| 296 |
+ underlying PBKDF2 function.) It is thus safe (if slow) to |
|
| 297 |
+ call this method with the same input with ever-increasing |
|
| 298 |
+ target lengths. |
|
| 299 |
+ |
|
| 300 |
+ Examples: |
|
| 301 |
+ >>> # See also Vault.phrase_from_key examples. |
|
| 302 |
+ >>> phrase = bytes.fromhex('''
|
|
| 303 |
+ ... 00 00 00 0b 73 73 68 2d 65 64 32 35 35 31 39 |
|
| 304 |
+ ... 00 00 00 40 |
|
| 305 |
+ ... f0 98 19 80 6c 1a 97 d5 26 03 6e cc e3 65 8f 86 |
|
| 306 |
+ ... 66 07 13 19 13 09 21 33 33 f9 e4 36 53 1d af fd |
|
| 307 |
+ ... 0d 08 1f ec f8 73 9b 8c 5f 55 39 16 7c 53 54 2c |
|
| 308 |
+ ... 1e 52 bb 30 ed 7f 89 e2 2f 69 51 55 d8 9e a6 02 |
|
| 309 |
+ ... ''') |
|
| 310 |
+ >>> Vault.create_hash(phrase, b'some_service', length=4) |
|
| 311 |
+ b'M\xb1<S' |
|
| 312 |
+ >>> Vault.create_hash(phrase, b'some_service', length=16) |
|
| 313 |
+ b'M\xb1<S\x827E\xd1M\xaf\xf8~\xc8n\x10\xcc' |
|
| 314 |
+ >>> Vault.create_hash(phrase, b'NOSUCHSERVICE', length=16) |
|
| 315 |
+ b'\x1c\xc3\x9c\xd9\xb6\x1a\x99CS\x07\xc41\xf4\x85#s' |
|
| 316 |
+ |
|
| 317 |
+ """ |
|
| 318 |
+ phrase = cls._get_binary_string(phrase) |
|
| 319 |
+ assert not isinstance(phrase, str) |
|
| 320 |
+ salt = bytes(service) + cls._UUID |
|
| 321 |
+ return hashlib.pbkdf2_hmac( |
|
| 322 |
+ hash_name='sha1', |
|
| 323 |
+ password=phrase, |
|
| 324 |
+ salt=salt, |
|
| 325 |
+ iterations=8, |
|
| 326 |
+ dklen=length, |
|
| 327 |
+ ) |
|
| 328 |
+ |
|
| 329 |
+ def generate( |
|
| 330 |
+ self, |
|
| 331 |
+ service_name: str | bytes | bytearray, |
|
| 332 |
+ /, |
|
| 333 |
+ *, |
|
| 334 |
+ phrase: bytes | bytearray | str = b'', |
|
| 335 |
+ ) -> bytes: |
|
| 336 |
+ r"""Generate a service passphrase. |
|
| 337 |
+ |
|
| 338 |
+ Args: |
|
| 339 |
+ service_name: |
|
| 340 |
+ The service name. |
|
| 341 |
+ phrase: |
|
| 342 |
+ If given, override the passphrase given during |
|
| 343 |
+ construction. |
|
| 344 |
+ |
|
| 345 |
+ If a text string, then the byte representation must be |
|
| 346 |
+ unique. |
|
| 347 |
+ |
|
| 348 |
+ Returns: |
|
| 349 |
+ The service passphrase. |
|
| 350 |
+ |
|
| 351 |
+ Raises: |
|
| 352 |
+ AmbiguousByteRepresentationError: |
|
| 353 |
+ The phrase is a text string with differing NFC- and |
|
| 354 |
+ NFD-normalized UTF-8 byte representations. |
|
| 355 |
+ |
|
| 356 |
+ Examples: |
|
| 357 |
+ >>> phrase = b'She cells C shells bye the sea shoars' |
|
| 358 |
+ >>> # Using default options in constructor. |
|
| 359 |
+ >>> Vault(phrase=phrase).generate(b'google') |
|
| 360 |
+ b': 4TVH#5:aZl8LueOT\\{'
|
|
| 361 |
+ >>> # Also possible: |
|
| 362 |
+ >>> Vault().generate(b'google', phrase=phrase) |
|
| 363 |
+ b': 4TVH#5:aZl8LueOT\\{'
|
|
| 364 |
+ |
|
| 365 |
+ """ |
|
| 366 |
+ hash_length = self._estimate_sufficient_hash_length() |
|
| 367 |
+ assert hash_length >= 1 |
|
| 368 |
+ # Ensure the phrase is a bytes object. Needed later for safe |
|
| 369 |
+ # concatenation. |
|
| 370 |
+ if isinstance(service_name, str): |
|
| 371 |
+ service_name = service_name.encode('utf-8')
|
|
| 372 |
+ elif not isinstance(service_name, bytes): |
|
| 373 |
+ service_name = bytes(service_name) |
|
| 374 |
+ assert_type(service_name, bytes) |
|
| 375 |
+ if not phrase: |
|
| 376 |
+ phrase = self._phrase |
|
| 377 |
+ phrase = self._get_binary_string(phrase) |
|
| 378 |
+ # Repeat the passphrase generation with ever-increasing hash |
|
| 379 |
+ # lengths, until the passphrase can be formed without exhausting |
|
| 380 |
+ # the sequin. See the guarantee in the create_hash method for |
|
| 381 |
+ # why this works. |
|
| 382 |
+ while True: |
|
| 383 |
+ try: |
|
| 384 |
+ required = self._required[:] |
|
| 385 |
+ seq = sequin.Sequin( |
|
| 386 |
+ self.create_hash( |
|
| 387 |
+ phrase=phrase, service=service_name, length=hash_length |
|
| 388 |
+ ) |
|
| 389 |
+ ) |
|
| 390 |
+ result = bytearray() |
|
| 391 |
+ while len(result) < self._length: |
|
| 392 |
+ pos = seq.generate(len(required)) |
|
| 393 |
+ charset = required.pop(pos) |
|
| 394 |
+ # Determine if an unlucky choice right now might |
|
| 395 |
+ # violate the restriction on repeated characters. |
|
| 396 |
+ # That is, check if the current partial passphrase |
|
| 397 |
+ # ends with r - 1 copies of the same character |
|
| 398 |
+ # (where r is the repeat limit that must not be |
|
| 399 |
+ # reached), and if so, remove this same character |
|
| 400 |
+ # from the current character's allowed set. |
|
| 401 |
+ if self._repeat and result: |
|
| 402 |
+ bad_suffix = bytes(result[-1:]) * (self._repeat - 1) |
|
| 403 |
+ if result.endswith(bad_suffix): |
|
| 404 |
+ charset = self._subtract( |
|
| 405 |
+ bytes(result[-1:]), charset |
|
| 406 |
+ ) |
|
| 407 |
+ pos = seq.generate(len(charset)) |
|
| 408 |
+ result.extend(charset[pos : pos + 1]) |
|
| 409 |
+ except sequin.SequinExhaustedError: |
|
| 410 |
+ hash_length *= 2 |
|
| 411 |
+ else: |
|
| 412 |
+ return bytes(result) |
|
| 413 |
+ |
|
| 414 |
+ @staticmethod |
|
| 415 |
+ def _is_suitable_ssh_key(key: bytes | bytearray, /) -> bool: |
|
| 416 |
+ """Check whether the key is suitable for passphrase derivation. |
|
| 417 |
+ |
|
| 418 |
+ Currently, this only checks whether signatures with this key |
|
| 419 |
+ type are deterministic. |
|
| 420 |
+ |
|
| 421 |
+ Args: |
|
| 422 |
+ key: SSH public key to check. |
|
| 423 |
+ |
|
| 424 |
+ Returns: |
|
| 425 |
+ True if and only if the key is suitable for use in deriving |
|
| 426 |
+ a passphrase deterministically. |
|
| 427 |
+ |
|
| 428 |
+ """ |
|
| 429 |
+ TestFunc: TypeAlias = Callable[[bytes | bytearray], bool] |
|
| 430 |
+ deterministic_signature_types: dict[str, TestFunc] |
|
| 431 |
+ deterministic_signature_types = {
|
|
| 432 |
+ 'ssh-ed25519': lambda k: k.startswith( |
|
| 433 |
+ b'\x00\x00\x00\x0bssh-ed25519' |
|
| 434 |
+ ), |
|
| 435 |
+ 'ssh-ed448': lambda k: k.startswith(b'\x00\x00\x00\x09ssh-ed448'), |
|
| 436 |
+ 'ssh-rsa': lambda k: k.startswith(b'\x00\x00\x00\x07ssh-rsa'), |
|
| 437 |
+ } |
|
| 438 |
+ return any(v(key) for v in deterministic_signature_types.values()) |
|
| 439 |
+ |
|
| 440 |
+ @classmethod |
|
| 441 |
+ def phrase_from_key(cls, key: bytes | bytearray, /) -> bytes: |
|
| 442 |
+ """Obtain the master passphrase from a configured SSH key. |
|
| 443 |
+ |
|
| 444 |
+ vault allows the usage of certain SSH keys to derive a master |
|
| 445 |
+ passphrase, by signing the vault UUID with the SSH key. The key |
|
| 446 |
+ type must ensure that signatures are deterministic. |
|
| 447 |
+ |
|
| 448 |
+ Args: |
|
| 449 |
+ key: The (public) SSH key to use for signing. |
|
| 450 |
+ |
|
| 451 |
+ Returns: |
|
| 452 |
+ The signature of the vault UUID under this key, unframed but |
|
| 453 |
+ encoded in base64. |
|
| 454 |
+ |
|
| 455 |
+ Raises: |
|
| 456 |
+ ValueError: |
|
| 457 |
+ The SSH key is principally unsuitable for this use case. |
|
| 458 |
+ Usually this means that the signature is not |
|
| 459 |
+ deterministic. |
|
| 460 |
+ |
|
| 461 |
+ Examples: |
|
| 462 |
+ >>> import base64 |
|
| 463 |
+ >>> # Actual Ed25519 test public key. |
|
| 464 |
+ >>> public_key = bytes.fromhex('''
|
|
| 465 |
+ ... 00 00 00 0b 73 73 68 2d 65 64 32 35 35 31 39 |
|
| 466 |
+ ... 00 00 00 20 |
|
| 467 |
+ ... 81 78 81 68 26 d6 02 48 5f 0f ff 32 48 6f e4 c1 |
|
| 468 |
+ ... 30 89 dc 1c 6a 45 06 09 e9 09 0f fb c2 12 69 76 |
|
| 469 |
+ ... ''') |
|
| 470 |
+ >>> expected_sig_raw = bytes.fromhex('''
|
|
| 471 |
+ ... 00 00 00 0b 73 73 68 2d 65 64 32 35 35 31 39 |
|
| 472 |
+ ... 00 00 00 40 |
|
| 473 |
+ ... f0 98 19 80 6c 1a 97 d5 26 03 6e cc e3 65 8f 86 |
|
| 474 |
+ ... 66 07 13 19 13 09 21 33 33 f9 e4 36 53 1d af fd |
|
| 475 |
+ ... 0d 08 1f ec f8 73 9b 8c 5f 55 39 16 7c 53 54 2c |
|
| 476 |
+ ... 1e 52 bb 30 ed 7f 89 e2 2f 69 51 55 d8 9e a6 02 |
|
| 477 |
+ ... ''') |
|
| 478 |
+ >>> # Raw Ed25519 signatures are 64 bytes long. |
|
| 479 |
+ >>> signature_blob = expected_sig_raw[-64:] |
|
| 480 |
+ >>> phrase = base64.standard_b64encode(signature_blob) |
|
| 481 |
+ >>> Vault.phrase_from_key(phrase) == expected # doctest:+SKIP |
|
| 482 |
+ True |
|
| 483 |
+ |
|
| 484 |
+ """ |
|
| 485 |
+ if not cls._is_suitable_ssh_key(key): |
|
| 486 |
+ msg = ( |
|
| 487 |
+ 'unsuitable SSH key: bad key, or ' |
|
| 488 |
+ 'signature not deterministic' |
|
| 489 |
+ ) |
|
| 490 |
+ raise ValueError(msg) |
|
| 491 |
+ with ssh_agent.SSHAgentClient() as client: |
|
| 492 |
+ raw_sig = client.sign(key, cls._UUID) |
|
| 493 |
+ _keytype, trailer = client.unstring_prefix(raw_sig) |
|
| 494 |
+ signature_blob = client.unstring(trailer) |
|
| 495 |
+ return bytes(base64.standard_b64encode(signature_blob)) |
|
| 496 |
+ |
|
| 497 |
+ @staticmethod |
|
| 498 |
+ def _subtract( |
|
| 499 |
+ charset: bytes | bytearray, |
|
| 500 |
+ allowed: bytes | bytearray, |
|
| 501 |
+ ) -> bytearray: |
|
| 502 |
+ """Remove the characters in charset from allowed. |
|
| 503 |
+ |
|
| 504 |
+ This preserves the relative order of characters in `allowed`. |
|
| 505 |
+ |
|
| 506 |
+ Args: |
|
| 507 |
+ charset: |
|
| 508 |
+ Characters to remove. Must not contain duplicate |
|
| 509 |
+ characters. |
|
| 510 |
+ allowed: |
|
| 511 |
+ Character set to remove the other characters from. Must |
|
| 512 |
+ not contain duplicate characters. |
|
| 513 |
+ |
|
| 514 |
+ Returns: |
|
| 515 |
+ The pruned "allowed" character set. |
|
| 516 |
+ |
|
| 517 |
+ Raises: |
|
| 518 |
+ ValueError: |
|
| 519 |
+ `allowed` or `charset` contained duplicate characters. |
|
| 520 |
+ |
|
| 521 |
+ """ |
|
| 522 |
+ allowed = ( |
|
| 523 |
+ allowed if isinstance(allowed, bytearray) else bytearray(allowed) |
|
| 524 |
+ ) |
|
| 525 |
+ assert_type(allowed, bytearray) |
|
| 526 |
+ msg_dup_characters = 'duplicate characters in set' |
|
| 527 |
+ if len(frozenset(allowed)) != len(allowed): |
|
| 528 |
+ raise ValueError(msg_dup_characters) |
|
| 529 |
+ if len(frozenset(charset)) != len(charset): |
|
| 530 |
+ raise ValueError(msg_dup_characters) |
|
| 531 |
+ for c in charset: |
|
| 532 |
+ try: |
|
| 533 |
+ pos = allowed.index(c) |
|
| 534 |
+ except ValueError: |
|
| 535 |
+ pass |
|
| 536 |
+ else: |
|
| 537 |
+ allowed[pos : pos + 1] = [] |
|
| 538 |
+ return allowed |
| ... | ... |
@@ -14,9 +14,9 @@ import pytest |
| 14 | 14 |
|
| 15 | 15 |
import derivepassphrase |
| 16 | 16 |
import derivepassphrase.cli |
| 17 |
+import derivepassphrase.ssh_agent |
|
| 18 |
+import derivepassphrase.ssh_agent.types |
|
| 17 | 19 |
import derivepassphrase.types |
| 18 |
-import ssh_agent_client |
|
| 19 |
-import ssh_agent_client.types |
|
| 20 | 20 |
|
| 21 | 21 |
__all__ = () |
| 22 | 22 |
|
| ... | ... |
@@ -357,9 +357,9 @@ skip_if_no_agent = pytest.mark.skipif( |
| 357 | 357 |
|
| 358 | 358 |
def list_keys( |
| 359 | 359 |
self: Any = None, |
| 360 |
-) -> list[ssh_agent_client.types.KeyCommentPair]: |
|
| 360 |
+) -> list[derivepassphrase.ssh_agent.types.KeyCommentPair]: |
|
| 361 | 361 |
del self # Unused. |
| 362 |
- Pair = ssh_agent_client.types.KeyCommentPair # noqa: N806 |
|
| 362 |
+ Pair = derivepassphrase.ssh_agent.types.KeyCommentPair # noqa: N806 |
|
| 363 | 363 |
list1 = [ |
| 364 | 364 |
Pair(value['public_key_data'], f'{key} test key'.encode('ASCII'))
|
| 365 | 365 |
for key, value in SUPPORTED_KEYS.items() |
| ... | ... |
@@ -373,9 +373,9 @@ def list_keys( |
| 373 | 373 |
|
| 374 | 374 |
def list_keys_singleton( |
| 375 | 375 |
self: Any = None, |
| 376 |
-) -> list[ssh_agent_client.types.KeyCommentPair]: |
|
| 376 |
+) -> list[derivepassphrase.ssh_agent.types.KeyCommentPair]: |
|
| 377 | 377 |
del self # Unused. |
| 378 |
- Pair = ssh_agent_client.types.KeyCommentPair # noqa: N806 |
|
| 378 |
+ Pair = derivepassphrase.ssh_agent.types.KeyCommentPair # noqa: N806 |
|
| 379 | 379 |
list1 = [ |
| 380 | 380 |
Pair(value['public_key_data'], f'{key} test key'.encode('ASCII'))
|
| 381 | 381 |
for key, value in SUPPORTED_KEYS.items() |
| ... | ... |
@@ -385,11 +385,12 @@ def list_keys_singleton( |
| 385 | 385 |
|
| 386 | 386 |
def suitable_ssh_keys( |
| 387 | 387 |
conn: Any, |
| 388 |
-) -> Iterator[ssh_agent_client.types.KeyCommentPair]: |
|
| 388 |
+) -> Iterator[derivepassphrase.ssh_agent.types.KeyCommentPair]: |
|
| 389 | 389 |
del conn # Unused. |
| 390 |
+ Pair = derivepassphrase.ssh_agent.types.KeyCommentPair # noqa: N806 |
|
| 390 | 391 |
yield from [ |
| 391 |
- ssh_agent_client.types.KeyCommentPair(DUMMY_KEY1, b'no comment'), |
|
| 392 |
- ssh_agent_client.types.KeyCommentPair(DUMMY_KEY2, b'a comment'), |
|
| 392 |
+ Pair(DUMMY_KEY1, b'no comment'), |
|
| 393 |
+ Pair(DUMMY_KEY2, b'a comment'), |
|
| 393 | 394 |
] |
| 394 | 395 |
|
| 395 | 396 |
|
| ... | ... |
@@ -2,7 +2,7 @@ |
| 2 | 2 |
# |
| 3 | 3 |
# SPDX-License-Identifier: MIT |
| 4 | 4 |
|
| 5 |
-"""Test passphrase generation via derivepassphrase.Vault.""" |
|
| 5 |
+"""Test passphrase generation via derivepassphrase.vault.Vault.""" |
|
| 6 | 6 |
|
| 7 | 7 |
from __future__ import annotations |
| 8 | 8 |
|
| ... | ... |
@@ -13,7 +13,7 @@ import pytest |
| 13 | 13 |
|
| 14 | 14 |
import derivepassphrase |
| 15 | 15 |
|
| 16 |
-Vault = derivepassphrase.Vault |
|
| 16 |
+Vault = derivepassphrase.vault.Vault |
|
| 17 | 17 |
|
| 18 | 18 |
|
| 19 | 19 |
class TestVault: |
| ... | ... |
@@ -218,9 +218,9 @@ class TestVault: |
| 218 | 218 |
def test_224_binary_strings( |
| 219 | 219 |
self, s: str | bytes | bytearray, raises: bool |
| 220 | 220 |
) -> None: |
| 221 |
- binstr = derivepassphrase.Vault._get_binary_string |
|
| 221 |
+ binstr = Vault._get_binary_string |
|
| 222 | 222 |
AmbiguousByteRepresentationError = ( # noqa: N806 |
| 223 |
- derivepassphrase.AmbiguousByteRepresentationError |
|
| 223 |
+ derivepassphrase.vault.AmbiguousByteRepresentationError |
|
| 224 | 224 |
) |
| 225 | 225 |
if raises: |
| 226 | 226 |
with pytest.raises(AmbiguousByteRepresentationError): |
| ... | ... |
@@ -16,9 +16,8 @@ import pytest |
| 16 | 16 |
from typing_extensions import NamedTuple |
| 17 | 17 |
|
| 18 | 18 |
import derivepassphrase as dpp |
| 19 |
-import ssh_agent_client |
|
| 20 | 19 |
import tests |
| 21 |
-from derivepassphrase import cli |
|
| 20 |
+from derivepassphrase import cli, ssh_agent |
|
| 22 | 21 |
|
| 23 | 22 |
if TYPE_CHECKING: |
| 24 | 23 |
from collections.abc import Callable |
| ... | ... |
@@ -226,7 +225,7 @@ class TestCLI: |
| 226 | 225 |
) -> None: |
| 227 | 226 |
monkeypatch.setattr(cli, '_prompt_for_passphrase', tests.auto_prompt) |
| 228 | 227 |
option = f'--{charset_name}'
|
| 229 |
- charset = dpp.Vault._CHARSETS[charset_name].decode('ascii')
|
|
| 228 |
+ charset = dpp.vault.Vault._CHARSETS[charset_name].decode('ascii')
|
|
| 230 | 229 |
runner = click.testing.CliRunner(mix_stderr=False) |
| 231 | 230 |
with tests.isolated_config( |
| 232 | 231 |
monkeypatch=monkeypatch, |
| ... | ... |
@@ -316,7 +315,7 @@ class TestCLI: |
| 316 | 315 |
monkeypatch=monkeypatch, runner=runner, config=config |
| 317 | 316 |
): |
| 318 | 317 |
monkeypatch.setattr( |
| 319 |
- dpp.Vault, 'phrase_from_key', tests.phrase_from_key |
|
| 318 |
+ dpp.vault.Vault, 'phrase_from_key', tests.phrase_from_key |
|
| 320 | 319 |
) |
| 321 | 320 |
result = runner.invoke( |
| 322 | 321 |
cli.derivepassphrase, [DUMMY_SERVICE], catch_exceptions=False |
| ... | ... |
@@ -343,7 +342,7 @@ class TestCLI: |
| 343 | 342 |
cli, '_get_suitable_ssh_keys', tests.suitable_ssh_keys |
| 344 | 343 |
) |
| 345 | 344 |
monkeypatch.setattr( |
| 346 |
- dpp.Vault, 'phrase_from_key', tests.phrase_from_key |
|
| 345 |
+ dpp.vault.Vault, 'phrase_from_key', tests.phrase_from_key |
|
| 347 | 346 |
) |
| 348 | 347 |
result = runner.invoke( |
| 349 | 348 |
cli.derivepassphrase, |
| ... | ... |
@@ -403,9 +402,9 @@ class TestCLI: |
| 403 | 402 |
raise AssertionError |
| 404 | 403 |
|
| 405 | 404 |
monkeypatch.setattr( |
| 406 |
- ssh_agent_client.SSHAgentClient, 'list_keys', tests.list_keys |
|
| 405 |
+ ssh_agent.SSHAgentClient, 'list_keys', tests.list_keys |
|
| 407 | 406 |
) |
| 408 |
- monkeypatch.setattr(ssh_agent_client.SSHAgentClient, 'sign', sign) |
|
| 407 |
+ monkeypatch.setattr(ssh_agent.SSHAgentClient, 'sign', sign) |
|
| 409 | 408 |
runner = click.testing.CliRunner(mix_stderr=False) |
| 410 | 409 |
with tests.isolated_config( |
| 411 | 410 |
monkeypatch=monkeypatch, runner=runner, config=config |
| ... | ... |
@@ -1304,7 +1303,7 @@ Boo. |
| 1304 | 1303 |
|
| 1305 | 1304 |
def test_204_phrase_from_key_manually(self) -> None: |
| 1306 | 1305 |
assert ( |
| 1307 |
- dpp.Vault( |
|
| 1306 |
+ dpp.vault.Vault( |
|
| 1308 | 1307 |
phrase=DUMMY_PHRASE_FROM_KEY1, **DUMMY_CONFIG_SETTINGS |
| 1309 | 1308 |
).generate(DUMMY_SERVICE) |
| 1310 | 1309 |
== DUMMY_RESULT_KEY1 |
| ... | ... |
@@ -1334,12 +1333,12 @@ Boo. |
| 1334 | 1333 |
conn_hint: str, |
| 1335 | 1334 |
) -> None: |
| 1336 | 1335 |
monkeypatch.setattr( |
| 1337 |
- ssh_agent_client.SSHAgentClient, 'list_keys', tests.list_keys |
|
| 1336 |
+ ssh_agent.SSHAgentClient, 'list_keys', tests.list_keys |
|
| 1338 | 1337 |
) |
| 1339 |
- hint: ssh_agent_client.SSHAgentClient | socket.socket | None |
|
| 1338 |
+ hint: ssh_agent.SSHAgentClient | socket.socket | None |
|
| 1340 | 1339 |
match conn_hint: |
| 1341 | 1340 |
case 'client': |
| 1342 |
- hint = ssh_agent_client.SSHAgentClient() |
|
| 1341 |
+ hint = ssh_agent.SSHAgentClient() |
|
| 1343 | 1342 |
case 'socket': |
| 1344 | 1343 |
hint = socket.socket(family=socket.AF_UNIX) |
| 1345 | 1344 |
hint.connect(os.environ['SSH_AUTH_SOCK']) |
| ... | ... |
@@ -18,10 +18,8 @@ import click.testing |
| 18 | 18 |
import pytest |
| 19 | 19 |
from typing_extensions import Any |
| 20 | 20 |
|
| 21 |
-import derivepassphrase |
|
| 22 |
-import derivepassphrase.cli |
|
| 23 |
-import ssh_agent_client |
|
| 24 | 21 |
import tests |
| 22 |
+from derivepassphrase import cli, ssh_agent, vault |
|
| 25 | 23 |
|
| 26 | 24 |
if TYPE_CHECKING: |
| 27 | 25 |
from collections.abc import Iterator |
| ... | ... |
@@ -49,7 +47,7 @@ class TestStaticFunctionality: |
| 49 | 47 |
with pytest.raises( |
| 50 | 48 |
KeyError, match='SSH_AUTH_SOCK environment variable' |
| 51 | 49 |
): |
| 52 |
- ssh_agent_client.SSHAgentClient(socket=sock) |
|
| 50 |
+ ssh_agent.SSHAgentClient(socket=sock) |
|
| 53 | 51 |
|
| 54 | 52 |
@pytest.mark.parametrize( |
| 55 | 53 |
['input', 'expected'], |
| ... | ... |
@@ -58,7 +56,7 @@ class TestStaticFunctionality: |
| 58 | 56 |
], |
| 59 | 57 |
) |
| 60 | 58 |
def test_210_uint32(self, input: int, expected: bytes | bytearray) -> None: |
| 61 |
- uint32 = ssh_agent_client.SSHAgentClient.uint32 |
|
| 59 |
+ uint32 = ssh_agent.SSHAgentClient.uint32 |
|
| 62 | 60 |
assert uint32(input) == expected |
| 63 | 61 |
|
| 64 | 62 |
@pytest.mark.parametrize( |
| ... | ... |
@@ -67,7 +65,7 @@ class TestStaticFunctionality: |
| 67 | 65 |
(b'ssh-rsa', b'\x00\x00\x00\x07ssh-rsa'), |
| 68 | 66 |
(b'ssh-ed25519', b'\x00\x00\x00\x0bssh-ed25519'), |
| 69 | 67 |
( |
| 70 |
- ssh_agent_client.SSHAgentClient.string(b'ssh-ed25519'), |
|
| 68 |
+ ssh_agent.SSHAgentClient.string(b'ssh-ed25519'), |
|
| 71 | 69 |
b'\x00\x00\x00\x0f\x00\x00\x00\x0bssh-ed25519', |
| 72 | 70 |
), |
| 73 | 71 |
], |
| ... | ... |
@@ -75,7 +73,7 @@ class TestStaticFunctionality: |
| 75 | 73 |
def test_211_string( |
| 76 | 74 |
self, input: bytes | bytearray, expected: bytes | bytearray |
| 77 | 75 |
) -> None: |
| 78 |
- string = ssh_agent_client.SSHAgentClient.string |
|
| 76 |
+ string = ssh_agent.SSHAgentClient.string |
|
| 79 | 77 |
assert bytes(string(input)) == expected |
| 80 | 78 |
|
| 81 | 79 |
@pytest.mark.parametrize( |
| ... | ... |
@@ -83,7 +81,7 @@ class TestStaticFunctionality: |
| 83 | 81 |
[ |
| 84 | 82 |
(b'\x00\x00\x00\x07ssh-rsa', b'ssh-rsa'), |
| 85 | 83 |
( |
| 86 |
- ssh_agent_client.SSHAgentClient.string(b'ssh-ed25519'), |
|
| 84 |
+ ssh_agent.SSHAgentClient.string(b'ssh-ed25519'), |
|
| 87 | 85 |
b'ssh-ed25519', |
| 88 | 86 |
), |
| 89 | 87 |
], |
| ... | ... |
@@ -91,8 +89,8 @@ class TestStaticFunctionality: |
| 91 | 89 |
def test_212_unstring( |
| 92 | 90 |
self, input: bytes | bytearray, expected: bytes | bytearray |
| 93 | 91 |
) -> None: |
| 94 |
- unstring = ssh_agent_client.SSHAgentClient.unstring |
|
| 95 |
- unstring_prefix = ssh_agent_client.SSHAgentClient.unstring_prefix |
|
| 92 |
+ unstring = ssh_agent.SSHAgentClient.unstring |
|
| 93 |
+ unstring_prefix = ssh_agent.SSHAgentClient.unstring_prefix |
|
| 96 | 94 |
assert bytes(unstring(input)) == expected |
| 97 | 95 |
assert tuple(bytes(x) for x in unstring_prefix(input)) == ( |
| 98 | 96 |
expected, |
| ... | ... |
@@ -109,7 +107,7 @@ class TestStaticFunctionality: |
| 109 | 107 |
def test_310_uint32_exceptions( |
| 110 | 108 |
self, value: int, exc_type: type[Exception], exc_pattern: str |
| 111 | 109 |
) -> None: |
| 112 |
- uint32 = ssh_agent_client.SSHAgentClient.uint32 |
|
| 110 |
+ uint32 = ssh_agent.SSHAgentClient.uint32 |
|
| 113 | 111 |
with pytest.raises(exc_type, match=exc_pattern): |
| 114 | 112 |
uint32(value) |
| 115 | 113 |
|
| ... | ... |
@@ -122,7 +120,7 @@ class TestStaticFunctionality: |
| 122 | 120 |
def test_311_string_exceptions( |
| 123 | 121 |
self, input: Any, exc_type: type[Exception], exc_pattern: str |
| 124 | 122 |
) -> None: |
| 125 |
- string = ssh_agent_client.SSHAgentClient.string |
|
| 123 |
+ string = ssh_agent.SSHAgentClient.string |
|
| 126 | 124 |
with pytest.raises(exc_type, match=exc_pattern): |
| 127 | 125 |
string(input) |
| 128 | 126 |
|
| ... | ... |
@@ -154,8 +152,8 @@ class TestStaticFunctionality: |
| 154 | 152 |
has_trailer: bool, |
| 155 | 153 |
parts: tuple[bytes | bytearray, bytes | bytearray] | None, |
| 156 | 154 |
) -> None: |
| 157 |
- unstring = ssh_agent_client.SSHAgentClient.unstring |
|
| 158 |
- unstring_prefix = ssh_agent_client.SSHAgentClient.unstring_prefix |
|
| 155 |
+ unstring = ssh_agent.SSHAgentClient.unstring |
|
| 156 |
+ unstring_prefix = ssh_agent.SSHAgentClient.unstring_prefix |
|
| 159 | 157 |
with pytest.raises(exc_type, match=exc_pattern): |
| 160 | 158 |
unstring(input) |
| 161 | 159 |
if has_trailer: |
| ... | ... |
@@ -189,7 +187,7 @@ class TestAgentInteraction: |
| 189 | 187 |
) |
| 190 | 188 |
else: |
| 191 | 189 |
try: |
| 192 |
- client = ssh_agent_client.SSHAgentClient() |
|
| 190 |
+ client = ssh_agent.SSHAgentClient() |
|
| 193 | 191 |
except OSError: # pragma: no cover |
| 194 | 192 |
pytest.skip('communication error with the SSH agent')
|
| 195 | 193 |
with client: |
| ... | ... |
@@ -202,19 +200,15 @@ class TestAgentInteraction: |
| 202 | 200 |
if public_key_data not in key_comment_pairs: # pragma: no cover |
| 203 | 201 |
pytest.skip('prerequisite SSH key not loaded')
|
| 204 | 202 |
signature = bytes( |
| 205 |
- client.sign( |
|
| 206 |
- payload=derivepassphrase.Vault._UUID, key=public_key_data |
|
| 207 |
- ) |
|
| 203 |
+ client.sign(payload=vault.Vault._UUID, key=public_key_data) |
|
| 208 | 204 |
) |
| 209 | 205 |
assert signature == expected_signature, 'SSH signature mismatch' |
| 210 | 206 |
signature2 = bytes( |
| 211 |
- client.sign( |
|
| 212 |
- payload=derivepassphrase.Vault._UUID, key=public_key_data |
|
| 213 |
- ) |
|
| 207 |
+ client.sign(payload=vault.Vault._UUID, key=public_key_data) |
|
| 214 | 208 |
) |
| 215 | 209 |
assert signature2 == expected_signature, 'SSH signature mismatch' |
| 216 | 210 |
assert ( |
| 217 |
- derivepassphrase.Vault.phrase_from_key(public_key_data) |
|
| 211 |
+ vault.Vault.phrase_from_key(public_key_data) |
|
| 218 | 212 |
== derived_passphrase |
| 219 | 213 |
), 'SSH signature mismatch' |
| 220 | 214 |
|
| ... | ... |
@@ -240,7 +234,7 @@ class TestAgentInteraction: |
| 240 | 234 |
) |
| 241 | 235 |
else: |
| 242 | 236 |
try: |
| 243 |
- client = ssh_agent_client.SSHAgentClient() |
|
| 237 |
+ client = ssh_agent.SSHAgentClient() |
|
| 244 | 238 |
except OSError: # pragma: no cover |
| 245 | 239 |
pytest.skip('communication error with the SSH agent')
|
| 246 | 240 |
with client: |
| ... | ... |
@@ -252,18 +246,14 @@ class TestAgentInteraction: |
| 252 | 246 |
if public_key_data not in key_comment_pairs: # pragma: no cover |
| 253 | 247 |
pytest.skip('prerequisite SSH key not loaded')
|
| 254 | 248 |
signature = bytes( |
| 255 |
- client.sign( |
|
| 256 |
- payload=derivepassphrase.Vault._UUID, key=public_key_data |
|
| 257 |
- ) |
|
| 249 |
+ client.sign(payload=vault.Vault._UUID, key=public_key_data) |
|
| 258 | 250 |
) |
| 259 | 251 |
signature2 = bytes( |
| 260 |
- client.sign( |
|
| 261 |
- payload=derivepassphrase.Vault._UUID, key=public_key_data |
|
| 262 |
- ) |
|
| 252 |
+ client.sign(payload=vault.Vault._UUID, key=public_key_data) |
|
| 263 | 253 |
) |
| 264 | 254 |
assert signature != signature2, 'SSH signature repeatable?!' |
| 265 | 255 |
with pytest.raises(ValueError, match='unsuitable SSH key'): |
| 266 |
- derivepassphrase.Vault.phrase_from_key(public_key_data) |
|
| 256 |
+ vault.Vault.phrase_from_key(public_key_data) |
|
| 267 | 257 |
|
| 268 | 258 |
@staticmethod |
| 269 | 259 |
def _params() -> Iterator[tuple[bytes, bool]]: |
| ... | ... |
@@ -287,7 +277,7 @@ class TestAgentInteraction: |
| 287 | 277 |
|
| 288 | 278 |
if single: |
| 289 | 279 |
monkeypatch.setattr( |
| 290 |
- ssh_agent_client.SSHAgentClient, |
|
| 280 |
+ ssh_agent.SSHAgentClient, |
|
| 291 | 281 |
'list_keys', |
| 292 | 282 |
tests.list_keys_singleton, |
| 293 | 283 |
) |
| ... | ... |
@@ -300,7 +290,7 @@ class TestAgentInteraction: |
| 300 | 290 |
text = 'Use this key? yes\n' |
| 301 | 291 |
else: |
| 302 | 292 |
monkeypatch.setattr( |
| 303 |
- ssh_agent_client.SSHAgentClient, 'list_keys', tests.list_keys |
|
| 293 |
+ ssh_agent.SSHAgentClient, 'list_keys', tests.list_keys |
|
| 304 | 294 |
) |
| 305 | 295 |
keys = [ |
| 306 | 296 |
pair.key |
| ... | ... |
@@ -314,7 +304,7 @@ class TestAgentInteraction: |
| 314 | 304 |
|
| 315 | 305 |
@click.command() |
| 316 | 306 |
def driver() -> None: |
| 317 |
- key = derivepassphrase.cli._select_ssh_key() |
|
| 307 |
+ key = cli._select_ssh_key() |
|
| 318 | 308 |
click.echo(base64.standard_b64encode(key).decode('ASCII'))
|
| 319 | 309 |
|
| 320 | 310 |
runner = click.testing.CliRunner(mix_stderr=True) |
| ... | ... |
@@ -339,7 +329,7 @@ class TestAgentInteraction: |
| 339 | 329 |
monkeypatch.setenv('SSH_AUTH_SOCK', os.environ['SSH_AUTH_SOCK'] + '~')
|
| 340 | 330 |
sock = socket.socket(family=socket.AF_UNIX) |
| 341 | 331 |
with pytest.raises(OSError): # noqa: PT011 |
| 342 |
- ssh_agent_client.SSHAgentClient(socket=sock) |
|
| 332 |
+ ssh_agent.SSHAgentClient(socket=sock) |
|
| 343 | 333 |
|
| 344 | 334 |
@pytest.mark.parametrize( |
| 345 | 335 |
'response', |
| ... | ... |
@@ -351,7 +341,7 @@ class TestAgentInteraction: |
| 351 | 341 |
def test_310_truncated_server_response( |
| 352 | 342 |
self, monkeypatch: Any, response: bytes |
| 353 | 343 |
) -> None: |
| 354 |
- client = ssh_agent_client.SSHAgentClient() |
|
| 344 |
+ client = ssh_agent.SSHAgentClient() |
|
| 355 | 345 |
response_stream = io.BytesIO(response) |
| 356 | 346 |
|
| 357 | 347 |
class PseudoSocket: |
| ... | ... |
@@ -375,7 +365,7 @@ class TestAgentInteraction: |
| 375 | 365 |
( |
| 376 | 366 |
12, |
| 377 | 367 |
b'\x00\x00\x00\x00abc', |
| 378 |
- ssh_agent_client.TrailingDataError, |
|
| 368 |
+ ssh_agent.TrailingDataError, |
|
| 379 | 369 |
'Overlong response', |
| 380 | 370 |
), |
| 381 | 371 |
], |
| ... | ... |
@@ -388,7 +378,7 @@ class TestAgentInteraction: |
| 388 | 378 |
exc_type: type[Exception], |
| 389 | 379 |
exc_pattern: str, |
| 390 | 380 |
) -> None: |
| 391 |
- client = ssh_agent_client.SSHAgentClient() |
|
| 381 |
+ client = ssh_agent.SSHAgentClient() |
|
| 392 | 382 |
monkeypatch.setattr( |
| 393 | 383 |
client, |
| 394 | 384 |
'request', |
| ... | ... |
@@ -426,9 +416,9 @@ class TestAgentInteraction: |
| 426 | 416 |
exc_type: type[Exception], |
| 427 | 417 |
exc_pattern: str, |
| 428 | 418 |
) -> None: |
| 429 |
- client = ssh_agent_client.SSHAgentClient() |
|
| 419 |
+ client = ssh_agent.SSHAgentClient() |
|
| 430 | 420 |
monkeypatch.setattr(client, 'request', lambda a, b: response) # noqa: ARG005 |
| 431 |
- KeyCommentPair = ssh_agent_client.types.KeyCommentPair # noqa: N806 |
|
| 421 |
+ KeyCommentPair = ssh_agent.types.KeyCommentPair # noqa: N806 |
|
| 432 | 422 |
loaded_keys = [ |
| 433 | 423 |
KeyCommentPair(v['public_key_data'], b'no comment') |
| 434 | 424 |
for v in tests.SUPPORTED_KEYS.values() |