Skip to content

Instantly share code, notes, and snippets.

@developerworks
Forked from lucperkins/migration.py
Created April 27, 2016 08:42
Show Gist options
  • Select an option

  • Save developerworks/75d096a42f218628dc788dc3aa5265f8 to your computer and use it in GitHub Desktop.

Select an option

Save developerworks/75d096a42f218628dc788dc3aa5265f8 to your computer and use it in GitHub Desktop.
Migrate SQL table to Riak
import psycopg2
from riak import RiakClient, RiakObject
from riak.datatypes import Set
# Riak connection and set
client = RiakClient(pb_port=8087)
SETS_BUCKET = client.bucket_type('sets').bucket('key_sets')
# Postgres connection and cursor
connection = psycopg2.connect('dbname=luc')
cursor = connection.cursor()
# Get columns for table
def get_table_columns(table):
cursor.execute("SELECT column_name FROM information_schema.columns WHERE table_name='{}'".format(table))
columns = cursor.fetchall()
for col in columns:
yield col[0]
# Convert each row to a dict, excluding the primary key (which will be the Riak object's key)
def convert_row_to_dict(row, columns_list):
obj = {}
for n in range(1, len(columns_list[1:])):
obj[columns_list[n]] = row[n]
return obj
# Store Postgres table in Riak by row
def store_table_in_riak(table):
bucket = client.bucket(table)
key_set = Set(SETS_BUCKET, table)
cursor.execute("SELECT * FROM {}".format(table))
rows = cursor.fetchall()
columns_list = list(get_table_columns(table))
print "Columns list: {}".format(columns_list)
for row in rows:
key = str(row[0])
user_dict = convert_row_to_dict(row, columns_list)
obj = RiakObject(client, bucket, key)
obj.data = user_dict
obj.content_type = 'application/json'
obj.store()
key_set.add(key)
key_set.store()
def select_star_from_table(table):
bucket = client.bucket(table)
key_set = Set(SETS_BUCKET, table)
for key in key_set.reload().value:
yield bucket.get(key).data
def select_by_id(table, key):
bucket = client.bucket(table)
key_set = Set(SETS_BUCKET, table)
if not key in key_set.reload().value:
raise Exception('No object for this primary key')
else:
return bucket.get(key).data
def get_row_column_value(table, key, field):
bucket = client.bucket(table)
key_set = Set(SETS_BUCKET, table)
if not key in key_set.reload().value:
raise Exception
else:
obj = bucket.get(key).data
if field in obj:
return obj[field]
else:
raise Exception('Field does not exist for this object')
# Run the main function and close our Postgres cursor
store_table_in_riak('users')
cursor.close()
# Let's try it out:
print list(select_star_from_table('users'))
# [{u'surname': u'Perkins', u'name': u'Luc'}, {u'surname': u'Redmond', u'name': u'Eric'}, {u'surname': u
# 'Cribbs', u'name': u'Sean'}, {u'surname': u'Bakken', u'name': u'Luke'}]
try:
print select_by_id('users', '4')
except Exception as e:
print e
# {u'surname': u'Bakken', u'name': u'Luke'}
try:
print get_row_column_value('users', '4', 'surname')
except Exception as e:
print e
# Bakken
try:
print get_row_column_value('users', '4', 'username')
except Exception as e:
print e
# Field does not exist for this object
CREATE TABLE users (
id INT PRIMARY KEY NOT NULL,
name TEXT NOT NULL,
surname TEXT NOT NULL,
password VARCHAR(12) NOT NULL
);
INSERT INTO users VALUES (1, 'Luc', 'Perkins', 'blazersrule');
INSERT INTO users VALUES (2, 'Sean', 'Cribbs', 'holding');
INSERT INTO users VALUES (3, 'Eric', 'Redmond', 'glasshole');
INSERT INTO users VALUES (4, 'Luke', 'Bakken', 'vidalbasson');
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment