-
-
Save pipoket/193ee5bc8a11a9eeadc7 to your computer and use it in GitHub Desktop.
Revisions
-
dustin revised this gist
Jul 27, 2010 . 1 changed file with 16 additions and 5 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,10 +1,13 @@ #!/usr/bin/env python # # tap -> flume # # requires: python thrift bindings + compiled flume thrift binding. # import sys import time import struct from thrift import Thrift from thrift.transport import TSocket @@ -28,23 +31,31 @@ def __init__(self, host, port): self.client = ThriftFlumeEventServer.Client(self.protocol) self.transport.open() self.n = 0 def __call__(self, identifier, cmd, extra, key, val, cas): if cmd == memcacheConstants.CMD_TAP_MUTATION: assert key is not None assert val is not None el, flags, ttl, iflags, exp = struct.unpack(memcacheConstants.TAP_MUTATION_PKT_FMT, extra) pri = ThriftFlumeEventServer.Priority.INFO evt = ThriftFlumeEventServer.ThriftFlumeEvent(timestamp=int(time.time()), priority=pri, body=val, nanos=0, host='localhost', fields={'key': key, 'flags': str(flags), 'iflags': str(iflags), 'exp': str(exp)}) self.client.append(evt) self.n += 1 if (self.n % 1000) == 0: print self.n if __name__ == '__main__': dest = FlumeDest('localhost', 1234) tap.mainLoop(sys.argv[1:], dest) -
dustin created this gist
Jul 27, 2010 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,50 @@ #!/usr/bin/env python # # tap stream -> flume # import sys import time from thrift import Thrift from thrift.transport import TSocket from thrift.transport import TTransport from thrift.protocol import TBinaryProtocol sys.path.extend(['ep_man', 'thriftflume']) import tap import memcacheConstants from flume import ThriftFlumeEventServer class FlumeDest(object): def __init__(self, host, port): self.transport = TSocket.TSocket(host, port) self.transport = TTransport.TBufferedTransport(self.transport) self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport) self.client = ThriftFlumeEventServer.Client(self.protocol) self.transport.open() def __call__(self, identifier, cmd, extra, key, val, cas): if cmd == memcacheConstants.CMD_TAP_MUTATION: assert key is not None assert val is not None pri = ThriftFlumeEventServer.Priority.INFO evt = ThriftFlumeEventServer.ThriftFlumeEvent(timestamp=int(time.time()), priority=pri, body=val, nanos=0, host='localhost', fields={'key': key}) self.client.append(evt) sys.stdout.write(".") sys.stdout.flush() if __name__ == '__main__': dest = FlumeDest('localhost', 1234) tap.mainLoop(sys.argv[1:], dest, opts)