Skip to content

Instantly share code, notes, and snippets.

@developerworks
Forked from lucperkins/migration.py
Created April 27, 2016 08:42
Show Gist options
  • Save developerworks/75d096a42f218628dc788dc3aa5265f8 to your computer and use it in GitHub Desktop.
Save developerworks/75d096a42f218628dc788dc3aa5265f8 to your computer and use it in GitHub Desktop.

Revisions

  1. @lucperkins lucperkins revised this gist Jul 9, 2014. 1 changed file with 19 additions and 48 deletions.
    67 changes: 19 additions & 48 deletions migration.py
    Original file line number Diff line number Diff line change
    @@ -1,6 +1,7 @@
    import psycopg2
    from riak import RiakClient, RiakObject
    from riak.datatypes import Set
    import datetime

    # Riak connection and set
    client = RiakClient(pb_port=8087)
    @@ -16,11 +17,9 @@ def get_table_columns(cursor, table):
    # Convert each row to a dict, excluding the primary key (which will be the Riak object's key)
    def convert_row_to_dict(row, columns_list):
    obj = {}
    for n in range(1, len(columns_list[1:])):
    if row[n] == 't':
    obj[columns_list[n]] = True
    elif row[n] == 'f':
    obj[columns_list[n]] = True
    for n in range(1, len(columns_list)):
    if type(row[n]) is datetime.date:
    obj[columns_list[n]] = row[n].strftime('%m-%d-%Y')
    else:
    obj[columns_list[n]] = row[n]
    return obj
    @@ -34,13 +33,13 @@ def store_table_in_riak(database, table):
    rows = cursor.fetchall()
    columns_list = list(get_table_columns(cursor, table))
    for row in rows:
    key = str(row[0])
    user_dict = convert_row_to_dict(row, columns_list)
    obj = RiakObject(client, bucket, key)
    obj.data = user_dict
    obj.content_type = 'application/json'
    obj.store()
    key_set.add(key)
    key = str(row[0])
    user_dict = convert_row_to_dict(row, columns_list)
    obj = RiakObject(client, bucket, key)
    obj.data = user_dict
    obj.content_type = 'application/json'
    obj.store()
    key_set.add(key)
    key_set.store()
    cursor.close()

    @@ -52,13 +51,13 @@ def store_and_drop_table(database, table):
    rows = cursor.fetchall()
    columns_list = list(get_table_columns(table))
    for row in rows:
    key = str(row[0])
    user_dict = convert_row_to_dict(row, columns_list)
    obj = RiakObject(client, bucket, key)
    obj.data = user_dict
    obj.content_type = 'application/json'
    obj.store()
    key_set.add(key)
    key = str(row[0])
    user_dict = convert_row_to_dict(row, columns_list)
    obj = RiakObject(client, bucket, key)
    obj.data = user_dict
    obj.content_type = 'application/json'
    obj.store()
    key_set.add(key)
    key_set.store()
    cursor.execute("DROP TABLE {};".format(table))
    connection.commit()
    @@ -98,32 +97,4 @@ def migrate_multiple_tables(database, tables):
    # Run the main function and close our Postgres cursor
    # store_table_in_riak('users')

    store_table_in_riak('luc', 'users')

    # Let's try it out:

    # print list(select_star_from_table('users'))

    # [{u'surname': u'Perkins', u'name': u'Luc'}, {u'surname': u'Redmond', u'name': u'Eric'}, {u'surname': u
    # 'Cribbs', u'name': u'Sean'}, {u'surname': u'Bakken', u'name': u'Luke'}]

    try:
    print select_by_id('users', '4')
    except Exception as e:
    print e

    # {u'surname': u'Bakken', u'name': u'Luke'}

    try:
    print get_row_column_value('users', '999', 'surname')
    except Exception as e:
    print e

    # Bakken

    try:
    print get_row_column_value('users', '4', 'username')
    except Exception as e:
    print e

    # Field does not exist for this object
    store_table_in_riak('luc', 'posts')
  2. @lucperkins lucperkins revised this gist Jul 5, 2014. 1 changed file with 7 additions and 7 deletions.
    14 changes: 7 additions & 7 deletions migration.py
    Original file line number Diff line number Diff line change
    @@ -52,13 +52,13 @@ def store_and_drop_table(database, table):
    rows = cursor.fetchall()
    columns_list = list(get_table_columns(table))
    for row in rows:
    key = str(row[0])
    user_dict = convert_row_to_dict(row, columns_list)
    obj = RiakObject(client, bucket, key)
    obj.data = user_dict
    obj.content_type = 'application/json'
    obj.store()
    key_set.add(key)
    key = str(row[0])
    user_dict = convert_row_to_dict(row, columns_list)
    obj = RiakObject(client, bucket, key)
    obj.data = user_dict
    obj.content_type = 'application/json'
    obj.store()
    key_set.add(key)
    key_set.store()
    cursor.execute("DROP TABLE {};".format(table))
    connection.commit()
  3. @lucperkins lucperkins revised this gist Jul 5, 2014. 1 changed file with 7 additions and 7 deletions.
    14 changes: 7 additions & 7 deletions migration.py
    Original file line number Diff line number Diff line change
    @@ -34,13 +34,13 @@ def store_table_in_riak(database, table):
    rows = cursor.fetchall()
    columns_list = list(get_table_columns(cursor, table))
    for row in rows:
    key = str(row[0])
    user_dict = convert_row_to_dict(row, columns_list)
    obj = RiakObject(client, bucket, key)
    obj.data = user_dict
    obj.content_type = 'application/json'
    obj.store()
    key_set.add(key)
    key = str(row[0])
    user_dict = convert_row_to_dict(row, columns_list)
    obj = RiakObject(client, bucket, key)
    obj.data = user_dict
    obj.content_type = 'application/json'
    obj.store()
    key_set.add(key)
    key_set.store()
    cursor.close()

  4. @lucperkins lucperkins revised this gist Jul 5, 2014. 1 changed file with 2 additions and 2 deletions.
    4 changes: 2 additions & 2 deletions migration.py
    Original file line number Diff line number Diff line change
    @@ -20,9 +20,9 @@ def convert_row_to_dict(row, columns_list):
    if row[n] == 't':
    obj[columns_list[n]] = True
    elif row[n] == 'f':
    obj[columns_list[n]] = True
    obj[columns_list[n]] = True
    else:
    obj[columns_list[n]] = row[n]
    obj[columns_list[n]] = row[n]
    return obj

    # Store Postgres table in Riak by row
  5. @lucperkins lucperkins revised this gist Jul 5, 2014. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion migration.py
    Original file line number Diff line number Diff line change
    @@ -18,7 +18,7 @@ def convert_row_to_dict(row, columns_list):
    obj = {}
    for n in range(1, len(columns_list[1:])):
    if row[n] == 't':
    obj[columns_list[n]] = True
    obj[columns_list[n]] = True
    elif row[n] == 'f':
    obj[columns_list[n]] = True
    else:
  6. @lucperkins lucperkins revised this gist Jul 5, 2014. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion migration.py
    Original file line number Diff line number Diff line change
    @@ -18,7 +18,7 @@ def convert_row_to_dict(row, columns_list):
    obj = {}
    for n in range(1, len(columns_list[1:])):
    if row[n] == 't':
    obj[columns_list[n]] = True
    obj[columns_list[n]] = True
    elif row[n] == 'f':
    obj[columns_list[n]] = True
    else:
  7. @lucperkins lucperkins revised this gist Jul 5, 2014. 1 changed file with 2 additions and 1 deletion.
    3 changes: 2 additions & 1 deletion table.sql
    Original file line number Diff line number Diff line change
    @@ -7,4 +7,5 @@ CREATE TABLE posts (
    );

    INSERT INTO posts (author, title, body, created) VALUES ('Luc Perkins', 'about stuff', 'stuff and stuff', now());
    etc.

    etc.
  8. @lucperkins lucperkins revised this gist Jul 5, 2014. 1 changed file with 8 additions and 9 deletions.
    17 changes: 8 additions & 9 deletions table.sql
    Original file line number Diff line number Diff line change
    @@ -1,11 +1,10 @@
    CREATE TABLE users (
    id INT PRIMARY KEY NOT NULL,
    name TEXT NOT NULL,
    surname TEXT NOT NULL,
    password VARCHAR(12) NOT NULL
    CREATE TABLE posts (
    id SERIAL PRIMARY KEY,
    author VARCHAR(30) NOT NULL,
    title VARCHAR(50) NOT NULL,
    body TEXT NOT NULL,
    created DATE NOT NULL
    );

    INSERT INTO users VALUES (1, 'Luc', 'Perkins', 'blazersrule');
    INSERT INTO users VALUES (2, 'Sean', 'Cribbs', 'holding');
    INSERT INTO users VALUES (3, 'Eric', 'Redmond', 'glasshole');
    INSERT INTO users VALUES (4, 'Luke', 'Bakken', 'vidalbasson');
    INSERT INTO posts (author, title, body, created) VALUES ('Luc Perkins', 'about stuff', 'stuff and stuff', now());
    etc.
  9. @lucperkins lucperkins revised this gist Jul 2, 2014. 1 changed file with 53 additions and 26 deletions.
    79 changes: 53 additions & 26 deletions migration.py
    Original file line number Diff line number Diff line change
    @@ -6,12 +6,8 @@
    client = RiakClient(pb_port=8087)
    SETS_BUCKET = client.bucket_type('sets').bucket('key_sets')

    # Postgres connection and cursor
    connection = psycopg2.connect('dbname=luc')
    cursor = connection.cursor()

    # Get columns for table
    def get_table_columns(table):
    def get_table_columns(cursor, table):
    cursor.execute("SELECT column_name FROM information_schema.columns WHERE table_name='{}'".format(table))
    columns = cursor.fetchall()
    for col in columns:
    @@ -21,26 +17,51 @@ def get_table_columns(table):
    def convert_row_to_dict(row, columns_list):
    obj = {}
    for n in range(1, len(columns_list[1:])):
    obj[columns_list[n]] = row[n]
    if row[n] == 't':
    obj[columns_list[n]] = True
    elif row[n] == 'f':
    obj[columns_list[n]] = True
    else:
    obj[columns_list[n]] = row[n]
    return obj

    # Store Postgres table in Riak by row
    def store_table_in_riak(table):
    bucket = client.bucket(table)
    key_set = Set(SETS_BUCKET, table)
    cursor.execute("SELECT * FROM {}".format(table))
    rows = cursor.fetchall()
    columns_list = list(get_table_columns(table))
    print "Columns list: {}".format(columns_list)
    for row in rows:
    key = str(row[0])
    user_dict = convert_row_to_dict(row, columns_list)
    obj = RiakObject(client, bucket, key)
    obj.data = user_dict
    obj.content_type = 'application/json'
    obj.store()
    key_set.add(key)
    key_set.store()
    def store_table_in_riak(database, table):
    cursor = psycopg2.connect("dbname={}".format(database)).cursor()
    bucket = client.bucket(table)
    key_set = Set(SETS_BUCKET, table)
    cursor.execute("SELECT * FROM {}".format(table))
    rows = cursor.fetchall()
    columns_list = list(get_table_columns(cursor, table))
    for row in rows:
    key = str(row[0])
    user_dict = convert_row_to_dict(row, columns_list)
    obj = RiakObject(client, bucket, key)
    obj.data = user_dict
    obj.content_type = 'application/json'
    obj.store()
    key_set.add(key)
    key_set.store()
    cursor.close()

    def store_and_drop_table(database, table):
    cursor = psycopg2.connect("dbname={}".format(database)).cursor()
    bucket = client.bucket(table)
    key_set = Set(SETS_BUCKET, table)
    cursor.execute("SELECT * FROM {}".format(table))
    rows = cursor.fetchall()
    columns_list = list(get_table_columns(table))
    for row in rows:
    key = str(row[0])
    user_dict = convert_row_to_dict(row, columns_list)
    obj = RiakObject(client, bucket, key)
    obj.data = user_dict
    obj.content_type = 'application/json'
    obj.store()
    key_set.add(key)
    key_set.store()
    cursor.execute("DROP TABLE {};".format(table))
    connection.commit()

    def select_star_from_table(table):
    bucket = client.bucket(table)
    @@ -68,14 +89,20 @@ def get_row_column_value(table, key, field):
    else:
    raise Exception('Field does not exist for this object')

    def migrate_multiple_tables(database, tables):
    cursor = psycopg2.connect("dbname={}".format(database)).cursor()
    for table in tables:
    store_and_drop_table(table)
    cursor.close()

    # Run the main function and close our Postgres cursor
    store_table_in_riak('users')
    cursor.close()
    # store_table_in_riak('users')

    store_table_in_riak('luc', 'users')

    # Let's try it out:

    print list(select_star_from_table('users'))
    # print list(select_star_from_table('users'))

    # [{u'surname': u'Perkins', u'name': u'Luc'}, {u'surname': u'Redmond', u'name': u'Eric'}, {u'surname': u
    # 'Cribbs', u'name': u'Sean'}, {u'surname': u'Bakken', u'name': u'Luke'}]
    @@ -88,7 +115,7 @@ def get_row_column_value(table, key, field):
    # {u'surname': u'Bakken', u'name': u'Luke'}

    try:
    print get_row_column_value('users', '4', 'surname')
    print get_row_column_value('users', '999', 'surname')
    except Exception as e:
    print e

  10. @lucperkins lucperkins revised this gist Jun 30, 2014. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion table.sql
    Original file line number Diff line number Diff line change
    @@ -5,7 +5,7 @@ CREATE TABLE users (
    password VARCHAR(12) NOT NULL
    );

    INSERT INTO users VALUES (1, 'Luc', 'Perkins', 'latexroolz');
    INSERT INTO users VALUES (1, 'Luc', 'Perkins', 'blazersrule');
    INSERT INTO users VALUES (2, 'Sean', 'Cribbs', 'holding');
    INSERT INTO users VALUES (3, 'Eric', 'Redmond', 'glasshole');
    INSERT INTO users VALUES (4, 'Luke', 'Bakken', 'vidalbasson');
  11. @lucperkins lucperkins revised this gist Jun 30, 2014. 2 changed files with 63 additions and 3 deletions.
    57 changes: 56 additions & 1 deletion migration.py
    Original file line number Diff line number Diff line change
    @@ -42,6 +42,61 @@ def store_table_in_riak(table):
    key_set.add(key)
    key_set.store()

    def select_star_from_table(table):
    bucket = client.bucket(table)
    key_set = Set(SETS_BUCKET, table)
    for key in key_set.reload().value:
    yield bucket.get(key).data

    def select_by_id(table, key):
    bucket = client.bucket(table)
    key_set = Set(SETS_BUCKET, table)
    if not key in key_set.reload().value:
    raise Exception('No object for this primary key')
    else:
    return bucket.get(key).data

    def get_row_column_value(table, key, field):
    bucket = client.bucket(table)
    key_set = Set(SETS_BUCKET, table)
    if not key in key_set.reload().value:
    raise Exception
    else:
    obj = bucket.get(key).data
    if field in obj:
    return obj[field]
    else:
    raise Exception('Field does not exist for this object')

    # Run the main function and close our Postgres cursor
    store_table_in_riak('users')
    cursor.close()


    # Let's try it out:

    print list(select_star_from_table('users'))

    # [{u'surname': u'Perkins', u'name': u'Luc'}, {u'surname': u'Redmond', u'name': u'Eric'}, {u'surname': u
    # 'Cribbs', u'name': u'Sean'}, {u'surname': u'Bakken', u'name': u'Luke'}]

    try:
    print select_by_id('users', '4')
    except Exception as e:
    print e

    # {u'surname': u'Bakken', u'name': u'Luke'}

    try:
    print get_row_column_value('users', '4', 'surname')
    except Exception as e:
    print e

    # Bakken

    try:
    print get_row_column_value('users', '4', 'username')
    except Exception as e:
    print e

    cursor.close()
    # Field does not exist for this object
    9 changes: 7 additions & 2 deletions table.sql
    Original file line number Diff line number Diff line change
    @@ -2,5 +2,10 @@ CREATE TABLE users (
    id INT PRIMARY KEY NOT NULL,
    name TEXT NOT NULL,
    surname TEXT NOT NULL,
    password VARCHAR(10) NOT NULL
    );
    password VARCHAR(12) NOT NULL
    );

    INSERT INTO users VALUES (1, 'Luc', 'Perkins', 'latexroolz');
    INSERT INTO users VALUES (2, 'Sean', 'Cribbs', 'holding');
    INSERT INTO users VALUES (3, 'Eric', 'Redmond', 'glasshole');
    INSERT INTO users VALUES (4, 'Luke', 'Bakken', 'vidalbasson');
  12. @lucperkins lucperkins revised this gist Jun 30, 2014. 1 changed file with 24 additions and 21 deletions.
    45 changes: 24 additions & 21 deletions migration.py
    Original file line number Diff line number Diff line change
    @@ -3,42 +3,45 @@
    from riak.datatypes import Set

    # Riak connection and set
    client = RiakClient(...)
    client = RiakClient(pb_port=8087)
    SETS_BUCKET = client.bucket_type('sets').bucket('key_sets')

    # Postgres connection and cursor
    connection = psycopg2.connect('dbname=userdb')
    connection = psycopg2.connect('dbname=luc')
    cursor = connection.cursor()

    # Convert rows into JSON
    def user_tuple_to_dict(tup):
    return {
    'name': tup[1],
    'surname': tup[2],
    'password': tup[3]
    }
    # Get columns for table
    def get_table_columns(table):
    cursor.execute("SELECT column_name FROM information_schema.columns WHERE table_name='{}'".format(table))
    columns = cursor.fetchall()
    for col in columns:
    yield col[0]

    # Convert each row to a dict, excluding the primary key (which will be the Riak object's key)
    def convert_row_to_dict(row, columns_list):
    obj = {}
    for n in range(1, len(columns_list[1:])):
    obj[columns_list[n]] = row[n]
    return obj

    # Store Postgres table in Riak by row
    def store_table_in_riak(table):
    bucket = client.bucket(table)
    key_set = Set(..., table)
    key_set = Set(SETS_BUCKET, table)
    cursor.execute("SELECT * FROM {}".format(table))
    rows = cursor.fetchall()
    columns_list = list(get_table_columns(table))
    print "Columns list: {}".format(columns_list)
    for row in rows:
    key = row[0]
    user_dict = user_tuple_to_dict(row)
    key = str(row[0])
    user_dict = convert_row_to_dict(row, columns_list)
    obj = RiakObject(client, bucket, key)
    obj.data = user_dict
    obj.content_type = 'application/json'
    obj.store()
    key_set.add(key)
    key_set.store()

    def fetch_all_users(table):
    key_set = Set(..., table)
    bucket = client.bucket(table)
    for key in key_set.reload():
    obj = bucket.get(key)
    yield obj.data
    store_table_in_riak('users')

    def fetch_user_by_table_and_id(table, id)
    def fetch_specific_field(table, id, field)

    cursor.close()
  13. @lucperkins lucperkins revised this gist Jun 30, 2014. 1 changed file with 33 additions and 0 deletions.
    33 changes: 33 additions & 0 deletions migration.py
    Original file line number Diff line number Diff line change
    @@ -1,11 +1,44 @@
    import psycopg2
    from riak import RiakClient, RiakObject
    from riak.datatypes import Set

    # Riak connection and set
    client = RiakClient(...)

    # Postgres connection and cursor
    connection = psycopg2.connect('dbname=userdb')
    cursor = connection.cursor()

    # Convert rows into JSON
    def user_tuple_to_dict(tup):
    return {
    'name': tup[1],
    'surname': tup[2],
    'password': tup[3]
    }

    # Store Postgres table in Riak by row
    def store_table_in_riak(table):
    bucket = client.bucket(table)
    key_set = Set(..., table)
    cursor.execute("SELECT * FROM {}".format(table))
    rows = cursor.fetchall()
    for row in rows:
    key = row[0]
    user_dict = user_tuple_to_dict(row)
    obj = RiakObject(client, bucket, key)
    obj.data = user_dict
    obj.content_type = 'application/json'
    obj.store()
    key_set.add(key)

    def fetch_all_users(table):
    key_set = Set(..., table)
    bucket = client.bucket(table)
    for key in key_set.reload():
    obj = bucket.get(key)
    yield obj.data

    def fetch_user_by_table_and_id(table, id)
    def fetch_specific_field(table, id, field)

  14. @lucperkins lucperkins revised this gist Jun 30, 2014. 2 changed files with 15 additions and 0 deletions.
    9 changes: 9 additions & 0 deletions migration.py
    Original file line number Diff line number Diff line change
    @@ -1,2 +1,11 @@
    import psycopg2

    connection = psycopg2.connect('dbname=userdb')
    cursor = connection.cursor()

    def store_table_in_riak(table):
    cursor.execute("SELECT * FROM {}".format(table))
    rows = cursor.fetchall()
    for row in rows:
    key = row[0]

    6 changes: 6 additions & 0 deletions table.sql
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,6 @@
    CREATE TABLE users (
    id INT PRIMARY KEY NOT NULL,
    name TEXT NOT NULL,
    surname TEXT NOT NULL,
    password VARCHAR(10) NOT NULL
    );
  15. @lucperkins lucperkins created this gist Jun 30, 2014.
    2 changes: 2 additions & 0 deletions migration.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,2 @@
    import psycopg2