Skip to content

Instantly share code, notes, and snippets.

@xcuzalex
Created February 17, 2022 03:58
Show Gist options
  • Select an option

  • Save xcuzalex/500fc5634bf103b94f2bc78de42d80f5 to your computer and use it in GitHub Desktop.

Select an option

Save xcuzalex/500fc5634bf103b94f2bc78de42d80f5 to your computer and use it in GitHub Desktop.

Revisions

  1. xcuzalex renamed this gist Feb 17, 2022. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  2. xcuzalex revised this gist Feb 17, 2022. 1 changed file with 57 additions and 1 deletion.
    58 changes: 57 additions & 1 deletion kudu.py
    Original file line number Diff line number Diff line change
    @@ -1 +1,57 @@
    ‎‎​
    import kudu
    from kudu.client import Partitioning
    from datetime import datetime

    # Connect to Kudu master server
    client = kudu.connect(host='localhost', port=7051)

    # Define a schema for a new table
    builder = kudu.schema_builder()
    builder.add_column('key').type(kudu.int64).nullable(False).primary_key()
    builder.add_column('ts_val', type_=kudu.unixtime_micros, nullable=False, compression='lz4')
    schema = builder.build()

    # Define partitioning schema
    partitioning = Partitioning().add_hash_partitions(column_names=['key'], num_buckets=3)

    # Create new table
    client.create_table('python-example', schema, partitioning)

    # Open a table
    table = client.table('python-example')

    # Create a new session so that we can apply write operations
    session = client.new_session()

    # Insert a row
    op = table.new_insert({'key': 1, 'ts_val': datetime.utcnow()})
    session.apply(op)

    # Upsert a row
    op = table.new_upsert({'key': 2, 'ts_val': "2016-01-01T00:00:00.000000"})
    session.apply(op)

    # Updating a row
    op = table.new_update({'key': 1, 'ts_val': ("2017-01-01", "%Y-%m-%d")})
    session.apply(op)

    # Delete a row
    op = table.new_delete({'key': 2})
    session.apply(op)

    # Flush write operations, if failures occur, capture print them.
    try:
    session.flush()
    except kudu.KuduBadStatus as e:
    print(session.get_pending_errors())

    # Create a scanner and add a predicate
    scanner = table.scanner()
    scanner.add_predicate(table['ts_val'] == datetime(2017, 1, 1))

    scanner.add_predicate(table['user_id'] == 5655945844)
    print(result)

    # Open Scanner and read all tuples
    # Note: This doesn't scale for large scans
    result = scanner.open().read_all_tuples()
  3. xcuzalex created this gist Feb 17, 2022.
    1 change: 1 addition & 0 deletions kudu.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1 @@
    ‎‎​