RC: W8 D5 — Testing for serial execution
April 5, 2024I have already implemented a few critical operations (compaction, flushing, freezing) that ultimately change the state of the engine. That is to say: after each of those operations, the list of memtables and/or SSTables that the engine refers to changes. If two such operations modify the exact same component, the engine could end up in a hybrid state with partial updates from the two operations. Therefore, it is critical that they do not run concurrently. To this aim, I have added a mutex lock wrapping each of these operations.
Here is the pseudocode showing how the mutex lock is used to handle the flush and freeze operations:
class LsmEngine:
# ~~~~~~~~~~~~~~~~~ #
# Freeze pseudocode #
# ~~~~~~~~~~~~~~~~~ #
def _try_freeze(self):
# Check if the freeze operation should occur
should_freeze = ...
# Do nothing if freeze is not needed
if not should_freeze:
return
# Otherwise, take the lock before performing the freeze
with self.mutex_lock:
self._freeze()
def _freeze(self):
# Actually performs the freeze operation and makes changes in the state
...
# ~~~~~~~~~~~~~~~~ #
# Flush pseudocode #
# ~~~~~~~~~~~~~~~~ #
def _launch_flush(self):
with self.mutex_lock:
self._flush()
# Other things that don't require the lock
...
def _flush(self):
# Actually performs the flush operation and makes changes in the state
...
The mutex lock is acquired when entering each with self.mutex_lock
statement, and is released upon leaving the
corresponding scope.
As it is, the behavior should work as intended: the _freeze
and _flush
methods should not be executed concurrently
since both need to have acquired the mutex lock before being executed.
However, so far, there was no test to enforce that this critical behavior will not be overridden by future changes.
So today, I tackled this issue and thought about the way I could actually test that.
One way to do this is to:
- Start two threads executing
_try_freeze()
and_launch_flush()
concurrently; - Record the start and end times of the inner
_freeze()
and_flush()
operations; - Assert that they don’t overlap.
To add timings around the inner operations, I can replace the actual _freeze()
and _flush()
methods by mocks that
wrap the original methods with a recording of the start and end times.
Here is how the mocks can be implemented in practice (time.time()
gives the current time in seconds since the epoch):
import time
storage = LsmStorage()
times = {}
# Recall original _flush and _freeze methods
original_flush = storage._flush
original_freeze = storage._freeze
# Create mocks calling original methods wrapped with timings
def _flush_with_timing():
times['flush_start'] = time.time()
original_flush()
times['flush_end'] = time.time()
def _freeze_with_timing():
times['freeze_start'] = time.time()
original_freeze()
times['freeze_end'] = time.time()
# Replace original methods with timed mock methods
storage._flush = _flush_with_timing
storage._freeze = _freeze_with_timing
Using these mocks, it then gets easy to create two threads and check that the inner operations are executed serially:
import threading
def test_flush_waits_for_freeze():
# Define mocks and time
storage = LsmStorage()
times = ...
# Define threads
freeze_thread = threading.Thread(target=storage._try_freeze())
flush_thread = threading.Thread(target=storage._launch_flush())
# Execute threads concurrently
freeze_thread.start(); flush_thread.start()
freeze_thread.join(); flush_thread.join()
# Assert that freeze ended before flush started
assert times['freeze_end'] < times['flush_start']
Calling .start()
on the two threads one after the other allows to start the two threads concurrently.
And to make sure that the main thread (the one executing the test) is blocked until its children threads terminate,
it calls the .join()
method of both of them
(cf Python’s documentation on threading)
With this altogether, I now have a way to enforce that the critical operations on my engine are executed serially! 🎉