RC: W12 D3 — Setting up a connexion between a broker and a producer
May 1, 2024Today I actually got started on my Kafka project. At a high-level, there are three major components:
- Producers, which send messages to brokers;
- Brokers, which contain the partitions into which messages are stored;
- Consumers, which read the messages from the brokers’ partitions.
Communication between these three components is therefore paramount and I decided to begin with implementing this. Brokers, producers and consumers are usually run on distinct machines. Since I have only my laptop, I modeled them as distinct threads and I used sockets to have them communicate with one another.
Brokers need to be continually listening for any messages that a producer could send them and deal with them when needed. This can be achieved with this implementation:
import pickle
import socket
class Broker:
Host = str
Port = str
def __init__(self, host: Host, port: Port):
self.host = host
self.port = port
def start(self):
"""
Starts the broker and makes it listen for messages indefinitely.
"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((self.host, self.port))
# Listen for incoming connections
s.listen()
print(f"Broker listening on {self.host}:{self.port}")
# Wait for incoming connections and process them
while True:
conn, addr = s.accept()
with conn:
# Read data
print(f"Connected by {addr}")
data = conn.recv(1024)
# Process data
message = pickle.loads(data)
print(f"Received message on {self.host}:{self.port}: {message}")
Here, each broker is identified by its host and port number.
At launch (when running the start
method), the broker server creates a TCP socket and binds it to its host and port.
It then waits for incoming connections from a client (a producer in this case) and processes it by printing it to the
terminal (when the time comes, I will update the processing to actually store the data on disk).
Kafka brokers usually run on ports 9092 and more (as mentioned in their documentation). I thus added the following script to launch two brokers on 9092 and 9093:
import threading
from src.broker import Broker
# Setup broker addresses (same as used in your producer)
broker1 = Broker('127.0.0.1', 9092)
broker2 = Broker('127.0.0.1', 9093)
brokers = [broker1, broker2]
# Start each broker on a separate thread
threads = []
for broker in brokers:
t = threading.Thread(target=broker.start, args=())
t.start()
threads.append(t)
for t in threads:
t.join()
At this stage, I had two broker servers running and waiting for data from clients. The next step consisted of implementing the client (the producer) to send data to the brokers. Simplified to the extreme, in order to send data to brokers, the producer needs to:
- Know the addresses of the brokers;
- Select one broker;
- Send some data to it.
Here is how I implemented this:
import pickle
import random
import socket
class KafkaProducer:
def __init__(self, brokers_addresses):
self.brokers_addresses = brokers_addresses # [(host_i, port_i), ...]
def _send_to_broker(self, broker_address, message):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect(broker_address)
sock.sendall(pickle.dumps(message))
print(f"Sent to {broker_address}: {message}")
def send(self, data):
broker_address = random.choice(self.brokers_addresses)
self._send_to_broker(broker_address, data)
To actually run a producer, I executed the following script in a new terminal window:
import time
from src.producer import KafkaProducer
brokers_addresses = [('127.0.0.1', 9092), ('127.0.0.1', 9093)] # (host, port)
producer = KafkaProducer(brokers_addresses)
count = 0
while True:
count += 1
data = f'This is a test message: {count}.'
producer.send(data)
time.sleep(1)
This script creates a producer that sends a new message every second. In this terminal, I got the following logs stating the messages sent:
Sent to 127.0.0.1:9093: This is a test message: 1.
Sent to 127.0.0.1:9093: This is a test message: 2.
Sent to 127.0.0.1:9092: This is a test message: 3.
Sent to 127.0.0.1:9093: This is a test message: 4.
Let’s note that some messages get sent to the first broker on port 9092 and others on the second broker on port 9093.
And on the server (broker) side, here are the logs I get:
Broker listening on 127.0.0.1:9092
Broker listening on 127.0.0.1:9093
Connected by ('127.0.0.1', 63169)
Received message on 127.0.0.1:9093: This is a test message: 1.
Connected by ('127.0.0.1', 63170)
Received message on 127.0.0.1:9093: This is a test message: 2.
Connected by ('127.0.0.1', 63171)
Received message on 127.0.0.1:9092: This is a test message: 3.
Connected by ('127.0.0.1', 63172)
Received message on 127.0.0.1:9093: This is a test message: 4.
Messages are indeed received by the brokers! That’s a good start. Tomorrow, I will build upon this to improve the behavior of my producers.