Skip to content

Instantly share code, notes, and snippets.

@wang502
Last active January 14, 2020 05:00
Show Gist options
  • Select an option

  • Save wang502/1887df780d773aafdeca0939cf49e757 to your computer and use it in GitHub Desktop.

Select an option

Save wang502/1887df780d773aafdeca0939cf49e757 to your computer and use it in GitHub Desktop.
Transform script for memsql pipeline quickstart handling name-value mapping in Kafka payload
#!/usr/bin/python
import json
import os
import struct
import sys
# dependencies are stored in a folder called python_deps
# relative to this script. This is setup by the Dockerfile.
SCRIPT_DIR = os.path.join(os.path.dirname(__file__))
sys.path.append(os.path.join(SCRIPT_DIR, "python_deps"))
binary_stdin = sys.stdin if sys.version_info < (3, 0) else sys.stdin.buffer
binary_stderr = sys.stderr if sys.version_info < (3, 0) else sys.stderr.buffer
binary_stdout = sys.stdout if sys.version_info < (3, 0) else sys.stdout.buffer
# This is a bit of boilerplate that handles the way that incoming records are
# encoded. Data is streamed to stdin, and each record is prefixed with 8 bytes
# indicating how long the record is. This Python generator reads individual
# records and yields them one by one.
def transform_records():
while True:
byte_len = binary_stdin.read(8)
if len(byte_len) == 8:
byte_len = struct.unpack("L", byte_len)[0]
result = binary_stdin.read(byte_len)
yield result
else:
assert len(byte_len) == 0, byte_len
return
# Iterate over the records that we receive from Kafka.
for bytes in transform_records():
# Parse the tab-separated record.
payload_str = bytes.decode("utf-8")
out_str = payload_str
out = b"%s\n" % out_str
binary_stdout.write(out)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment