RC: W12 D4 — Implementing a producer
May 2, 2024Yesterday, I implemented a simplistic Kafka producer that is able to send messages to a broker.
Today, I aimed at adding the other main capabilities of the real Kafka producer.
So I looked into
the source code for the KafkaProducer
to better grasp what happens when we send a message to the producer (calling the producer.send()
method from a
client).
After setting aside the details that are unnecessary to grasp the big picture, three main things are performed:
- Both the key (if it exists) and the value are serialized (by applying the
keySerializer
andvalueSerializer
functions defined when configuring the Kafka producer); - The partition to which the record should be sent gets selected;
- The record is appended to a
RecordAccumulator
which is basically a buffer of records for each partition. Once the buffer is full, the records are sent in batch to the broker containing the selected partition.
Hereunder is a minimal implementation of such a producer in 50 lines of Python code:
import time
from hashlib import sha256
class ProducerRecord:
def __init__(self, key, value, timestamp=None):
self.key = key
self.value = value
self.timestamp = timestamp if timestamp else time.time()
class KafkaProducer:
def __init__(self, brokers_addresses, key_serializer, value_serializer):
self.brokers_addresses = brokers_addresses
self.key_serializer = key_serializer
self.value_serializer = value_serializer
# Assuming one partition per broker for simplicity
self.partitions = {partition_id: broker_address
for partition_id, broker_address in enumerate(self.brokers_addresses)}
self.buffers = {partition: [] for partition in self.partitions}
def send(self, record: ProducerRecord):
"""Sends a message to a Kafka broker."""
serialized_key, serialized_value = self._serialize(record)
partition = self._choose_partition(serialized_key)
message = self._create_message(record, serialized_key, serialized_value)
self._append_to_buffer(partition, message)
def _serialize(self, record: ProducerRecord):
"""Serializes the record's key and value."""
serialized_key = self.key_serializer(record.key)
serialized_value = self.value_serializer(record.value)
return serialized_key, serialized_value
@staticmethod
def _create_message(record: ProducerRecord, key: bytes, value: bytes):
"""Creates a message complying with the protocol between producers and brokers."""
return {'timestamp': record.timestamp, 'key': key, 'value': value}
def _choose_partition(self, serialized_key: bytes):
"""Selects a partition based on the hash of the key (default method)."""
hash_value = sha256(serialized_key).digest()
return int.from_bytes(hash_value, byteorder='big') % len(self.partitions)
def _append_to_buffer(self, partition_id: int, message: dict):
"""Adds message to the correct buffer and sends the batch of messages to the broker when the buffer is full."""
self.buffers[partition_id].append(message)
if self._is_buffer_full(partition_id):
self._send_batch_to_broker(partition_id)
def _is_buffer_full(self, partition_id: int):
pass
def _send_batch_to_broker(self, partition_id: int):
"""Sends buffered messages to the broker designated for this partition and resets the buffer."""
pass
Let’s note a few things on this implementation.
Here, I very much simplified the handling of partitions: I assumed, for simplicity, that each broker would hold only one
partition.
In reality, each broker generally has multiple partitions, the partitions are replicated multiple times on several
brokers (so that no data gets lost if one broker falls) and partitions can be reassigned to new brokers (to balance the
load or handle a broker failure).
As a consequence, producers need to update regularly the partition/broker relationship so that the data gets sent to the
correct broker, even if partitions are reassigned.
In practice, the current list of partitions and broker addresses are stored in a separate PartitionMetadata
object (
the whole logic to update the list can be found
in the source code).
Second, there are several existing strategies to select a partition.
Here, I only implemented the default strategy where each key of a record defines which partition it gets sent
to (hash(key) modulo nb_partitions
).
In the real Kafka, other strategies exist like using
a round-robin approach to distribute the load evenly across
partitions (no matter what their key is) or letting the user define custom partitioning strategies.
Last, Kafka producers append messages in a buffer and only send them in batch to the brokers when the latter is full or after a pre-configured period of time. Since the implementation to send batched messages to the broker is the same as in yesterday’s post, I did not report it here a second time.
I believe this is as far as I will go for my Kafka project during my time at RC, since tomorrow will be a travel day back home. In 2 days, I managed to implement a producer and have it communicate with a broker. There is still plenty to do about it and I hope to have time to carry this on in the future!