Add tests for the configuration mutex
Marco Ricci

Marco Ricci commited on 2025-05-18 23:08:44
Zeige 6 geänderte Dateien mit 900 Einfügungen und 35 Löschungen.


Conceptually, we need two types of tests for the configuration mutex:
the first type asserts that the program logic works for *any* kind of
(potentially fake) configuration mutex, and the second type asserts that
the configuration mutex works for *any* set of (potentially fake)
operations.  In both cases, the fake parts usually impose a specific
order of locking or running, controllable from the outside.

We modify the `ConfigurationMutex` class to shift the management of the
lock file descriptor into the lock and unlock functions themselves,
instead of cramming them into `__enter__` and `__exit__`.  (This means
that subclasses need not deal with the lock file, or in fact, with any
of the platform-specific locking machinery.)  A custom subclass in the
`tests` namespace provides a faked mutex that imposes a specific locking
order, controllable from the outside via message passing.  This subclass
is then used with a state machine to test various sets of operations
together with the faked mutex, i.e. to provide tests of the first type.
(The standard tests, executed with the actual mutex implementation,
already provide tests of the second type.)

Regrettably, this set of tests requires concurrency support.
Specifically, because the click testing machinery needs to reset
process-wide properties for each concurrent unit, and because the tests
require this to be done in a time-overlapped manner, the tests
specifically require multiprocessing support.  Care must also be taken
to ensure that coverage management works: that it is enabled in the
subprocesses whenever it is enabled in the parent (which is
a straightforward configuration issue), and that the parent process does
not accidentally remove these coverage measurements as junk files in
case the subprocesses are spawned in specifically prepared and
automatically cleaned up temporary directories (which is a manual labor
issue).
... ...
@@ -228,6 +228,8 @@ omit = [
228 228
     "__main__.py",
229 229
 ]
230 230
 dynamic_context = 'test_function'
231
+concurrency = ['thread', 'multiprocessing']
232
+sigterm = true
231 233
 
232 234
 [tool.hatch.build.targets.sdist]
233 235
 exclude = [
... ...
@@ -198,11 +198,13 @@ class ConfigurationMutex:
198 198
 
199 199
     """
200 200
 
201
-    lock: Callable[[int], None]
202
-    """A function to lock a given file descriptor exclusively.
201
+    lock: Callable[[], None]
202
+    """A function to lock the mutex exclusively.
203 203
 
204
-    On Windows, this uses [`msvcrt.locking`][], on other systems, this
205
-    uses [`fcntl.flock`][].
204
+    This implementation uses a file descriptor of a well-known file,
205
+    which is opened before locking and closed after unlocking (and on
206
+    error when locking). On Windows, we use [`msvcrt.locking`][], on
207
+    other systems, we use [`fcntl.flock`][].
206 208
 
207 209
     Note:
208 210
         This is a normal Python function, not a method.
... ...
@@ -213,11 +215,14 @@ class ConfigurationMutex:
213 215
         [`lock`][] and [`unlock`][] are still compatible.
214 216
 
215 217
     """
216
-    unlock: Callable[[int], None]
217
-    """A function to unlock a given file descriptor.
218
+    unlock: Callable[[], None]
219
+    """A function to unlock the mutex.
218 220
 
219
-    On Windows, this uses [`msvcrt.locking`][], on other systems, this
220
-    uses [`fcntl.flock`][].
221
+    This implementation uses a file descriptor of a well-known file,
222
+    which is opened before locking and closed after unlocking (and on
223
+    error when locking). It will fail if the file descriptor is
224
+    unavailable. On Windows, we use [`msvcrt.locking`][], on other
225
+    systems, we use [`fcntl.flock`][].
221 226
 
222 227
     Note:
223 228
         This is a normal Python function, not a method.
... ...
@@ -244,11 +249,11 @@ class ConfigurationMutex:
244 249
             LK_LOCK = msvcrt.LK_LOCK  # noqa: N806
245 250
             LK_UNLCK = msvcrt.LK_UNLCK  # noqa: N806
246 251
 
247
-            def lock_func(fileobj: int) -> None:
248
-                locking(fileobj, LK_LOCK, LOCK_SIZE)
252
+            def lock_fd(fd: int, /) -> None:
253
+                locking(fd, LK_LOCK, LOCK_SIZE)
249 254
 
250
-            def unlock_func(fileobj: int) -> None:
251
-                locking(fileobj, LK_UNLCK, LOCK_SIZE)
255
+            def unlock_fd(fd: int, /) -> None:
256
+                locking(fd, LK_UNLCK, LOCK_SIZE)
252 257
 
253 258
         else:
254 259
             import fcntl  # noqa: PLC0415
... ...
@@ -257,11 +262,31 @@ class ConfigurationMutex:
257 262
             LOCK_EX = fcntl.LOCK_EX  # noqa: N806
258 263
             LOCK_UN = fcntl.LOCK_UN  # noqa: N806
259 264
 
260
-            def lock_func(fileobj: int) -> None:
261
-                flock(fileobj, LOCK_EX)
265
+            def lock_fd(fd: int, /) -> None:
266
+                flock(fd, LOCK_EX)
262 267
 
263
-            def unlock_func(fileobj: int) -> None:
264
-                flock(fileobj, LOCK_UN)
268
+            def unlock_fd(fd: int, /) -> None:
269
+                flock(fd, LOCK_UN)
270
+
271
+        def lock_func() -> None:
272
+            with self.write_lock_condition:
273
+                self.write_lock_condition.wait_for(
274
+                    lambda: self.write_lock_fileobj is None
275
+                )
276
+                self.write_lock_condition.notify()
277
+                self.write_lock_file.touch()
278
+                self.write_lock_fileobj = self.write_lock_file.open('wb')
279
+                lock_fd(self.write_lock_fileobj.fileno())
280
+
281
+        def unlock_func() -> None:
282
+            with self.write_lock_condition:
283
+                assert self.write_lock_fileobj is not None, (
284
+                    'We lost track of the configuration write lock '
285
+                    'file object, so we cannot unlock it anymore!'
286
+                )
287
+                unlock_fd(self.write_lock_fileobj.fileno())
288
+                self.write_lock_fileobj.close()
289
+                self.write_lock_fileobj = None
265 290
 
266 291
         self.lock = lock_func
267 292
         self.unlock = unlock_func
... ...
@@ -271,13 +296,7 @@ class ConfigurationMutex:
271 296
 
272 297
     def __enter__(self) -> Self:
273 298
         """Enter the context, locking the configuration file."""  # noqa: DOC201
274
-        with self.write_lock_condition:
275
-            self.write_lock_condition.wait_for(
276
-                lambda: self.write_lock_fileobj is None
277
-            )
278
-            self.write_lock_file.touch()
279
-            self.write_lock_fileobj = self.write_lock_file.open('wb')
280
-            self.lock(self.write_lock_fileobj.fileno())
299
+        self.lock()
281 300
         return self
282 301
 
283 302
     def __exit__(
... ...
@@ -288,14 +307,7 @@ class ConfigurationMutex:
288 307
         /,
289 308
     ) -> Literal[False]:
290 309
         """Exit the context, releasing the lock on the configuration file."""  # noqa: DOC201
291
-        with self.write_lock_condition:
292
-            assert self.write_lock_fileobj is not None, (
293
-                'We lost track of the configuration write lock file object, '
294
-                'so we cannot unlock it anymore!'
295
-            )
296
-            self.unlock(self.write_lock_fileobj.fileno())
297
-            self.write_lock_fileobj.close()
298
-            self.write_lock_fileobj = None
310
+        self.unlock()
299 311
         return False
300 312
 
301 313
 
... ...
@@ -36,6 +36,7 @@ from derivepassphrase._internals import cli_messages as _msg
36 36
 if TYPE_CHECKING:
37 37
     from collections.abc import Sequence
38 38
     from collections.abc import Set as AbstractSet
39
+    from contextlib import AbstractContextManager
39 40
 
40 41
 __all__ = ('derivepassphrase',)
41 42
 
... ...
@@ -737,7 +738,10 @@ class _VaultContext:  # noqa: PLR0904
737 738
                 )
738 739
                 raise click.UsageError(str(err_msg))
739 740
 
740
-    def get_mutex(self, op: str) -> contextlib.AbstractContextManager[None]:
741
+    def get_mutex(
742
+        self,
743
+        op: str,
744
+    ) -> AbstractContextManager[AbstractContextManager | None]:
741 745
         """Return a mutex for accessing the configuration on disk.
742 746
 
743 747
         The mutex is a context manager, and will lock out other threads
... ...
@@ -16,6 +16,7 @@ import pathlib
16 16
 import re
17 17
 import shlex
18 18
 import stat
19
+import sys
19 20
 import tempfile
20 21
 import types
21 22
 import zipfile
... ...
@@ -1603,6 +1604,65 @@ available.  Usually this means that the test targets the
1603 1604
 `derivepassphrase export vault` subcommand, whose functionality depends
1604 1605
 on cryptography support being available.
1605 1606
 """
1607
+skip_if_no_multiprocessing_support = pytest.mark.skipif(
1608
+    importlib.util.find_spec('multiprocessing') is None,
1609
+    reason='no "multiprocessing" support',
1610
+)
1611
+"""
1612
+A cached pytest mark to skip this test if multiprocessing support is not
1613
+available.  Usually this means that the test targets the concurrency
1614
+features of `derivepassphrase`, which is generally only possible to test
1615
+in separate processes because the testing machinery operates on
1616
+process-global state.
1617
+"""
1618
+
1619
+MIN_CONCURRENCY = 4
1620
+"""
1621
+The minimum amount of concurrent threads used for testing.
1622
+"""
1623
+
1624
+
1625
+def get_concurrency_limit() -> int:
1626
+    """Return the imposed limit on the number of concurrent threads.
1627
+
1628
+    We use [`os.process_cpu_count`][] as the limit on Python 3.13 and
1629
+    higher, and [`os.cpu_count`][] on Python 3.12 and below.  On
1630
+    Python 3.12 and below, we explicitly support the `PYTHON_CPU_COUNT`
1631
+    environment variable.  We guarantee at least [`MIN_CONCURRENCY`][]
1632
+    many threads in any case.
1633
+
1634
+    """  # noqa: RUF002
1635
+    result: int | None = None
1636
+    if sys.version_info >= (3, 13):
1637
+        result = os.process_cpu_count()
1638
+    else:
1639
+        try:
1640
+            cpus = os.sched_getaffinity(os.getpid())
1641
+        except AttributeError:
1642
+            pass
1643
+        else:
1644
+            result = len(cpus)
1645
+    return max(result if result is not None else 0, MIN_CONCURRENCY)
1646
+
1647
+
1648
+def get_concurrency_step_count(
1649
+    settings: hypothesis.settings | None = None,
1650
+) -> int:
1651
+    """Return the desired step count for concurrency-related tests.
1652
+
1653
+    This is the smaller of the [general concurrency
1654
+    limit][tests.get_concurrency_limit] and the step count from the
1655
+    current hypothesis settings.
1656
+
1657
+    Args:
1658
+        settings:
1659
+            The hypothesis settings for a specific tests.  If not given,
1660
+            then the current profile will be queried directly.
1661
+
1662
+    """
1663
+    if settings is None:  # pragma: no cover
1664
+        settings = hypothesis.settings()
1665
+    return min(get_concurrency_limit(), settings.stateful_step_count)
1606 1666
 
1607 1667
 
1608 1668
 def list_keys(self: Any = None) -> list[_types.SSHKeyCommentPair]:
... ...
@@ -111,7 +111,7 @@ _hypothesis_settings_setup()
111 111
 
112 112
 # https://docs.pytest.org/en/stable/explanation/fixtures.html#a-note-about-fixture-cleanup
113 113
 # https://github.com/pytest-dev/pytest/issues/5243#issuecomment-491522595
114
-@pytest.fixture(scope='session', autouse=True)
114
+@pytest.fixture(scope='session', autouse=False)
115 115
 def term_handler() -> Iterator[None]:  # pragma: no cover
116 116
     try:
117 117
         import signal  # noqa: PLC0415
... ...
@@ -14,6 +14,7 @@ import json
14 14
 import logging
15 15
 import os
16 16
 import pathlib
17
+import queue
17 18
 import re
18 19
 import shlex
19 20
 import shutil
... ...
@@ -22,13 +23,13 @@ import tempfile
22 23
 import textwrap
23 24
 import types
24 25
 import warnings
25
-from typing import TYPE_CHECKING
26
+from typing import TYPE_CHECKING, cast
26 27
 
27 28
 import click.testing
28 29
 import hypothesis
29 30
 import pytest
30 31
 from hypothesis import stateful, strategies
31
-from typing_extensions import Any, NamedTuple
32
+from typing_extensions import Any, NamedTuple, TypeAlias
32 33
 
33 34
 import tests
34 35
 from derivepassphrase import _types, cli, ssh_agent, vault
... ...
@@ -39,6 +40,7 @@ from derivepassphrase._internals import (
39 40
 )
40 41
 
41 42
 if TYPE_CHECKING:
43
+    import multiprocessing
42 44
     from collections.abc import Callable, Iterable, Iterator, Sequence
43 45
     from collections.abc import Set as AbstractSet
44 46
     from typing import NoReturn
... ...
@@ -6002,6 +6004,791 @@ TestConfigManagement = ConfigManagementStateMachine.TestCase
6002 6004
 """The [`unittest.TestCase`][] class that will actually be run."""
6003 6005
 
6004 6006
 
6007
+class FakeConfigurationMutexAction(NamedTuple):
6008
+    """An action/a step in the [`FakeConfigurationMutexStateMachine`][].
6009
+
6010
+    Attributes:
6011
+        command_line:
6012
+            The command-line for `derivepassphrase vault` to execute.
6013
+        input:
6014
+            The input to this command.
6015
+
6016
+    """
6017
+
6018
+    command_line: list[str]
6019
+    """"""
6020
+    input: str | bytes | None = None
6021
+    """"""
6022
+
6023
+
6024
+def run_actions_handler(
6025
+    id_num: int,
6026
+    action: FakeConfigurationMutexAction,
6027
+    *,
6028
+    input_queue: queue.Queue,
6029
+    output_queue: queue.Queue,
6030
+    timeout: int,
6031
+) -> None:
6032
+    """Prepare the faked mutex, then run `action`.
6033
+
6034
+    This is a top-level handler function -- to be used in a new
6035
+    [`multiprocessing.Process`][] -- to run a single action from the
6036
+    [`FakeConfigurationMutexStateMachine`][].  Output from this function
6037
+    must be sent down the output queue instead of relying on the call
6038
+    stack.  Additionally, because this runs in a separate process, we
6039
+    need to restart coverage tracking if it is currently running.
6040
+
6041
+    Args:
6042
+        id_num:
6043
+            The internal ID of this subprocess.
6044
+        action:
6045
+            The action to execute.
6046
+        input_queue:
6047
+            The queue for data passed from the manager/parent process to
6048
+            this subprocess.
6049
+        output_queue:
6050
+            The queue for data passed from this subprocess to the
6051
+            manager/parent process.
6052
+        timeout:
6053
+            The maximum amount of time to wait for a data transfer along
6054
+            the input or the output queue.  If exceeded, we exit
6055
+            immediately.
6056
+
6057
+    """
6058
+    with pytest.MonkeyPatch.context() as monkeypatch:
6059
+        monkeypatch.setattr(
6060
+            cli_helpers,
6061
+            'configuration_mutex',
6062
+            lambda: FakeConfigurationMutexStateMachine.ConfigurationMutexStub(
6063
+                my_id=id_num,
6064
+                input_queue=input_queue,
6065
+                output_queue=output_queue,
6066
+                timeout=timeout,
6067
+            ),
6068
+        )
6069
+        runner = tests.CliRunner(mix_stderr=False)
6070
+        try:
6071
+            result = runner.invoke(
6072
+                cli.derivepassphrase_vault,
6073
+                args=action.command_line,
6074
+                input=action.input,
6075
+                catch_exceptions=True,
6076
+            )
6077
+            output_queue.put(
6078
+                FakeConfigurationMutexStateMachine.IPCMessage(
6079
+                    id_num,
6080
+                    'result',
6081
+                    (
6082
+                        result.clean_exit(empty_stderr=False),
6083
+                        copy.copy(result.stdout),
6084
+                        copy.copy(result.stderr),
6085
+                    ),
6086
+                ),
6087
+                block=True,
6088
+                timeout=timeout,
6089
+            )
6090
+        except Exception as exc:  # pragma: no cover  # noqa: BLE001
6091
+            output_queue.put(
6092
+                FakeConfigurationMutexStateMachine.IPCMessage(
6093
+                    id_num, 'exception', exc
6094
+                ),
6095
+                block=False,
6096
+            )
6097
+
6098
+
6099
+@hypothesis.settings(
6100
+    stateful_step_count=tests.get_concurrency_step_count(),
6101
+    deadline=None,
6102
+)
6103
+class FakeConfigurationMutexStateMachine(stateful.RuleBasedStateMachine):
6104
+    """A state machine simulating the (faked) configuration mutex.
6105
+
6106
+    Generate an ordered set of concurrent writers to the
6107
+    derivepassphrase configuration, then test that the writers' accesses
6108
+    are serialized correctly, i.e., test that the writers correctly use
6109
+    the mutex to avoid concurrent accesses, under the assumption that
6110
+    the mutex itself is correctly implemented.
6111
+
6112
+    We use a custom mutex implementation to both ensure that all writers
6113
+    attempt to lock the configuration at the same time and that the lock
6114
+    is granted in our desired order.  This test is therefore independent
6115
+    of the actual (operating system-specific) mutex implementation in
6116
+    `derivepassphrase`.
6117
+
6118
+    Attributes:
6119
+        setting:
6120
+            A bundle for single-service settings.
6121
+        configuration:
6122
+            A bundle for full vault configurations.
6123
+
6124
+    """
6125
+
6126
+    class IPCMessage(NamedTuple):
6127
+        """A message for inter-process communication.
6128
+
6129
+        Used by the configuration mutex stub class to affect/signal the
6130
+        control flow amongst the linked mutex clients.
6131
+
6132
+        Attributes:
6133
+            child_id:
6134
+                The ID of the sending or receiving child process.
6135
+            message:
6136
+                One of "ready", "go", "config", "result" or "exception".
6137
+            payload:
6138
+                The (optional) message payload.
6139
+
6140
+        """
6141
+
6142
+        child_id: int
6143
+        """"""
6144
+        message: Literal['ready', 'go', 'config', 'result', 'exception']
6145
+        """"""
6146
+        payload: object | None
6147
+        """"""
6148
+
6149
+    class ConfigurationMutexStub(cli_helpers.ConfigurationMutex):
6150
+        """Configuration mutex subclass that enforces a locking order.
6151
+
6152
+        Each configuration mutex stub object ("mutex client") has an
6153
+        associated ID, and one read-only and one write-only pipe
6154
+        (actually: [`multiprocessing.Queue`][] objects) to the "manager"
6155
+        instance coordinating these stub objects.  First, the mutex
6156
+        client signals readiness, then the manager signals when the
6157
+        mutex shall be considered "acquired", then finally the mutex
6158
+        client sends the result back (simultaneously releasing the mutex
6159
+        again).  The manager may optionally send an abort signal if the
6160
+        operations take too long.
6161
+
6162
+        This subclass also copies the effective vault configuration
6163
+        to `intermediate_configs` upon releasing the lock.
6164
+
6165
+        """
6166
+
6167
+        def __init__(
6168
+            self,
6169
+            *,
6170
+            my_id: int,
6171
+            timeout: int,
6172
+            input_queue: queue.Queue[
6173
+                FakeConfigurationMutexStateMachine.IPCMessage
6174
+            ],
6175
+            output_queue: queue.Queue[
6176
+                FakeConfigurationMutexStateMachine.IPCMessage
6177
+            ],
6178
+        ) -> None:
6179
+            """Initialize this mutex client.
6180
+
6181
+            Args:
6182
+                my_id:
6183
+                    The ID of this client.
6184
+                timeout:
6185
+                    The timeout for each get and put operation on the
6186
+                    queues.
6187
+                input_queue:
6188
+                    The message queue for IPC messages from the manager
6189
+                    instance to this mutex client.
6190
+                output_queue:
6191
+                    The message queue for IPC messages from this mutex
6192
+                    client to the manager instance.
6193
+
6194
+            """
6195
+            super().__init__()
6196
+
6197
+            def lock() -> None:
6198
+                """Simulate locking of the mutex.
6199
+
6200
+                Issue a "ready" message, wait for a "go", then return.
6201
+                If an exception occurs, issue an "exception" message,
6202
+                then raise the exception.
6203
+
6204
+                """
6205
+                IPCMessage: TypeAlias = (
6206
+                    FakeConfigurationMutexStateMachine.IPCMessage
6207
+                )
6208
+                try:
6209
+                    output_queue.put(
6210
+                        IPCMessage(my_id, 'ready', None),
6211
+                        block=True,
6212
+                        timeout=timeout,
6213
+                    )
6214
+                    ok = input_queue.get(block=True, timeout=timeout)
6215
+                    if ok != IPCMessage(my_id, 'go', None):  # pragma: no cover
6216
+                        output_queue.put(
6217
+                            IPCMessage(my_id, 'exception', ok), block=False
6218
+                        )
6219
+                        raise (
6220
+                            ok[2]
6221
+                            if isinstance(ok[2], BaseException)
6222
+                            else RuntimeError(ok[2])
6223
+                        )
6224
+                except (queue.Empty, queue.Full) as exc:  # pragma: no cover
6225
+                    output_queue.put(
6226
+                        IPCMessage(my_id, 'exception', exc), block=False
6227
+                    )
6228
+                    return
6229
+
6230
+            def unlock() -> None:
6231
+                """Simulate unlocking of the mutex.
6232
+
6233
+                Issue a "config" message, then return.  If an exception
6234
+                occurs, issue an "exception" message, then raise the
6235
+                exception.
6236
+
6237
+                """
6238
+                IPCMessage: TypeAlias = (
6239
+                    FakeConfigurationMutexStateMachine.IPCMessage
6240
+                )
6241
+                try:
6242
+                    output_queue.put(
6243
+                        IPCMessage(
6244
+                            my_id,
6245
+                            'config',
6246
+                            copy.copy(cli_helpers.load_config()),
6247
+                        ),
6248
+                        block=True,
6249
+                        timeout=timeout,
6250
+                    )
6251
+                except (queue.Empty, queue.Full) as exc:  # pragma: no cover
6252
+                    output_queue.put(
6253
+                        IPCMessage(my_id, 'exception', exc), block=False
6254
+                    )
6255
+                    raise
6256
+
6257
+            self.lock = lock
6258
+            self.unlock = unlock
6259
+
6260
+    setting: stateful.Bundle[_types.VaultConfigServicesSettings] = (
6261
+        stateful.Bundle('setting')
6262
+    )
6263
+    """"""
6264
+    configuration: stateful.Bundle[_types.VaultConfig] = stateful.Bundle(
6265
+        'configuration'
6266
+    )
6267
+    """"""
6268
+
6269
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
6270
+        """Initialize the state machine."""
6271
+        super().__init__(*args, **kwargs)
6272
+        self.actions: list[FakeConfigurationMutexAction] = []
6273
+        # Determine the step count by poking around in the hypothesis
6274
+        # internals. As this isn't guaranteed to be stable, turn off
6275
+        # coverage.
6276
+        try:  # pragma: no cover
6277
+            settings: hypothesis.settings | None
6278
+            settings = FakeConfigurationMutexStateMachine.TestCase.settings
6279
+        except AttributeError:  # pragma: no cover
6280
+            settings = None
6281
+        self.step_count = tests.get_concurrency_step_count(settings)
6282
+
6283
+    @stateful.initialize(
6284
+        target=configuration,
6285
+        configs=strategies.lists(
6286
+            vault_full_config(),
6287
+            min_size=8,
6288
+            max_size=8,
6289
+        ),
6290
+    )
6291
+    def declare_initial_configs(
6292
+        self,
6293
+        configs: list[_types.VaultConfig],
6294
+    ) -> stateful.MultipleResults[_types.VaultConfig]:
6295
+        """Initialize the configuration bundle with eight configurations."""
6296
+        return stateful.multiple(*configs)
6297
+
6298
+    @stateful.initialize(
6299
+        target=setting,
6300
+        configs=strategies.lists(
6301
+            vault_full_config(),
6302
+            min_size=4,
6303
+            max_size=4,
6304
+        ),
6305
+    )
6306
+    def extract_initial_settings(
6307
+        self,
6308
+        configs: list[_types.VaultConfig],
6309
+    ) -> stateful.MultipleResults[_types.VaultConfigServicesSettings]:
6310
+        """Initialize the settings bundle with four service settings."""
6311
+        settings: list[_types.VaultConfigServicesSettings] = []
6312
+        for c in configs:
6313
+            settings.extend(c['services'].values())
6314
+        return stateful.multiple(*map(copy.deepcopy, settings))
6315
+
6316
+    @stateful.initialize(
6317
+        config=vault_full_config(),
6318
+    )
6319
+    def declare_initial_action(
6320
+        self,
6321
+        config: _types.VaultConfig,
6322
+    ) -> None:
6323
+        """Initialize the actions bundle from the configuration bundle.
6324
+
6325
+        This is roughly comparable to the
6326
+        [`add_import_configuration_action`][] general rule, but adding
6327
+        it as a separate initialize rule avoids having to guard every
6328
+        other action-amending rule against empty action sequences, which
6329
+        would discard huge portions of the rule selection search space
6330
+        and thus trigger loads of hypothesis health check warnings.
6331
+
6332
+        """
6333
+        command_line = ['--import', '-', '--overwrite-existing']
6334
+        input = json.dumps(config)  # noqa: A001
6335
+        hypothesis.note(f'# {command_line = }, {input = }')
6336
+        action = FakeConfigurationMutexAction(
6337
+            command_line=command_line, input=input
6338
+        )
6339
+        self.actions.append(action)
6340
+
6341
+    @stateful.rule(
6342
+        setting=setting.filter(bool),
6343
+        maybe_unset=strategies.sets(
6344
+            strategies.sampled_from(VALID_PROPERTIES),
6345
+            max_size=3,
6346
+        ),
6347
+        overwrite=strategies.booleans(),
6348
+    )
6349
+    def add_set_globals_action(
6350
+        self,
6351
+        setting: _types.VaultConfigGlobalSettings,
6352
+        maybe_unset: set[str],
6353
+        overwrite: bool,
6354
+    ) -> None:
6355
+        """Set the global settings of a configuration.
6356
+
6357
+        Args:
6358
+            setting:
6359
+                The new global settings.
6360
+            maybe_unset:
6361
+                Settings keys to additionally unset, if not already
6362
+                present in the new settings.  Corresponds to the
6363
+                `--unset` command-line argument.
6364
+            overwrite:
6365
+                Overwrite the settings object if true, or merge if
6366
+                false.  Corresponds to the `--overwrite-existing` and
6367
+                `--merge-existing` command-line arguments.
6368
+
6369
+        """
6370
+        maybe_unset = set(maybe_unset) - setting.keys()
6371
+        command_line = (
6372
+            [
6373
+                '--config',
6374
+                '--overwrite-existing' if overwrite else '--merge-existing',
6375
+            ]
6376
+            + [f'--unset={key}' for key in maybe_unset]
6377
+            + [
6378
+                f'--{key}={value}'
6379
+                for key, value in setting.items()
6380
+                if key in VALID_PROPERTIES
6381
+            ]
6382
+        )
6383
+        input = None  # noqa: A001
6384
+        hypothesis.note(f'# {command_line = }, {input = }')
6385
+        action = FakeConfigurationMutexAction(
6386
+            command_line=command_line, input=input
6387
+        )
6388
+        self.actions.append(action)
6389
+
6390
+    @stateful.rule(
6391
+        service=strategies.sampled_from(KNOWN_SERVICES),
6392
+        setting=setting.filter(bool),
6393
+        maybe_unset=strategies.sets(
6394
+            strategies.sampled_from(VALID_PROPERTIES),
6395
+            max_size=3,
6396
+        ),
6397
+        overwrite=strategies.booleans(),
6398
+    )
6399
+    def add_set_service_action(
6400
+        self,
6401
+        service: str,
6402
+        setting: _types.VaultConfigServicesSettings,
6403
+        maybe_unset: set[str],
6404
+        overwrite: bool,
6405
+    ) -> None:
6406
+        """Set the named service settings for a configuration.
6407
+
6408
+        Args:
6409
+            service:
6410
+                The name of the service to set.
6411
+            setting:
6412
+                The new service settings.
6413
+            maybe_unset:
6414
+                Settings keys to additionally unset, if not already
6415
+                present in the new settings.  Corresponds to the
6416
+                `--unset` command-line argument.
6417
+            overwrite:
6418
+                Overwrite the settings object if true, or merge if
6419
+                false.  Corresponds to the `--overwrite-existing` and
6420
+                `--merge-existing` command-line arguments.
6421
+
6422
+        """
6423
+        maybe_unset = set(maybe_unset) - setting.keys()
6424
+        command_line = (
6425
+            [
6426
+                '--config',
6427
+                '--overwrite-existing' if overwrite else '--merge-existing',
6428
+            ]
6429
+            + [f'--unset={key}' for key in maybe_unset]
6430
+            + [
6431
+                f'--{key}={value}'
6432
+                for key, value in setting.items()
6433
+                if key in VALID_PROPERTIES
6434
+            ]
6435
+            + ['--', service]
6436
+        )
6437
+        input = None  # noqa: A001
6438
+        hypothesis.note(f'# {command_line = }, {input = }')
6439
+        action = FakeConfigurationMutexAction(
6440
+            command_line=command_line, input=input
6441
+        )
6442
+        self.actions.append(action)
6443
+
6444
+    @stateful.rule()
6445
+    def add_purge_global_action(
6446
+        self,
6447
+    ) -> None:
6448
+        """Purge the globals of a configuration."""
6449
+        command_line = ['--delete-globals']
6450
+        input = None  # 'y'  # noqa: A001
6451
+        hypothesis.note(f'# {command_line = }, {input = }')
6452
+        action = FakeConfigurationMutexAction(
6453
+            command_line=command_line, input=input
6454
+        )
6455
+        self.actions.append(action)
6456
+
6457
+    @stateful.rule(
6458
+        service=strategies.sampled_from(KNOWN_SERVICES),
6459
+    )
6460
+    def add_purge_service_action(
6461
+        self,
6462
+        service: str,
6463
+    ) -> None:
6464
+        """Purge the settings of a named service in a configuration.
6465
+
6466
+        Args:
6467
+            service:
6468
+                The service name to purge.
6469
+
6470
+        """
6471
+        command_line = ['--delete', '--', service]
6472
+        input = None  # 'y'  # noqa: A001
6473
+        hypothesis.note(f'# {command_line = }, {input = }')
6474
+        action = FakeConfigurationMutexAction(
6475
+            command_line=command_line, input=input
6476
+        )
6477
+        self.actions.append(action)
6478
+
6479
+    @stateful.rule()
6480
+    def add_purge_all_action(
6481
+        self,
6482
+    ) -> None:
6483
+        """Purge the entire configuration."""
6484
+        command_line = ['--clear']
6485
+        input = None  # 'y'  # noqa: A001
6486
+        hypothesis.note(f'# {command_line = }, {input = }')
6487
+        action = FakeConfigurationMutexAction(
6488
+            command_line=command_line, input=input
6489
+        )
6490
+        self.actions.append(action)
6491
+
6492
+    @stateful.rule(
6493
+        config_to_import=configuration,
6494
+        overwrite=strategies.booleans(),
6495
+    )
6496
+    def add_import_configuration_action(
6497
+        self,
6498
+        config_to_import: _types.VaultConfig,
6499
+        overwrite: bool,
6500
+    ) -> None:
6501
+        """Import the given configuration.
6502
+
6503
+        Args:
6504
+            config_to_import:
6505
+                The configuration to import.
6506
+            overwrite:
6507
+                Overwrite the base configuration if true, or merge if
6508
+                false.  Corresponds to the `--overwrite-existing` and
6509
+                `--merge-existing` command-line arguments.
6510
+
6511
+        """
6512
+        command_line = ['--import', '-'] + (
6513
+            ['--overwrite-existing'] if overwrite else []
6514
+        )
6515
+        input = json.dumps(config_to_import)  # noqa: A001
6516
+        hypothesis.note(f'# {command_line = }, {input = }')
6517
+        action = FakeConfigurationMutexAction(
6518
+            command_line=command_line, input=input
6519
+        )
6520
+        self.actions.append(action)
6521
+
6522
+    @stateful.precondition(lambda self: len(self.actions) > 0)
6523
+    @stateful.invariant()
6524
+    def run_actions(  # noqa: C901
6525
+        self,
6526
+    ) -> None:
6527
+        """Run the actions, serially and concurrently.
6528
+
6529
+        Run the actions once serially, then once more concurrently with
6530
+        the faked configuration mutex, and assert that both runs yield
6531
+        identical intermediate and final results.
6532
+
6533
+        We must run the concurrent version in processes, not threads or
6534
+        Python async functions, because the `click` testing machinery
6535
+        manipulates global properties (e.g. the standard I/O streams,
6536
+        the current directory, and the environment), and we require this
6537
+        manipulation to happen in a time-overlapped manner.
6538
+
6539
+        However, running multiple processes increases the risk of the
6540
+        operating system imposing process count or memory limits on us.
6541
+        We therefore skip the test as a whole if we fail to start a new
6542
+        process due to lack of necessary resources (memory, processes,
6543
+        or open file descriptors).
6544
+
6545
+        """
6546
+        if not TYPE_CHECKING:  # pragma: no branch
6547
+            multiprocessing = pytest.importorskip('multiprocessing')
6548
+        IPCMessage: TypeAlias = FakeConfigurationMutexStateMachine.IPCMessage
6549
+        intermediate_configs: dict[int, _types.VaultConfig] = {}
6550
+        intermediate_results: dict[
6551
+            int, tuple[bool, str | None, str | None]
6552
+        ] = {}
6553
+        true_configs: dict[int, _types.VaultConfig] = {}
6554
+        true_results: dict[int, tuple[bool, str | None, str | None]] = {}
6555
+        timeout = 5
6556
+        actions = self.actions
6557
+        mp = multiprocessing.get_context()
6558
+        # Coverage tracking writes coverage data to the current working
6559
+        # directory, but because the subprocesses are spawned within the
6560
+        # `tests.isolated_vault_config` context manager, their starting
6561
+        # working directory is the isolated one, not the original one.
6562
+        orig_cwd = pathlib.Path.cwd()
6563
+
6564
+        fatal_process_creation_errnos = {
6565
+            # Specified by POSIX for fork(3).
6566
+            errno.ENOMEM,
6567
+            # Specified by POSIX for fork(3).
6568
+            errno.EAGAIN,
6569
+            # Specified by Linux/glibc for fork(3)
6570
+            getattr(errno, 'ENOSYS', errno.ENOMEM),
6571
+            # Specified by POSIX for posix_spawn(3).
6572
+            errno.EINVAL,
6573
+        }
6574
+
6575
+        hypothesis.note(f'# {actions = }')
6576
+
6577
+        stack = contextlib.ExitStack()
6578
+        with stack:
6579
+            runner = tests.CliRunner(mix_stderr=False)
6580
+            monkeypatch = stack.enter_context(pytest.MonkeyPatch.context())
6581
+            stack.enter_context(
6582
+                tests.isolated_vault_config(
6583
+                    monkeypatch=monkeypatch,
6584
+                    runner=runner,
6585
+                    vault_config={'services': {}},
6586
+                )
6587
+            )
6588
+            for i, action in enumerate(actions):
6589
+                result = runner.invoke(
6590
+                    cli.derivepassphrase_vault,
6591
+                    args=action.command_line,
6592
+                    input=action.input,
6593
+                    catch_exceptions=True,
6594
+                )
6595
+                true_configs[i] = copy.copy(cli_helpers.load_config())
6596
+                true_results[i] = (
6597
+                    result.clean_exit(empty_stderr=False),
6598
+                    result.stdout,
6599
+                    result.stderr,
6600
+                )
6601
+
6602
+        with stack:  # noqa: PLR1702
6603
+            runner = tests.CliRunner(mix_stderr=False)
6604
+            monkeypatch = stack.enter_context(pytest.MonkeyPatch.context())
6605
+            stack.enter_context(
6606
+                tests.isolated_vault_config(
6607
+                    monkeypatch=monkeypatch,
6608
+                    runner=runner,
6609
+                    vault_config={'services': {}},
6610
+                )
6611
+            )
6612
+
6613
+            child_output_queue: multiprocessing.Queue[IPCMessage] = mp.Queue()
6614
+            child_input_queues: list[
6615
+                multiprocessing.Queue[IPCMessage] | None
6616
+            ] = []
6617
+            processes: list[multiprocessing.process.BaseProcess] = []
6618
+            processes_pending: set[multiprocessing.process.BaseProcess] = set()
6619
+            ready_wait: set[int] = set()
6620
+
6621
+            try:
6622
+                for i, action in enumerate(actions):
6623
+                    q: multiprocessing.Queue[IPCMessage] | None = mp.Queue()
6624
+                    try:
6625
+                        p: multiprocessing.process.BaseProcess = mp.Process(
6626
+                            name=f'fake-mutex-action-{i:02d}',
6627
+                            target=run_actions_handler,
6628
+                            kwargs={
6629
+                                'id_num': i,
6630
+                                'timeout': timeout,
6631
+                                'action': action,
6632
+                                'input_queue': q,
6633
+                                'output_queue': child_output_queue,
6634
+                            },
6635
+                            daemon=False,
6636
+                        )
6637
+                        p.start()
6638
+                    except OSError as exc:  # pragma: no cover
6639
+                        if exc.errno in fatal_process_creation_errnos:
6640
+                            pytest.skip(
6641
+                                'cannot test mutex functionality due to '
6642
+                                'lack of system resources for '
6643
+                                'creating enough subprocesses'
6644
+                            )
6645
+                        raise
6646
+                    else:
6647
+                        processes.append(p)
6648
+                        processes_pending.add(p)
6649
+                        child_input_queues.append(q)
6650
+                        ready_wait.add(i)
6651
+
6652
+                while processes_pending:
6653
+                    try:
6654
+                        self.mainloop(
6655
+                            timeout=timeout,
6656
+                            child_output_queue=child_output_queue,
6657
+                            child_input_queues=child_input_queues,
6658
+                            ready_wait=ready_wait,
6659
+                            intermediate_configs=intermediate_configs,
6660
+                            intermediate_results=intermediate_results,
6661
+                            processes=processes,
6662
+                            processes_pending=processes_pending,
6663
+                            block=True,
6664
+                        )
6665
+                    except Exception as exc:  # pragma: no cover
6666
+                        for i, q in enumerate(child_input_queues):
6667
+                            if q:
6668
+                                q.put(IPCMessage(i, 'exception', exc))
6669
+                        for p in processes_pending:
6670
+                            p.join(timeout=timeout)
6671
+                        raise
6672
+            finally:
6673
+                try:
6674
+                    while True:
6675
+                        try:
6676
+                            self.mainloop(
6677
+                                timeout=timeout,
6678
+                                child_output_queue=child_output_queue,
6679
+                                child_input_queues=child_input_queues,
6680
+                                ready_wait=ready_wait,
6681
+                                intermediate_configs=intermediate_configs,
6682
+                                intermediate_results=intermediate_results,
6683
+                                processes=processes,
6684
+                                processes_pending=processes_pending,
6685
+                                block=False,
6686
+                            )
6687
+                        except queue.Empty:
6688
+                            break
6689
+                finally:
6690
+                    # The subprocesses have this
6691
+                    # `tests.isolated_vault_config` directory as their
6692
+                    # startup and working directory, so systems like
6693
+                    # coverage tracking write their data files to this
6694
+                    # directory.  We need to manually move them back to
6695
+                    # the starting working directory if they are to
6696
+                    # survive this test.
6697
+                    for coverage_file in pathlib.Path.cwd().glob(
6698
+                        '.coverage.*'
6699
+                    ):
6700
+                        shutil.move(coverage_file, orig_cwd)
6701
+        hypothesis.note(
6702
+            f'# {true_results = }, {intermediate_results = }, '
6703
+            f'identical = {true_results == intermediate_results}'
6704
+        )
6705
+        hypothesis.note(
6706
+            f'# {true_configs = }, {intermediate_configs = }, '
6707
+            f'identical = {true_configs == intermediate_configs}'
6708
+        )
6709
+        assert intermediate_results == true_results
6710
+        assert intermediate_configs == true_configs
6711
+
6712
+    @staticmethod
6713
+    def mainloop(
6714
+        *,
6715
+        timeout: int,
6716
+        child_output_queue: multiprocessing.Queue[
6717
+            FakeConfigurationMutexStateMachine.IPCMessage
6718
+        ],
6719
+        child_input_queues: list[
6720
+            multiprocessing.Queue[
6721
+                FakeConfigurationMutexStateMachine.IPCMessage
6722
+            ]
6723
+            | None
6724
+        ],
6725
+        ready_wait: set[int],
6726
+        intermediate_configs: dict[int, _types.VaultConfig],
6727
+        intermediate_results: dict[int, tuple[bool, str | None, str | None]],
6728
+        processes: list[multiprocessing.process.BaseProcess],
6729
+        processes_pending: set[multiprocessing.process.BaseProcess],
6730
+        block: bool = True,
6731
+    ) -> None:
6732
+        IPCMessage: TypeAlias = FakeConfigurationMutexStateMachine.IPCMessage
6733
+        msg = child_output_queue.get(block=block, timeout=timeout)
6734
+        # TODO(the-13th-letter): Rewrite using structural pattern
6735
+        # matching.
6736
+        # https://the13thletter.info/derivepassphrase/latest/pycompatibility/#after-eol-py3.9
6737
+        if (  # pragma: no cover
6738
+            isinstance(msg, IPCMessage)
6739
+            and msg[1] == 'exception'
6740
+            and isinstance(msg[2], Exception)
6741
+        ):
6742
+            e = msg[2]
6743
+            raise e
6744
+        if isinstance(msg, IPCMessage) and msg[1] == 'ready':
6745
+            n = msg[0]
6746
+            ready_wait.remove(n)
6747
+            if not ready_wait:
6748
+                assert child_input_queues
6749
+                assert child_input_queues[0]
6750
+                child_input_queues[0].put(
6751
+                    IPCMessage(0, 'go', None),
6752
+                    block=True,
6753
+                    timeout=timeout,
6754
+                )
6755
+        elif isinstance(msg, IPCMessage) and msg[1] == 'config':
6756
+            n = msg[0]
6757
+            config = msg[2]
6758
+            intermediate_configs[n] = cast('_types.VaultConfig', config)
6759
+        elif isinstance(msg, IPCMessage) and msg[1] == 'result':
6760
+            n = msg[0]
6761
+            result_ = msg[2]
6762
+            result_tuple: tuple[bool, str | None, str | None] = cast(
6763
+                'tuple[bool, str | None, str | None]', result_
6764
+            )
6765
+            intermediate_results[n] = result_tuple
6766
+            child_input_queues[n] = None
6767
+            p = processes[n]
6768
+            p.join(timeout=timeout)
6769
+            assert not p.is_alive()
6770
+            processes_pending.remove(p)
6771
+            assert result_tuple[0], (
6772
+                f'action #{n} exited with an error: {result_tuple!r}'
6773
+            )
6774
+            if n + 1 < len(processes):
6775
+                next_child_input_queue = child_input_queues[n + 1]
6776
+                assert next_child_input_queue
6777
+                next_child_input_queue.put(
6778
+                    IPCMessage(n + 1, 'go', None),
6779
+                    block=True,
6780
+                    timeout=timeout,
6781
+                )
6782
+        else:
6783
+            raise AssertionError()
6784
+
6785
+
6786
+TestFakedConfigurationMutex = tests.skip_if_no_multiprocessing_support(
6787
+    FakeConfigurationMutexStateMachine.TestCase
6788
+)
6789
+"""The [`unittest.TestCase`][] class that will actually be run."""
6790
+
6791
+
6005 6792
 def completion_item(
6006 6793
     item: str | click.shell_completion.CompletionItem,
6007 6794
 ) -> click.shell_completion.CompletionItem:
6008 6795