RC: W8 D5 — Testing for serial execution

April 5, 2024

I have already implemented a few critical operations (compaction, flushing, freezing) that ultimately change the state of the engine. That is to say: after each of those operations, the list of memtables and/or SSTables that the engine refers to changes. If two such operations modify the exact same component, the engine could end up in a hybrid state with partial updates from the two operations. Therefore, it is critical that they do not run concurrently. To this aim, I have added a mutex lock wrapping each of these operations.

Here is the pseudocode showing how the mutex lock is used to handle the flush and freeze operations:

class LsmEngine:

    # ~~~~~~~~~~~~~~~~~ #
    # Freeze pseudocode #
    # ~~~~~~~~~~~~~~~~~ #
    
    def _try_freeze(self):
        # Check if the freeze operation should occur
        should_freeze = ...

        # Do nothing if freeze is not needed
        if not should_freeze:
            return

        # Otherwise, take the lock before performing the freeze
        with self.mutex_lock:
            self._freeze()
            

    def _freeze(self):
        # Actually performs the freeze operation and makes changes in the state
        ...

    
    
    # ~~~~~~~~~~~~~~~~ #
    # Flush pseudocode #
    # ~~~~~~~~~~~~~~~~ #
    
    def _launch_flush(self):
        with self.mutex_lock:
            self._flush()

        # Other things that don't require the lock
        ...
    

    def _flush(self):
        # Actually performs the flush operation and makes changes in the state
        ...

The mutex lock is acquired when entering each with self.mutex_lock statement, and is released upon leaving the corresponding scope. As it is, the behavior should work as intended: the _freeze and _flush methods should not be executed concurrently since both need to have acquired the mutex lock before being executed. However, so far, there was no test to enforce that this critical behavior will not be overridden by future changes. So today, I tackled this issue and thought about the way I could actually test that.

One way to do this is to:

  1. Start two threads executing _try_freeze() and _launch_flush() concurrently;
  2. Record the start and end times of the inner _freeze() and _flush() operations;
  3. Assert that they don’t overlap.

To add timings around the inner operations, I can replace the actual _freeze() and _flush() methods by mocks that wrap the original methods with a recording of the start and end times.

Here is how the mocks can be implemented in practice (time.time() gives the current time in seconds since the epoch):

import time

storage = LsmStorage()

times = {}

# Recall original _flush and _freeze methods
original_flush = storage._flush
original_freeze = storage._freeze


# Create mocks calling original methods wrapped with timings
def _flush_with_timing():
    times['flush_start'] = time.time()
    original_flush()
    times['flush_end'] = time.time()


def _freeze_with_timing():
    times['freeze_start'] = time.time()
    original_freeze()
    times['freeze_end'] = time.time()


# Replace original methods with timed mock methods
storage._flush = _flush_with_timing
storage._freeze = _freeze_with_timing

Using these mocks, it then gets easy to create two threads and check that the inner operations are executed serially:

import threading


def test_flush_waits_for_freeze():
    # Define mocks and time
    storage = LsmStorage()
    times = ...

    # Define threads
    freeze_thread = threading.Thread(target=storage._try_freeze())
    flush_thread = threading.Thread(target=storage._launch_flush())

    # Execute threads concurrently
    freeze_thread.start(); flush_thread.start()
    freeze_thread.join(); flush_thread.join()

    # Assert that freeze ended before flush started
    assert times['freeze_end'] < times['flush_start']

Calling .start() on the two threads one after the other allows to start the two threads concurrently. And to make sure that the main thread (the one executing the test) is blocked until its children threads terminate, it calls the .join() method of both of them (cf Python’s documentation on threading)

With this altogether, I now have a way to enforce that the critical operations on my engine are executed serially! 🎉