Split the top-level `tests`...
Marco Ricci authored 5 months ago
|
1) # SPDX-FileCopyrightText: 2025 Marco Ricci <software@the13thletter.info>
2) #
3) # SPDX-License-Identifier: Zlib
4)
5) from __future__ import annotations
6)
7) import contextlib
8) import errno
9) import logging
10) import os
11) import re
12) import sys
13) from typing import TYPE_CHECKING, TypedDict
14)
15) import click.testing
16) from typing_extensions import NamedTuple
17)
18) import tests.data
19) from derivepassphrase import _types, cli, ssh_agent, vault
20) from derivepassphrase.ssh_agent import socketprovider
21)
22) __all__ = ()
23)
24) if TYPE_CHECKING:
25) from collections.abc import Callable, Iterator, Mapping, Sequence
26) from contextlib import AbstractContextManager
27) from typing import IO, NotRequired
28)
29) from typing_extensions import Any, Buffer, Self
30)
31)
32) # Test suite settings
33) # ===================
34)
35) MIN_CONCURRENCY = 4
36) """
37) The minimum amount of concurrent threads used for testing.
38) """
39)
40)
41) def get_concurrency_limit() -> int:
42) """Return the imposed limit on the number of concurrent threads.
43)
44) We use [`os.process_cpu_count`][] as the limit on Python 3.13 and
45) higher, and [`os.cpu_count`][] on Python 3.12 and below. On
46) Python 3.12 and below, we explicitly support the `PYTHON_CPU_COUNT`
47) environment variable. We guarantee at least [`MIN_CONCURRENCY`][]
48) many threads in any case.
49)
50) """ # noqa: RUF002
51) result: int | None = None
52) if sys.version_info >= (3, 13):
53) result = os.process_cpu_count()
54) else:
55) with contextlib.suppress(KeyError, ValueError):
56) result = result or int(os.environ["PYTHON_CPU_COUNT"], 10)
57) with contextlib.suppress(AttributeError):
58) result = result or len(os.sched_getaffinity(os.getpid()))
59) return max(result if result is not None else 0, MIN_CONCURRENCY)
60)
61)
62) # Log/Error message searching
63) # ===========================
64)
65)
66) def message_emitted_factory(
67) level: int,
68) *,
69) logger_name: str = cli.PROG_NAME,
70) ) -> Callable[[str | re.Pattern[str], Sequence[tuple[str, int, str]]], bool]:
71) """Return a function to test if a matching message was emitted.
72)
73) Args:
74) level: The level to match messages at.
75) logger_name: The name of the logger to match against.
76)
77) """
78)
79) def message_emitted(
80) text: str | re.Pattern[str],
81) record_tuples: Sequence[tuple[str, int, str]],
82) ) -> bool:
83) """Return true if a matching message was emitted.
84)
85) Args:
86) text: Substring or pattern to match against.
87) record_tuples: Items to match.
88)
89) """
90)
91) def check_record(record: tuple[str, int, str]) -> bool:
92) if record[:2] != (logger_name, level):
93) return False
94) if isinstance(text, str):
95) return text in record[2]
96) return text.match(record[2]) is not None # pragma: no cover
97)
98) return any(map(check_record, record_tuples))
99)
100) return message_emitted
101)
102)
103) # No need to assert debug messages as of yet.
104) info_emitted = message_emitted_factory(logging.INFO)
105) warning_emitted = message_emitted_factory(logging.WARNING)
106) deprecation_warning_emitted = message_emitted_factory(
107) logging.WARNING, logger_name=f"{cli.PROG_NAME}.deprecation"
108) )
109) deprecation_info_emitted = message_emitted_factory(
110) logging.INFO, logger_name=f"{cli.PROG_NAME}.deprecation"
111) )
112) error_emitted = message_emitted_factory(logging.ERROR)
113)
114)
115) # click.testing.CliRunner handling
116) # ================================
117)
118)
119) class ReadableResult(NamedTuple):
120) """Helper class for formatting and testing click.testing.Result objects."""
121)
122) exception: BaseException | None
123) exit_code: int
124) stdout: str
125) stderr: str
126)
127) def clean_exit(
128) self, *, output: str = "", empty_stderr: bool = False
129) ) -> bool:
130) """Return whether the invocation exited cleanly.
131)
132) Args:
133) output:
134) An expected output string.
135)
136) """
137) return (
138) (
139) not self.exception
140) or (
141) isinstance(self.exception, SystemExit)
142) and self.exit_code == 0
143) )
144) )
145) and (not output or output in self.stdout)
146) and (not empty_stderr or not self.stderr)
147) )
148)
149) def error_exit(
150) self,
151) *,
152) error: str | re.Pattern[str] | type[BaseException] = BaseException,
153) record_tuples: Sequence[tuple[str, int, str]] = (),
154) ) -> bool:
155) """Return whether the invocation exited uncleanly.
156)
157) Args:
158) error:
159) An expected error message, or an expected numeric error
160) code, or an expected exception type.
161)
162) """
163)
164) def error_match(error: str | re.Pattern[str], line: str) -> bool:
165) return (
166) error in line
167) if isinstance(error, str)
168) else error.match(line) is not None
169) )
170)
171) # TODO(the-13th-letter): Rewrite using structural pattern matching.
172) # https://the13thletter.info/derivepassphrase/latest/pycompatibility/#after-eol-py3.9
173) if isinstance(error, type):
174) return isinstance(self.exception, error)
175) else: # noqa: RET505
176) assert isinstance(error, (str, re.Pattern))
177) return (
178) isinstance(self.exception, SystemExit)
179) and self.exit_code > 0
180) and (
181) not error
182) or any(
183) error_match(error, line)
184) for line in self.stderr.splitlines(True)
185) )
186) or tests.machinery.error_emitted(error, record_tuples)
187) )
188) )
189)
190)
191) class CliRunner:
192) """An abstracted CLI runner class.
193)
194) Intended to provide similar functionality and scope as the
195) [`click.testing.CliRunner`][] class, though not necessarily
196) `click`-specific. Also allows for seamless migration away from
197) `click`, if/when we decide this.
198)
199) """
200)
201) _SUPPORTS_MIX_STDERR_ATTRIBUTE = not hasattr(click.testing, "StreamMixer")
202) """
203) True if and only if [`click.testing.CliRunner`][] supports the
204) `mix_stderr` attribute. It was removed in 8.2.0 in favor of the
205) `click.testing.StreamMixer` class.
206)
207) See also
208) [`pallets/click#2523`](https://github.com/pallets/click/pull/2523).
209) """
210)
211) def __init__(
212) self,
213) *,
214) mix_stderr: bool = False,
215) color: bool | None = None,
216) ) -> None:
217) self.color = color
218) self.mix_stderr = mix_stderr
219)
220) class MixStderrAttribute(TypedDict):
221) mix_stderr: NotRequired[bool]
222)
223) mix_stderr_args: MixStderrAttribute = (
224) {"mix_stderr": mix_stderr}
225) if self._SUPPORTS_MIX_STDERR_ATTRIBUTE
226) else {}
227) )
|