Skip to content

Instantly share code, notes, and snippets.

@piyushgarg-dev
Last active November 6, 2025 18:56
Show Gist options
  • Save piyushgarg-dev/32cadf6420c452b66a9a6d977ade0b01 to your computer and use it in GitHub Desktop.
Save piyushgarg-dev/32cadf6420c452b66a9a6d977ade0b01 to your computer and use it in GitHub Desktop.
Kafka Crash Course

Kafka

Video Link: Apache Kafka Crash Course | What is Kafka?

Prerequisite

Commands

  • Start Zookeper Container and expose PORT 2181.
docker run -p 2181:2181 zookeeper
  • Start Kafka Container, expose PORT 9092 and setup ENV variables.
docker run -p 9092:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=<PRIVATE_IP>:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<PRIVATE_IP>:9092 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka

Code

client.js

const { Kafka } = require("kafkajs");

exports.kafka = new Kafka({
  clientId: "my-app",
  brokers: ["<PRIVATE_IP>:9092"],
});

admin.js

const { kafka } = require("./client");

async function init() {
  const admin = kafka.admin();
  console.log("Admin connecting...");
  admin.connect();
  console.log("Adming Connection Success...");

  console.log("Creating Topic [rider-updates]");
  await admin.createTopics({
    topics: [
      {
        topic: "rider-updates",
        numPartitions: 2,
      },
    ],
  });
  console.log("Topic Created Success [rider-updates]");

  console.log("Disconnecting Admin..");
  await admin.disconnect();
}

init();

producer.js

const { kafka } = require("./client");
const readline = require("readline");

const rl = readline.createInterface({
  input: process.stdin,
  output: process.stdout,
});

async function init() {
  const producer = kafka.producer();

  console.log("Connecting Producer");
  await producer.connect();
  console.log("Producer Connected Successfully");

  rl.setPrompt("> ");
  rl.prompt();

  rl.on("line", async function (line) {
    const [riderName, location] = line.split(" ");
    await producer.send({
      topic: "rider-updates",
      messages: [
        {
          partition: location.toLowerCase() === "north" ? 0 : 1,
          key: "location-update",
          value: JSON.stringify({ name: riderName, location }),
        },
      ],
    });
  }).on("close", async () => {
    await producer.disconnect();
  });
}

init();

consumer.js

const { kafka } = require("./client");
const group = process.argv[2];

async function init() {
  const consumer = kafka.consumer({ groupId: group });
  await consumer.connect();

  await consumer.subscribe({ topics: ["rider-updates"], fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ topic, partition, message, heartbeat, pause }) => {
      console.log(
        `${group}: [${topic}]: PART:${partition}:`,
        message.value.toString()
      );
    },
  });
}

init();

Running Locally

  • Run Multiple Consumers
node consumer.js <GROUP_NAME>
  • Create Producer
node producer.js
> tony south
> tony north
@ANKIT3412
Copy link

[cause]: KafkaJSConnectionError: Connection timeout
at Timeout.onTimeout [as _onTimeout] (C:\Users\Dell\Desktop\kafka-app\node_modules\kafkajs\src\network\connection.js:223:23)
at listOnTimeout (node:internal/timers:594:17)
at process.processTimers (node:internal/timers:529:7) {
retriable: true,
helpUrl: undefined,
broker: '192.168.0.204:9092',
code: undefined,
[cause]: undefined

while running any js file this same error is showing. kindly help

@0602aman
Copy link

0602aman commented Aug 7, 2025

[cause]: KafkaJSConnectionError: Connection timeout at Timeout.onTimeout [as _onTimeout] (C:\Users\Dell\Desktop\kafka-app\node_modules\kafkajs\src\network\connection.js:223:23) at listOnTimeout (node:internal/timers:594:17) at process.processTimers (node:internal/timers:529:7) { retriable: true, helpUrl: undefined, broker: '192.168.0.204:9092', code: undefined, [cause]: undefined

while running any js file this same error is showing. kindly help

@ANKIT3412 Seems like you don't have the correct image for running the kafka in docker , i also faced the same issue and with the new image from bitnami it was resolved

@SaswatBarai
Copy link

If anyone need docker compose file

version: "3"

services:
    zookeeper:
        image: zookeeper
        container_name: zookeeper
        ports:
            - "2181:2181"

    kafka:
        image: confluentinc/cp-kafka
        depends_on:
            - zookeeper
        ports:
            - "9092:9092"
        expose:
            - "29092"
        environment:
            KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
            KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
            KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
            KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
            KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
            KAFKA_MIN_INSYNC_REPLICAS: "1"

    kafka-ui:
        container_name: kafka-ui
        image: provectuslabs/kafka-ui
        ports:
            - 8080:8080
        environment:
            DYNAMIC_CONFIG_ENABLED: true

Also if you are getting error that port received NaN then type URL like this localhost:9092 instead of http://localhost:9092

Edit: add kafka UI

thanks buddy

@dheeraj-rawat-vas
Copy link

Here is the updated docker-compose.yaml file which works for me

version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    container_name: zookeeper
    restart: unless-stopped
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    container_name: kafka
    restart: unless-stopped
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_MIN_INSYNC_REPLICAS: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment