Skip to content

Instantly share code, notes, and snippets.

@pipoket
Forked from dustin/flume.py
Last active August 29, 2015 14:14
Show Gist options
  • Select an option

  • Save pipoket/193ee5bc8a11a9eeadc7 to your computer and use it in GitHub Desktop.

Select an option

Save pipoket/193ee5bc8a11a9eeadc7 to your computer and use it in GitHub Desktop.

Revisions

  1. @dustin dustin revised this gist Jul 27, 2010. 1 changed file with 16 additions and 5 deletions.
    21 changes: 16 additions & 5 deletions flume.py
    Original file line number Diff line number Diff line change
    @@ -1,10 +1,13 @@
    #!/usr/bin/env python
    #
    # tap stream -> flume
    # tap -> flume
    #
    # requires: python thrift bindings + compiled flume thrift binding.
    #

    import sys
    import time
    import struct

    from thrift import Thrift
    from thrift.transport import TSocket
    @@ -28,23 +31,31 @@ def __init__(self, host, port):
    self.client = ThriftFlumeEventServer.Client(self.protocol)
    self.transport.open()

    self.n = 0

    def __call__(self, identifier, cmd, extra, key, val, cas):
    if cmd == memcacheConstants.CMD_TAP_MUTATION:
    assert key is not None
    assert val is not None
    el, flags, ttl, iflags, exp = struct.unpack(memcacheConstants.TAP_MUTATION_PKT_FMT,
    extra)
    pri = ThriftFlumeEventServer.Priority.INFO
    evt = ThriftFlumeEventServer.ThriftFlumeEvent(timestamp=int(time.time()),
    priority=pri,
    body=val,
    nanos=0,
    host='localhost',
    fields={'key': key})
    fields={'key': key,
    'flags': str(flags),
    'iflags': str(iflags),
    'exp': str(exp)})
    self.client.append(evt)
    sys.stdout.write(".")
    sys.stdout.flush()
    self.n += 1
    if (self.n % 1000) == 0:
    print self.n

    if __name__ == '__main__':

    dest = FlumeDest('localhost', 1234)

    tap.mainLoop(sys.argv[1:], dest, opts)
    tap.mainLoop(sys.argv[1:], dest)
  2. @dustin dustin created this gist Jul 27, 2010.
    50 changes: 50 additions & 0 deletions flume.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,50 @@
    #!/usr/bin/env python
    #
    # tap stream -> flume
    #

    import sys
    import time

    from thrift import Thrift
    from thrift.transport import TSocket
    from thrift.transport import TTransport
    from thrift.protocol import TBinaryProtocol

    sys.path.extend(['ep_man', 'thriftflume'])

    import tap
    import memcacheConstants

    from flume import ThriftFlumeEventServer

    class FlumeDest(object):

    def __init__(self, host, port):
    self.transport = TSocket.TSocket(host, port)
    self.transport = TTransport.TBufferedTransport(self.transport)
    self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)

    self.client = ThriftFlumeEventServer.Client(self.protocol)
    self.transport.open()

    def __call__(self, identifier, cmd, extra, key, val, cas):
    if cmd == memcacheConstants.CMD_TAP_MUTATION:
    assert key is not None
    assert val is not None
    pri = ThriftFlumeEventServer.Priority.INFO
    evt = ThriftFlumeEventServer.ThriftFlumeEvent(timestamp=int(time.time()),
    priority=pri,
    body=val,
    nanos=0,
    host='localhost',
    fields={'key': key})
    self.client.append(evt)
    sys.stdout.write(".")
    sys.stdout.flush()

    if __name__ == '__main__':

    dest = FlumeDest('localhost', 1234)

    tap.mainLoop(sys.argv[1:], dest, opts)