#!/usr/bin/env bash ###################################### # Backup Schema Registry # # To Restore: # 0. Download if remotely stored. # 1. Extract : `tar -xjvf schemas.tar.bz2` # # 2. Inspect Logs & Errors : `cat schemas-err.txt` # 3. Inspect Schemas : `tail -n50 schemas.txt` # 4. Check how many schemas: `wc -l schemas.txt` # # 5. Load : `kafka-console-producer --broker-list $KAFKA --topic _schemas --property parse.key=true < schemas.txt` ####################################### set -eu -o pipefail if [ $# -lt 2 ]; then echo "backup-schemas " exit 1 fi # Confluent version compatibile with our brokers CP_VERSION=3.3.2 SCHEMAS_TOPIC=_schemas STDOUT=schemas.txt STDERR=schemas-err.txt TIMEOUT_MS=$2 NOW=$(date +"%Y%m%d_%H%M") echo "==> [${NOW}] Consuming ${SCHEMAS_TOPIC} with 'timeout-ms'=${TIMEOUT_MS}" # This is a really naïve way to do this... Realistically, it should have a consumer group that can # track offsets and progress continuously rather than always re-read from the beginning, which will get longer # over time. # TODO: Find a way to know how if timeout-ms is too small (compute `wc -l` for each day) docker run --rm -ti \ -v $PWD:/workdir \ confluentinc/cp-kafka:${CP_VERSION} \ bash -c "kafka-console-consumer --from-beginning --property print.key=true \ --bootstrap-server $1 --topic ${SCHEMAS_TOPIC} \ --timeout-ms ${TIMEOUT_MS} \ 1>/workdir/${STDOUT} \ 2>/workdir/${STDERR}" # For debugging cat ${STDERR} # Should say "Consumed x records. But will also output the TimeoutException" echo "==> ${STDOUT} contains $(wc -l ${STDOUT} | awk '{print $1}') messages" echo "==> Compressing schemas" # Errors file is not removed so we can download and inspect it later tar -cjvf schema-registry-${NOW}.tar.bz2 ${STDOUT} ${STDERR} && rm ${STDOUT} # ${STDERR} # TODO: Upload to S3, for example # aws s3 cp schema-registry-${NOW}.tar.bz2 ${S3_BUCKET}/backup-schema-registry/