You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
2 ways of consuming messages from a given topic from any given offset.
Consume the messages from any offset by creating consumer group
get_group_offset(){
# describe a given group and sort by partition
/home/tools/confluent/bin/kafka-consumer-groups --bootstrap-server ${KAFKA_BROKER} --command-config ${CONFIG} --timeout 15000 --group $1 --describe | sort -k2,3
}
## create a test group associated to the input topic and set the offset to latest
/home/tools/confluent/bin/kafka-consumer-groups --bootstrap-server ${KAFKA_BROKER} \
--command-config ${CONFIG} \
--reset-offsets \
--to-latest \
--execute \
--group devops_siptest_group \
--topic test-input_topic
# reset offset for a single partition on that group to the offset you wanted to consume
/home/tools/confluent/bin/kafka-consumer-groups --bootstrap-server ${KAFKA_BROKER} \
--command-config ${CONFIG} \
--reset-offsets \
--to-offset 74625 \
--execute \
--group devops_test_group \
--topic test-input_topic:0
#display the group offsets
get_group_offset devops_siptest_group
# consume 500 messages from that group from partition 0
/home/tools/confluent/bin/kafka-avro-console-consumer --bootstrap-server ${KAFKA_BROKER} --consumer.config ${CONFIG} \
--property schema.registry.url=${KAFKA_SCHEMA_REGISTRY_URL} --group devops_test_group --timeout-ms 15000 --max-messages 500 --topic test-input_topic
# reset all the offset in the group to latest
/home/tools/confluent/bin/kafka-consumer-groups --bootstrap-server ${KAFKA_BROKER} \
--command-config ${CONFIG} \
--reset-offsets \
--to-latest \
--execute \
--group devops_siptest_group \
--topic test-input_topic
## reset offset for a single partition on that group to the offset you wanted to consume
/home/tools/confluent/bin/kafka-consumer-groups --bootstrap-server ${KAFKA_BROKER} \
--command-config ${CONFIG} \
--reset-offsets \
--to-offset 42775 \
--execute \
--group devops_siptest_group \
--topic test-input_topic:1
#display the group offsets
get_group_offset devops_siptest_group
# consume 500 messages from that group for partition 1
/home/tools/confluent/bin/kafka-avro-console-consumer --bootstrap-server ${KAFKA_BROKER} --consumer.config ${CONFIG} \
--property schema.registry.url=${KAFKA_SCHEMA_REGISTRY_URL} --group devops_siptest_group --timeout-ms 15000 --max-messages 500 --topic -sip-input_sip
#display the group offsets
get_group_offset devops_siptest_group
consume the messages directly from topic without having to create consumer group.