Skip to content

Instantly share code, notes, and snippets.

@aeldaly
Created May 17, 2011 21:54
Show Gist options
  • Save aeldaly/977488 to your computer and use it in GitHub Desktop.
Save aeldaly/977488 to your computer and use it in GitHub Desktop.
# lib/tap.py
"""
tap protocol client.
Copyright (c) 2010 Dustin Sallings <[email protected]>
"""
import socket
import string
import random
import struct
import mc_bin_server
import mc_bin_client
import memcacheConstants as mcc
class TapConnection(mc_bin_server.MemcachedBinaryChannel):
def __init__(self, server, port, callback, options, clientId=None):
membase_options = {
mcc.TAP_FLAG_BACKFILL: options.timestamp,
}
mc_bin_server.MemcachedBinaryChannel.__init__(
self, None, None, self._createTapCall(clientId, membase_options)
)
self.bucket = options.bucket
self.bucket_password = options.password or ""
self.callback = callback
self.identifier = (server, port)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.mc = mc_bin_client.MemcachedClient()
self.mc.s = self.socket
self.connect((server, port))
def _createTapCall(self, key=None, opts={}):
# Client identifier
if not key:
# key = "".join(random.sample(string.letters, 16))
key = "chango_tap_client"
dtype=0
opaque=0
cas=0
defaults = {
mcc.TAP_FLAG_CHECKPOINT: '',
mcc.TAP_FLAG_SUPPORT_ACK: '',
mcc.TAP_FLAG_REGISTERED_CLIENT: 0x01, # "value > 0" means "closed checkpoints only"
}
defaults.update(opts)
# name = "chango_tap_client"
# self.mc._sendCmd(mcc.CMD_TAP_CONNECT, name, val, 0, ext)
extraHeader, val = self._encodeOpts(defaults)
msg=struct.pack(mcc.REQ_PKT_FMT, mcc.REQ_MAGIC_BYTE,
mcc.CMD_TAP_CONNECT,
len(key), len(extraHeader), dtype, 0,
len(key) + len(extraHeader) + len(val),
opaque, cas)
return msg + extraHeader + key + val
def _encodeOpts(self, opts):
header = 0
val = []
for op in sorted(opts.keys()):
header |= op
if op in mcc.TAP_FLAG_TYPES:
val.append(struct.pack(mcc.TAP_FLAG_TYPES[op],
opts[op]))
elif op == mcc.TAP_FLAG_LIST_VBUCKETS:
val.append(self._encodeVBucketList(opts[op]))
else:
val.append(opts[op])
return struct.pack(">I", header), ''.join(str(val))
def _encodeVBucketList(self, vbl):
l = list(vbl) # in case it's a generator
vals = [struct.pack("!H", len(l))]
for v in vbl:
vals.append(struct.pack("!H", v))
return ''.join(vals)
def processCommand(self, cmd, klen, vb, extralen, cas, data):
extra = data[0:extralen]
key = data[extralen:(extralen+klen)]
val = data[(extralen+klen):]
return self.callback(self.identifier, cmd, extra, key, vb, val, cas)
def handle_connect(self):
self.socket.setblocking(1)
self.mc.sasl_auth_plain(self.bucket, self.bucket_password)
self.socket.setblocking(0)
def handle_close(self):
self.close()
# joe.py
#!/usr/bin/env python
import sys
import asyncore
import signal
import re
import lib.memcacheConstants as mcc
import lib.mc_bin_client as client
import lib.tools as tools
def signal_handler(signal, frame):
print 'Tap stream terminated by user'
sys.exit(0)
def mainLoop(options, callback):
"""Run the given callback for each tap message from any of the
upstream servers.
loops until all connections drop
"""
signal.signal(signal.SIGINT, signal_handler)
connections = (tools.TapDescriptor(a) for a in options.server_list)
tools.TapClient(connections, callback, options)
asyncore.loop()
if __name__ == '__main__':
from optparse import OptionParser
parser = OptionParser()
parser.add_option("-s", "--server_list", help="source server(s). To connect to multiple servers supply a comma separated list of server:port values. Defaults to localhost:11210", dest="server_list", default="localhost:11210")
parser.add_option("-b", "--bucket", help="bucket to tap. Defaults to default", dest="bucket", default="default")
parser.add_option("-P", "--password", help="password for specified bucket. Default is none", dest="password", default=None)
parser.add_option("-t", "--timestamp", help="timestamp in hex for when to start tapping. Defaults to 0xffffffff (future-only)", dest="timestamp", default=0xffffffff, type="long")
parser.add_option("-d", "--dump", help="dump keys that match a given regexp", dest="regexp", default=".*")
parser.add_option("-f", "--file", help="send output to file, doesn't write to stdout", dest="file", default=None)
parser.add_option("--replicate", help="Replicate writes and deletes from one server to another, overrides -d, must supply destination server", dest="replicate", action="store_true")
parser.add_option("--destination", help="destination server for replication. Format is server:port. Default is None", dest="destination", default=None)
parser.add_option("-D", "--debug", help="Output all tap commands to stdout. Overrides --replicate and --dump", dest="debug", action="store_true")
(options, args) = parser.parse_args()
options.server_list = [s.strip() for s in options.server_list.split(",")]
if options.replicate:
if not options.destination:
print "\n\nError!\n=======\n--replicate must be used with --destination to specify destination server. Aborting!\n\n"
exit(1)
host, port = options.destination.split(":")
port = int(port)
if not host or not port:
print "\n\nError!\n======\n.Please ensure that the destination is in the form of host:port. Aborting!\n\n"
exit(1)
mc = client.MemcachedClient(host=host, port=port)
elif options.regexp:
options.regexp = re.compile(options.regexp)
output = set([])
def cb(identifier, cmd, extra, key, vb, val, cas):
# identifier is (server, port)
# cmd is a hex value representing a command
# extra - don't know
# key is the key that changed
# vb - don't know
# val is the new value
# cas - don't know
global output
command = mcc.COMMAND_NAMES[cmd]
if options.debug:
print " %s: ``%s'' (vb:%d) -> ``%s'' (%d bytes from %s)" % (
mcc.COMMAND_NAMES[cmd], key, vb, val, len(val), identifier
)
elif options.replicate:
print "cmd: %s - key: %s - val: %s - extra: %s - cas: %s" % (command, key, val, extra, cas)
if command in ["CMD_TAP_MUTATION", "CMD_TAP_DELETE"]:
if command == "CMD_TAP_DELETE":
mc.delete(key)
else:
mc.set(key=key, val=val, exp=0.0, flags=0.0)
elif options.regexp:
if options.regexp.search(key):
if not options.file:
print key
else:
output.add(key)
if output and len(output) % 10000 == 0:
with open(options.file, 'a') as f:
f.write("\n".join(output))
output = set([])
mainLoop(options, cb)
Called from command line with ./joe.py --replicate --destination membase02-test:11210 --timestamp 0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment