Add prototype implementation
Marco Ricci

Marco Ricci commited on 2024-05-19 00:24:09
Zeige 15 geänderte Dateien mit 930 Einfügungen und 1 Löschungen.


Plumbing only; there is no user interface yet.  The code type-checks,
but has been only superficially tested for correctness in the Python
interactive prompt.  API documentation is written, but probably also
of "first draft" quality, and other kinds of documentation (usage,
design) are missing completely.
... ...
@@ -5,6 +5,9 @@ div.doc-contents:not(.first) {
5 5
 }
6 6
 
7 7
 /* Mark external links as such. */
8
+div.doc-contents a[href^="https://"]::after,
9
+div.doc-contents a[href^="http://"]::after,
10
+div.doc-contents a[href^="//"]::after,
8 11
 a.external::after,
9 12
 a.autorefs-external::after {
10 13
   /* https://primer.style/octicons/arrow-up-right-24 */
... ...
@@ -1 +1,13 @@
1 1
 # API reference
2
+
3
+Top-level modules:
4
+
5
+Module                  | Description
6
+----------------------- | --------------------------------------------------------------------
7
+[`derivepassphrase`][]  | Work-alike for vault(1) - deterministic, stateless password manager.
8
+[`sequin`][]            | Python port of Sequin, a pseudorandom number generator.
9
+[`ssh_agent_client`][]  | A bare-bones SSH agent client supporting signing and key listing.
10
+
11
+  [derivepassphrase]: reference/derivepassphrase.md
12
+  [sequin]: reference/sequin.md
13
+  [ssh_agent_client]: reference/ssh_agent_client.md
... ...
@@ -0,0 +1,4 @@
1
+::: derivepassphrase
2
+    heading_level: 1
3
+    options:
4
+      show_submodules: true
... ...
@@ -0,0 +1,2 @@
1
+::: sequin
2
+    heading_level: 1
... ...
@@ -0,0 +1,4 @@
1
+::: ssh_agent_client
2
+    heading_level: 1
3
+    options:
4
+      show_submodules: true
... ...
@@ -56,6 +56,8 @@ plugins:
56 56
             docstring_options:
57 57
               ignore_init_summary: true
58 58
               returns_multiple_items: false
59
+            merge_init_into_class: true
60
+            show_source: false
59 61
             heading_level: 2
60 62
             show_object_full_path: false
61 63
             show_root_members_full_path: false
... ...
@@ -75,7 +77,11 @@ nav:
75 77
   - derivepassphrase documentation: index.md
76 78
   #- tutorials.md
77 79
   #- How-Tos: how-tos.md
78
-  - Reference: reference.md
80
+  - Reference:
81
+    - API overview: reference.md
82
+    - Module derivepassphrase: reference/derivepassphrase.md
83
+    - Module sequin: reference/sequin.md
84
+    - Module ssh_agent_client: reference/ssh_agent_client.md
79 85
   #- explanation.md
80 86
 
81 87
 markdown_extensions:
... ...
@@ -1,3 +1,305 @@
1 1
 # SPDX-FileCopyrightText: 2024 Marco Ricci <m@the13thletter.info>
2 2
 #
3 3
 # SPDX-License-Identifier: MIT
4
+
5
+"""Work-alike of vault(1) – a deterministic, stateless password manager
6
+
7
+"""
8
+
9
+from __future__ import annotations
10
+
11
+import collections
12
+import hashlib
13
+import math
14
+import warnings
15
+
16
+from typing import assert_type, reveal_type
17
+
18
+import sequin
19
+import ssh_agent_client
20
+
21
+class Vault:
22
+    """A work-alike of James Coglan's vault.
23
+
24
+    Store settings for generating (actually: deriving) passphrases for
25
+    named services, with various constraints, given only a master
26
+    passphrase.  Also, actually generate the passphrase.  The derivation
27
+    is deterministic and non-secret; only the master passphrase need be
28
+    kept secret.  The implementation is compatible with [vault][].
29
+
30
+    [James Coglan explains the passphrase derivation algorithm in great
31
+    detail][ALGORITHM] in his blog post on said topic: A principally
32
+    infinite bit stream is obtained by running a key-derivation function
33
+    on the master passphrase and the service name, then this bit stream
34
+    is fed into a [sequin][] to generate random numbers in the correct
35
+    range, and finally these random numbers select passphrase characters
36
+    until the desired length is reached.
37
+
38
+    [vault]: https://getvau.lt
39
+    [ALGORITHM]: https://blog.jcoglan.com/2012/07/16/designing-vaults-generator-algorithm/
40
+
41
+    """
42
+    _UUID = b'e87eb0f4-34cb-46b9-93ad-766c5ab063e7'
43
+    """A tag used by vault in the bit stream generation."""
44
+    _CHARSETS: collections.OrderedDict[str, bytes]
45
+    """
46
+        Known character sets from which to draw passphrase characters.
47
+        Relies on a certain, fixed order for their definition and their
48
+        contents.
49
+
50
+    """
51
+    _CHARSETS = collections.OrderedDict([
52
+        ('lower', b'abcdefghijklmnopqrstuvwxyz'),
53
+        ('upper', b'ABCDEFGHIJKLMNOPQRSTUVWXYZ'),
54
+        ('alpha', b''),  # Placeholder.
55
+        ('number', b'0123456789'),
56
+        ('alphanum', b''),  # Placeholder.
57
+        ('space', b' '),
58
+        ('dash', b'-_'),
59
+        ('symbol', b'!"#$%&\'()*+,./:;<=>?@[\\]^{|}~-_'),
60
+        ('all', b''),  # Placeholder.
61
+    ])
62
+    _CHARSETS['alpha'] = _CHARSETS['lower'] + _CHARSETS['upper']
63
+    _CHARSETS['alphanum'] = _CHARSETS['alpha'] + _CHARSETS['number']
64
+    _CHARSETS['all'] = (_CHARSETS['alphanum'] + _CHARSETS['space']
65
+                        + _CHARSETS['symbol'])
66
+
67
+    def __init__(
68
+        self, *, phrase: bytes | bytearray = b'', length: int = 20,
69
+        repeat: int = 0, lower: int | None = None,
70
+        upper: int | None = None, number: int | None = None,
71
+        space: int | None = None, dash: int | None = None,
72
+        symbol: int | None = None,
73
+    ) -> None:
74
+        """Initialize the Vault object.
75
+
76
+        Args:
77
+            phrase:
78
+                The master passphrase from which to derive the service
79
+                passphrases.
80
+            length:
81
+                Desired passphrase length.
82
+            repeat:
83
+                The maximum number of immediate character repetitions
84
+                allowed in the passphrase.  Disabled if set to 0.
85
+            lower:
86
+                Optional constraint on lowercase characters.  If
87
+                positive, include this many lowercase characters
88
+                somewhere in the passphrase.  If 0, avoid lowercase
89
+                characters altogether.
90
+            upper:
91
+                Same as `lower`, but for uppercase characters.
92
+            number:
93
+                Same as `lower`, but for ASCII digits.
94
+            space:
95
+                Same as `lower`, but for the space character.
96
+            dash:
97
+                Same as `lower`, but for the hyphen-minus and underscore
98
+                characters.
99
+            symbol:
100
+                Same as `lower`, but for all other hitherto unlisted
101
+                ASCII printable characters (except backquote).
102
+
103
+        """
104
+        self._phrase = bytes(phrase)
105
+        self._length = length
106
+        self._repeat = repeat
107
+        self._allowed = bytearray(self._CHARSETS['all'])
108
+        self._required: list[bytes] = []
109
+        def subtract_or_require(
110
+            count: int | None, characters: bytes | bytearray
111
+        ) -> None:
112
+            if not isinstance(count, int):
113
+                return
114
+            elif count <= 0:
115
+                self._allowed = self._subtract(characters, self._allowed)
116
+            else:
117
+                for _ in range(count):
118
+                    self._required.append(characters)
119
+        subtract_or_require(lower, self._CHARSETS['lower'])
120
+        subtract_or_require(upper, self._CHARSETS['upper'])
121
+        subtract_or_require(number, self._CHARSETS['number'])
122
+        subtract_or_require(space, self._CHARSETS['space'])
123
+        subtract_or_require(dash, self._CHARSETS['dash'])
124
+        subtract_or_require(symbol, self._CHARSETS['symbol'])
125
+        if len(self._required) < self._length:
126
+            raise ValueError('requested passphrase length too short')
127
+        if not self._allowed:
128
+            raise ValueError('no allowed characters left')
129
+        for _ in range(len(self._required), self._length):
130
+            self._required.append(bytes(self._allowed))
131
+
132
+    def _entropy_upper_bound(self) -> int:
133
+        """Estimate the passphrase entropy, given the current settings.
134
+
135
+        The entropy is the base 2 logarithm of the amount of
136
+        possibilities.  We operate directly on the logarithms, and round
137
+        each summand up, overestimating the true entropy.
138
+
139
+        """
140
+        factors: list[int] = []
141
+        for i, charset in enumerate(self._required):
142
+            factors.append(i + 1)
143
+            factors.append(len(charset))
144
+        return sum(int(math.ceil(math.log2(f))) for f in factors)
145
+
146
+    @classmethod
147
+    def create_hash(
148
+        cls, key: bytes | bytearray, message: bytes | bytearray, *,
149
+        length: int = 32,
150
+    ) -> bytes:
151
+        """Create a pseudorandom byte stream from key and message.
152
+
153
+        Args:
154
+            key:
155
+                A cryptographic key.  Typically a master passphrase, or
156
+                an SSH signature.
157
+            message:
158
+                A message.  Typically a vault service name.
159
+            length:
160
+                The length of the byte stream to generate.
161
+
162
+        Returns:
163
+            A pseudorandom byte string of length `length`.
164
+
165
+        Note:
166
+            Shorter values returned from this method (with the same key
167
+            and message) are prefixes of longer values returned from
168
+            this method.  (This property is inherited from the
169
+            underlying PBKDF2 function.)  It is thus safe (if slow) to
170
+            call this method with the same input with ever-increasing
171
+            target lengths.
172
+
173
+        """
174
+        return hashlib.pbkdf2_hmac(hash_name='sha1', password=key,
175
+                                   salt=message, iterations=8, dklen=length)
176
+
177
+    def generate(
178
+        self, service_name: str, /, *, phrase: bytes | bytearray = b'',
179
+    ) -> bytes | bytearray:
180
+        """Generate a service passphrase.
181
+
182
+        Args:
183
+            service_name:
184
+                The service name.
185
+            phrase:
186
+                If given, override the passphrase given during
187
+                construction.
188
+
189
+        """
190
+        entropy_bound = self._entropy_upper_bound()
191
+        # Use a safety factor, because a sequin will potentially throw
192
+        # bits away and we cannot rely on having generated a hash of
193
+        # exactly the right length.
194
+        safety_factor = 2
195
+        hash_length = int(math.ceil(safety_factor * entropy_bound / 8))
196
+        if not phrase:
197
+            phrase = self._phrase
198
+        # Repeat the passphrase generation with ever-increasing hash
199
+        # lengths, until the passphrase can be formed without exhausting
200
+        # the sequin.  See the guarantee in the create_hash method for
201
+        # why this works.
202
+        while True:
203
+            try:
204
+                required = self._required[:]
205
+                seq = sequin.Sequin(self.create_hash(
206
+                    key=phrase,
207
+                    message=(service_name.encode('utf-8') + self._UUID),
208
+                    length=hash_length))
209
+                result = bytearray()
210
+                while len(result) < self._length:
211
+                    pos = seq.generate(len(required))
212
+                    charset = required.pop(pos)
213
+                    # Determine if an unlucky choice right now might
214
+                    # violate the restriction on repeated characters.
215
+                    # That is, check if the current partial passphrase
216
+                    # ends with r - 1 copies of the same character
217
+                    # (where r is the repeat limit that must not be
218
+                    # reached), and if so, remove this same character
219
+                    # from the current character's allowed set.
220
+                    previous = result[-1] if result else None
221
+                    i = self._repeat - 1
222
+                    same = (i >= 0) if previous is not None else False
223
+                    while same and i > 0:
224
+                        i -= 1
225
+                        if same:
226
+                            other_pos = -(self._repeat - i)
227
+                            same = (result[other_pos] == previous)
228
+                    if same:
229
+                        assert previous is not None  # for the type checker
230
+                        charset = self._subtract(bytes([previous]), charset)
231
+                    # End checking for repeated characters.
232
+                    index = seq.generate(len(charset))
233
+                    result.extend(charset[index:index+1])
234
+            except sequin.SequinExhaustedException:
235
+                hash_length *= 2
236
+            else:
237
+                return result
238
+
239
+    @classmethod
240
+    def phrase_from_signature(
241
+        cls, key: bytes | bytearray, /
242
+    ) -> bytes | bytearray:
243
+        """Obtain the master passphrase from a configured SSH key.
244
+
245
+        vault allows the usage of certain SSH keys to derive a master
246
+        passphrase, by signing the vault UUID with the SSH key.  The key
247
+        type must ensure that signatures are deterministic.
248
+
249
+        Args:
250
+            key: The (public) SSH key to use for signing.
251
+
252
+        Returns:
253
+            The signature of the vault UUID under this key.
254
+
255
+        Raises:
256
+            ValueError:
257
+                The SSH key is principally unsuitable for this use case.
258
+                Usually this means that the signature is not
259
+                deterministic.
260
+
261
+        """
262
+        deterministic_signature_types = {
263
+            'ssh-ed25519':
264
+                lambda k: k.startswith(b'\x00\x00\x00\x0bssh-ed25519'),
265
+            'ssh-rsa':
266
+                lambda k: k.startswith(b'\x00\x00\x00\x07ssh-rsa'),
267
+        }
268
+        if not any(v(key) for v in deterministic_signature_types.values()):
269
+            raise ValueError(
270
+                'unsuitable SSH key: bad key, or signature not deterministic')
271
+        with ssh_agent_client.SSHAgentClient() as client:
272
+            ret = client.sign(key, cls._UUID)
273
+        return ret
274
+
275
+    def _subtract(
276
+        self, charset: bytes | bytearray, allowed: bytes | bytearray,
277
+    ) -> bytearray:
278
+        """Remove the characters in charset from allowed.
279
+
280
+        This preserves the relative order of characters in `allowed`.
281
+
282
+        Args:
283
+            charset: Characters to remove.
284
+            allowed: Character set to remove the other characters from.
285
+
286
+        Returns:
287
+            The pruned character set.
288
+
289
+        Raises:
290
+            ValueError: `charset` contained duplicate characters.
291
+
292
+        """
293
+        allowed = (allowed if isinstance(allowed, bytearray)
294
+                   else bytearray(allowed))
295
+        assert_type(allowed, bytearray)
296
+        if len(frozenset(charset)) != len(charset):
297
+            raise ValueError('duplicate characters in set')
298
+        for c in charset:
299
+            try:
300
+                pos = allowed.index(c)
301
+            except LookupError:
302
+                pass
303
+            else:
304
+                allowed[pos:pos+1] = []
305
+        return allowed
... ...
@@ -0,0 +1,5 @@
1
+# SPDX-FileCopyrightText: 2024 Marco Ricci <m@the13thletter.info>
2
+#
3
+# SPDX-License-Identifier: MIT
4
+__version__ = "0.0.1"
5
+__author__ = "Marco Ricci <m@the13thletter.info>"
... ...
@@ -0,0 +1,277 @@
1
+# SPDX-FileCopyrightText: 2024 Marco Ricci <m@the13thletter.info>
2
+#
3
+# SPDX-License-Identifier: MIT
4
+
5
+"""A Python reimplementation of James Coglan's "sequin" Node.js module.
6
+
7
+James Coglan's "sequin" Node.js module provides a pseudorandom number
8
+generator (using rejection sampling on a stream of input numbers) that
9
+attempts to minimize the amount of information it throws away:
10
+(non-degenerate) rejected samples are fed into a stream of higher-order
11
+numbers from which the next random number generation request will be
12
+served.  The sequin module is used in Coglan's "vault" module (a
13
+deterministic, stateless password manager that recomputes passwords
14
+instead of storing them), and this reimplementation is used for
15
+a similar purpose.
16
+
17
+The main API is the [`Sequin`] [sequin.Sequin] class, which is
18
+thoroughly documented.
19
+
20
+"""
21
+
22
+from __future__ import annotations
23
+
24
+import collections
25
+import math
26
+
27
+from collections.abc import Sequence, MutableSequence
28
+from typing import assert_type, Literal, TypeAlias
29
+
30
+__all__ = ('Sequin', 'SequinExhaustedException')
31
+
32
+class Sequin:
33
+    """Generate pseudorandom non-negative numbers in different ranges.
34
+
35
+    Given a (presumed high-quality) uniformly random sequence of input
36
+    bits, generate pseudorandom non-negative integers in a certain range
37
+    on each call of the `generate` method.  (It is permissible to
38
+    specify a different range per call to `generate`; this is the main
39
+    use case.)  We use a modified version of rejection sampling, where
40
+    rejected values are stored in "rejection queues" if possible, and
41
+    these rejection queues re-seed the next round of rejection sampling.
42
+
43
+    This is a Python reimplementation of James Coglan's [Node.js sequin
44
+    module][JS_SEQUIN], as introduced in [his blog post][BLOG_POST].  It
45
+    uses a [technique by Christian Lawson-Perfect][SEQUIN_TECHNIQUE].
46
+    I do not know why the original module is called "sequin"; I presume
47
+    it to be a pun on "sequence".
48
+
49
+    [JS_SEQUIN]: https://www.npmjs.com/package/sequin
50
+    [BLOG_POST]: https://blog.jcoglan.com/2012/07/16/designing-vaults-generator-algorithm/
51
+    [SEQUIN_TECHNIQUE]: https://checkmyworking.com/2012/06/converting-a-stream-of-binary-digits-to-a-stream-of-base-n-digits/
52
+
53
+    """
54
+    def __init__(
55
+        self,
56
+        sequence: str | bytes | bytearray | Sequence[int],
57
+    ):
58
+        """Initialize the Sequin.
59
+
60
+        Args:
61
+            sequence:
62
+                A sequence of bits, or things convertible to bits, to
63
+                seed the pseudorandom number generator.  Byte and text
64
+                strings are converted to 8-bit integer sequences.
65
+                (Conversion will fail if the text string contains
66
+                non-ISO-8859-1 characters.)  The numbers are then
67
+                converted to bits.
68
+
69
+        """
70
+        def uint8_to_bits(value):
71
+            """Yield individual bits of an 8-bit number, MSB first."""
72
+            for i in (0x80, 0x40, 0x20, 0x10, 0x08, 0x04, 0x02, 0x01):
73
+                yield 1 if value | i == value else 0
74
+        if isinstance(sequence, str):
75
+            sequence = tuple(sequence.encode('iso-8859-1'))
76
+        else:
77
+            sequence = tuple(sequence)
78
+        assert_type(sequence, tuple[int, ...])
79
+        self.bases: dict[int, MutableSequence[int]] = {}
80
+        gen = (bit for num in sequence for bit in uint8_to_bits(num))
81
+        self.bases[2] = collections.deque(gen)
82
+
83
+    def _all_or_nothing_shift(
84
+        self, count: int, /, *, base: int = 2
85
+    ) -> Sequence[int]:
86
+        """Shift and return items if and only if there are enough.
87
+
88
+        Args:
89
+            count: Number of items to shift/consume.
90
+            base: Use the base `base` sequence.
91
+
92
+        Returns:
93
+            If there are sufficient items in the sequence left, then
94
+            consume them from the sequence and return them.  Otherwise,
95
+            consume nothing, and return nothing.
96
+
97
+        Notes:
98
+            We currently remove now-empty sequences from the registry of
99
+            sequences.
100
+
101
+        Examples:
102
+            >>> seq = Sequin([1, 0, 1, 0, 0, 1, 0, 0, 0, 1])
103
+            >>> seq.bases
104
+            {2: deque([1, 0, 1, 0, 0, 1, 0, 0, 0, 1])}
105
+            >>> seq._all_or_nothing_shift(3)
106
+            (1, 0, 1)
107
+            >>> seq._all_or_nothing_shift(3)
108
+            (0, 0, 1)
109
+            >>> seq.bases[2]
110
+            deque([0, 0, 0, 1])
111
+            >>> seq._all_or_nothing_shift(5)
112
+            ()
113
+            >>> seq.bases[2]
114
+            deque([0, 0, 0, 1])
115
+            >>> seq._all_or_nothing_shift(4)
116
+            (0, 0, 0, 1)
117
+            >>> 2 in seq.bases  # now-empty sequences are removed
118
+            False
119
+
120
+        """
121
+        try:
122
+            seq = self.bases[base]
123
+        except KeyError:
124
+            return ()
125
+        else:
126
+            chunk = tuple(seq[:count])
127
+            if len(chunk) == count:
128
+                del seq[:count]
129
+                # Clean up queues.
130
+                if not seq:
131
+                    del self.bases[base]
132
+                return chunk
133
+            return ()
134
+
135
+    @staticmethod
136
+    def _big_endian_number(
137
+        digits: Sequence[int], /, *, base: int = 2
138
+    ) -> int:
139
+        """Evaluate the given integer sequence as a big endian number.
140
+
141
+        Args:
142
+            digits: A sequence of integers to evaluate.
143
+            base: The number base to evaluate those integers in.
144
+
145
+        Raises:
146
+            ValueError: `base` is an invalid base.
147
+            ValueError: Not all integers are valid base `base` digits.
148
+
149
+        Examples:
150
+            >>> Sequin._big_endian_number([1, 2, 3, 4, 5, 6, 7, 8], base=10)
151
+            12345678
152
+            >>> Sequin._big_endian_number([0, 0, 0, 0, 1, 4, 9, 7], base=10)
153
+            1497
154
+            >>> Sequin._big_endian_number([1, 0, 0, 1, 0, 0, 0, 0], base=2)
155
+            144
156
+            >>> Sequin._big_endian_number([1, 7, 5, 5], base=8) == 0o1755
157
+            True
158
+            >>> Sequin._big_endian_number([-1], base=3)  # doctest: +ELLIPSIS
159
+            Traceback (most recent call last):
160
+                ...
161
+            ValueError: ...
162
+            >>> Sequin._big_endian_number([0], base=1)  # doctest: +ELLIPSIS
163
+            Traceback (most recent call last):
164
+                ...
165
+            ValueError: ...
166
+
167
+        """
168
+        if base < 2:
169
+            raise ValueError(f'invalid base: {base!r}')
170
+        ret = 0
171
+        allowed_range = range(base)
172
+        n = len(digits)
173
+        for i in range(n):
174
+            i2 = (n - 1) - i
175
+            x = digits[i]
176
+            if not isinstance(x, int):
177
+                raise TypeError(f'not an integer: {x!r}')
178
+            if x not in allowed_range:
179
+                raise ValueError(f'invalid base {base!r} digit: {x!r}')
180
+            ret += (base ** i2) * x
181
+        return ret
182
+
183
+    def generate(self, n: int, /) -> int:
184
+        """Generate a base `n` non-negative integer.
185
+
186
+        We attempt to generate a value using rejection sampling.  If the
187
+        generated sample is outside the desired range (i.e., is
188
+        rejected), then attempt to reuse the sample by seeding
189
+        a "higher-order" input sequence of uniformly random numbers (for
190
+        a different base).
191
+
192
+        Args:
193
+            n:
194
+                Generate numbers in the range 0, ..., `n` - 1.
195
+                (Inclusive.)
196
+
197
+        Returns:
198
+            A pseudorandom number in the range 0, ..., `n` - 1.
199
+
200
+        Raises:
201
+            SequinExhaustedException:
202
+                The sequin is exhausted.
203
+
204
+        """
205
+        value = self._generate_inner(n, base=2)
206
+        if value == n:
207
+            raise SequinExhaustedException('Sequin is exhausted')
208
+        return value
209
+
210
+    def _generate_inner(
211
+        self, n: int, /, *, base: int = 2
212
+    ) -> int:
213
+        """Recursive call to generate a base `n` non-negative integer.
214
+
215
+        We first determine the correct exponent `k` to generate base `n`
216
+        numbers from a stream of base `base` numbers, then attempt to
217
+        take `k` numbers from the base `base` sequence (or bail if not
218
+        possible).  If the resulting number `v` is out of range for
219
+        base `n`, then push `v - n` onto the rejection queue for
220
+        base `r` = `base` ** `k` - `n`, and attempt to generate the
221
+        requested base `n` integer from the sequence of base `r` numbers
222
+        next.  (This recursion is not attempted if `r` = 1.)  Otherwise,
223
+        return the number.
224
+
225
+        Args:
226
+            n:
227
+                Generate numbers in the range 0, ..., `n` - 1.
228
+                (Inclusive.)
229
+            base:
230
+                Use the base `base` sequence as a source for
231
+                pseudorandom numbers.
232
+
233
+        Returns:
234
+            A pseudorandom number in the range 0, ..., `n` - 1 if
235
+            possible, or `n` if the stream is exhausted.
236
+
237
+        """
238
+        # p = base ** k, where k is the smallest integer such that
239
+        # p >= n.  We determine p and k inductively.
240
+        p = 1
241
+        k = 0
242
+        while p < n:
243
+            p *= base
244
+            k += 1
245
+        # The remainder r of p and n is used as the base for rejection
246
+        # queue.
247
+        r = p - n
248
+        # The generated number v is initialized to n because of the
249
+        # while loop below.
250
+        v = n
251
+        while v > n - 1:
252
+            list_slice = self._all_or_nothing_shift(k, base=base)
253
+            if not list_slice:
254
+                return n
255
+            v = self._big_endian_number(list_slice, base=base)
256
+            if v > n - 1:
257
+                # If r is 0, then p == n, so v < n, or rather
258
+                # v <= n - 1.
259
+                assert r > 0
260
+                if r == 1:
261
+                    continue
262
+                self._stash(v - n, base=r)
263
+                v = self._generate_inner(n, base=r)
264
+        return v
265
+
266
+    def _stash(self, value: int, /, *, base: int = 2) -> None:
267
+        """Stash `value` on the base `base` sequence."""
268
+        if base not in self.bases:
269
+            self.bases[base] = collections.deque()
270
+        self.bases[base].append(value)
271
+
272
+class SequinExhaustedException(Exception):
273
+    """The sequin is exhausted.
274
+
275
+    No more values can be generated from this sequin.
276
+
277
+    """
... ...
@@ -0,0 +1,5 @@
1
+# SPDX-FileCopyrightText: 2024 Marco Ricci <m@the13thletter.info>
2
+#
3
+# SPDX-License-Identifier: MIT
4
+__version__ = "0.0.1"
5
+__author__ = "Marco Ricci <m@the13thletter.info>"
... ...
@@ -0,0 +1,259 @@
1
+# SPDX-FileCopyrightText: 2024 Marco Ricci <m@the13thletter.info>
2
+#
3
+# SPDX-License-Identifier: MIT
4
+
5
+"""A bare-bones SSH agent client supporting signing and key listing."""
6
+
7
+from __future__ import annotations
8
+
9
+import collections
10
+import enum
11
+import errno
12
+import os
13
+import pathlib
14
+import socket
15
+
16
+from collections.abc import Sequence, MutableSequence
17
+from typing import Any, NamedTuple, Self, TypeAlias
18
+from ssh_agent_client.types import KeyCommentPair, SSH_AGENT, SSH_AGENTC
19
+
20
+__all__ = ('SSHAgentClient',)
21
+
22
+_socket = socket
23
+
24
+class SSHAgentClient:
25
+    """A bare-bones SSH agent client supporting signing and key listing.
26
+
27
+    The main use case is requesting the agent sign some data, after
28
+    checking that the necessary key is already loaded.
29
+
30
+    The main fleshed out methods are `list_keys` and `sign`, which
31
+    implement the `REQUEST_IDENTITIES` and `SIGN_REQUEST` requests.  If
32
+    you *really* wanted to, there is enough infrastructure in place to
33
+    issue other requests as defined in the protocol---it's merely the
34
+    wrapper functions and the protocol numbers table that are missing.
35
+
36
+    Attributes:
37
+        connection:
38
+            The socket connected to the SSH agent.
39
+        ssh_auth_sock:
40
+            The UNIX domain socket the SSH agent is listening on.  Unset
41
+            if socket auto-discovery is not used.
42
+
43
+    """
44
+    connection: socket.socket
45
+    ssh_auth_sock: str | None
46
+    def __init__(
47
+        self, /, *, socket: socket.socket | None = None, timeout: int = 125
48
+    ) -> None:
49
+        """Initialize the client.
50
+
51
+        Args:
52
+            socket:
53
+                An optional socket, connected to the SSH agent.  If not
54
+                given, we query the `SSH_AUTH_SOCK` environment
55
+                variable to auto-discover the correct socket address.
56
+            timeout:
57
+                A connection timeout for the SSH agent.  Only used if
58
+                the socket is not yet connected.  The default value
59
+                gives ample time for agent connections forwarded via
60
+                SSH on high-latency networks (e.g. Tor).
61
+
62
+        """
63
+        self.ssh_auth_sock = None
64
+        if socket is not None:
65
+            self.connection = socket
66
+        else:
67
+            self.connection = _socket.socket(family=_socket.AF_UNIX)
68
+        try:
69
+            self.connection.getpeername()
70
+        except OSError as e:
71
+            if e.errno != errno.ENOTCONN:
72
+                raise
73
+            try:
74
+                self.ssh_auth_sock = os.environ['SSH_AUTH_SOCK']
75
+            except KeyError as e:
76
+                raise RuntimeError(
77
+                    "Can't find running ssh-agent: missing SSH_AUTH_SOCK"
78
+                ) from e
79
+            self.connection.settimeout(timeout)
80
+            try:
81
+                self.connection.connect(self.ssh_auth_sock)
82
+            except FileNotFoundError as e:
83
+                raise RuntimeError(
84
+                    "Can't find running ssh-agent: unusable SSH_AUTH_SOCK"
85
+                ) from e
86
+
87
+    def __enter__(self) -> Self:
88
+        """Context management: defer to `self.connection`."""
89
+        self.connection.__enter__()
90
+        return self
91
+
92
+    def __exit__(
93
+        self, exc_type: Any, exc_val: Any, exc_tb: Any
94
+    ) -> bool:
95
+        """Context management: defer to `self.connection`."""
96
+        return bool(
97
+            self.connection.__exit__(
98
+                exc_type, exc_val, exc_tb)  # type: ignore[func-returns-value]
99
+        )
100
+
101
+    @staticmethod
102
+    def uint32(num, /) -> bytes:
103
+        """Format the number as a `uint32`, as per the agent protocol."""
104
+        return int.to_bytes(num, 4, 'big', signed=False)
105
+
106
+    @classmethod
107
+    def string(cls, payload: bytes | bytearray, /) -> bytes | bytearray:
108
+        """Format the payload as an SSH string, as per the agent protocol."""
109
+        ret = bytearray()
110
+        ret.extend(cls.uint32(len(payload)))
111
+        ret.extend(payload)
112
+        return ret
113
+
114
+    @classmethod
115
+    def unstring(cls, bytestring: bytes | bytearray, /) -> bytes | bytearray:
116
+        """Unpack an SSH string."""
117
+        n = len(bytestring)
118
+        if n < 4:
119
+            raise ValueError('malformed SSH byte string')
120
+        elif n != 4 + int.from_bytes(bytestring[:4], 'big', signed=False):
121
+            raise ValueError('malformed SSH byte string')
122
+        return bytestring[4:]
123
+
124
+    def request(
125
+        self, code: int, payload: bytes | bytearray, /
126
+    ) -> tuple[int, bytes | bytearray]:
127
+        """Issue a generic request to the SSH agent.
128
+
129
+        Args:
130
+            code:
131
+                The request code.  See the SSH agent protocol for
132
+                protocol numbers to use here (and which protocol numbers
133
+                to expect in a response).
134
+            payload:
135
+                A byte string containing the payload, or "contents", of
136
+                the request.  Request-specific.  `request` will add any
137
+                necessary wire framing around the request code and the
138
+                payload.
139
+
140
+        Returns:
141
+            A 2-tuple consisting of the response code and the payload,
142
+            with all wire framing removed.
143
+
144
+        Raises:
145
+            EOFError:
146
+                The response from the SSH agent is truncated or missing.
147
+
148
+        """
149
+        request_message = bytearray([code])
150
+        request_message.extend(payload)
151
+        self.connection.sendall(self.string(request_message))
152
+        chunk = self.connection.recv(4)
153
+        if len(chunk) < 4:
154
+            raise EOFError('cannot read response length')
155
+        response_length = int.from_bytes(chunk, 'big', signed=False)
156
+        response = self.connection.recv(response_length)
157
+        if len(response) < response_length:
158
+            raise EOFError('truncated response from SSH agent')
159
+        return response[0], response[1:]
160
+
161
+    def list_keys(self) -> Sequence[KeyCommentPair]:
162
+        """Request a list of keys known to the SSH agent.
163
+
164
+        Returns:
165
+            A read-only sequence of key/comment pairs.
166
+
167
+        Raises:
168
+            EOFError:
169
+                The response from the SSH agent is truncated or missing.
170
+            RuntimeError:
171
+                The response from the SSH agent is too long.
172
+
173
+        """
174
+        response_code, response = self.request(
175
+            SSH_AGENTC.REQUEST_IDENTITIES.value, b'')
176
+        if response_code != SSH_AGENT.IDENTITIES_ANSWER.value:
177
+            raise RuntimeError(
178
+                f'error return from SSH agent: '
179
+                f'{response_code = }, {response = }'
180
+            )
181
+        response_stream = collections.deque(response)
182
+        def shift(num: int) -> bytes:
183
+            buf = collections.deque(bytes())
184
+            for i in range(num):
185
+                try:
186
+                    val = response_stream.popleft()
187
+                except IndexError:
188
+                    response_stream.extendleft(reversed(buf))
189
+                    raise EOFError(
190
+                        'truncated response from SSH agent'
191
+                    ) from None
192
+                buf.append(val)
193
+            return bytes(buf)
194
+        key_count = int.from_bytes(shift(4), 'big')
195
+        keys: collections.deque[KeyCommentPair] = collections.deque()
196
+        for i in range(key_count):
197
+            key_size = int.from_bytes(shift(4), 'big')
198
+            key = shift(key_size)
199
+            comment_size = int.from_bytes(shift(4), 'big')
200
+            comment = shift(comment_size)
201
+            # Both `key` and `comment` are not wrapped as SSH strings.
202
+            keys.append(KeyCommentPair(key, comment))
203
+        if response_stream:
204
+            raise RuntimeError('overlong response from SSH agent')
205
+        return keys
206
+
207
+    def sign(
208
+        self, /, key: bytes | bytearray, payload: bytes | bytearray,
209
+        *, flags: int = 0, check_if_key_loaded: bool = False,
210
+    ) -> bytes | bytearray:
211
+        """Request the SSH agent sign the payload with the key.
212
+
213
+        Args:
214
+            key:
215
+                The public SSH key to sign the payload with, in the same
216
+                format as returned by, e.g., the `list_keys` method.
217
+                The corresponding private key must have previously been
218
+                loaded into the agent to successfully issue a signature.
219
+            payload:
220
+                A byte string of data to sign.
221
+            flags:
222
+                Optional flags for the signing request.  Currently
223
+                passed on as-is to the agent.  In real-world usage, this
224
+                could be used, e.g., to request more modern hash
225
+                algorithms when signing with RSA keys.  (No such
226
+                real-world usage is currently implemented.)
227
+            check_if_key_loaded:
228
+                If true, check beforehand (via `list_keys`) if the
229
+                corresponding key has been loaded into the agent.
230
+
231
+        Returns:
232
+            The binary signature of the payload under the given key.
233
+
234
+        Raises:
235
+            EOFError:
236
+                The response from the SSH agent is truncated or missing.
237
+            RuntimeError:
238
+                The response from the SSH agent is too long.
239
+            RuntimeError:
240
+                The agent failed to complete the request.
241
+            RuntimeError:
242
+                `check_if_key_loaded` is true, and the `key` was not
243
+                loaded into the agent.
244
+
245
+        """
246
+        if check_if_key_loaded:
247
+            loaded_keys = frozenset({pair.key for pair in self.list_keys()})
248
+            if bytes(key) not in loaded_keys:
249
+                raise RuntimeError('target SSH key not loaded into agent')
250
+        request_data = bytearray(self.string(key))
251
+        request_data.extend(self.string(payload))
252
+        request_data.extend(self.uint32(flags))
253
+        response_code, response = self.request(
254
+            SSH_AGENTC.SIGN_REQUEST.value, request_data)
255
+        if response_code != SSH_AGENT.SIGN_RESPONSE.value:
256
+            raise RuntimeError(
257
+                f'signing data failed: {response_code = }, {response = }'
258
+            )
259
+        return self.unstring(response)
... ...
@@ -0,0 +1,50 @@
1
+# SPDX-FileCopyrightText: 2024 Marco Ricci <m@the13thletter.info>
2
+#
3
+# SPDX-License-Identifier: MIT
4
+
5
+"""Common typing declarations for the parent module."""
6
+
7
+from __future__ import annotations
8
+
9
+import enum
10
+
11
+from typing import NamedTuple
12
+
13
+__all__ = ('KeyCommentPair', 'SSH_AGENT', 'SSH_AGENTC')
14
+
15
+class KeyCommentPair(NamedTuple):
16
+    """SSH key plus comment pair.  For typing purposes.
17
+
18
+    Attributes:
19
+        key: SSH key.
20
+        comment: SSH key comment.
21
+
22
+    """
23
+    key: bytes | bytearray
24
+    comment: bytes | bytearray
25
+
26
+class SSH_AGENTC(enum.Enum):
27
+    """SSH agent protocol numbers: client requests.
28
+
29
+    Attributes:
30
+        REQUEST_IDENTITIES:
31
+            List identities.  Expecting `SSH_AGENT.IDENTITIES_ANSWER`.
32
+        SIGN_REQUEST:
33
+            Sign data.  Expecting `SSH_AGENT.SIGN_RESPONSE`.
34
+
35
+    """
36
+    REQUEST_IDENTITIES: int = 11
37
+    SIGN_REQUEST: int = 13
38
+
39
+class SSH_AGENT(enum.Enum):
40
+    """SSH agent protocol numbers: server replies.
41
+
42
+    Attributes:
43
+        IDENTITIES_ANSWER:
44
+            Successful answer to `SSH_AGENTC.REQUEST_IDENTITIES`.
45
+        SIGN_RESPONSE:
46
+            Successful answer to `SSH_AGENTC.SIGN_REQUEST`.
47
+
48
+    """
49
+    IDENTITIES_ANSWER: int = 12
50
+    SIGN_RESPONSE: int = 14
0 51