Last active
May 4, 2016 14:26
-
-
Save konstantin-chukharev/56cc697ed0634fab55bbacb0ef3a5c8c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # -------------------------------- | |
| # NOTICE: all tasks except "Final Exam" should execute in Sliderepl presentation | |
| # -------------------------------- | |
| +-----------------------------------------------------------------------------------+ | |
| | Assuming this table: | | |
| | | | |
| | CREATE TABLE employee ( | | |
| | emp_id INTEGER PRIMARY KEY, | | |
| | emp_name VARCHAR(30) | | |
| | } | | |
| | | | |
| | And using the "engine.execute()" method to invoke a statement: | | |
| | | | |
| | 1. Execute an INSERT statement that will insert the row with emp_name='dilbert'. | | |
| | The primary key column can be omitted so that it is generated automatically. | | |
| | | | |
| | 2. SELECT all rows from the employee table. | | |
| +---------------------------------------------------------------------- ------------+ | |
| connection = engine.connect() | |
| # Insert query | |
| transaction = connection.begin() | |
| connection.execute("insert into employee (emp_name) values (:emp_name)", emp_name="dilbert") | |
| transaction.commit() | |
| # Select query | |
| result = connection.execute("select * from employee") | |
| print(result.fetchall()) | |
| connection.close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| +------------------------------------------------------------------+ | |
| | 1. Write a Table construct corresponding to this CREATE TABLE | | |
| | statement. | | |
| | | | |
| | CREATE TABLE network ( | | |
| | network_id INTEGER PRIMARY KEY, | | |
| | name VARCHAR(100) NOT NULL, | | |
| | created_at DATETIME NOT NULL, | | |
| | owner_id INTEGER, | | |
| | FOREIGN KEY owner_id REFERENCES user(id) | | |
| | ) | | |
| | | | |
| | 2. Then emit metadata.create_all(), which will | | |
| | emit CREATE TABLE for this table (it will skip | | |
| | those that already exist). | | |
| +------------------------------------------------------------------+ | |
| # Create Table query | |
| import sqlalchemy | |
| engine = create_engine("sqlite://") | |
| network_table = Table ('network', metadata, | |
| Column('network_id', Integer, primary_key = True), | |
| Column('name', String(100), nullable = False), | |
| Column('created_at', DateTime, nullable = False), | |
| Column('owner_id', Integer, ForeignKey('user.id')) | |
| ) | |
| metadata.create_all(engine) | |
| +---------------------------------------------------------------------+ | | |
| | 1. Using 'metadata2', reflect the "network" table in the same way | | |
| | we just did 'user', then display the columns (or bonus, display | | |
| | just the column names) | | |
| | | | |
| | 2. Using "inspector", print a list of all table names that | | |
| | include a column called "story_id" | | |
| +---------------------------------------------------------------------+ | |
| #----------------------------------------------- | |
| 1. | |
| # ---------------------------------------------- | |
| # Using Inspector | |
| metadata2 = MetaData() | |
| network_reflected = Table('network', metadata2, autoload = True, autoload_with = engine) | |
| inspector = inspect(engine) | |
| # Displaying information about columns | |
| inspector.get_columns('network') | |
| # Displaying column names | |
| get_column_names = lambda table_name: list(map(lambda column: column['name'], inspector.get_columns(table_name))) | |
| print(get_column_names('network')) | |
| #----------------------------------------------- | |
| 2. | |
| # ---------------------------------------------- | |
| all_table_names = inspector.get_table_names() | |
| # getting table names that contains column with name 'story_id' | |
| filtered_table_names = list(filter(lambda table_name: 'story_id' in get_column_names(table_name), all_table_names)) | |
| print(filtered_table_names) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| +------------------------------------------------------------------------+ | |
| | Produce these expressions using "user_table.c.fullname", | | |
| | "user_table.c.id", and "user_table.c.username": | | |
| | | | |
| | 1. user.fullname = 'ed' | | |
| | | | |
| | 2. user.fullname = 'ed' AND user.id > 5 | | |
| | | | |
| | 3. user.username = 'edward' OR (user.fullname = 'ed' AND user.id > 5) | | |
| +------------------------------------------------------------------------+ | |
| first_expression = user_table.c.fullname == 'ed' | |
| print(first_expression) | |
| second_expression = first_expression & (user_table.c.id > 5) | |
| print (second_expression) | |
| third_expression = (user_table.c.username == 'edward') | second_expression | |
| print(third_expression) | |
| +----------------------------------------------------------------------------+ | |
| | 1. use user_table.insert() and "r = conn.execute()" to emit this | | |
| | statement: | | |
| | | | |
| | INSERT INTO user (username, fullname) VALUES ('dilbert', 'Dilbert Jones') | | |
| | | | |
| | 2. What is the value of 'user.id' for the above INSERT statement? | | |
| | | | |
| | 3. Using "select([user_table])", execute this SELECT: | | |
| | | | |
| | SELECT id, username, fullname FROM user WHERE username = 'wendy' OR | | |
| | username = 'dilbert' ORDER BY fullname | | |
| +----------------------------------------------------------------------------+ | |
| insert_statement = user_table.insert().values(username = 'dilbert', fullname = 'Dilbert Jones') | |
| connection = engine.connect() | |
| result = connection.execute(insert_statement) | |
| # value of 'user.id' for the above INSERT statement is: | |
| result.inserted_primary_key | |
| select_statement = select([user_table.c.id, | |
| user_table.c.username, | |
| user_table.c.fullname] | |
| ).where( | |
| (user_table.c.username == 'wendy') | | |
| (user_table.c.username == 'dilbert') | |
| ).order_by(user_table.c.fullname) | |
| connection = engine.connect() | |
| result = connection.execute(select_statement) | |
| for user in result: | |
| print(user) | |
| +------------------------------------------------------------------+ | |
| | Produce this SELECT: | | |
| | | | |
| | SELECT fullname, email_address FROM user JOIN address | | |
| | ON user.id = address.user_id WHERE username='ed' | | |
| | ORDER BY email_address | | |
| +------------------------------------------------------------------+ | |
| join_obj = user_table.join(address_table) | |
| join_statement = select([user_table.c.fullname, address_table.c.email_address]).\ | |
| select_from(join_obj).\ | |
| where(user_table.c.username == 'ed').\ | |
| order_by(address_table.c.email_address) | |
| connection = engine.connect() | |
| result = connection.execute(join_statement) | |
| for user in result: | |
| print(user) | |
| +---------------------------------------------------------------------+ | |
| | 1. Execute this UPDATE - keep the "result" that's returned | | |
| | | | |
| | UPDATE user SET fullname='Ed Jones' where username='ed' | | |
| | | | |
| | 2. how many rows did the above statement update? | | |
| | | | |
| | 3. Tricky bonus! Combine update() along with select().as_scalar() | | |
| | to execute this UPDATE: | | |
| | | | |
| | UPDATE user SET fullname=fullname || | | |
| | (select email_address FROM address WHERE user_id=user.id) | | |
| | WHERE username IN ('jack', 'wendy') | | |
| +---------------------------------------------------------------------+ | |
| update_statement = user_table.update().values(fullname = 'Ed Jones').where(user_table.c.username == 'ed') | |
| connection = engine.connect() | |
| result = connection.execute(update_statement) | |
| # rows updated | |
| result.rowcount | |
| address_table_sel = select([address_table.c.email_address]).where(address_table.c.user_id == user_table.c.id) | |
| update_statement_with_select = user_table.update().values(fullname = user_table.c.fullname + ' ' + | |
| address_table_sel.as_scalar()\ | |
| ).where(user_table.c.username.in_(['jack', 'wendy'])) | |
| connection = engine.connect() | |
| result = connection.execute(update_statement_with_select) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| +-------------------------------------------------------------------+ | |
| | 1. Create a class/mapping for this table, call the class Network | | |
| | | | |
| | CREATE TABLE network ( | | |
| | network_id INTEGER PRIMARY KEY, | | |
| | name VARCHAR(100) NOT NULL, | | |
| | ) | | |
| | | | |
| | 2. emit Base.metadata.create_all(engine) to create the table | | |
| | | | |
| | 3. commit a few Network objects to the database: | | |
| | | | |
| | Network(name='net1'), Network(name='net2') | | |
| +-------------------------------------------------------------------+ | |
| from sqlalchemy.ext.declarative import declarative_base | |
| from sqlalchemy.orm import Session | |
| engine = create_engine('sqlite://') | |
| Base = declarative_base() | |
| class Network(Base): | |
| __tablename__ = 'network' | |
| network_id = Column(Integer, primary_key = True) | |
| name = Column(String(100), nullable = False) | |
| def __repr__(self): | |
| return "Network<(%r)>" % (self.name) | |
| Base.metadata.create_all(engine) | |
| session = Session(bind=engine) | |
| session.add_all([Network(name = 'net1'), Network(name = 'net2')]) | |
| session.commit() | |
| +---------------------------------------------------------------------------+ | |
| | 1. Produce a Query object representing the list of "fullname" values for | | |
| | all User objects in alphabetical order. | | |
| | | | |
| | 2. call .all() on the query to make sure it works! | | |
| | | | |
| | 3. build a second Query object from the first that also selects | | |
| | only User rows with the name "mary" or "ed". | | |
| | | | |
| | 4. return only the second row of the Query from #3. | | |
| +---------------------------------------------------------------------------+ | |
| query = session.query(User.fullname).order_by(User.fullname) | |
| print(query.all()) | |
| second_query = query.filter(or_(User.name == 'ed', User.name == 'mary')) | |
| second_row_of_second_query = second_query[1] | |
| print(second_row_of_second_query) | |
| +------------------------------------------------------------------+ | |
| | 1. Run this SQL JOIN: | | |
| | | | |
| | SELECT user.name, address.email_address FROM user | | |
| | JOIN address ON user.id=address.user_id WHERE | | |
| | address.email_address='[email protected]' | | |
| | | | |
| | 2. Tricky Bonus! Select all pairs of distinct user names. | | |
| | Hint: "... ON user_alias1.name < user_alias2.name" | | |
| +------------------------------------------------------------------+ | |
| query = session.query(User.name, Address.email_address).join(Address).filter(Address.email_address == '[email protected]') | |
| query.all() | |
| alias1, alias2 = aliased(User), aliased(User) | |
| query = session.query(alias1.name, alias2.name).join(alias2, alias1.name < alias2.name) | |
| query.all() | |
| +----------------------------------------------------------------------------+ | |
| | NOTICE: this task should execute like ordinary python program | | |
| +----------------------------------------------------------------------------+ | |
| | 1. Create a class called 'Account', with table "account": | | |
| | id = Column(Integer, primary_key=True) | | |
| | owner = Column(String(50), nullable=False) | | |
| | balance = Column(Numeric, default=0) | | |
| | | | |
| | 2. Create a class "Transaction", with table "transaction": | | |
| | * Integer primary key | | |
| | * numeric "amount" column | | |
| | * Integer "account_id" column with ForeignKey('account.id') | | |
| | | | |
| | 3. Add a relationship() on Transaction named "account", which refers | | |
| | to "Account", and has a backref called "transactions". | | |
| | | | |
| | 4. Create a database, create tables, then insert these objects: | | |
| | a1 = Account(owner='Jack Jones', balance=5000) | | |
| | a2 = Account(owner='Ed Rendell', balance=10000) | | |
| | Transaction(amount=500, account=a1) | | |
| | Transaction(amount=4500, account=a1) | | |
| | Transaction(amount=6000, account=a2) | | |
| | Transaction(amount=4000, account=a2) | | |
| | | | |
| | 5. Produce a report that shows: | | |
| | * account owner | | |
| | * account balance | | |
| | * summation of transaction amounts per account (should match balance) | | |
| | A column can be summed using func.sum(Transaction.amount) | | |
| +----------------------------------------------------------------------------+ | |
| from sqlalchemy import Column, ForeignKey, Integer, String, Numeric, create_engine, select | |
| from sqlalchemy.ext.declarative import declarative_base | |
| from sqlalchemy.orm import Session, relationship | |
| engine = create_engine('sqlite://') | |
| Base = declarative_base() | |
| class Account(Base): | |
| __tablename__ = 'account' | |
| id = Column(Integer, primary_key = True) | |
| owner = Column(String(50), nullable = False) | |
| balance = Column(Numeric, default = 0) | |
| def __repr__(self): | |
| return "Account<(%r, %r)>" % (self.owner, self.balance) | |
| class Transaction(Base): | |
| __tablename__ = 'tablename' | |
| id = Column(Integer, primary_key = True) | |
| amount = Column(Numeric, nullable = False) | |
| account_id = Column(Integer, ForeignKey('account.id')) | |
| account = relationship("Account", backref="transactions") | |
| def __repr__(self): | |
| return "Transaction<(%r, %r)>" % (self.amount, self.account) | |
| Base.metadata.create_all(engine) | |
| session = Session(bind=engine) | |
| a1 = Account(owner='Jack Jones', balance=5000) | |
| a2 = Account(owner='Ed Rendell', balance=10000) | |
| session.add_all([a1, | |
| a2, | |
| Transaction(amount=500, account=a1), | |
| Transaction(amount=4500, account=a1), | |
| Transaction(amount=6000, account=a2), | |
| Transaction(amount=4000, account=a2)]) | |
| session.commit() | |
| class Report(object): | |
| def __init__(self, account): | |
| self.account_owner = account.owner | |
| self.account_balance = account.balance | |
| amounts = list(map(lambda transaction: transaction.amount, account.transactions)) | |
| self.transactions_summ = sum(amounts) | |
| def __repr__(self): | |
| return "Report<(Owner: %r, Balance: %r, Summ: %r)>" %\ | |
| (self.account_owner, self.account_balance, self.transactions_summ) | |
| accounts = session.query(Account).all() | |
| for account in accounts: | |
| print(Report(account)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment