Skip to content

Instantly share code, notes, and snippets.

@xcuzalex
Created February 17, 2022 03:58
Show Gist options
  • Save xcuzalex/500fc5634bf103b94f2bc78de42d80f5 to your computer and use it in GitHub Desktop.
Save xcuzalex/500fc5634bf103b94f2bc78de42d80f5 to your computer and use it in GitHub Desktop.
python-kudu
import kudu
from kudu.client import Partitioning
from datetime import datetime
# Connect to Kudu master server
client = kudu.connect(host='localhost', port=7051)
# Define a schema for a new table
builder = kudu.schema_builder()
builder.add_column('key').type(kudu.int64).nullable(False).primary_key()
builder.add_column('ts_val', type_=kudu.unixtime_micros, nullable=False, compression='lz4')
schema = builder.build()
# Define partitioning schema
partitioning = Partitioning().add_hash_partitions(column_names=['key'], num_buckets=3)
# Create new table
client.create_table('python-example', schema, partitioning)
# Open a table
table = client.table('python-example')
# Create a new session so that we can apply write operations
session = client.new_session()
# Insert a row
op = table.new_insert({'key': 1, 'ts_val': datetime.utcnow()})
session.apply(op)
# Upsert a row
op = table.new_upsert({'key': 2, 'ts_val': "2016-01-01T00:00:00.000000"})
session.apply(op)
# Updating a row
op = table.new_update({'key': 1, 'ts_val': ("2017-01-01", "%Y-%m-%d")})
session.apply(op)
# Delete a row
op = table.new_delete({'key': 2})
session.apply(op)
# Flush write operations, if failures occur, capture print them.
try:
session.flush()
except kudu.KuduBadStatus as e:
print(session.get_pending_errors())
# Create a scanner and add a predicate
scanner = table.scanner()
scanner.add_predicate(table['ts_val'] == datetime(2017, 1, 1))
scanner.add_predicate(table['user_id'] == 5655945844)
print(result)
# Open Scanner and read all tuples
# Note: This doesn't scale for large scans
result = scanner.open().read_all_tuples()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment