You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
2 ways of consuming kafka messages from a given topic at a specific offset.
Consume the messages from any offset by creating consumer group
get_group_offset(){
# describe a given group and sort by partition
/home/tools/confluent/bin/kafka-consumer-groups --bootstrap-server ${KAFKA_BROKER} --command-config ${CONFIG} --timeout 15000 --group $1 --describe | sort -k2,3
}
## create a test group associated to the input topic and set the offset to latest
/home/tools/confluent/bin/kafka-consumer-groups --bootstrap-server ${KAFKA_BROKER} \
--command-config ${CONFIG} \
--reset-offsets \
--to-latest \
--execute \
--group devops_test_group \
--topic test-input_topic
# reset offset for a single partition on that group to the offset you wanted to consume
/home/tools/confluent/bin/kafka-consumer-groups --bootstrap-server ${KAFKA_BROKER} \
--command-config ${CONFIG} \
--reset-offsets \
--to-offset 74625 \
--execute \
--group devops_test_group \
--topic test-input_topic:0
#display the group offsets
get_group_offset devops_test_group
# consume 500 messages from that group from partition 0
/home/tools/confluent/bin/kafka-avro-console-consumer --bootstrap-server ${KAFKA_BROKER} --consumer.config ${CONFIG} \
--property schema.registry.url=${KAFKA_SCHEMA_REGISTRY_URL} --group devops_test_group --timeout-ms 15000 --max-messages 500 --topic test-input_topic
# reset all the offset in the group to latest
/home/tools/confluent/bin/kafka-consumer-groups --bootstrap-server ${KAFKA_BROKER} \
--command-config ${CONFIG} \
--reset-offsets \
--to-latest \
--execute \
--group devops_test_group \
--topic test-input_topic
## reset offset for a single partition on that group to the offset you wanted to consume
/home/tools/confluent/bin/kafka-consumer-groups --bootstrap-server ${KAFKA_BROKER} \
--command-config ${CONFIG} \
--reset-offsets \
--to-offset 42775 \
--execute \
--group devops_test_group \
--topic test-input_topic:1
#display the group offsets
get_group_offset devops_test_group
# consume 500 messages from that group for partition 1
/home/tools/confluent/bin/kafka-avro-console-consumer --bootstrap-server ${KAFKA_BROKER} --consumer.config ${CONFIG} \
--property schema.registry.url=${KAFKA_SCHEMA_REGISTRY_URL} --group devops_test_group --timeout-ms 15000 --max-messages 500 --topic test-input_topic
#display the group offsets
get_group_offset devops_test_group
consume the messages directly from topic without having to create consumer group.