#!/usr/bin/env python # Author: Sean Fitzgerald # Date: 2018/2/5 import socket import google.cloud.happybase as hb from google.cloud import bigtable as bt import sys import os.path import logging logging.basicConfig(level=logging.DEBUG) CHECKPOINT_FILE = "/tmp/bigtable_backfill.checkpoint" def create_socket(host, port): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((host, port)) return s def relay_batch(metrics, sock): for metric in metrics: sock.send(metric) def relay_messages(source_project, source_db, source_table_name, dest_carbon_ip): client = bt.Client(project=source_project, admin=True) instance = client.instance(source_db) connection = hb.Connection(instance=instance) table = connection.table(source_table_name) last_key = None if os.path.exists(CHECKPOINT_FILE): f = open(CHECKPOINT_FILE, 'r') last_key = f.read() f.close() while True: cursor = table.scan(row_start=last_key) if last_key else table.scan() try: key_count = 0 for key, row in cursor: s = create_socket(dest_carbon_ip, 2003) batch = [] metric_name = key.rsplit(":", 1)[0] for col in row.keys(): ts = col.split(":")[1] val = row[col] carbon_msg = "%s %s %s\n" % (metric_name, str(val), str(ts)) batch.append(carbon_msg) relay_batch(batch, s) last_key = key key_count += 1 if key_count % 100 == 0 and key_count > 0: print("Refreshing checkpoint...") f = open(CHECKPOINT_FILE, 'w') f.write(last_key) f.close() raise Exception("Checkpoint...") break except Exception as e: logging.exception("Failed to write batch")