RC: W5 D2 — Merging iterators (part 1)

March 12, 2024

LSM engines implement iterators on all of their components. I implemented the red-black tree iterator yesterday. Given that LSM engines work with multiple memtables (one active and the olders ones immutable), I have multiple iterators. Thus, the next step is to merge them into a single one.

What we want to achieve is summed up in the example below:

Iterator 0 (current memtable)    : ("key1": 1, "key4": 4)
Iterator 1 (immutable memtable 1): ("key2": 2, "key9": 9)
Iterator 2 (immutable memtable 2): ("key7": 7)

==>
Merged iterator: ("key1": 1, "key2": 2, "key4": 4, "key7": 7, "key9": 9)

The principle is relatively simple: we have n iterators and each of those has its cursor pointing to its first item (since they iterate in ascending order, the “first” item is the one that has the smallest key among its collection of items). Among those n candidate items, the merged iterator should select the one that has the smallest key (so that it iterates on ascending order too). We can do this by storing all n candidates in a list, select the min, take it out of the list and replace it by the next item that the corresponding iterator will point to.

Carrying on the example above, it would give:

next_items -> [ ("key1": 1, iter_0), ("key2": 2, iter_1), ("key7": 7, iter_2) ]
The min is ("key1": 1, iter_0). 
We take it out and replace it by the next item of iter_0 ("key4": 4)

next_items -> [ ("key4": 4, iter_0), ("key2": 2, iter_1), ("key7": 7, iter_2) ]
The min is ("key2": 2, iter_1). 
We take it out and replace it by the next item of iter_1: ("key9": 9)

next_items -> [ ("key4": 4, iter_0), ("key9": 9, iter_1), ("key7": 7, iter_2) ]
The min is ("key4": 4, iter_0). 
We take it out.
Since iter_0 has no more item to iterate on, we don't add any.

next_items -> [ ("key9": 9, iter_1), ("key7": 7, iter_2) ]
The min is ("key7": 7, iter_2). 
We take it out.
Since iter_2 has no more item to iterate on, we don't add any.

next_items -> [ ("key9": 9, iter_1) ]
The min is ("key9": 9, iter_1). 
We take it out.
Since iter_1 has no more item to iterate on, we don't add any.

next_items -> []
Finished !
The merged iterator correctly yields: 
("key1": 1, "key2": 2, "key4": 4, "key7": 7, "key9": 9)

I was pairing with Laurent on this, and he rightly explained that, even though this worked, it would not be very efficient. Indeed, taking the min requires doing n-1 comparisons and thus runs in o(n) time. More efficiently, we can use a priority queue which allows to do inserts and retrievals in o(log n) time because it is usually implemented with a heap.

Here is how we implemented it in the end:

from typing import Iterator, Any
from heapq import heappush, heappop


def merge_iterators(iterators: list[Iterator[Any]]) -> Iterator[Any]:
    no_item = object()
    heap = []

    def get_next(iterator: Iterator[Any]):
        try:
            return next(iterator)
        except StopIteration:
            return no_item

    def try_push_iterator(iterator_index: int):
        next_item = get_next(iterator=iterators[iterator_index])
        if next_item is no_item:
            return
        heappush(heap, (next_item, iterator_index))

    for i in range(len(iterators)):
        try_push_iterator(iterator_index=i)

    while len(heap):
        value, i = heappop(heap)
        yield value
        try_push_iterator(iterator_index=i)

That’s it for today, more about this tomorrow!