Created
May 17, 2011 21:54
-
-
Save aeldaly/977488 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # lib/tap.py | |
| """ | |
| tap protocol client. | |
| Copyright (c) 2010 Dustin Sallings <[email protected]> | |
| """ | |
| import socket | |
| import string | |
| import random | |
| import struct | |
| import mc_bin_server | |
| import mc_bin_client | |
| import memcacheConstants as mcc | |
| class TapConnection(mc_bin_server.MemcachedBinaryChannel): | |
| def __init__(self, server, port, callback, options, clientId=None): | |
| membase_options = { | |
| mcc.TAP_FLAG_BACKFILL: options.timestamp, | |
| } | |
| mc_bin_server.MemcachedBinaryChannel.__init__( | |
| self, None, None, self._createTapCall(clientId, membase_options) | |
| ) | |
| self.bucket = options.bucket | |
| self.bucket_password = options.password or "" | |
| self.callback = callback | |
| self.identifier = (server, port) | |
| self.create_socket(socket.AF_INET, socket.SOCK_STREAM) | |
| self.mc = mc_bin_client.MemcachedClient() | |
| self.mc.s = self.socket | |
| self.connect((server, port)) | |
| def _createTapCall(self, key=None, opts={}): | |
| # Client identifier | |
| if not key: | |
| # key = "".join(random.sample(string.letters, 16)) | |
| key = "chango_tap_client" | |
| dtype=0 | |
| opaque=0 | |
| cas=0 | |
| defaults = { | |
| mcc.TAP_FLAG_CHECKPOINT: '', | |
| mcc.TAP_FLAG_SUPPORT_ACK: '', | |
| mcc.TAP_FLAG_REGISTERED_CLIENT: 0x01, # "value > 0" means "closed checkpoints only" | |
| } | |
| defaults.update(opts) | |
| # name = "chango_tap_client" | |
| # self.mc._sendCmd(mcc.CMD_TAP_CONNECT, name, val, 0, ext) | |
| extraHeader, val = self._encodeOpts(defaults) | |
| msg=struct.pack(mcc.REQ_PKT_FMT, mcc.REQ_MAGIC_BYTE, | |
| mcc.CMD_TAP_CONNECT, | |
| len(key), len(extraHeader), dtype, 0, | |
| len(key) + len(extraHeader) + len(val), | |
| opaque, cas) | |
| return msg + extraHeader + key + val | |
| def _encodeOpts(self, opts): | |
| header = 0 | |
| val = [] | |
| for op in sorted(opts.keys()): | |
| header |= op | |
| if op in mcc.TAP_FLAG_TYPES: | |
| val.append(struct.pack(mcc.TAP_FLAG_TYPES[op], | |
| opts[op])) | |
| elif op == mcc.TAP_FLAG_LIST_VBUCKETS: | |
| val.append(self._encodeVBucketList(opts[op])) | |
| else: | |
| val.append(opts[op]) | |
| return struct.pack(">I", header), ''.join(str(val)) | |
| def _encodeVBucketList(self, vbl): | |
| l = list(vbl) # in case it's a generator | |
| vals = [struct.pack("!H", len(l))] | |
| for v in vbl: | |
| vals.append(struct.pack("!H", v)) | |
| return ''.join(vals) | |
| def processCommand(self, cmd, klen, vb, extralen, cas, data): | |
| extra = data[0:extralen] | |
| key = data[extralen:(extralen+klen)] | |
| val = data[(extralen+klen):] | |
| return self.callback(self.identifier, cmd, extra, key, vb, val, cas) | |
| def handle_connect(self): | |
| self.socket.setblocking(1) | |
| self.mc.sasl_auth_plain(self.bucket, self.bucket_password) | |
| self.socket.setblocking(0) | |
| def handle_close(self): | |
| self.close() | |
| # joe.py | |
| #!/usr/bin/env python | |
| import sys | |
| import asyncore | |
| import signal | |
| import re | |
| import lib.memcacheConstants as mcc | |
| import lib.mc_bin_client as client | |
| import lib.tools as tools | |
| def signal_handler(signal, frame): | |
| print 'Tap stream terminated by user' | |
| sys.exit(0) | |
| def mainLoop(options, callback): | |
| """Run the given callback for each tap message from any of the | |
| upstream servers. | |
| loops until all connections drop | |
| """ | |
| signal.signal(signal.SIGINT, signal_handler) | |
| connections = (tools.TapDescriptor(a) for a in options.server_list) | |
| tools.TapClient(connections, callback, options) | |
| asyncore.loop() | |
| if __name__ == '__main__': | |
| from optparse import OptionParser | |
| parser = OptionParser() | |
| parser.add_option("-s", "--server_list", help="source server(s). To connect to multiple servers supply a comma separated list of server:port values. Defaults to localhost:11210", dest="server_list", default="localhost:11210") | |
| parser.add_option("-b", "--bucket", help="bucket to tap. Defaults to default", dest="bucket", default="default") | |
| parser.add_option("-P", "--password", help="password for specified bucket. Default is none", dest="password", default=None) | |
| parser.add_option("-t", "--timestamp", help="timestamp in hex for when to start tapping. Defaults to 0xffffffff (future-only)", dest="timestamp", default=0xffffffff, type="long") | |
| parser.add_option("-d", "--dump", help="dump keys that match a given regexp", dest="regexp", default=".*") | |
| parser.add_option("-f", "--file", help="send output to file, doesn't write to stdout", dest="file", default=None) | |
| parser.add_option("--replicate", help="Replicate writes and deletes from one server to another, overrides -d, must supply destination server", dest="replicate", action="store_true") | |
| parser.add_option("--destination", help="destination server for replication. Format is server:port. Default is None", dest="destination", default=None) | |
| parser.add_option("-D", "--debug", help="Output all tap commands to stdout. Overrides --replicate and --dump", dest="debug", action="store_true") | |
| (options, args) = parser.parse_args() | |
| options.server_list = [s.strip() for s in options.server_list.split(",")] | |
| if options.replicate: | |
| if not options.destination: | |
| print "\n\nError!\n=======\n--replicate must be used with --destination to specify destination server. Aborting!\n\n" | |
| exit(1) | |
| host, port = options.destination.split(":") | |
| port = int(port) | |
| if not host or not port: | |
| print "\n\nError!\n======\n.Please ensure that the destination is in the form of host:port. Aborting!\n\n" | |
| exit(1) | |
| mc = client.MemcachedClient(host=host, port=port) | |
| elif options.regexp: | |
| options.regexp = re.compile(options.regexp) | |
| output = set([]) | |
| def cb(identifier, cmd, extra, key, vb, val, cas): | |
| # identifier is (server, port) | |
| # cmd is a hex value representing a command | |
| # extra - don't know | |
| # key is the key that changed | |
| # vb - don't know | |
| # val is the new value | |
| # cas - don't know | |
| global output | |
| command = mcc.COMMAND_NAMES[cmd] | |
| if options.debug: | |
| print " %s: ``%s'' (vb:%d) -> ``%s'' (%d bytes from %s)" % ( | |
| mcc.COMMAND_NAMES[cmd], key, vb, val, len(val), identifier | |
| ) | |
| elif options.replicate: | |
| print "cmd: %s - key: %s - val: %s - extra: %s - cas: %s" % (command, key, val, extra, cas) | |
| if command in ["CMD_TAP_MUTATION", "CMD_TAP_DELETE"]: | |
| if command == "CMD_TAP_DELETE": | |
| mc.delete(key) | |
| else: | |
| mc.set(key=key, val=val, exp=0.0, flags=0.0) | |
| elif options.regexp: | |
| if options.regexp.search(key): | |
| if not options.file: | |
| print key | |
| else: | |
| output.add(key) | |
| if output and len(output) % 10000 == 0: | |
| with open(options.file, 'a') as f: | |
| f.write("\n".join(output)) | |
| output = set([]) | |
| mainLoop(options, cb) | |
| Called from command line with ./joe.py --replicate --destination membase02-test:11210 --timestamp 0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment