62317bc2b46db2cd7ea7686f742975f41bc1d507
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

1) # SPDX-FileCopyrightText: 2024 Marco Ricci <m@the13thletter.info>
2) #
3) # SPDX-License-Identifier: MIT
4) 
5) """A bare-bones SSH agent client supporting signing and key listing."""
6) 
7) from __future__ import annotations
8) 
9) import collections
10) import errno
11) import os
12) import socket
13) 
Marco Ricci Support Python 3.10 and PyP...

Marco Ricci authored 2 months ago

14) from collections.abc import Sequence
15) from typing_extensions import Any, Self
16) 
Marco Ricci Fix Google code style viola...

Marco Ricci authored 2 months ago

17) from ssh_agent_client import types
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

18) 
19) __all__ = ('SSHAgentClient',)
Marco Ricci Restore __version__ attribu...

Marco Ricci authored 2 months ago

20) __author__ = "Marco Ricci <m@the13thletter.info>"
Marco Ricci Release 0.1.1

Marco Ricci authored 2 months ago

21) __version__ = "0.1.1"
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

22) 
23) _socket = socket
24) 
Marco Ricci Introduce TrailingDataError...

Marco Ricci authored 2 months ago

25) class TrailingDataError(RuntimeError):
26)     """The result contained trailing data."""
27) 
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

28) class SSHAgentClient:
29)     """A bare-bones SSH agent client supporting signing and key listing.
30) 
31)     The main use case is requesting the agent sign some data, after
32)     checking that the necessary key is already loaded.
33) 
34)     The main fleshed out methods are `list_keys` and `sign`, which
35)     implement the `REQUEST_IDENTITIES` and `SIGN_REQUEST` requests.  If
36)     you *really* wanted to, there is enough infrastructure in place to
37)     issue other requests as defined in the protocol---it's merely the
38)     wrapper functions and the protocol numbers table that are missing.
39) 
40)     """
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

41)     _connection: socket.socket
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

42)     def __init__(
43)         self, /, *, socket: socket.socket | None = None, timeout: int = 125
44)     ) -> None:
45)         """Initialize the client.
46) 
47)         Args:
48)             socket:
49)                 An optional socket, connected to the SSH agent.  If not
50)                 given, we query the `SSH_AUTH_SOCK` environment
51)                 variable to auto-discover the correct socket address.
52)             timeout:
53)                 A connection timeout for the SSH agent.  Only used if
54)                 the socket is not yet connected.  The default value
55)                 gives ample time for agent connections forwarded via
56)                 SSH on high-latency networks (e.g. Tor).
57) 
Marco Ricci Distinguish errors when con...

Marco Ricci authored 2 months ago

58)         Raises:
59)             KeyError:
60)                 The `SSH_AUTH_SOCK` environment was not found.
61)             OSError:
62)                 There was an error setting up a socket connection to the
63)                 agent.
64) 
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

65)         """
66)         if socket is not None:
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

67)             self._connection = socket
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

68)         else:
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

69)             self._connection = _socket.socket(family=_socket.AF_UNIX)
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

70)         try:
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

71)             # Test whether the socket is connected.
72)             self._connection.getpeername()
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

73)         except OSError as e:
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

74)             # This condition is hard to test purposefully, so exclude
75)             # from coverage.
76)             if e.errno != errno.ENOTCONN:  # pragma: no cover
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

77)                 raise
Marco Ricci Distinguish errors when con...

Marco Ricci authored 2 months ago

78)             if 'SSH_AUTH_SOCK' not in os.environ:
79)                 raise KeyError('SSH_AUTH_SOCK environment variable')
80)             ssh_auth_sock = os.environ['SSH_AUTH_SOCK']
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

81)             self._connection.settimeout(timeout)
Marco Ricci Distinguish errors when con...

Marco Ricci authored 2 months ago

82)             self._connection.connect(ssh_auth_sock)
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

83) 
84)     def __enter__(self) -> Self:
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

85)         """Close socket connection upon context manager completion."""
86)         self._connection.__enter__()
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

87)         return self
88) 
89)     def __exit__(
90)         self, exc_type: Any, exc_val: Any, exc_tb: Any
91)     ) -> bool:
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

92)         """Close socket connection upon context manager completion."""
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

93)         return bool(
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

94)             self._connection.__exit__(
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

95)                 exc_type, exc_val, exc_tb)  # type: ignore[func-returns-value]
96)         )
97) 
98)     @staticmethod
Marco Ricci Add unit tests, both new an...

Marco Ricci authored 3 months ago

99)     def uint32(num: int, /) -> bytes:
100)         r"""Format the number as a `uint32`, as per the agent protocol.
101) 
102)         Args:
103)             num: A number.
104) 
105)         Returns:
106)             The number in SSH agent wire protocol format, i.e. as
107)             a 32-bit big endian number.
108) 
109)         Raises:
110)             OverflowError:
111)                 As per [`int.to_bytes`][].
112) 
113)         Examples:
114)             >>> SSHAgentClient.uint32(16777216)
115)             b'\x01\x00\x00\x00'
116) 
117)         """
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

118)         return int.to_bytes(num, 4, 'big', signed=False)
119) 
120)     @classmethod
121)     def string(cls, payload: bytes | bytearray, /) -> bytes | bytearray:
Marco Ricci Add unit tests, both new an...

Marco Ricci authored 3 months ago

122)         r"""Format the payload as an SSH string, as per the agent protocol.
123) 
124)         Args:
125)             payload: A byte string.
126) 
127)         Returns:
128)             The payload, framed in the SSH agent wire protocol format.
129) 
130)         Examples:
131)             >>> bytes(SSHAgentClient.string(b'ssh-rsa'))
132)             b'\x00\x00\x00\x07ssh-rsa'
133) 
134)         """
Marco Ricci Fix numerous argument type...

Marco Ricci authored 3 months ago

135)         try:
136)             ret = bytearray()
137)             ret.extend(cls.uint32(len(payload)))
138)             ret.extend(payload)
139)             return ret
140)         except Exception as e:
141)             raise TypeError('invalid payload type') from e
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

142) 
143)     @classmethod
144)     def unstring(cls, bytestring: bytes | bytearray, /) -> bytes | bytearray:
Marco Ricci Add unit tests, both new an...

Marco Ricci authored 3 months ago

145)         r"""Unpack an SSH string.
146) 
147)         Args:
148)             bytestring: A framed byte string.
149) 
150)         Returns:
151)             The unframed byte string, i.e., the payload.
152) 
153)         Raises:
154)             ValueError:
Marco Ricci Add function for SSH framed...

Marco Ricci authored 2 months ago

155)                 The byte string is not an SSH string.
Marco Ricci Add unit tests, both new an...

Marco Ricci authored 3 months ago

156) 
157)         Examples:
158)             >>> bytes(SSHAgentClient.unstring(b'\x00\x00\x00\x07ssh-rsa'))
159)             b'ssh-rsa'
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 2 months ago

160)             >>> bytes(SSHAgentClient.unstring(
161)             ...     SSHAgentClient.string(b'ssh-ed25519')))
Marco Ricci Add unit tests, both new an...

Marco Ricci authored 3 months ago

162)             b'ssh-ed25519'
163) 
164)         """
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

165)         n = len(bytestring)
166)         if n < 4:
167)             raise ValueError('malformed SSH byte string')
168)         elif n != 4 + int.from_bytes(bytestring[:4], 'big', signed=False):
169)             raise ValueError('malformed SSH byte string')
170)         return bytestring[4:]
171) 
Marco Ricci Add function for SSH framed...

Marco Ricci authored 2 months ago

172)     @classmethod
173)     def unstring_prefix(
174)         cls, bytestring: bytes | bytearray, /
175)     ) -> tuple[bytes | bytearray, bytes | bytearray]:
176)         r"""Unpack an SSH string at the beginning of the byte string.
177) 
178)         Args:
179)             bytestring:
180)                 A (general) byte string, beginning with a framed/SSH
181)                 byte string.
182) 
183)         Returns:
184)             A 2-tuple `(a, b)`, where `a` is the unframed byte
185)             string/payload at the beginning of input byte string, and
186)             `b` is the remainder of the input byte string.
187) 
188)         Raises:
189)             ValueError:
190)                 The byte string does not begin with an SSH string.
191) 
192)         Examples:
193)             >>> a, b = SSHAgentClient.unstring_prefix(
194)             ...     b'\x00\x00\x00\x07ssh-rsa____trailing data')
195)             >>> (bytes(a), bytes(b))
196)             (b'ssh-rsa', b'____trailing data')
197)             >>> a, b = SSHAgentClient.unstring_prefix(
198)             ...     SSHAgentClient.string(b'ssh-ed25519'))
199)             >>> (bytes(a), bytes(b))
200)             (b'ssh-ed25519', b'')
201) 
202)         """
203)         n = len(bytestring)
204)         if n < 4:
205)             raise ValueError('malformed SSH byte string')
206)         m = int.from_bytes(bytestring[:4], 'big', signed=False)
207)         if m + 4 > n:
208)             raise ValueError('malformed SSH byte string')
209)         return (bytestring[4:m + 4], bytestring[m + 4:])
210) 
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

211)     def request(
212)         self, code: int, payload: bytes | bytearray, /
213)     ) -> tuple[int, bytes | bytearray]:
214)         """Issue a generic request to the SSH agent.
215) 
216)         Args:
217)             code:
218)                 The request code.  See the SSH agent protocol for
219)                 protocol numbers to use here (and which protocol numbers
220)                 to expect in a response).
221)             payload:
222)                 A byte string containing the payload, or "contents", of
223)                 the request.  Request-specific.  `request` will add any
224)                 necessary wire framing around the request code and the
225)                 payload.
226) 
227)         Returns:
228)             A 2-tuple consisting of the response code and the payload,
229)             with all wire framing removed.
230) 
231)         Raises:
232)             EOFError:
233)                 The response from the SSH agent is truncated or missing.
234) 
235)         """
236)         request_message = bytearray([code])
237)         request_message.extend(payload)
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

238)         self._connection.sendall(self.string(request_message))
239)         chunk = self._connection.recv(4)
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

240)         if len(chunk) < 4:
241)             raise EOFError('cannot read response length')
242)         response_length = int.from_bytes(chunk, 'big', signed=False)
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

243)         response = self._connection.recv(response_length)
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

244)         if len(response) < response_length:
245)             raise EOFError('truncated response from SSH agent')
246)         return response[0], response[1:]
247) 
Marco Ricci Fix Google code style viola...

Marco Ricci authored 2 months ago

248)     def list_keys(self) -> Sequence[types.KeyCommentPair]:
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

249)         """Request a list of keys known to the SSH agent.
250) 
251)         Returns:
252)             A read-only sequence of key/comment pairs.
253) 
254)         Raises:
255)             EOFError:
256)                 The response from the SSH agent is truncated or missing.
Marco Ricci Introduce TrailingDataError...

Marco Ricci authored 2 months ago

257)             TrailingDataError:
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

258)                 The response from the SSH agent is too long.
Marco Ricci Introduce TrailingDataError...

Marco Ricci authored 2 months ago

259)             RuntimeError:
260)                 The agent failed to complete the request.
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

261) 
262)         """
263)         response_code, response = self.request(
Marco Ricci Fix Google code style viola...

Marco Ricci authored 2 months ago

264)             types.SSH_AGENTC.REQUEST_IDENTITIES.value, b'')
265)         if response_code != types.SSH_AGENT.IDENTITIES_ANSWER.value:
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

266)             raise RuntimeError(
267)                 f'error return from SSH agent: '
268)                 f'{response_code = }, {response = }'
269)             )
270)         response_stream = collections.deque(response)
271)         def shift(num: int) -> bytes:
272)             buf = collections.deque(bytes())
273)             for i in range(num):
274)                 try:
275)                     val = response_stream.popleft()
276)                 except IndexError:
277)                     response_stream.extendleft(reversed(buf))
278)                     raise EOFError(
279)                         'truncated response from SSH agent'
280)                     ) from None
281)                 buf.append(val)
282)             return bytes(buf)
283)         key_count = int.from_bytes(shift(4), 'big')
Marco Ricci Fix Google code style viola...

Marco Ricci authored 2 months ago

284)         keys: collections.deque[types.KeyCommentPair]
285)         keys = collections.deque()
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

286)         for i in range(key_count):
287)             key_size = int.from_bytes(shift(4), 'big')
288)             key = shift(key_size)
289)             comment_size = int.from_bytes(shift(4), 'big')
290)             comment = shift(comment_size)
291)             # Both `key` and `comment` are not wrapped as SSH strings.
Marco Ricci Fix Google code style viola...

Marco Ricci authored 2 months ago

292)             keys.append(types.KeyCommentPair(key, comment))
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

293)         if response_stream:
Marco Ricci Introduce TrailingDataError...

Marco Ricci authored 2 months ago

294)             raise TrailingDataError('overlong response from SSH agent')
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

295)         return keys
296) 
297)     def sign(
298)         self, /, key: bytes | bytearray, payload: bytes | bytearray,
299)         *, flags: int = 0, check_if_key_loaded: bool = False,
300)     ) -> bytes | bytearray:
301)         """Request the SSH agent sign the payload with the key.
302) 
303)         Args:
304)             key:
305)                 The public SSH key to sign the payload with, in the same
306)                 format as returned by, e.g., the `list_keys` method.
307)                 The corresponding private key must have previously been
308)                 loaded into the agent to successfully issue a signature.
309)             payload:
310)                 A byte string of data to sign.
311)             flags:
312)                 Optional flags for the signing request.  Currently
313)                 passed on as-is to the agent.  In real-world usage, this
314)                 could be used, e.g., to request more modern hash
315)                 algorithms when signing with RSA keys.  (No such
316)                 real-world usage is currently implemented.)
317)             check_if_key_loaded:
318)                 If true, check beforehand (via `list_keys`) if the
319)                 corresponding key has been loaded into the agent.
320) 
321)         Returns:
322)             The binary signature of the payload under the given key.
323) 
324)         Raises:
325)             EOFError:
326)                 The response from the SSH agent is truncated or missing.
Marco Ricci Introduce TrailingDataError...

Marco Ricci authored 2 months ago

327)             TrailingDataError:
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

328)                 The response from the SSH agent is too long.
329)             RuntimeError:
330)                 The agent failed to complete the request.
Marco Ricci Raise KeyError when signing...

Marco Ricci authored 2 months ago

331)             KeyError:
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

332)                 `check_if_key_loaded` is true, and the `key` was not
333)                 loaded into the agent.
334) 
335)         """
336)         if check_if_key_loaded:
337)             loaded_keys = frozenset({pair.key for pair in self.list_keys()})
338)             if bytes(key) not in loaded_keys:
Marco Ricci Raise KeyError when signing...

Marco Ricci authored 2 months ago

339)                 raise KeyError('target SSH key not loaded into agent')
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

340)         request_data = bytearray(self.string(key))
341)         request_data.extend(self.string(payload))
342)         request_data.extend(self.uint32(flags))
343)         response_code, response = self.request(
Marco Ricci Fix Google code style viola...

Marco Ricci authored 2 months ago

344)             types.SSH_AGENTC.SIGN_REQUEST.value, request_data)
345)         if response_code != types.SSH_AGENT.SIGN_RESPONSE.value: