Skip to content

Instantly share code, notes, and snippets.

@tuxfight3r
Last active November 17, 2023 15:37
Show Gist options
  • Save tuxfight3r/70b27cd3d8c4d06ad5ad026d7c038564 to your computer and use it in GitHub Desktop.
Save tuxfight3r/70b27cd3d8c4d06ad5ad026d7c038564 to your computer and use it in GitHub Desktop.

Revisions

  1. tuxfight3r revised this gist Nov 17, 2023. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion consume_message.md
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,4 @@
    # consume kafka messages from an offset
    # Consume kafka messages from a specific offset

    There are 2 ways of consuming kafka messages from a given topic at a specific offset.

  2. tuxfight3r revised this gist Nov 17, 2023. 1 changed file with 3 additions and 1 deletion.
    4 changes: 3 additions & 1 deletion consume_message.md
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,6 @@
    # 2 ways of consuming kafka messages from a given topic at a specific offset.
    # consume kafka messages from an offset

    There are 2 ways of consuming kafka messages from a given topic at a specific offset.

    ## Consume the messages from any offset by creating consumer group
    ```bash
  3. tuxfight3r revised this gist Nov 17, 2023. 1 changed file with 9 additions and 9 deletions.
    18 changes: 9 additions & 9 deletions consume_message.md
    Original file line number Diff line number Diff line change
    @@ -14,7 +14,7 @@ get_group_offset(){
    --reset-offsets \
    --to-latest \
    --execute \
    --group devops_siptest_group \
    --group devops_test_group \
    --topic test-input_topic

    # reset offset for a single partition on that group to the offset you wanted to consume
    @@ -28,7 +28,7 @@ get_group_offset(){


    #display the group offsets
    get_group_offset devops_siptest_group
    get_group_offset devops_test_group

    # consume 500 messages from that group from partition 0
    /home/tools/confluent/bin/kafka-avro-console-consumer --bootstrap-server ${KAFKA_BROKER} --consumer.config ${CONFIG} \
    @@ -40,7 +40,7 @@ get_group_offset devops_siptest_group
    --reset-offsets \
    --to-latest \
    --execute \
    --group devops_siptest_group \
    --group devops_test_group \
    --topic test-input_topic

    ## reset offset for a single partition on that group to the offset you wanted to consume
    @@ -49,29 +49,29 @@ get_group_offset devops_siptest_group
    --reset-offsets \
    --to-offset 42775 \
    --execute \
    --group devops_siptest_group \
    --group devops_test_group \
    --topic test-input_topic:1


    #display the group offsets
    get_group_offset devops_siptest_group
    get_group_offset devops_test_group


    # consume 500 messages from that group for partition 1
    /home/tools/confluent/bin/kafka-avro-console-consumer --bootstrap-server ${KAFKA_BROKER} --consumer.config ${CONFIG} \
    --property schema.registry.url=${KAFKA_SCHEMA_REGISTRY_URL} --group devops_siptest_group --timeout-ms 15000 --max-messages 500 --topic -sip-input_sip
    --property schema.registry.url=${KAFKA_SCHEMA_REGISTRY_URL} --group devops_test_group --timeout-ms 15000 --max-messages 500 --topic test-input_topic

    #display the group offsets
    get_group_offset devops_siptest_group
    get_group_offset devops_test_group

    ```

    ## consume the messages directly from topic without having to create consumer group.

    ```bash
    /home/tools/confluent/bin/kafka-avro-console-consumer --bootstrap-server ${KAFKA_BROKER} --consumer.config ${CONFIG} \
    --property schema.registry.url=${KAFKA_SCHEMA_REGISTRY_URL} --timeout-ms 15000 --max-messages 500 --topic -sip-input_sip --partition 0 --offset 74625
    --property schema.registry.url=${KAFKA_SCHEMA_REGISTRY_URL} --timeout-ms 15000 --max-messages 500 --topic test-input_topic --partition 0 --offset 74625

    /home/tools/confluent/bin/kafka-avro-console-consumer --bootstrap-server ${KAFKA_BROKER} --consumer.config ${CONFIG} \
    --property schema.registry.url=${KAFKA_SCHEMA_REGISTRY_URL} --timeout-ms 15000 --max-messages 500 --topic -sip-input_sip --partition 1 --offset 8000
    --property schema.registry.url=${KAFKA_SCHEMA_REGISTRY_URL} --timeout-ms 15000 --max-messages 500 --topic test-input_topic --partition 1 --offset 8000
    ```
  4. tuxfight3r revised this gist Nov 17, 2023. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion consume_message.md
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,4 @@
    # 2 ways of consuming messages from a given topic from any given offset.
    # 2 ways of consuming kafka messages from a given topic at a specific offset.

    ## Consume the messages from any offset by creating consumer group
    ```bash
  5. tuxfight3r revised this gist Nov 17, 2023. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion consume_message.md
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,4 @@
    # There are 2 ways to consume messages from a given topic from any offset.
    # 2 ways of consuming messages from a given topic from any given offset.

    ## Consume the messages from any offset by creating consumer group
    ```bash
  6. tuxfight3r created this gist Nov 17, 2023.
    77 changes: 77 additions & 0 deletions consume_message.md
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,77 @@
    # There are 2 ways to consume messages from a given topic from any offset.

    ## Consume the messages from any offset by creating consumer group
    ```bash
    get_group_offset(){
    # describe a given group and sort by partition
    /home/tools/confluent/bin/kafka-consumer-groups --bootstrap-server ${KAFKA_BROKER} --command-config ${CONFIG} --timeout 15000 --group $1 --describe | sort -k2,3
    }


    ## create a test group associated to the input topic and set the offset to latest
    /home/tools/confluent/bin/kafka-consumer-groups --bootstrap-server ${KAFKA_BROKER} \
    --command-config ${CONFIG} \
    --reset-offsets \
    --to-latest \
    --execute \
    --group devops_siptest_group \
    --topic test-input_topic

    # reset offset for a single partition on that group to the offset you wanted to consume
    /home/tools/confluent/bin/kafka-consumer-groups --bootstrap-server ${KAFKA_BROKER} \
    --command-config ${CONFIG} \
    --reset-offsets \
    --to-offset 74625 \
    --execute \
    --group devops_test_group \
    --topic test-input_topic:0


    #display the group offsets
    get_group_offset devops_siptest_group

    # consume 500 messages from that group from partition 0
    /home/tools/confluent/bin/kafka-avro-console-consumer --bootstrap-server ${KAFKA_BROKER} --consumer.config ${CONFIG} \
    --property schema.registry.url=${KAFKA_SCHEMA_REGISTRY_URL} --group devops_test_group --timeout-ms 15000 --max-messages 500 --topic test-input_topic

    # reset all the offset in the group to latest
    /home/tools/confluent/bin/kafka-consumer-groups --bootstrap-server ${KAFKA_BROKER} \
    --command-config ${CONFIG} \
    --reset-offsets \
    --to-latest \
    --execute \
    --group devops_siptest_group \
    --topic test-input_topic

    ## reset offset for a single partition on that group to the offset you wanted to consume
    /home/tools/confluent/bin/kafka-consumer-groups --bootstrap-server ${KAFKA_BROKER} \
    --command-config ${CONFIG} \
    --reset-offsets \
    --to-offset 42775 \
    --execute \
    --group devops_siptest_group \
    --topic test-input_topic:1


    #display the group offsets
    get_group_offset devops_siptest_group


    # consume 500 messages from that group for partition 1
    /home/tools/confluent/bin/kafka-avro-console-consumer --bootstrap-server ${KAFKA_BROKER} --consumer.config ${CONFIG} \
    --property schema.registry.url=${KAFKA_SCHEMA_REGISTRY_URL} --group devops_siptest_group --timeout-ms 15000 --max-messages 500 --topic -sip-input_sip

    #display the group offsets
    get_group_offset devops_siptest_group

    ```

    ## consume the messages directly from topic without having to create consumer group.

    ```bash
    /home/tools/confluent/bin/kafka-avro-console-consumer --bootstrap-server ${KAFKA_BROKER} --consumer.config ${CONFIG} \
    --property schema.registry.url=${KAFKA_SCHEMA_REGISTRY_URL} --timeout-ms 15000 --max-messages 500 --topic -sip-input_sip --partition 0 --offset 74625

    /home/tools/confluent/bin/kafka-avro-console-consumer --bootstrap-server ${KAFKA_BROKER} --consumer.config ${CONFIG} \
    --property schema.registry.url=${KAFKA_SCHEMA_REGISTRY_URL} --timeout-ms 15000 --max-messages 500 --topic -sip-input_sip --partition 1 --offset 8000
    ```