git.schokokeks.org
Repositories
Help
Report an Issue
derivepassphrase.git
Code
Commits
Branches
Tags
Suche
Strukturansicht:
a637416
Branches
Tags
documentation-tree
master
unstable/annoying-os-named-pipes
wishlist
0.1.0
0.1.1
0.1.2
0.1.3
0.2.0
0.3.0
0.3.1
0.3.2
0.3.3
0.4.0
0.5.1
0.5.2
derivepassphrase.git
src
derivepassphrase
ssh_agent
socketprovider.py
Implement The Annoying Operating System named pipes
Marco Ricci
commited
a637416
at 2025-12-07 00:14:33
socketprovider.py
Blame
History
Raw
# SPDX-FileCopyrightText: 2025 Marco Ricci <software@the13thletter.info> # # SPDX-License-Identifier: Zlib """Machinery for providing sockets connected to SSH agents.""" from __future__ import annotations import collections import ctypes import ctypes.wintypes import errno import hashlib import os import socket from ctypes.wintypes import ( # type: ignore[attr-defined] BOOL, DWORD, HANDLE, LPCVOID, LPCWSTR, LPDWORD, LPVOID, ) from typing import TYPE_CHECKING, cast if TYPE_CHECKING: from collections.abc import Callable from typing import ClassVar from typing_extensions import Any, Buffer, Self from derivepassphrase import _types kernel32: ctypes.WinDLL # type: ignore[name-defined] crypt32: ctypes.WinDLL # type: ignore[name-defined] CryptProtectMemory: Callable[[LPVOID, DWORD, DWORD], BOOL] CreateFile: Callable[ [ LPCWSTR, DWORD, DWORD, None, DWORD, DWORD, HANDLE | None, ], HANDLE, ] ReadFile: Callable[[HANDLE, LPCVOID, DWORD, LPDWORD, None], BOOL] WriteFile: Callable[[HANDLE, LPCVOID, DWORD, LPDWORD, None], BOOL] CloseHandle: Callable[[HANDLE], BOOL] __all__ = ("SocketProvider",) class NoSuchProviderError(KeyError): """No such SSH agent socket provider is known.""" class AfUnixNotAvailableError(NotImplementedError): """This Python installation does not support socket.AF_UNIX.""" class TheAnnoyingOsNamedPipesNotAvailableError(NotImplementedError): """This Python installation does not support Windows named pipes.""" GENERIC_READ = 0x80000000 GENERIC_WRITE = 0x40000000 FILE_SHARE_READ = 0x00000001 FILE_SHARE_WRITE = 0x00000002 OPEN_EXISTING = 0x3 CRYPTPROTECTMEMORY_BLOCK_SIZE = 0x10 CRYPTPROTECTMEMORY_CROSS_PROCESS = 0x1 try: import ctypes import ctypes.wintypes try: crypt32 = ctypes.WinDLL("crypt32.dll") # type: ignore[attr-defined] CryptProtectMemory = crypt32.CryptProtectMemory # type: ignore[attr-defined] except (AttributeError, FileNotFoundError): def CryptProtectMemory( # noqa: N802 pDataIn: LPVOID, # noqa: N803 cbDataIn: DWORD, # noqa: N803 dwFlags: DWORD, # noqa: N803 ) -> BOOL: raise NotImplementedError else: CryptProtectMemory.argtypes = [LPVOID, DWORD, DWORD] # type: ignore[attr-defined] CryptProtectMemory.restype = BOOL # type: ignore[attr-defined] def errcheck( result: HANDLE, func: Callable, arguments: Any, # noqa: ANN401 ) -> HANDLE: del func del arguments invalid_handle = ctypes.wintypes.HANDLE(0) if result == invalid_handle: raise ctypes.WinError() # type: ignore[attr-defined] return result try: kernel32 = ctypes.WinDLL("Kernel32.dll") # type: ignore[attr-defined] CreateFile = kernel32.CreateFileW # type: ignore[attr-defined] ReadFile = kernel32.ReadFileW # type: ignore[attr-defined] WriteFile = kernel32.WriteFileW # type: ignore[attr-defined] CloseHandle = kernel32.CloseHandle # type: ignore[attr-defined] except (AttributeError, FileNotFoundError): def CreateFile( # noqa: N802, PLR0913, PLR0917 lpFilename: LPCWSTR, # noqa: N803 dwDesiredAccess: DWORD, # noqa: N803 dwShareMode: DWORD, # noqa: N803 lpSecurityAttributes: None, # noqa: N803 dwCreationDisposition: DWORD, # noqa: N803 dwFlagsAndAttributes: DWORD, # noqa: N803 hTemplateFile: HANDLE | None, # noqa: N803 ) -> HANDLE: del ( lpFilename, dwDesiredAccess, dwShareMode, lpSecurityAttributes, dwCreationDisposition, dwFlagsAndAttributes, hTemplateFile, ) msg = "This Python version does not support Windows named pipes." raise TheAnnoyingOsNamedPipesNotAvailableError(msg) def ReadFile( # noqa: N802 hFile: HANDLE, # noqa: N803 lpBuffer: LPVOID, # noqa: N803 nNumberOfBytesToRead: DWORD, # noqa: N803 lpNumberOfBytesRead: LPDWORD, # noqa: N803 lpOverlapped: None, # noqa: N803 ) -> BOOL: del ( hFile, lpBuffer, nNumberOfBytesToRead, lpNumberOfBytesRead, lpOverlapped, ) raise OSError(errno.ENOTSUP, os.strerror(errno.ENOTSUP)) def WriteFile( # noqa: N802 hFile: HANDLE, # noqa: N803 lpBuffer: LPVOID, # noqa: N803 nNumberOfBytesToWrite: DWORD, # noqa: N803 lpNumberOfBytesWritten: LPDWORD, # noqa: N803 lpOverlapped: None, # noqa: N803 ) -> BOOL: del ( hFile, lpBuffer, nNumberOfBytesToWrite, lpNumberOfBytesWritten, lpOverlapped, ) raise OSError(errno.ENOTSUP, os.strerror(errno.ENOTSUP)) else: CreateFile.argtypes = [ # type: ignore[attr-defined] LPCWSTR, DWORD, DWORD, None, DWORD, DWORD, HANDLE | None, ] CreateFile.restype = HANDLE # type: ignore[attr-defined] CreateFile.errcheck = errcheck # type: ignore[attr-defined] ReadFile.argtypes = [HANDLE, LPVOID, DWORD, LPDWORD, None] # type: ignore[attr-defined] WriteFile.argtypes = [HANDLE, LPVOID, DWORD, LPDWORD, None] # type: ignore[attr-defined] CloseHandle.argtypes = [HANDLE] # type: ignore[attr-defined] ReadFile.argtypes = [BOOL] # type: ignore[attr-defined] WriteFile.argtypes = [BOOL] # type: ignore[attr-defined] CloseHandle.argtypes = [BOOL] # type: ignore[attr-defined] except ImportError: pass class TheAnnoyingOsNamedPipeHandle: """A named pipe handle on The Annoying OS (Microsoft Windows). This handle implements the [`SSHAgentSocket`][_types.SSHAgentSocket] interface. It is only constructable if the Python installation can successfully call into The Annoying OS `kernel32.dll` library via [`ctypes`][]. """ def __init__(self, name: str) -> None: self.handle: HANDLE | None = CreateFile( ctypes.c_wchar_p(name), ctypes.c_ulong(GENERIC_READ | GENERIC_WRITE), ctypes.c_ulong(FILE_SHARE_READ | FILE_SHARE_WRITE), None, ctypes.c_ulong(OPEN_EXISTING), ctypes.c_ulong(0), None, ) def __enter__(self) -> Self: return self def __exit__(self, *args: object) -> bool | None: try: if self.handle is not None: return CloseHandle(self.handle).value != 0 return False finally: self.handle = None def recv(self, data: int, flags: int = 0, /) -> bytes: if self.handle is None: raise OSError(errno.EBADF, os.strerror(errno.EBADF)) del flags result = bytearray() read_count = DWORD(0) buffer = (ctypes.c_char * 65536)() while data > 0: block_size = min(max(0, data), 65536) success = ReadFile( self.handle, ctypes.cast(ctypes.byref(buffer), ctypes.c_void_p), DWORD(block_size), ctypes.cast(ctypes.byref(read_count), LPDWORD), None, ) if not success or read_count.value == 0: raise ctypes.WinError() # type: ignore[attr-defined] result.extend(buffer.raw[:block_size]) data -= read_count.value read_count.value = 0 return bytes(result) def sendall(self, data: Buffer, flags: int = 0, /) -> None: if self.handle is None: raise OSError(errno.EBADF, os.strerror(errno.EBADF)) del flags data = bytes(data) databuf = (ctypes.c_char * len(data))(data) write_count = DWORD(0) WriteFile( self.handle, ctypes.cast(ctypes.byref(databuf), ctypes.c_void_p), DWORD(len(data)), ctypes.cast(ctypes.byref(write_count), LPDWORD), None, ) @classmethod def for_pageant(cls) -> Self: """Construct a named pipe for use with Pageant. Returns: A new named pipe handle, using Pageant's pipe name. """ return cls(cls.pageant_named_pipe_name()) @staticmethod def pageant_named_pipe_name( *, require_cryptprotectmemory: bool = False, ) -> str: """Return The Annoying OS named pipe name that Pageant would use. Args: require_cryptprotectmemory: Pageant normally attempts to use the `CryptProtectMemory` system function from the `crypt32.dll` library on The Annoying OS, ignoring any errors. This in turn influences the resulting named pipe's filename. If true, and if we fail to call `CryptProtectMemory`, we abort; otherwise we ignore any errors from calling or failing to call `CryptProtectMemory`. Raises: NotImplementedError: This Python version cannot call `CryptProtectMemory` due to lack of system support. """ realname = "Pageant" c_realname = ctypes.create_string_buffer(realname.encode("UTF-8")) num_blocks = ( len(c_realname) + CRYPTPROTECTMEMORY_BLOCK_SIZE - 1 ) // CRYPTPROTECTMEMORY_BLOCK_SIZE assert num_blocks == 1 buf_decl = ctypes.c_char * (CRYPTPROTECTMEMORY_BLOCK_SIZE * num_blocks) buf = buf_decl(c_realname.raw) try: CryptProtectMemory( ctypes.cast(ctypes.byref(buf), ctypes.c_void_p), DWORD(len(buf.raw)), DWORD(CRYPTPROTECTMEMORY_CROSS_PROCESS), ) except NotImplementedError: if require_cryptprotectmemory: raise buf_as_ssh_string = bytearray() buf_as_ssh_string.extend(int.to_bytes(len(buf.raw), 4, "big")) buf_as_ssh_string.extend(buf.raw) digest = hashlib.sha256(buf_as_ssh_string).hexdigest() return f"//.pipe/pageant.{os.getlogin()}.{digest}" class SocketProvider: """Static functionality for providing sockets.""" @staticmethod def unix_domain_ssh_auth_sock(*, timeout: int = 125) -> socket.socket: """Return a UNIX domain socket connected to `SSH_AUTH_SOCK`. Args: timeout: A connection timeout for the SSH agent. Only used for "true" sockets, and only if the socket is not yet connected. The default value gives ample time for agent connections forwarded via SSH on high-latency networks (e.g. Tor). Returns: A connected UNIX domain socket. Raises: KeyError: The `SSH_AUTH_SOCK` environment variable was not found. AfUnixNotAvailableError: [This Python version does not support UNIX domain sockets][AF_UNIX], necessary to automatically connect to a running SSH agent via the `SSH_AUTH_SOCK` environment variable. [AF_UNIX]: https://docs.python.org/3/library/socket.html#socket.AF_UNIX OSError: There was an error setting up a socket connection to the agent. """ if not hasattr(socket, "AF_UNIX"): msg = "This Python version does not support UNIX domain sockets." raise AfUnixNotAvailableError(msg) else: # noqa: RET506 # pragma: unless posix no cover sock = socket.socket(family=socket.AF_UNIX) if "SSH_AUTH_SOCK" not in os.environ: msg = "SSH_AUTH_SOCK environment variable" raise KeyError(msg) ssh_auth_sock = os.environ["SSH_AUTH_SOCK"] sock.settimeout(timeout) sock.connect(ssh_auth_sock) return sock @staticmethod def the_annoying_os_named_pipes() -> _types.SSHAgentSocket: """Return a socket connected to Pageant/OpenSSH on The Annoying OS. This may be a write-through socket if the underlying connection to Pageant or OpenSSH does not use an actual network socket to communicate. Raises: TheAnnoyingOsNamedPipesNotAvailableError: This functionality is not implemented yet. Warning: Not implemented yet This functionality is not implemented yet. Specifically, [we do not yet support any of the communication mechanisms used by the leading SSH agent implementations.][windows-ssh-agent-support] [windows-ssh-agent-support]: https://the13thletter.info/derivepassphrase/0.x/wishlist/windows-ssh-agent-support/ """ try: # pragma: no cover [external] import ctypes # noqa: PLC0415 except ImportError as exc: # pragma: no cover [external] msg = "This Python version does not support Windows named pipes." raise TheAnnoyingOsNamedPipesNotAvailableError(msg) from exc if not hasattr(ctypes, "WinDLL"): msg = "This Python version does not support Windows named pipes." raise TheAnnoyingOsNamedPipesNotAvailableError(msg) else: # noqa: RET506 # pragma: unless the-annoying-os no cover return TheAnnoyingOsNamedPipeHandle.for_pageant() registry: ClassVar[ dict[str, _types.SSHAgentSocketProvider | str | None] ] = {} """A dictionary of callables that provide SSH agent sockets. Each entry in the dictionary points either to a callable, a string, or `None`: if a callable, then that callable returns a socket; if a string, then this entry is an alias for that other entry; if `None`, then the entry name is merely registered, but no implementation is available. If a callable is not applicable to this platform, Python installation or `derivepassphrase` installation, then it MUST raise [`NotImplementedError`][]. Conversely, if the callable returns a value, or raises any other kind of exception, then the caller MAY assume that this platform, Python installation and `derivepassphrase` installation are sufficient for the callable to be able to return a working socket on this platform. (The latter may still be dependent on further, external circumstances, such as required configuration settings, or environment variables, or sufficient system resources, etc.) (Interpretation of "MUST" and "MAY" as per IETF Best Current Practice #14; see [RFC 2119][] and [RFC 8174][].) [RFC 2119]: https://www.rfc-editor.org/info/rfc2119 [RFC 8174]: https://www.rfc-editor.org/info/rfc8174 """ @classmethod def register( cls, name: str, *aliases: str, ) -> Callable[ [_types.SSHAgentSocketProvider], _types.SSHAgentSocketProvider ]: """Register a callable as an SSH agent socket provider (decorator). Attempting to re-register an existing alias, or a name with an implementation, with a different implementation is an error. Args: name: The principal name under which to register the passed callable. aliases: Alternate names to register as aliases for the principal name. Returns: A decorator implementing the above. """ def decorator( f: _types.SSHAgentSocketProvider, ) -> _types.SSHAgentSocketProvider: """Register a callable as an SSH agent socket provider. Attempting to re-register an existing alias, or a name with an implementation, with a different implementation is an error. Args: f: The callable to decorate/register. Returns: The callable. Raises: ValueError: The name or alias is already in use. """ for alias in [name, *aliases]: try: existing = cls.lookup(alias) except (NoSuchProviderError, NotImplementedError): cls.registry[alias] = f if alias == name else name else: if existing is None: cls.registry[alias] = f if alias == name else name elif existing != f: msg = ( f"The SSH agent socket provider {alias!r} " f"is already registered." ) raise ValueError(msg) return f return decorator @classmethod def lookup( cls, provider: _types.SSHAgentSocketProvider | str | None, / ) -> _types.SSHAgentSocketProvider | None: """Look up a socket provider entry. Args: provider: The provider to look up. Returns: The callable indicated by this provider, if it is implemented, or `None`, if it is merely registered. Raises: NoSuchProviderError: The provider is not registered. """ ret = provider while isinstance(ret, str): try: ret = cls.registry[ret] except KeyError as exc: raise NoSuchProviderError(ret) from exc return ret @classmethod def resolve( cls, provider: _types.SSHAgentSocketProvider | str | None, / ) -> _types.SSHAgentSocketProvider: """Resolve a socket provider to a proper callable. Args: provider: The provider to resolve. Returns: The callable indicated by this provider. Raises: NoSuchProviderError: The provider is not registered. NotImplementedError: The provider is registered, but is not functional or not applicable to this `derivepassphrase` installation. """ ret = cls.lookup(provider) if ret is None: msg = ( f"The {ret!r} socket provider is not functional on or " "not applicable to this derivepassphrase installation." ) raise NotImplementedError(msg) return ret ENTRY_POINT_GROUP_NAME = "derivepassphrase.ssh_agent_socket_providers" """ The group name under which [entry points][importlib.metadata.entry_points] for the SSH agent socket provider registry should be recorded. Each target of such an entry point should be a [`_types.SSHAgentSocketProviderEntry`][] object. """ @classmethod def _find_all_ssh_agent_socket_providers(cls) -> None: """Find and load all declared SSH agent socket providers. Load all [entry points][importlib.metadata.entry_points] in the `derivepassphrase.ssh_agent_socket_providers` group as providers, then register them. The target of each entry point should be a [`_types.SSHAgentSocketProviderEntry`][] object. Raises: AssertionError: The declared SSH agent socket provider was not, in fact, an SSH agent socket provider. Alternatively, multiple distributions supplied the same SSH agent socket provider, but with different implementations. """ import importlib.metadata # noqa: PLC0415 origins: dict[str, str | None] = {} entries = collections.ChainMap({}, cls.registry) for entry_point in importlib.metadata.entry_points( group=cls.ENTRY_POINT_GROUP_NAME ): provider_entry = cast( "_types.SSHAgentSocketProviderEntry", entry_point.load() ) key = provider_entry.key value = entry_point.value dist = ( entry_point.dist.name # type: ignore[union-attr] if getattr(entry_point, "dist", None) is not None else None ) origin = origins.get(key, "derivepassphrase") if not callable(provider_entry.provider): msg = ( f"Not an SSHAgentSocketProvider: " f"{dist = }, {cls.ENTRY_POINT_GROUP_NAME = }, " f"{value = }, {provider_entry = }" ) raise AssertionError(msg) # noqa: TRY004 if key in entries: if entries[key] != provider_entry.provider: msg = ( f"Name clash in SSH agent socket providers " f"for entry {key!r}, both by {dist!r} " f"and by {origin!r}" ) raise AssertionError(msg) else: entries[key] = provider_entry.provider origins[key] = dist for alias in provider_entry.aliases: alias_origin = origins.get(alias, "derivepassphrase") if alias in entries: if entries[alias] != key: msg = ( f"Name clash in SSH agent socket providers " f"for entry {alias!r}, both by {dist!r} " f"and by {alias_origin!r}" ) raise AssertionError(msg) else: entries[alias] = key origins[key] = dist cls.registry.update(entries.maps[0]) SocketProvider.registry.update({ "posix": SocketProvider.unix_domain_ssh_auth_sock, "the_annoying_os": SocketProvider.the_annoying_os_named_pipes, # known instances "stub_agent": None, "stub_with_address": None, "stub_with_address_and_deterministic_dsa": None, # aliases "native": "the_annoying_os" if os.name == "nt" else "posix", "unix_domain": "posix", "ssh_auth_sock": "posix", "the_annoying_os_named_pipe": "the_annoying_os", "pageant_on_the_annoying_os": "the_annoying_os", "openssh_on_the_annoying_os": "the_annoying_os", "windows": "the_annoying_os", "windows_named_pipe": "the_annoying_os", "pageant_on_windows": "the_annoying_os", "openssh_on_windows": "the_annoying_os", })