6eab62ca9f46c2caf2172c6558e26f7cef8f02d5
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

1) # SPDX-FileCopyrightText: 2024 Marco Ricci <m@the13thletter.info>
2) #
3) # SPDX-License-Identifier: MIT
4) 
5) """A bare-bones SSH agent client supporting signing and key listing."""
6) 
7) from __future__ import annotations
8) 
9) import collections
10) import enum
11) import errno
12) import os
13) import pathlib
14) import socket
15) 
16) from collections.abc import Sequence, MutableSequence
17) from typing import Any, NamedTuple, Self, TypeAlias
18) from ssh_agent_client.types import KeyCommentPair, SSH_AGENT, SSH_AGENTC
19) 
20) __all__ = ('SSHAgentClient',)
Marco Ricci Remove __about__.py files,...

Marco Ricci authored 3 months ago

21) __author__ = 'Marco Ricci <m@the13thletter.info>'
22) __version__ = "0.1.0"
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

23) 
24) _socket = socket
25) 
26) class SSHAgentClient:
27)     """A bare-bones SSH agent client supporting signing and key listing.
28) 
29)     The main use case is requesting the agent sign some data, after
30)     checking that the necessary key is already loaded.
31) 
32)     The main fleshed out methods are `list_keys` and `sign`, which
33)     implement the `REQUEST_IDENTITIES` and `SIGN_REQUEST` requests.  If
34)     you *really* wanted to, there is enough infrastructure in place to
35)     issue other requests as defined in the protocol---it's merely the
36)     wrapper functions and the protocol numbers table that are missing.
37) 
38)     Attributes:
39)         connection:
40)             The socket connected to the SSH agent.
41)         ssh_auth_sock:
42)             The UNIX domain socket the SSH agent is listening on.  Unset
43)             if socket auto-discovery is not used.
44) 
45)     """
46)     connection: socket.socket
47)     ssh_auth_sock: str | None
48)     def __init__(
49)         self, /, *, socket: socket.socket | None = None, timeout: int = 125
50)     ) -> None:
51)         """Initialize the client.
52) 
53)         Args:
54)             socket:
55)                 An optional socket, connected to the SSH agent.  If not
56)                 given, we query the `SSH_AUTH_SOCK` environment
57)                 variable to auto-discover the correct socket address.
58)             timeout:
59)                 A connection timeout for the SSH agent.  Only used if
60)                 the socket is not yet connected.  The default value
61)                 gives ample time for agent connections forwarded via
62)                 SSH on high-latency networks (e.g. Tor).
63) 
64)         """
65)         self.ssh_auth_sock = None
66)         if socket is not None:
67)             self.connection = socket
68)         else:
69)             self.connection = _socket.socket(family=_socket.AF_UNIX)
70)         try:
71)             self.connection.getpeername()
72)         except OSError as e:
73)             if e.errno != errno.ENOTCONN:
74)                 raise
75)             try:
76)                 self.ssh_auth_sock = os.environ['SSH_AUTH_SOCK']
77)             except KeyError as e:
78)                 raise RuntimeError(
79)                     "Can't find running ssh-agent: missing SSH_AUTH_SOCK"
80)                 ) from e
81)             self.connection.settimeout(timeout)
82)             try:
83)                 self.connection.connect(self.ssh_auth_sock)
84)             except FileNotFoundError as e:
85)                 raise RuntimeError(
86)                     "Can't find running ssh-agent: unusable SSH_AUTH_SOCK"
87)                 ) from e
88) 
89)     def __enter__(self) -> Self:
90)         """Context management: defer to `self.connection`."""
91)         self.connection.__enter__()
92)         return self
93) 
94)     def __exit__(
95)         self, exc_type: Any, exc_val: Any, exc_tb: Any
96)     ) -> bool:
97)         """Context management: defer to `self.connection`."""
98)         return bool(
99)             self.connection.__exit__(
100)                 exc_type, exc_val, exc_tb)  # type: ignore[func-returns-value]
101)         )
102) 
103)     @staticmethod
Marco Ricci Add unit tests, both new an...

Marco Ricci authored 3 months ago

104)     def uint32(num: int, /) -> bytes:
105)         r"""Format the number as a `uint32`, as per the agent protocol.
106) 
107)         Args:
108)             num: A number.
109) 
110)         Returns:
111)             The number in SSH agent wire protocol format, i.e. as
112)             a 32-bit big endian number.
113) 
114)         Raises:
115)             OverflowError:
116)                 As per [`int.to_bytes`][].
117) 
118)         Examples:
119)             >>> SSHAgentClient.uint32(16777216)
120)             b'\x01\x00\x00\x00'
121) 
122)         """
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

123)         return int.to_bytes(num, 4, 'big', signed=False)
124) 
125)     @classmethod
126)     def string(cls, payload: bytes | bytearray, /) -> bytes | bytearray:
Marco Ricci Add unit tests, both new an...

Marco Ricci authored 3 months ago

127)         r"""Format the payload as an SSH string, as per the agent protocol.
128) 
129)         Args:
130)             payload: A byte string.
131) 
132)         Returns:
133)             The payload, framed in the SSH agent wire protocol format.
134) 
135)         Examples:
136)             >>> bytes(SSHAgentClient.string(b'ssh-rsa'))
137)             b'\x00\x00\x00\x07ssh-rsa'
138) 
139)         """
Marco Ricci Fix numerous argument type...

Marco Ricci authored 3 months ago

140)         try:
141)             ret = bytearray()
142)             ret.extend(cls.uint32(len(payload)))
143)             ret.extend(payload)
144)             return ret
145)         except Exception as e:
146)             raise TypeError('invalid payload type') from e
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

147) 
148)     @classmethod
149)     def unstring(cls, bytestring: bytes | bytearray, /) -> bytes | bytearray:
Marco Ricci Add unit tests, both new an...

Marco Ricci authored 3 months ago

150)         r"""Unpack an SSH string.
151) 
152)         Args:
153)             bytestring: A framed byte string.
154) 
155)         Returns:
156)             The unframed byte string, i.e., the payload.
157) 
158)         Raises:
159)             ValueError:
160)                 The bytestring is not an SSH string.
161) 
162)         Examples:
163)             >>> bytes(SSHAgentClient.unstring(b'\x00\x00\x00\x07ssh-rsa'))
164)             b'ssh-rsa'
165)             >>> bytes(SSHAgentClient.unstring(SSHAgentClient.string(b'ssh-ed25519')))
166)             b'ssh-ed25519'
167) 
168)         """