b315e5bfd56052306d724f7dfa2f9c3b09300595
Marco Ricci Split the top-level `tests`...

Marco Ricci authored 5 months ago

1) # SPDX-FileCopyrightText: 2025 Marco Ricci <software@the13thletter.info>
2) #
3) # SPDX-License-Identifier: Zlib
4) 
5) from __future__ import annotations
6) 
7) import contextlib
8) import errno
9) import logging
10) import os
11) import re
12) import sys
13) from typing import TYPE_CHECKING, TypedDict
14) 
15) import click.testing
16) from typing_extensions import NamedTuple
17) 
18) import tests.data
19) from derivepassphrase import _types, cli, ssh_agent, vault
20) from derivepassphrase.ssh_agent import socketprovider
21) 
22) __all__ = ()
23) 
24) if TYPE_CHECKING:
25)     from collections.abc import Callable, Iterator, Mapping, Sequence
26)     from contextlib import AbstractContextManager
27)     from typing import IO, NotRequired
28) 
29)     from typing_extensions import Any, Buffer, Self
30) 
31) 
32) # Test suite settings
33) # ===================
34) 
35) MIN_CONCURRENCY = 4
36) """
37) The minimum amount of concurrent threads used for testing.
38) """
39) 
40) 
41) def get_concurrency_limit() -> int:
42)     """Return the imposed limit on the number of concurrent threads.
43) 
44)     We use [`os.process_cpu_count`][] as the limit on Python 3.13 and
45)     higher, and [`os.cpu_count`][] on Python 3.12 and below.  On
46)     Python 3.12 and below, we explicitly support the `PYTHON_CPU_COUNT`
47)     environment variable.  We guarantee at least [`MIN_CONCURRENCY`][]
48)     many threads in any case.
49) 
50)     """  # noqa: RUF002
51)     result: int | None = None
52)     if sys.version_info >= (3, 13):
53)         result = os.process_cpu_count()
54)     else:
55)         with contextlib.suppress(KeyError, ValueError):
56)             result = result or int(os.environ["PYTHON_CPU_COUNT"], 10)
57)         with contextlib.suppress(AttributeError):
58)             result = result or len(os.sched_getaffinity(os.getpid()))
59)     return max(result if result is not None else 0, MIN_CONCURRENCY)
60) 
61) 
62) # Log/Error message searching
63) # ===========================
64) 
65) 
66) def message_emitted_factory(
67)     level: int,
68)     *,
69)     logger_name: str = cli.PROG_NAME,
70) ) -> Callable[[str | re.Pattern[str], Sequence[tuple[str, int, str]]], bool]:
71)     """Return a function to test if a matching message was emitted.
72) 
73)     Args:
74)         level: The level to match messages at.
75)         logger_name: The name of the logger to match against.
76) 
77)     """
78) 
79)     def message_emitted(
80)         text: str | re.Pattern[str],
81)         record_tuples: Sequence[tuple[str, int, str]],
82)     ) -> bool:
83)         """Return true if a matching message was emitted.
84) 
85)         Args:
86)             text: Substring or pattern to match against.
87)             record_tuples: Items to match.
88) 
89)         """
90) 
91)         def check_record(record: tuple[str, int, str]) -> bool:
92)             if record[:2] != (logger_name, level):
93)                 return False
94)             if isinstance(text, str):
95)                 return text in record[2]
96)             return text.match(record[2]) is not None  # pragma: no cover
97) 
98)         return any(map(check_record, record_tuples))
99) 
100)     return message_emitted
101) 
102) 
103) # No need to assert debug messages as of yet.
104) info_emitted = message_emitted_factory(logging.INFO)
105) warning_emitted = message_emitted_factory(logging.WARNING)
106) deprecation_warning_emitted = message_emitted_factory(
107)     logging.WARNING, logger_name=f"{cli.PROG_NAME}.deprecation"
108) )
109) deprecation_info_emitted = message_emitted_factory(
110)     logging.INFO, logger_name=f"{cli.PROG_NAME}.deprecation"
111) )
112) error_emitted = message_emitted_factory(logging.ERROR)
113) 
114) 
115) # click.testing.CliRunner handling
116) # ================================
117) 
118) 
119) class ReadableResult(NamedTuple):
120)     """Helper class for formatting and testing click.testing.Result objects."""
121) 
122)     exception: BaseException | None
123)     exit_code: int
124)     stdout: str
125)     stderr: str
126) 
127)     def clean_exit(
128)         self, *, output: str = "", empty_stderr: bool = False
129)     ) -> bool:
130)         """Return whether the invocation exited cleanly.
131) 
132)         Args:
133)             output:
134)                 An expected output string.
135) 
136)         """
137)         return (
138)             (
139)                 not self.exception
140)                 or (
141)                     isinstance(self.exception, SystemExit)
142)                     and self.exit_code == 0
143)                 )
144)             )
145)             and (not output or output in self.stdout)
146)             and (not empty_stderr or not self.stderr)
147)         )
148) 
149)     def error_exit(
150)         self,
151)         *,
152)         error: str | re.Pattern[str] | type[BaseException] = BaseException,
153)         record_tuples: Sequence[tuple[str, int, str]] = (),
154)     ) -> bool:
155)         """Return whether the invocation exited uncleanly.
156) 
157)         Args:
158)             error:
159)                 An expected error message, or an expected numeric error
160)                 code, or an expected exception type.
161) 
162)         """
163) 
164)         def error_match(error: str | re.Pattern[str], line: str) -> bool:
165)             return (
166)                 error in line
167)                 if isinstance(error, str)
168)                 else error.match(line) is not None
169)             )
170) 
171)         # TODO(the-13th-letter): Rewrite using structural pattern matching.
172)         # https://the13thletter.info/derivepassphrase/latest/pycompatibility/#after-eol-py3.9
173)         if isinstance(error, type):
174)             return isinstance(self.exception, error)
175)         else:  # noqa: RET505
176)             assert isinstance(error, (str, re.Pattern))
177)             return (
178)                 isinstance(self.exception, SystemExit)
179)                 and self.exit_code > 0
180)                 and (
181)                     not error
182)                     or any(
183)                         error_match(error, line)
184)                         for line in self.stderr.splitlines(True)
185)                     )
186)                     or tests.machinery.error_emitted(error, record_tuples)
187)                 )
188)             )
189) 
190) 
191) class CliRunner:
192)     """An abstracted CLI runner class.
193) 
194)     Intended to provide similar functionality and scope as the
195)     [`click.testing.CliRunner`][] class, though not necessarily
196)     `click`-specific.  Also allows for seamless migration away from
197)     `click`, if/when we decide this.
198) 
199)     """
200) 
201)     _SUPPORTS_MIX_STDERR_ATTRIBUTE = not hasattr(click.testing, "StreamMixer")
202)     """
203)     True if and only if [`click.testing.CliRunner`][] supports the
204)     `mix_stderr` attribute.  It was removed in 8.2.0 in favor of the
205)     `click.testing.StreamMixer` class.
206) 
207)     See also
208)     [`pallets/click#2523`](https://github.com/pallets/click/pull/2523).
209)     """
210) 
211)     def __init__(
212)         self,
213)         *,
214)         mix_stderr: bool = False,
215)         color: bool | None = None,
216)     ) -> None:
217)         self.color = color
218)         self.mix_stderr = mix_stderr
219) 
220)         class MixStderrAttribute(TypedDict):
221)             mix_stderr: NotRequired[bool]
222) 
223)         mix_stderr_args: MixStderrAttribute = (
224)             {"mix_stderr": mix_stderr}
225)             if self._SUPPORTS_MIX_STDERR_ATTRIBUTE
226)             else {}
227)         )
Marco Ricci Fix type errors due to clic...

Marco Ricci authored 2 months ago

228)         # The "mix_stderr" argument, mandatory for our use case, was
229)         # removed in click 8.2.0 without a transition period.  Since we
230)         # cannot branch on the click version available to the type
231)         # checker, we disable static type checking for this call in
232)         # particular; our test suite will have to uncover any breakage
233)         # dynamically instead.
234)         self.click_testing_clirunner = click.testing.CliRunner(  # type: ignore[misc]
Marco Ricci Split the top-level `tests`...

Marco Ricci authored 5 months ago

235)             **mix_stderr_args
236)         )
237) 
238)     def invoke(
239)         self,
Marco Ricci Fix type errors due to clic...

Marco Ricci authored 2 months ago

240)         # The click.BaseCommand abstract base class, previously the base
241)         # class of click.Command and click.Group, was removed in click
242)         # 8.2.0 without a transition period, and click.Command instated
243)         # as the common base class instead.  To keep some degree of
244)         # compatibility with both old click and new click, we explicitly
245)         # list the (somewhat) concrete base classes we actually care
246)         # about here.
247)         cli: click.Command | click.Group,