# SPDX-FileCopyrightText: 2024 Marco Ricci # # SPDX-License-Identifier: MIT """A bare-bones SSH agent client supporting signing and key listing.""" from __future__ import annotations import collections import errno import os import socket from typing import TYPE_CHECKING, overload from typing_extensions import Self from derivepassphrase import _types if TYPE_CHECKING: from collections.abc import Iterable, Sequence from types import TracebackType __all__ = ('SSHAgentClient',) __author__ = 'Marco Ricci ' # In SSH bytestrings, the "length" of the byte string is stored as # a 4-byte/32-bit unsigned integer at the beginning. HEAD_LEN = 4 _socket = socket class TrailingDataError(RuntimeError): """The result contained trailing data.""" def __init__(self) -> None: super().__init__('Overlong response from SSH agent') class SSHAgentFailedError(RuntimeError): """The SSH agent failed to complete the requested operation.""" def __str__(self) -> str: match self.args: case (_types.SSH_AGENT.FAILURE.value, b''): # pragma: no branch return 'The SSH agent failed to complete the request' case (_, _msg) if _msg: # pragma: no cover code = self.args[0] msg = self.args[1].decode('utf-8', 'surrogateescape') return f'[Code {code:d}] {msg:s}' case _: # pragma: no cover return repr(self) def __repr__(self) -> str: # pragma: no cover return f'{self.__class__.__name__}{self.args!r}' class SSHAgentClient: """A bare-bones SSH agent client supporting signing and key listing. The main use case is requesting the agent sign some data, after checking that the necessary key is already loaded. The main fleshed out methods are `list_keys` and `sign`, which implement the `REQUEST_IDENTITIES` and `SIGN_REQUEST` requests. If you *really* wanted to, there is enough infrastructure in place to issue other requests as defined in the protocol---it's merely the wrapper functions and the protocol numbers table that are missing. """ _connection: socket.socket def __init__( self, /, *, socket: socket.socket | None = None, timeout: int = 125 ) -> None: """Initialize the client. Args: socket: An optional socket, connected to the SSH agent. If not given, we query the `SSH_AUTH_SOCK` environment variable to auto-discover the correct socket address. timeout: A connection timeout for the SSH agent. Only used if the socket is not yet connected. The default value gives ample time for agent connections forwarded via SSH on high-latency networks (e.g. Tor). Raises: KeyError: The `SSH_AUTH_SOCK` environment was not found. OSError: There was an error setting up a socket connection to the agent. """ if socket is not None: self._connection = socket else: self._connection = _socket.socket(family=_socket.AF_UNIX) try: # Test whether the socket is connected. self._connection.getpeername() except OSError as e: # This condition is hard to test purposefully, so exclude # from coverage. if e.errno != errno.ENOTCONN: # pragma: no cover raise if 'SSH_AUTH_SOCK' not in os.environ: msg = 'SSH_AUTH_SOCK environment variable' raise KeyError(msg) from None ssh_auth_sock = os.environ['SSH_AUTH_SOCK'] self._connection.settimeout(timeout) self._connection.connect(ssh_auth_sock) def __enter__(self) -> Self: """Close socket connection upon context manager completion. Returns: Self. """ self._connection.__enter__() return self def __exit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> bool: """Close socket connection upon context manager completion. Args: exc_type: An optional exception type. exc_val: An optional exception value. exc_tb: An optional exception traceback. Returns: True if the exception was handled, false if it should propagate. """ return bool( self._connection.__exit__(exc_type, exc_val, exc_tb) # type: ignore[func-returns-value] ) @staticmethod def uint32(num: int, /) -> bytes: r"""Format the number as a `uint32`, as per the agent protocol. Args: num: A number. Returns: The number in SSH agent wire protocol format, i.e. as a 32-bit big endian number. Raises: OverflowError: As per [`int.to_bytes`][]. Examples: >>> SSHAgentClient.uint32(16777216) b'\x01\x00\x00\x00' """ return int.to_bytes(num, 4, 'big', signed=False) @classmethod def string(cls, payload: bytes | bytearray, /) -> bytes | bytearray: r"""Format the payload as an SSH string, as per the agent protocol. Args: payload: A byte string. Returns: The payload, framed in the SSH agent wire protocol format. Examples: >>> bytes(SSHAgentClient.string(b'ssh-rsa')) b'\x00\x00\x00\x07ssh-rsa' """ try: ret = bytearray() ret.extend(cls.uint32(len(payload))) ret.extend(payload) except Exception as e: msg = 'invalid payload type' raise TypeError(msg) from e # noqa: DOC501 return ret @classmethod def unstring(cls, bytestring: bytes | bytearray, /) -> bytes | bytearray: r"""Unpack an SSH string. Args: bytestring: A framed byte string. Returns: The unframed byte string, i.e., the payload. Raises: ValueError: The byte string is not an SSH string. Examples: >>> bytes(SSHAgentClient.unstring(b'\x00\x00\x00\x07ssh-rsa')) b'ssh-rsa' >>> bytes( ... SSHAgentClient.unstring(SSHAgentClient.string(b'ssh-ed25519')) ... ) b'ssh-ed25519' """ # noqa: E501 n = len(bytestring) msg = 'malformed SSH byte string' if n < HEAD_LEN or n != HEAD_LEN + int.from_bytes( bytestring[:HEAD_LEN], 'big', signed=False ): raise ValueError(msg) return bytestring[HEAD_LEN:] @classmethod def unstring_prefix( cls, bytestring: bytes | bytearray, / ) -> tuple[bytes | bytearray, bytes | bytearray]: r"""Unpack an SSH string at the beginning of the byte string. Args: bytestring: A (general) byte string, beginning with a framed/SSH byte string. Returns: A 2-tuple `(a, b)`, where `a` is the unframed byte string/payload at the beginning of input byte string, and `b` is the remainder of the input byte string. Raises: ValueError: The byte string does not begin with an SSH string. Examples: >>> a, b = SSHAgentClient.unstring_prefix( ... b'\x00\x00\x00\x07ssh-rsa____trailing data' ... ) >>> (bytes(a), bytes(b)) (b'ssh-rsa', b'____trailing data') >>> a, b = SSHAgentClient.unstring_prefix( ... SSHAgentClient.string(b'ssh-ed25519') ... ) >>> (bytes(a), bytes(b)) (b'ssh-ed25519', b'') """ n = len(bytestring) msg = 'malformed SSH byte string' if n < HEAD_LEN: raise ValueError(msg) m = int.from_bytes(bytestring[:HEAD_LEN], 'big', signed=False) if m + HEAD_LEN > n: raise ValueError(msg) return ( bytestring[HEAD_LEN : m + HEAD_LEN], bytestring[m + HEAD_LEN :], ) @overload def request( # pragma: no cover self, code: int | _types.SSH_AGENTC, payload: bytes | bytearray, /, *, response_code: None = None, ) -> tuple[int, bytes | bytearray]: ... @overload def request( # pragma: no cover self, code: int | _types.SSH_AGENTC, payload: bytes | bytearray, /, *, response_code: Iterable[_types.SSH_AGENT | int] = frozenset({ _types.SSH_AGENT.SUCCESS }), ) -> tuple[int, bytes | bytearray]: ... @overload def request( # pragma: no cover self, code: int | _types.SSH_AGENTC, payload: bytes | bytearray, /, *, response_code: _types.SSH_AGENT | int = _types.SSH_AGENT.SUCCESS, ) -> bytes | bytearray: ... def request( self, code: int | _types.SSH_AGENTC, payload: bytes | bytearray, /, *, response_code: ( Iterable[_types.SSH_AGENT | int] | _types.SSH_AGENT | int | None ) = None, ) -> tuple[int, bytes | bytearray] | bytes | bytearray: """Issue a generic request to the SSH agent. Args: code: The request code. See the SSH agent protocol for protocol numbers to use here (and which protocol numbers to expect in a response). payload: A byte string containing the payload, or "contents", of the request. Request-specific. `request` will add any necessary wire framing around the request code and the payload. response_code: An optional response code, or a set of response codes, that we expect. If given, and the actual response code does not match, raise an error. Returns: A 2-tuple consisting of the response code and the payload, with all wire framing removed. Raises: EOFError: The response from the SSH agent is truncated or missing. OSError: There was a communication error with the SSH agent. SSHAgentFailedError: We expected specific response codes, but did not receive any of them. """ if isinstance( # pragma: no branch response_code, int | _types.SSH_AGENT ): response_code = frozenset({response_code}) if response_code is not None: # pragma: no branch response_code = frozenset({ c if isinstance(c, int) else c.value for c in response_code }) request_message = bytearray([ code if isinstance(code, int) else code.value ]) request_message.extend(payload) self._connection.sendall(self.string(request_message)) chunk = self._connection.recv(HEAD_LEN) if len(chunk) < HEAD_LEN: msg = 'cannot read response length' raise EOFError(msg) response_length = int.from_bytes(chunk, 'big', signed=False) response = self._connection.recv(response_length) if len(response) < response_length: msg = 'truncated response from SSH agent' raise EOFError(msg) if not response_code: # pragma: no cover return response[0], response[1:] if response[0] not in response_code: raise SSHAgentFailedError(response[0], response[1:]) return response[1:] def list_keys(self) -> Sequence[_types.KeyCommentPair]: """Request a list of keys known to the SSH agent. Returns: A read-only sequence of key/comment pairs. Raises: EOFError: The response from the SSH agent is truncated or missing. OSError: There was a communication error with the SSH agent. TrailingDataError: The response from the SSH agent is too long. SSHAgentFailedError: The agent failed to complete the request. """ response = self.request( _types.SSH_AGENTC.REQUEST_IDENTITIES.value, b'', response_code=_types.SSH_AGENT.IDENTITIES_ANSWER, ) response_stream = collections.deque(response) def shift(num: int) -> bytes: buf = collections.deque(b'') for _ in range(num): try: val = response_stream.popleft() except IndexError: response_stream.extendleft(reversed(buf)) msg = 'truncated response from SSH agent' raise EOFError(msg) from None buf.append(val) return bytes(buf) key_count = int.from_bytes(shift(4), 'big') keys: collections.deque[_types.KeyCommentPair] keys = collections.deque() for _ in range(key_count): key_size = int.from_bytes(shift(4), 'big') key = shift(key_size) comment_size = int.from_bytes(shift(4), 'big') comment = shift(comment_size) # Both `key` and `comment` are not wrapped as SSH strings. keys.append(_types.KeyCommentPair(key, comment)) if response_stream: raise TrailingDataError return keys def sign( self, /, key: bytes | bytearray, payload: bytes | bytearray, *, flags: int = 0, check_if_key_loaded: bool = False, ) -> bytes | bytearray: """Request the SSH agent sign the payload with the key. Args: key: The public SSH key to sign the payload with, in the same format as returned by, e.g., the `list_keys` method. The corresponding private key must have previously been loaded into the agent to successfully issue a signature. payload: A byte string of data to sign. flags: Optional flags for the signing request. Currently passed on as-is to the agent. In real-world usage, this could be used, e.g., to request more modern hash algorithms when signing with RSA keys. (No such real-world usage is currently implemented.) check_if_key_loaded: If true, check beforehand (via `list_keys`) if the corresponding key has been loaded into the agent. Returns: The binary signature of the payload under the given key. Raises: EOFError: The response from the SSH agent is truncated or missing. OSError: There was a communication error with the SSH agent. TrailingDataError: The response from the SSH agent is too long. SSHAgentFailedError: The agent failed to complete the request. KeyError: `check_if_key_loaded` is true, and the `key` was not loaded into the agent. """ if check_if_key_loaded: loaded_keys = frozenset({pair.key for pair in self.list_keys()}) if bytes(key) not in loaded_keys: msg = 'target SSH key not loaded into agent' raise KeyError(msg) request_data = bytearray(self.string(key)) request_data.extend(self.string(payload)) request_data.extend(self.uint32(flags)) return self.unstring( self.request( _types.SSH_AGENTC.SIGN_REQUEST.value, request_data, response_code=_types.SSH_AGENT.SIGN_RESPONSE, ) )