Last active
January 14, 2020 05:00
-
-
Save wang502/1887df780d773aafdeca0939cf49e757 to your computer and use it in GitHub Desktop.
Transform script for memsql pipeline quickstart handling name-value mapping in Kafka payload
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/python | |
| import json | |
| import os | |
| import struct | |
| import sys | |
| # dependencies are stored in a folder called python_deps | |
| # relative to this script. This is setup by the Dockerfile. | |
| SCRIPT_DIR = os.path.join(os.path.dirname(__file__)) | |
| sys.path.append(os.path.join(SCRIPT_DIR, "python_deps")) | |
| binary_stdin = sys.stdin if sys.version_info < (3, 0) else sys.stdin.buffer | |
| binary_stderr = sys.stderr if sys.version_info < (3, 0) else sys.stderr.buffer | |
| binary_stdout = sys.stdout if sys.version_info < (3, 0) else sys.stdout.buffer | |
| # This is a bit of boilerplate that handles the way that incoming records are | |
| # encoded. Data is streamed to stdin, and each record is prefixed with 8 bytes | |
| # indicating how long the record is. This Python generator reads individual | |
| # records and yields them one by one. | |
| def transform_records(): | |
| while True: | |
| byte_len = binary_stdin.read(8) | |
| if len(byte_len) == 8: | |
| byte_len = struct.unpack("L", byte_len)[0] | |
| result = binary_stdin.read(byte_len) | |
| yield result | |
| else: | |
| assert len(byte_len) == 0, byte_len | |
| return | |
| # Iterate over the records that we receive from Kafka. | |
| for bytes in transform_records(): | |
| # Parse the tab-separated record. | |
| payload_str = bytes.decode("utf-8") | |
| out_str = payload_str | |
| out = b"%s\n" % out_str | |
| binary_stdout.write(out) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment