#!/usr/bin/env python3 """ Helper script to create logical replica from standby-server using combination of manually created logical replication slot and pg_dump. In comparison to "normal" logical replication setup this script has the following advantages: 1) Initial snapshot is loaded with pg_dump+pg_restore, which should be much faster than initial copying of entire database via native logical replication. 2) pg_dump is created on hot-standby, this will make sure that pg_dump won't interfere with normal database operation Requirements: - This script can be run on any host with installed python3, psycopg2 and postgresql-client package (to run pg_dump and pg_restore). - Replication user must be created before running this script. Example: setup_logical_replication.py \ -s logical_repl_sub \ -p logical_repl_pub \ --primary-conn-str "host=10.0.0.1 port=5432 dbname=mydb user=repuser password=Secret" \ --secondary-conn-str "host=10.0.0.2 port=5432 dbname=mydb user=repuser password=Secret" \ --target-conn-str "host=10.1.0.100 port=5432 dbname=mytargetdb user=adminuser password=1234" --dump-dir /data/dump Inspired by https://www.postgresql-archive.org/logical-replication-initiate-via-manual-pg-dump-td6085971.html#a6086220 """ import optparse import logging import sys import os import subprocess import shlex import shutil import psycopg2 from psycopg2 import sql from contextlib import contextmanager class LogicalReplicationInitializer: def __init__(self, args): # parse command line arguments parser = self.init_optparse() options, self.args = parser.parse_args(args) if options.primary_conn_str: self.primary_conn_str = options.primary_conn_str else: parser.error("--primary-conn-str is missing") if options.secondary_conn_str: self.secondary_conn_str = options.secondary_conn_str else: self.secondary_conn_str = self.primary_conn_str if options.target_conn_str: self.target_conn_str = options.target_conn_str else: parser.error("--target-conn-str is missing") if options.subscription_conn_str: self.subscription_conn_str = options.subscription_conn_str else: self.subscription_conn_str = self.primary_conn_str if options.dump_dir: self.dump_dir = options.dump_dir else: parser.error("--dump-dir is missing") self.slot = options.slot_name self.publication_name = options.publication_name logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)s %(message)s') self.log = logging.getLogger() def init_optparse(self): parser = optparse.OptionParser() parser.add_option("-s", "--slot-name", dest="slot_name", default="migration_sub", help="Replication slot name") parser.add_option("-p", "--publication-name", dest="publication_name", default="migration_pub", help="Publication name") parser.add_option("-P", "--primary-conn-str", dest="primary_conn_str", help="Primary DB connection string") parser.add_option("-S", "--secondary-conn-str", dest="secondary_conn_str", help="Secondary DB connection string") parser.add_option("-T", "--target-conn-str", dest="target_conn_str", help="Target DB connection string") parser.add_option("-C", "--subscription-conn-str", dest="subscription_conn_str", help="Primary DB connection string to be used on the target DB") parser.add_option("-d", "--dump-dir", dest="dump_dir", help="pg_dump target directory") return parser def start(self): try: self.connect() # Drop existing subscription if previous attempt failed self.drop_subscription() self.create_publication() self.create_logical_replication_slot() # Export snapshot in the separate transaction and while # it's open, dump the database with self.with_snapshot() as (lsn, snapshot): self.dump(snapshot) self.restore_dump() self.create_subscription() self.advance_subscription_origin(lsn) self.enable_subscription() except Exception as err: self.log.error("%s", err) sys.exit(1) def connect(self): self.primary = psycopg2.connect(self.primary_conn_str) self.secondary = psycopg2.connect(self.secondary_conn_str) self.target = psycopg2.connect(self.target_conn_str) def create_publication(self): with self.primary.cursor() as cur: cur.execute("SELECT COUNT(*) FROM pg_publication WHERE pubname = %s", (self.publication_name,)) (cnt,) = cur.fetchone() if cnt == 0: pub_name = sql.Identifier(self.publication_name) query = sql.SQL("CREATE PUBLICATION {} FOR ALL TABLES").format(pub_name) cur.execute(query) self.log.info("Created publication %s" % self.publication_name) else: self.log.info("Publication %s already exists" % self.publication_name) self.primary.commit() def create_logical_replication_slot(self): with self.primary.cursor() as cur: cur.execute(""" SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name = %s """, (self.slot,)) cur.execute( "SELECT pg_create_logical_replication_slot(%s, %s)", (self.slot, "pgoutput") ) (res,) = cur.fetchone() self.primary.commit() self.log.info("Created logical replication slot %s" % res) @contextmanager def with_snapshot(self): self.secondary.set_session(isolation_level='REPEATABLE READ') with self.secondary.cursor() as cur: cur.execute(""" SELECT CASE WHEN pg_is_in_recovery() THEN pg_last_wal_replay_lsn() ELSE pg_current_wal_lsn() END, pg_export_snapshot() """) yield cur.fetchone() def dump(self, snapshot): self.log.info("Dumping source DB") subprocess.run([ "pg_dump", "--no-publication", "--no-subscription", "--snapshot=" + snapshot, "--format=d", "--jobs=4", "-f", self.dump_dir, self.secondary_conn_str ], check=True) def restore_dump(self): self.log.info("Restoring the dump on the target DB") subprocess.run([ "pg_restore", "--clean", "--if-exists", "-Fd", "--jobs=4", "-d", self.target_conn_str, self.dump_dir ]) def create_subscription(self): self.log.info("Creating subscription %s" % self.slot) with self.target.cursor() as cur: pub_name = sql.Identifier(self.publication_name) sub_name = sql.Identifier(self.slot) query = sql.SQL(""" CREATE SUBSCRIPTION {} CONNECTION %s PUBLICATION {} WITH (create_slot=false, slot_name={}, enabled=false, copy_data=false) """).format(sub_name, pub_name, sub_name) cur.execute(query, (self.subscription_conn_str,)) self.target.commit() def advance_subscription_origin(self, lsn): self.log.info("Setting replication origin position to %s" % lsn) with self.target.cursor() as cur: cur.execute(""" SELECT pg_replication_origin_advance('pg_' || subid::text, %s) FROM pg_stat_subscription pss WHERE pss.subname = %s """, (lsn, self.slot)) self.target.commit() def enable_subscription(self): self.log.info("Enabling subscription %s" % self.slot) with self.target.cursor() as cur: sub_name = sql.Identifier(self.slot) query = sql.SQL("ALTER SUBSCRIPTION {} ENABLE").format(sub_name) cur.execute(query) self.target.commit() def drop_subscription(self): self.target.autocommit = True with self.target.cursor() as cur: sub_name = sql.Identifier(self.slot) query = sql.SQL("DROP SUBSCRIPTION IF EXISTS {}").format(sub_name) cur.execute(query) self.target.autocommit = False if __name__ == "__main__": script = LogicalReplicationInitializer(sys.argv[1:]) script.start()