RC: W11 D4 — Adding a transactional layer on my LSM engine
April 25, 2024With the changes made this week, memtables and SSTables are all able to store multiple versions for a given key, and any historical version can be retrieved. But how to know which of these versions should be returned when querying the corresponding key? The idea is the following: the engine should only return versions that are complete, and thus never return a version that is being written concurrently to the read query. Therefore, only versions that have been committed should be accessed when a read request is made. Also, the engine should return the most up-to-date data. Therefore, if there are multiple committed versions, the one that should be returned is the most recently written one, among all those that have been committed.
To do this, it is necessary for the engine to remember the sequence number of the last record inserted. This can be done simply by keeping in memory the last committed version after each insertion is finished. Read queries can then take this last committed sequence number into account when looking in memtables and SSTables to read only versions that are anterior to this number. Thus, if a write query is inserting a new record concurrently, the read request will not see it.
In addition, to distinguish the regular LSM engine from the transactional layer, I created a TransactionalLsmStorage
class that inherits from the regular LsmStorage
class and that overrides the put
and get
methods to handle this
new behavior.
Here is how I implemented it:
class TransactionalLsmStorage(LsmStorage):
def __init__(self,
configuration: Configuration,
directory: str,
state: LsmState,
manifest: Manifest
):
super().__init__(configuration=configuration,
directory=directory,
state=state,
manifest=manifest)
self.last_committed_sequence_number: int = -1
def put(self, key: Record.Key, value: Record.Value):
record = self.state.memtable.put(key=key, value=value)
self.last_committed_sequence_number = record.sequence_number
self._try_freeze()
def get(self, key: Record.Key) -> Optional[Record.Value]:
# Define snapshot at the beginning of the read query
snapshot = self.last_committed_sequence_number
# Search in memtables (from most recent to oldest)
memtables = [self.state.memtable, *self.state.immutable_memtables]
value = self._search(key=key, memtables=memtables, snapshot=snapshot)
if value is not None:
return value
# Search in SSTables by level (from most recent to oldest)
all_ss_tables = [self.state.sstables_level0, *self.state.sstables_levels]
for ss_tables in all_ss_tables:
value = self._search(key=key, ss_tables=ss_tables, snapshot=snapshot)
if value is not None:
return value
return None
There is only one simple change on the put
method as compared to a non-transactional engine: once the record has been
inserted, its sequence number must be recorded as the last_committed_sequence_number
, so that it can be used for
subsequent read queries.
The get
method also remains similar to the original one.
There is nonetheless one important change: only records that have been written before the selected snapshot should be
retrieved (those written after should be excluded).
To do this, the snapshot is taken at the beginning of the get
query: the sequence number of the last record written
when the get
query begins (last_committed_sequence_number
) is the maximum sequence number that can be retrieved.
So this snapshot
is passed to the get
methods of each in-memory and on-disk components, and the memtables and
SSTables are still searched from most recent to oldest.
Last, I did not report the corresponding code here, but the changes to the scan
method are similar: as long as more
recent SSTables are on upper levels, memtables and SSTables can be scanned from most recent to oldest to get the correct
results.
I now have a transactional LSM engine! There is still some work to do on it to recover the engine in its transactional state and to remove obsolete versions while keeping all the ones needed. I will tackle those in the coming days.