RC: W12 D3 — Setting up a connexion between a broker and a producer

May 1, 2024

Today I actually got started on my Kafka project. At a high-level, there are three major components:

Communication between these three components is therefore paramount and I decided to begin with implementing this. Brokers, producers and consumers are usually run on distinct machines. Since I have only my laptop, I modeled them as distinct threads and I used sockets to have them communicate with one another.

Brokers need to be continually listening for any messages that a producer could send them and deal with them when needed. This can be achieved with this implementation:

import pickle
import socket


class Broker:
    Host = str
    Port = str

    def __init__(self, host: Host, port: Port):
        self.host = host
        self.port = port

    def start(self):
        """
        Starts the broker and makes it listen for messages indefinitely.
        """

        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.bind((self.host, self.port))

            # Listen for incoming connections
            s.listen()
            print(f"Broker listening on {self.host}:{self.port}")

            # Wait for incoming connections and process them
            while True:
                conn, addr = s.accept()
                with conn:
                    # Read data
                    print(f"Connected by {addr}")
                    data = conn.recv(1024)

                    # Process data
                    message = pickle.loads(data)
                    print(f"Received message on {self.host}:{self.port}: {message}")

Here, each broker is identified by its host and port number. At launch (when running the start method), the broker server creates a TCP socket and binds it to its host and port. It then waits for incoming connections from a client (a producer in this case) and processes it by printing it to the terminal (when the time comes, I will update the processing to actually store the data on disk).

Kafka brokers usually run on ports 9092 and more (as mentioned in their documentation). I thus added the following script to launch two brokers on 9092 and 9093:

import threading

from src.broker import Broker

# Setup broker addresses (same as used in your producer)
broker1 = Broker('127.0.0.1', 9092)
broker2 = Broker('127.0.0.1', 9093)
brokers = [broker1, broker2]

# Start each broker on a separate thread
threads = []
for broker in brokers:
    t = threading.Thread(target=broker.start, args=())
    t.start()
    threads.append(t)

for t in threads:
    t.join()

At this stage, I had two broker servers running and waiting for data from clients. The next step consisted of implementing the client (the producer) to send data to the brokers. Simplified to the extreme, in order to send data to brokers, the producer needs to:

Here is how I implemented this:

import pickle
import random
import socket


class KafkaProducer:
    def __init__(self, brokers_addresses):
        self.brokers_addresses = brokers_addresses  # [(host_i, port_i), ...] 

    def _send_to_broker(self, broker_address, message):
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
            sock.connect(broker_address)
            sock.sendall(pickle.dumps(message))
            print(f"Sent to {broker_address}: {message}")

    def send(self, data):
        broker_address = random.choice(self.brokers_addresses)
        self._send_to_broker(broker_address, data)

To actually run a producer, I executed the following script in a new terminal window:

import time

from src.producer import KafkaProducer

brokers_addresses = [('127.0.0.1', 9092), ('127.0.0.1', 9093)]  # (host, port)
producer = KafkaProducer(brokers_addresses)

count = 0
while True:
    count += 1
    data = f'This is a test message: {count}.'
    producer.send(data)
    time.sleep(1)

This script creates a producer that sends a new message every second. In this terminal, I got the following logs stating the messages sent:

Sent to 127.0.0.1:9093: This is a test message: 1.
Sent to 127.0.0.1:9093: This is a test message: 2.
Sent to 127.0.0.1:9092: This is a test message: 3.
Sent to 127.0.0.1:9093: This is a test message: 4.

Let’s note that some messages get sent to the first broker on port 9092 and others on the second broker on port 9093.

And on the server (broker) side, here are the logs I get:

Broker listening on 127.0.0.1:9092
Broker listening on 127.0.0.1:9093
Connected by ('127.0.0.1', 63169)
Received message on 127.0.0.1:9093: This is a test message: 1.
Connected by ('127.0.0.1', 63170)
Received message on 127.0.0.1:9093: This is a test message: 2.
Connected by ('127.0.0.1', 63171)
Received message on 127.0.0.1:9092: This is a test message: 3.
Connected by ('127.0.0.1', 63172)
Received message on 127.0.0.1:9093: This is a test message: 4.

Messages are indeed received by the brokers! That’s a good start. Tomorrow, I will build upon this to improve the behavior of my producers.