RC: W12 D4 — Implementing a producer

May 2, 2024

Yesterday, I implemented a simplistic Kafka producer that is able to send messages to a broker. Today, I aimed at adding the other main capabilities of the real Kafka producer. So I looked into the source code for the KafkaProducer to better grasp what happens when we send a message to the producer (calling the producer.send() method from a client). After setting aside the details that are unnecessary to grasp the big picture, three main things are performed:

  1. Both the key (if it exists) and the value are serialized (by applying the keySerializer and valueSerializer functions defined when configuring the Kafka producer);
  2. The partition to which the record should be sent gets selected;
  3. The record is appended to a RecordAccumulator which is basically a buffer of records for each partition. Once the buffer is full, the records are sent in batch to the broker containing the selected partition.

Hereunder is a minimal implementation of such a producer in 50 lines of Python code:

import time
from hashlib import sha256


class ProducerRecord:
    def __init__(self, key, value, timestamp=None):
        self.key = key
        self.value = value
        self.timestamp = timestamp if timestamp else time.time()


class KafkaProducer:
    def __init__(self, brokers_addresses, key_serializer, value_serializer):
        self.brokers_addresses = brokers_addresses
        self.key_serializer = key_serializer
        self.value_serializer = value_serializer

        # Assuming one partition per broker for simplicity
        self.partitions = {partition_id: broker_address
                           for partition_id, broker_address in enumerate(self.brokers_addresses)}
        self.buffers = {partition: [] for partition in self.partitions}

    def send(self, record: ProducerRecord):
        """Sends a message to a Kafka broker."""
        serialized_key, serialized_value = self._serialize(record)
        partition = self._choose_partition(serialized_key)
        message = self._create_message(record, serialized_key, serialized_value)
        self._append_to_buffer(partition, message)

    def _serialize(self, record: ProducerRecord):
        """Serializes the record's key and value."""
        serialized_key = self.key_serializer(record.key)
        serialized_value = self.value_serializer(record.value)
        return serialized_key, serialized_value

    @staticmethod
    def _create_message(record: ProducerRecord, key: bytes, value: bytes):
        """Creates a message complying with the protocol between producers and brokers."""
        return {'timestamp': record.timestamp, 'key': key, 'value': value}

    def _choose_partition(self, serialized_key: bytes):
        """Selects a partition based on the hash of the key (default method)."""
        hash_value = sha256(serialized_key).digest()
        return int.from_bytes(hash_value, byteorder='big') % len(self.partitions)

    def _append_to_buffer(self, partition_id: int, message: dict):
        """Adds message to the correct buffer and sends the batch of messages to the broker when the buffer is full."""
        self.buffers[partition_id].append(message)
        if self._is_buffer_full(partition_id):
            self._send_batch_to_broker(partition_id)

    def _is_buffer_full(self, partition_id: int):
        pass

    def _send_batch_to_broker(self, partition_id: int):
        """Sends buffered messages to the broker designated for this partition and resets the buffer."""
        pass

Let’s note a few things on this implementation.

Here, I very much simplified the handling of partitions: I assumed, for simplicity, that each broker would hold only one partition. In reality, each broker generally has multiple partitions, the partitions are replicated multiple times on several brokers (so that no data gets lost if one broker falls) and partitions can be reassigned to new brokers (to balance the load or handle a broker failure). As a consequence, producers need to update regularly the partition/broker relationship so that the data gets sent to the correct broker, even if partitions are reassigned. In practice, the current list of partitions and broker addresses are stored in a separate PartitionMetadata object ( the whole logic to update the list can be found in the source code).

Second, there are several existing strategies to select a partition. Here, I only implemented the default strategy where each key of a record defines which partition it gets sent to (hash(key) modulo nb_partitions). In the real Kafka, other strategies exist like using a round-robin approach to distribute the load evenly across partitions (no matter what their key is) or letting the user define custom partitioning strategies.

Last, Kafka producers append messages in a buffer and only send them in batch to the brokers when the latter is full or after a pre-configured period of time. Since the implementation to send batched messages to the broker is the same as in yesterday’s post, I did not report it here a second time.

I believe this is as far as I will go for my Kafka project during my time at RC, since tomorrow will be a travel day back home. In 2 days, I managed to implement a producer and have it communicate with a broker. There is still plenty to do about it and I hope to have time to carry this on in the future!