Add more coverage and linting exclusions related to Windows named pipes
Marco Ricci

Marco Ricci commited on 2026-01-04 22:40:37
Zeige 3 geänderte Dateien mit 62 Einfügungen und 23 Löschungen.


Some of the missing exclusions are typos.

Others are accidental omissions due to writing the code for one platform
(POSIX-ish systems) without actually *testing* on that platform, because
the main logic is geared towards a different platform (here, The
Annoying OS) and was developed there.  This especially concerns code
that is designed to never be called and which immediately throws an
exception upon being called: it is easy to forget the coverage exclusion
if the corresponding code path is never actually run.

Finally, other exclusions and code refactorings are due to the fact that
the static type checking interface for `ctypes` (i.e., the typing stubs
from the `typeshed` project) is vastly different for different operating
systems.  So, type checking code on POSIX-ish systems that uses `ctypes`
for The Annoying OS's facilities causes the type checker to complain
about *a lot* of undefined symbols that `ctypes` supposedly does not
expose.  For some commonly-used symbols with many call sites, it is
worth defining a dual wrapper/stub function (wrapper for supported OSes,
stub for others) to both silence the type checker's complaints about
"undefined symbols" and to not sacrifice too much readability.

(While the type checker supports branching on the current OS, this
drastically increases the number of branches to cover, or alternatively,
the number of coverage branch exceptions/exclusions to mark up.  We
would be trading type checking exclusions for coverage branch
exclusions, gaining nothing.)

Finally finally, because The Annoying OS uses a very incompatible naming
scheme in its standard library, we need many linting exceptions for
variable, function and function argument naming.
... ...
@@ -32,6 +32,7 @@ if TYPE_CHECKING:
32 32
         Any,
33 33
         Buffer,
34 34
         Literal,
35
+        Protocol,
35 36
         Self,
36 37
         TypeAlias,
37 38
         TypeVar,
... ...
@@ -66,6 +67,12 @@ if TYPE_CHECKING:
66 67
     GetOverlappedResult: Callable[[HANDLE, LPOVERLAPPED, LPDWORD, BOOL], BOOL]
67 68
     CreateEvent: Callable[[None, BOOL, BOOL, None], HANDLE]
68 69
 
70
+    class WinErrorProtocol(Protocol):
71
+        def __call__(self, err: int | None = None, /) -> OSError: ...
72
+
73
+    GetLastError: Callable[[], int]
74
+    WinError: WinErrorProtocol
75
+
69 76
 
70 77
 class _OverlappedDummyStruct(ctypes.Structure):
71 78
     """The DUMMYSTRUCTNAME structure in the definition of OVERLAPPED."""
... ...
@@ -170,13 +177,29 @@ ERROR_IO_PENDING = 997
170 177
 try:  # pragma: unless the-annoying-os no cover
171 178
     import ctypes
172 179
 
180
+    GetLastError = ctypes.GetLastError  # type: ignore[attr-defined]
181
+    WinError = ctypes.WinError  # type: ignore[attr-defined]
182
+except (
183
+    AttributeError,
184
+    FileNotFoundError,
185
+    ImportError,
186
+):  # pragma: unless posix no cover
187
+
188
+    def GetLastError() -> int:  # noqa: N802  # pragma: no cover [failsafe]
189
+        raise NotImplementedError
190
+
191
+    def WinError(err: int | None = None, /) -> OSError:  # noqa: N802  # pragma: no cover [failsafe]
192
+        raise NotImplementedError
193
+
194
+
195
+try:  # pragma: unless the-annoying-os no cover
173 196
     crypt32 = ctypes.WinDLL("crypt32.dll")  # type: ignore[attr-defined]
174 197
     CryptProtectMemory = crypt32.CryptProtectMemory  # type: ignore[attr-defined]
175 198
 except (
176 199
     AttributeError,
177 200
     FileNotFoundError,
178 201
     ImportError,
179
-):  # pragma unless posix no cover
202
+):  # pragma: unless posix no cover
180 203
 
181 204
     def CryptProtectMemory(  # noqa: N802
182 205
         pDataIn: LPVOID,  # noqa: N803
... ...
@@ -204,7 +227,7 @@ def _errcheck(
204 227
     result_value = HANDLE(result).value
205 228
     invalid_value = HANDLE(INVALID_HANDLE_VALUE).value
206 229
     if result_value == invalid_value:
207
-        raise ctypes.WinError()  # type: ignore[attr-defined]
230
+        raise WinError()
208 231
     assert result_value is not None  # for the type checker
209 232
     return result_value
210 233
 
... ...
@@ -295,7 +318,7 @@ except (
295 318
         bManualReset: BOOL,  # noqa: N803
296 319
         bInitialState: BOOL,  # noqa: N803
297 320
         lpName: None,  # noqa: N803
298
-    ) -> HANDLE:
321
+    ) -> HANDLE:  # pragma: no cover [external]
299 322
         del lpEventAttributes, bManualReset, bInitialState, lpName
300 323
         raise OSError(errno.ENOTSUP, os.strerror(errno.ENOTSUP))
301 324
 
... ...
@@ -313,7 +336,7 @@ else:  # pragma: unless the-annoying-os no cover
313 336
         # We always pass None/NULL.
314 337
         HANDLE,
315 338
     ]
316
-    CreateEvent.argtypes = [
339
+    CreateEvent.argtypes = [  # type: ignore[attr-defined]
317 340
         # Actually, LPSECURITY_ATTRIBUTES, but we always pass
318 341
         # None/NULL.
319 342
         ctypes.c_void_p,
... ...
@@ -341,7 +364,7 @@ else:  # pragma: unless the-annoying-os no cover
341 364
         LPOVERLAPPED,
342 365
     ]
343 366
     CloseHandle.argtypes = [HANDLE]  # type: ignore[attr-defined]
344
-    GetOverlappedResult.argtypes = [
367
+    GetOverlappedResult.argtypes = [  # type: ignore[attr-defined]
345 368
         HANDLE,
346 369
         LPOVERLAPPED,
347 370
         LPDWORD,
... ...
@@ -366,7 +389,9 @@ class WindowsNamedPipeHandle:
366 389
 
367 390
     _IO_ON_CLOSED_FILE = "I/O operation on closed file."
368 391
 
369
-    def __init__(self, name: str) -> None:
392
+    def __init__(
393
+        self, name: str
394
+    ) -> None:  # pragma: unless the-annoying-os no cover
370 395
         """Create a named pipe handle.
371 396
 
372 397
         Args:
... ...
@@ -404,10 +429,12 @@ class WindowsNamedPipeHandle:
404 429
                     continue  # retry
405 430
                 raise
406 431
 
407
-    def __enter__(self) -> Self:
432
+    def __enter__(self) -> Self:  # pragma: unless the-annoying-os no cover
408 433
         return self
409 434
 
410
-    def __exit__(self, *args: object) -> Literal[False]:
435
+    def __exit__(
436
+        self, *args: object
437
+    ) -> Literal[False]:  # pragma: unless the-annoying-os no cover
411 438
         try:
412 439
             if self.handle is not None:
413 440
                 CloseHandle(self.handle)
... ...
@@ -415,7 +442,9 @@ class WindowsNamedPipeHandle:
415 442
         finally:
416 443
             self.handle = None
417 444
 
418
-    def recv(self, data: int, flags: int = 0, /) -> bytes:
445
+    def recv(
446
+        self, data: int, flags: int = 0, /
447
+    ) -> bytes:  # pragma: unless the-annoying-os no cover
419 448
         if self.handle is None:
420 449
             raise ValueError(self._IO_ON_CLOSED_FILE)
421 450
         del flags
... ...
@@ -434,7 +463,7 @@ class WindowsNamedPipeHandle:
434 463
                     ctypes.cast(ctypes.byref(read_count), LPDWORD),
435 464
                     ctypes.cast(ctypes.byref(overlapped_struct), LPOVERLAPPED),
436 465
                 )
437
-                if not success and ctypes.GetLastError() == ERROR_IO_PENDING:
466
+                if not success and GetLastError() == ERROR_IO_PENDING:
438 467
                     success = GetOverlappedResult(
439 468
                         self.handle,
440 469
                         ctypes.cast(
... ...
@@ -446,7 +475,7 @@ class WindowsNamedPipeHandle:
446 475
                 if (
447 476
                     not success or read_count.value == 0
448 477
                 ):  # pragma: no cover [external]
449
-                    raise ctypes.WinError()  # type: ignore[attr-defined]
478
+                    raise WinError()
450 479
             finally:
451 480
                 CloseHandle(wait_event)
452 481
             result.extend(buffer.raw[:block_size])
... ...
@@ -454,7 +483,9 @@ class WindowsNamedPipeHandle:
454 483
             read_count.value = 0
455 484
         return bytes(result)
456 485
 
457
-    def sendall(self, data: Buffer, flags: int = 0, /) -> None:
486
+    def sendall(
487
+        self, data: Buffer, flags: int = 0, /
488
+    ) -> None:  # pragma: unless the-annoying-os no cover
458 489
         if self.handle is None:
459 490
             raise ValueError(self._IO_ON_CLOSED_FILE)
460 491
         del flags
... ...
@@ -473,7 +504,7 @@ class WindowsNamedPipeHandle:
473 504
                 ctypes.cast(ctypes.byref(write_count), LPDWORD),
474 505
                 ctypes.cast(ctypes.byref(overlapped_struct), LPOVERLAPPED),
475 506
             )
476
-            if not success and ctypes.GetLastError() == ERROR_IO_PENDING:
507
+            if not success and GetLastError() == ERROR_IO_PENDING:
477 508
                 success = GetOverlappedResult(
478 509
                     self.handle,
479 510
                     ctypes.cast(ctypes.byref(overlapped_struct), LPOVERLAPPED),
... ...
@@ -483,12 +514,12 @@ class WindowsNamedPipeHandle:
483 514
             if (
484 515
                 not success or write_count.value == 0
485 516
             ):  # pragma: no cover [external]
486
-                raise ctypes.WinError()  # type: ignore[attr-defined]
517
+                raise WinError()
487 518
         finally:
488 519
             CloseHandle(wait_event)
489 520
 
490 521
     @classmethod
491
-    def for_openssh(cls) -> Self:
522
+    def for_openssh(cls) -> Self:  # pragma: unless the-annoying-os no cover
492 523
         """Construct a named pipe for use with OpenSSH on The Annoying OS.
493 524
 
494 525
         Returns:
... ...
@@ -498,7 +529,7 @@ class WindowsNamedPipeHandle:
498 529
         return cls(f"{PIPE_PREFIX}openssh-ssh-agent")
499 530
 
500 531
     @classmethod
501
-    def for_pageant(cls) -> Self:
532
+    def for_pageant(cls) -> Self:  # pragma: unless the-annoying-os no cover
502 533
         """Construct a named pipe for use with Pageant.
503 534
 
504 535
         Returns:
... ...
@@ -511,7 +542,7 @@ class WindowsNamedPipeHandle:
511 542
     def pageant_named_pipe_name(
512 543
         *,
513 544
         require_cryptprotectmemory: bool = False,
514
-    ) -> str:
545
+    ) -> str:  # pragma: unless the-annoying-os no cover
515 546
         """Return the pipe name that Pageant on The Annoying OS would use.
516 547
 
517 548
         Args:
... ...
@@ -232,7 +232,9 @@ class SocketAddressAction(str, enum.Enum):
232 232
         # https://the13thletter.info/derivepassphrase/latest/pycompatibility/#after-eol-py3.9
233 233
         if self == self.MANGLE_ADDRESS:
234 234
 
235
-            def mangled_address(*_args: Any, **_kwargs: Any) -> NoReturn:
235
+            def mangled_address(
236
+                *_args: Any, **_kwargs: Any
237
+            ) -> NoReturn:  # pragma: no cover [failsafe]
236 238
                 raise OSError(errno.ENOENT, os.strerror(errno.ENOENT))
237 239
 
238 240
             monkeypatch.setattr(
... ...
@@ -253,8 +255,10 @@ class SocketAddressAction(str, enum.Enum):
253 255
             )
254 256
         elif self == self.UNSET_ADDRESS:
255 257
 
256
-            def no_address(*_args: Any, **_kwargs: Any) -> NoReturn:
257
-                raise KeyError("SSH_AUTH_SOCK environment variable")  # noqa: EM101, TRY003
258
+            def no_address(
259
+                *_args: Any, **_kwargs: Any
260
+            ) -> NoReturn:  # pragma: no cover [failsafe]
261
+                raise KeyError("SSH_AUTH_SOCK")  # noqa: EM101
258 262
 
259 263
             monkeypatch.setattr(
260 264
                 socketprovider.WindowsNamedPipeHandle,
... ...
@@ -798,8 +798,10 @@ class TestStoringConfigurationFailures:
798 798
             patch_suitable_ssh_keys=False,
799 799
         ) as monkeypatch:
800 800
 
801
-            def no_agent(*_args: Any, **_kwargs: Any) -> NoReturn:
802
-                raise KeyError("SSH_AUTH_SOCK environment variable")  # noqa: EM101, TRY003
801
+            def no_agent(
802
+                *_args: Any, **_kwargs: Any
803
+            ) -> NoReturn:  # pragma: no cover [failsafe]
804
+                raise KeyError("SSH_AUTH_SOCK")  # noqa: EM101
803 805
 
804 806
             monkeypatch.setattr(
805 807
                 socketprovider.WindowsNamedPipeHandle, "for_pageant", no_agent
... ...
@@ -822,7 +824,9 @@ class TestStoringConfigurationFailures:
822 824
             cwd = pathlib.Path.cwd().resolve()
823 825
             monkeypatch.setenv("SSH_AUTH_SOCK", str(cwd))
824 826
 
825
-            def mangled_address(*_args: Any, **_kwargs: Any) -> NoReturn:
827
+            def mangled_address(
828
+                *_args: Any, **_kwargs: Any
829
+            ) -> NoReturn:  # pragma: no cover [failsafe]
826 830
                 raise OSError(errno.ENOENT, os.strerror(errno.ENOENT))
827 831
 
828 832
             monkeypatch.setattr(
829 833