Last active
July 9, 2025 13:11
-
-
Save hakib/7e723d2c113b947f7920bf55737e4d16 to your computer and use it in GitHub Desktop.
Revisions
-
hakib revised this gist
Feb 25, 2022 . 1 changed file with 2 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -451,15 +451,15 @@ def copy_string_iterator(connection, beers: Iterator[Dict[str, Any]], size: int ) connection.set_session(autocommit=True) import psycopg2.extras def test(connection, n: int): # Make sure the data was loaded with connection.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor) as cursor: # Test number of rows. cursor.execute('SELECT COUNT(*) AS cnt FROM staging_beers') record = cursor.fetchone() assert record.cnt == n, f'Expected {n} rows, got {record.cnt} rows!' # Test that the data was loaded, and that transformations were applied correctly. cursor.execute(""" -
hakib created this gist
Jul 9, 2019 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,540 @@ # https://hakibenita.com/fast-load-data-python-postgresql from typing import Iterator, Dict, Any, Optional from urllib.parse import urlencode import datetime #------------------------ Profile import time from functools import wraps from memory_profiler import memory_usage def profile(fn): @wraps(fn) def inner(*args, **kwargs): fn_kwargs_str = ', '.join(f'{k}={v}' for k, v in kwargs.items()) print(f'\n{fn.__name__}({fn_kwargs_str})') # Measure time t = time.perf_counter() retval = fn(*args, **kwargs) elapsed = time.perf_counter() - t print(f'Time {elapsed:0.4}') # Measure memory mem, retval = memory_usage((fn, args, kwargs), retval=True, timeout=200, interval=1e-7) print(f'Memory {max(mem) - min(mem)}') return retval return inner #------------------------ Data import requests def iter_beers_from_api(page_size: int = 25) -> Iterator[Dict[str, Any]]: session = requests.Session() page = 1 while True: response = session.get('https://api.punkapi.com/v2/beers?' + urlencode({ 'page': page, 'per_page': page_size })) response.raise_for_status() data = response.json() if not data: break for beer in data: yield beer page += 1 def iter_beers_from_file(path: str) -> Iterator[Dict[str, Any]]: import json with open(path, 'r') as f: data = json.load(f) for beer in data: yield beer #------------------------ Load def create_staging_table(cursor): cursor.execute(""" DROP TABLE IF EXISTS staging_beers; CREATE TABLE staging_beers ( id INTEGER, name TEXT, tagline TEXT, first_brewed DATE, description TEXT, image_url TEXT, abv DECIMAL, ibu DECIMAL, target_fg DECIMAL, target_og DECIMAL, ebc DECIMAL, srm DECIMAL, ph DECIMAL, attenuation_level DECIMAL, brewers_tips TEXT, contributed_by TEXT, volume INTEGER ); """) def parse_first_brewed(text: str) -> datetime.date: parts = text.split('/') if len(parts) == 2: return datetime.date(int(parts[1]), int(parts[0]), 1) elif len(parts) == 1: return datetime.date(int(parts[0]), 1, 1) else: assert False, 'Unknown date format' @profile def insert_one_by_one(connection, beers: Iterator[Dict[str, Any]]) -> None: with connection.cursor() as cursor: create_staging_table(cursor) for beer in beers: cursor.execute(""" INSERT INTO staging_beers VALUES ( %(id)s, %(name)s, %(tagline)s, %(first_brewed)s, %(description)s, %(image_url)s, %(abv)s, %(ibu)s, %(target_fg)s, %(target_og)s, %(ebc)s, %(srm)s, %(ph)s, %(attenuation_level)s, %(brewers_tips)s, %(contributed_by)s, %(volume)s ); """, { **beer, 'first_brewed': parse_first_brewed(beer['first_brewed']), 'volume': beer['volume']['value'], }) # http://initd.org/psycopg/docs/cursor.html#cursor.executemany @profile def insert_executemany(connection, beers: Iterator[Dict[str, Any]]) -> None: with connection.cursor() as cursor: create_staging_table(cursor) all_beers = [{ **beer, 'first_brewed': parse_first_brewed(beer['first_brewed']), 'volume': beer['volume']['value'], } for beer in beers] cursor.executemany(""" INSERT INTO staging_beers VALUES ( %(id)s, %(name)s, %(tagline)s, %(first_brewed)s, %(description)s, %(image_url)s, %(abv)s, %(ibu)s, %(target_fg)s, %(target_og)s, %(ebc)s, %(srm)s, %(ph)s, %(attenuation_level)s, %(brewers_tips)s, %(contributed_by)s, %(volume)s ); """, all_beers) @profile def insert_executemany_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None: with connection.cursor() as cursor: create_staging_table(cursor) cursor.executemany(""" INSERT INTO staging_beers VALUES ( %(id)s, %(name)s, %(tagline)s, %(first_brewed)s, %(description)s, %(image_url)s, %(abv)s, %(ibu)s, %(target_fg)s, %(target_og)s, %(ebc)s, %(srm)s, %(ph)s, %(attenuation_level)s, %(brewers_tips)s, %(contributed_by)s, %(volume)s ); """, ({ **beer, 'first_brewed': parse_first_brewed(beer['first_brewed']), 'volume': beer['volume']['value'], } for beer in beers)) # http://initd.org/psycopg/docs/extras.html#psycopg2.extras.execute_batch import psycopg2.extras @profile def insert_execute_batch(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None: with connection.cursor() as cursor: create_staging_table(cursor) all_beers = [{ **beer, 'first_brewed': parse_first_brewed(beer['first_brewed']), 'volume': beer['volume']['value'], } for beer in beers] psycopg2.extras.execute_batch(cursor, """ INSERT INTO staging_beers VALUES ( %(id)s, %(name)s, %(tagline)s, %(first_brewed)s, %(description)s, %(image_url)s, %(abv)s, %(ibu)s, %(target_fg)s, %(target_og)s, %(ebc)s, %(srm)s, %(ph)s, %(attenuation_level)s, %(brewers_tips)s, %(contributed_by)s, %(volume)s ); """, all_beers, page_size=page_size) @profile def insert_execute_batch_iterator(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None: with connection.cursor() as cursor: create_staging_table(cursor) iter_beers = ({ **beer, 'first_brewed': parse_first_brewed(beer['first_brewed']), 'volume': beer['volume']['value'], } for beer in beers) psycopg2.extras.execute_batch(cursor, """ INSERT INTO staging_beers VALUES ( %(id)s, %(name)s, %(tagline)s, %(first_brewed)s, %(description)s, %(image_url)s, %(abv)s, %(ibu)s, %(target_fg)s, %(target_og)s, %(ebc)s, %(srm)s, %(ph)s, %(attenuation_level)s, %(brewers_tips)s, %(contributed_by)s, %(volume)s ); """, iter_beers, page_size=page_size) # http://initd.org/psycopg/docs/extras.html#psycopg2.extras.execute_values import psycopg2.extras @profile def insert_execute_values(connection, beers: Iterator[Dict[str, Any]]) -> None: with connection.cursor() as cursor: create_staging_table(cursor) psycopg2.extras.execute_values(cursor, """ INSERT INTO staging_beers VALUES %s; """, [( beer['id'], beer['name'], beer['tagline'], parse_first_brewed(beer['first_brewed']), beer['description'], beer['image_url'], beer['abv'], beer['ibu'], beer['target_fg'], beer['target_og'], beer['ebc'], beer['srm'], beer['ph'], beer['attenuation_level'], beer['brewers_tips'], beer['contributed_by'], beer['volume']['value'], ) for beer in beers]) @profile def insert_execute_values_iterator(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None: with connection.cursor() as cursor: create_staging_table(cursor) psycopg2.extras.execute_values(cursor, """ INSERT INTO staging_beers VALUES %s; """, (( beer['id'], beer['name'], beer['tagline'], parse_first_brewed(beer['first_brewed']), beer['description'], beer['image_url'], beer['abv'], beer['ibu'], beer['target_fg'], beer['target_og'], beer['ebc'], beer['srm'], beer['ph'], beer['attenuation_level'], beer['brewers_tips'], beer['contributed_by'], beer['volume']['value'], ) for beer in beers), page_size=page_size) # http://initd.org/psycopg/docs/cursor.html#cursor.copy_from # https://docs.python.org/3.7/library/io.html?io.StringIO#io.StringIO import io def clean_csv_value(value: Optional[Any]) -> str: if value is None: return r'\N' return str(value).replace('\n', '\\n') @profile def copy_stringio(connection, beers: Iterator[Dict[str, Any]]) -> None: with connection.cursor() as cursor: create_staging_table(cursor) csv_file_like_object = io.StringIO() for beer in beers: csv_file_like_object.write('|'.join(map(clean_csv_value, ( beer['id'], beer['name'], beer['tagline'], parse_first_brewed(beer['first_brewed']), beer['description'], beer['image_url'], beer['abv'], beer['ibu'], beer['target_fg'], beer['target_og'], beer['ebc'], beer['srm'], beer['ph'], beer['attenuation_level'], beer['contributed_by'], beer['brewers_tips'], beer['volume']['value'], ))) + '\n') csv_file_like_object.seek(0) cursor.copy_from(csv_file_like_object, 'staging_beers', sep='|') class StringIteratorIO(io.TextIOBase): def __init__(self, iter: Iterator[str]): self._iter = iter self._buff = '' def readable(self) -> bool: return True def _read1(self, n: Optional[int] = None) -> str: while not self._buff: try: self._buff = next(self._iter) except StopIteration: break ret = self._buff[:n] self._buff = self._buff[len(ret):] return ret def read(self, n: Optional[int] = None) -> str: line = [] if n is None or n < 0: while True: m = self._read1() if not m: break line.append(m) else: while n > 0: m = self._read1(n) if not m: break n -= len(m) line.append(m) return ''.join(line) @profile def copy_string_iterator(connection, beers: Iterator[Dict[str, Any]], size: int = 8192) -> None: with connection.cursor() as cursor: create_staging_table(cursor) beers_string_iterator = StringIteratorIO(( '|'.join(map(clean_csv_value, ( beer['id'], beer['name'], beer['tagline'], parse_first_brewed(beer['first_brewed']).isoformat(), beer['description'], beer['image_url'], beer['abv'], beer['ibu'], beer['target_fg'], beer['target_og'], beer['ebc'], beer['srm'], beer['ph'], beer['attenuation_level'], beer['brewers_tips'], beer['contributed_by'], beer['volume']['value'], ))) + '\n' for beer in beers )) cursor.copy_from(beers_string_iterator, 'staging_beers', sep='|', size=size) #------------------------ Benchmark connection = psycopg2.connect( host='localhost', database='testload', user='haki', password=None, ) connection.set_session(autocommit=True) from psycopg2.extras import NamedTupleCursor def test(connection, n: int): # Make sure the data was loaded with connection.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor) as cursor: # Test number of rows. cursor.execute('SELECT COUNT(*) AS cnt FROM staging_beers') record = cursor.fetchone() assert record.cnt == n, f'Expected {n} rows, got {rowcount} rows!' # Test that the data was loaded, and that transformations were applied correctly. cursor.execute(""" SELECT DISTINCT ON (id) * FROM staging_beers WHERE id IN (1, 235) ORDER BY id; """) beer_1 = cursor.fetchone() assert beer_1.name == 'Buzz' assert beer_1.first_brewed == datetime.date(2007, 9, 1) assert beer_1.volume == 20 beer_235 = cursor.fetchone() assert beer_235.name == 'Mango And Chili Barley Wine' assert beer_235.first_brewed == datetime.date(2016, 1, 1) assert beer_235.volume == 20 beers = list(iter_beers_from_api()) * 100 insert_one_by_one(connection, beers) test(connection, len(beers)) insert_executemany(connection, beers) test(connection, len(beers)) insert_executemany_iterator(connection, beers) test(connection, len(beers)) insert_execute_batch(connection, beers) test(connection, len(beers)) insert_execute_batch_iterator(connection, beers, page_size=1) test(connection, len(beers)) insert_execute_batch_iterator(connection, beers, page_size=100) test(connection, len(beers)) insert_execute_batch_iterator(connection, beers, page_size=1000) test(connection, len(beers)) insert_execute_batch_iterator(connection, beers, page_size=10000) test(connection, len(beers)) insert_execute_values(connection, beers) test(connection, len(beers)) insert_execute_values_iterator(connection, beers, page_size=1) test(connection, len(beers)) insert_execute_values_iterator(connection, beers, page_size=100) test(connection, len(beers)) insert_execute_values_iterator(connection, beers, page_size=1000) test(connection, len(beers)) insert_execute_values_iterator(connection, beers, page_size=10000) test(connection, len(beers)) copy_stringio(connection, beers) test(connection, len(beers)) copy_string_iterator(connection, beers, size=1024) test(connection, len(beers)) copy_string_iterator(connection, beers, size=1024 * 8) test(connection, len(beers)) copy_string_iterator(connection, beers, size=1024 * 16) test(connection, len(beers)) copy_string_iterator(connection, beers, size=1024 * 64) test(connection, len(beers))