-
-
Save hakib/7e723d2c113b947f7920bf55737e4d16 to your computer and use it in GitHub Desktop.
| # https://hakibenita.com/fast-load-data-python-postgresql | |
| from typing import Iterator, Dict, Any, Optional | |
| from urllib.parse import urlencode | |
| import datetime | |
| #------------------------ Profile | |
| import time | |
| from functools import wraps | |
| from memory_profiler import memory_usage | |
| def profile(fn): | |
| @wraps(fn) | |
| def inner(*args, **kwargs): | |
| fn_kwargs_str = ', '.join(f'{k}={v}' for k, v in kwargs.items()) | |
| print(f'\n{fn.__name__}({fn_kwargs_str})') | |
| # Measure time | |
| t = time.perf_counter() | |
| retval = fn(*args, **kwargs) | |
| elapsed = time.perf_counter() - t | |
| print(f'Time {elapsed:0.4}') | |
| # Measure memory | |
| mem, retval = memory_usage((fn, args, kwargs), retval=True, timeout=200, interval=1e-7) | |
| print(f'Memory {max(mem) - min(mem)}') | |
| return retval | |
| return inner | |
| #------------------------ Data | |
| import requests | |
| def iter_beers_from_api(page_size: int = 25) -> Iterator[Dict[str, Any]]: | |
| session = requests.Session() | |
| page = 1 | |
| while True: | |
| response = session.get('https://api.punkapi.com/v2/beers?' + urlencode({ | |
| 'page': page, | |
| 'per_page': page_size | |
| })) | |
| response.raise_for_status() | |
| data = response.json() | |
| if not data: | |
| break | |
| for beer in data: | |
| yield beer | |
| page += 1 | |
| def iter_beers_from_file(path: str) -> Iterator[Dict[str, Any]]: | |
| import json | |
| with open(path, 'r') as f: | |
| data = json.load(f) | |
| for beer in data: | |
| yield beer | |
| #------------------------ Load | |
| def create_staging_table(cursor): | |
| cursor.execute(""" | |
| DROP TABLE IF EXISTS staging_beers; | |
| CREATE TABLE staging_beers ( | |
| id INTEGER, | |
| name TEXT, | |
| tagline TEXT, | |
| first_brewed DATE, | |
| description TEXT, | |
| image_url TEXT, | |
| abv DECIMAL, | |
| ibu DECIMAL, | |
| target_fg DECIMAL, | |
| target_og DECIMAL, | |
| ebc DECIMAL, | |
| srm DECIMAL, | |
| ph DECIMAL, | |
| attenuation_level DECIMAL, | |
| brewers_tips TEXT, | |
| contributed_by TEXT, | |
| volume INTEGER | |
| ); | |
| """) | |
| def parse_first_brewed(text: str) -> datetime.date: | |
| parts = text.split('/') | |
| if len(parts) == 2: | |
| return datetime.date(int(parts[1]), int(parts[0]), 1) | |
| elif len(parts) == 1: | |
| return datetime.date(int(parts[0]), 1, 1) | |
| else: | |
| assert False, 'Unknown date format' | |
| @profile | |
| def insert_one_by_one(connection, beers: Iterator[Dict[str, Any]]) -> None: | |
| with connection.cursor() as cursor: | |
| create_staging_table(cursor) | |
| for beer in beers: | |
| cursor.execute(""" | |
| INSERT INTO staging_beers VALUES ( | |
| %(id)s, | |
| %(name)s, | |
| %(tagline)s, | |
| %(first_brewed)s, | |
| %(description)s, | |
| %(image_url)s, | |
| %(abv)s, | |
| %(ibu)s, | |
| %(target_fg)s, | |
| %(target_og)s, | |
| %(ebc)s, | |
| %(srm)s, | |
| %(ph)s, | |
| %(attenuation_level)s, | |
| %(brewers_tips)s, | |
| %(contributed_by)s, | |
| %(volume)s | |
| ); | |
| """, { | |
| **beer, | |
| 'first_brewed': parse_first_brewed(beer['first_brewed']), | |
| 'volume': beer['volume']['value'], | |
| }) | |
| # http://initd.org/psycopg/docs/cursor.html#cursor.executemany | |
| @profile | |
| def insert_executemany(connection, beers: Iterator[Dict[str, Any]]) -> None: | |
| with connection.cursor() as cursor: | |
| create_staging_table(cursor) | |
| all_beers = [{ | |
| **beer, | |
| 'first_brewed': parse_first_brewed(beer['first_brewed']), | |
| 'volume': beer['volume']['value'], | |
| } for beer in beers] | |
| cursor.executemany(""" | |
| INSERT INTO staging_beers VALUES ( | |
| %(id)s, | |
| %(name)s, | |
| %(tagline)s, | |
| %(first_brewed)s, | |
| %(description)s, | |
| %(image_url)s, | |
| %(abv)s, | |
| %(ibu)s, | |
| %(target_fg)s, | |
| %(target_og)s, | |
| %(ebc)s, | |
| %(srm)s, | |
| %(ph)s, | |
| %(attenuation_level)s, | |
| %(brewers_tips)s, | |
| %(contributed_by)s, | |
| %(volume)s | |
| ); | |
| """, all_beers) | |
| @profile | |
| def insert_executemany_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None: | |
| with connection.cursor() as cursor: | |
| create_staging_table(cursor) | |
| cursor.executemany(""" | |
| INSERT INTO staging_beers VALUES ( | |
| %(id)s, | |
| %(name)s, | |
| %(tagline)s, | |
| %(first_brewed)s, | |
| %(description)s, | |
| %(image_url)s, | |
| %(abv)s, | |
| %(ibu)s, | |
| %(target_fg)s, | |
| %(target_og)s, | |
| %(ebc)s, | |
| %(srm)s, | |
| %(ph)s, | |
| %(attenuation_level)s, | |
| %(brewers_tips)s, | |
| %(contributed_by)s, | |
| %(volume)s | |
| ); | |
| """, ({ | |
| **beer, | |
| 'first_brewed': parse_first_brewed(beer['first_brewed']), | |
| 'volume': beer['volume']['value'], | |
| } for beer in beers)) | |
| # http://initd.org/psycopg/docs/extras.html#psycopg2.extras.execute_batch | |
| import psycopg2.extras | |
| @profile | |
| def insert_execute_batch(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None: | |
| with connection.cursor() as cursor: | |
| create_staging_table(cursor) | |
| all_beers = [{ | |
| **beer, | |
| 'first_brewed': parse_first_brewed(beer['first_brewed']), | |
| 'volume': beer['volume']['value'], | |
| } for beer in beers] | |
| psycopg2.extras.execute_batch(cursor, """ | |
| INSERT INTO staging_beers VALUES ( | |
| %(id)s, | |
| %(name)s, | |
| %(tagline)s, | |
| %(first_brewed)s, | |
| %(description)s, | |
| %(image_url)s, | |
| %(abv)s, | |
| %(ibu)s, | |
| %(target_fg)s, | |
| %(target_og)s, | |
| %(ebc)s, | |
| %(srm)s, | |
| %(ph)s, | |
| %(attenuation_level)s, | |
| %(brewers_tips)s, | |
| %(contributed_by)s, | |
| %(volume)s | |
| ); | |
| """, all_beers, page_size=page_size) | |
| @profile | |
| def insert_execute_batch_iterator(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None: | |
| with connection.cursor() as cursor: | |
| create_staging_table(cursor) | |
| iter_beers = ({ | |
| **beer, | |
| 'first_brewed': parse_first_brewed(beer['first_brewed']), | |
| 'volume': beer['volume']['value'], | |
| } for beer in beers) | |
| psycopg2.extras.execute_batch(cursor, """ | |
| INSERT INTO staging_beers VALUES ( | |
| %(id)s, | |
| %(name)s, | |
| %(tagline)s, | |
| %(first_brewed)s, | |
| %(description)s, | |
| %(image_url)s, | |
| %(abv)s, | |
| %(ibu)s, | |
| %(target_fg)s, | |
| %(target_og)s, | |
| %(ebc)s, | |
| %(srm)s, | |
| %(ph)s, | |
| %(attenuation_level)s, | |
| %(brewers_tips)s, | |
| %(contributed_by)s, | |
| %(volume)s | |
| ); | |
| """, iter_beers, page_size=page_size) | |
| # http://initd.org/psycopg/docs/extras.html#psycopg2.extras.execute_values | |
| import psycopg2.extras | |
| @profile | |
| def insert_execute_values(connection, beers: Iterator[Dict[str, Any]]) -> None: | |
| with connection.cursor() as cursor: | |
| create_staging_table(cursor) | |
| psycopg2.extras.execute_values(cursor, """ | |
| INSERT INTO staging_beers VALUES %s; | |
| """, [( | |
| beer['id'], | |
| beer['name'], | |
| beer['tagline'], | |
| parse_first_brewed(beer['first_brewed']), | |
| beer['description'], | |
| beer['image_url'], | |
| beer['abv'], | |
| beer['ibu'], | |
| beer['target_fg'], | |
| beer['target_og'], | |
| beer['ebc'], | |
| beer['srm'], | |
| beer['ph'], | |
| beer['attenuation_level'], | |
| beer['brewers_tips'], | |
| beer['contributed_by'], | |
| beer['volume']['value'], | |
| ) for beer in beers]) | |
| @profile | |
| def insert_execute_values_iterator(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None: | |
| with connection.cursor() as cursor: | |
| create_staging_table(cursor) | |
| psycopg2.extras.execute_values(cursor, """ | |
| INSERT INTO staging_beers VALUES %s; | |
| """, (( | |
| beer['id'], | |
| beer['name'], | |
| beer['tagline'], | |
| parse_first_brewed(beer['first_brewed']), | |
| beer['description'], | |
| beer['image_url'], | |
| beer['abv'], | |
| beer['ibu'], | |
| beer['target_fg'], | |
| beer['target_og'], | |
| beer['ebc'], | |
| beer['srm'], | |
| beer['ph'], | |
| beer['attenuation_level'], | |
| beer['brewers_tips'], | |
| beer['contributed_by'], | |
| beer['volume']['value'], | |
| ) for beer in beers), page_size=page_size) | |
| # http://initd.org/psycopg/docs/cursor.html#cursor.copy_from | |
| # https://docs.python.org/3.7/library/io.html?io.StringIO#io.StringIO | |
| import io | |
| def clean_csv_value(value: Optional[Any]) -> str: | |
| if value is None: | |
| return r'\N' | |
| return str(value).replace('\n', '\\n') | |
| @profile | |
| def copy_stringio(connection, beers: Iterator[Dict[str, Any]]) -> None: | |
| with connection.cursor() as cursor: | |
| create_staging_table(cursor) | |
| csv_file_like_object = io.StringIO() | |
| for beer in beers: | |
| csv_file_like_object.write('|'.join(map(clean_csv_value, ( | |
| beer['id'], | |
| beer['name'], | |
| beer['tagline'], | |
| parse_first_brewed(beer['first_brewed']), | |
| beer['description'], | |
| beer['image_url'], | |
| beer['abv'], | |
| beer['ibu'], | |
| beer['target_fg'], | |
| beer['target_og'], | |
| beer['ebc'], | |
| beer['srm'], | |
| beer['ph'], | |
| beer['attenuation_level'], | |
| beer['contributed_by'], | |
| beer['brewers_tips'], | |
| beer['volume']['value'], | |
| ))) + '\n') | |
| csv_file_like_object.seek(0) | |
| cursor.copy_from(csv_file_like_object, 'staging_beers', sep='|') | |
| class StringIteratorIO(io.TextIOBase): | |
| def __init__(self, iter: Iterator[str]): | |
| self._iter = iter | |
| self._buff = '' | |
| def readable(self) -> bool: | |
| return True | |
| def _read1(self, n: Optional[int] = None) -> str: | |
| while not self._buff: | |
| try: | |
| self._buff = next(self._iter) | |
| except StopIteration: | |
| break | |
| ret = self._buff[:n] | |
| self._buff = self._buff[len(ret):] | |
| return ret | |
| def read(self, n: Optional[int] = None) -> str: | |
| line = [] | |
| if n is None or n < 0: | |
| while True: | |
| m = self._read1() | |
| if not m: | |
| break | |
| line.append(m) | |
| else: | |
| while n > 0: | |
| m = self._read1(n) | |
| if not m: | |
| break | |
| n -= len(m) | |
| line.append(m) | |
| return ''.join(line) | |
| @profile | |
| def copy_string_iterator(connection, beers: Iterator[Dict[str, Any]], size: int = 8192) -> None: | |
| with connection.cursor() as cursor: | |
| create_staging_table(cursor) | |
| beers_string_iterator = StringIteratorIO(( | |
| '|'.join(map(clean_csv_value, ( | |
| beer['id'], | |
| beer['name'], | |
| beer['tagline'], | |
| parse_first_brewed(beer['first_brewed']).isoformat(), | |
| beer['description'], | |
| beer['image_url'], | |
| beer['abv'], | |
| beer['ibu'], | |
| beer['target_fg'], | |
| beer['target_og'], | |
| beer['ebc'], | |
| beer['srm'], | |
| beer['ph'], | |
| beer['attenuation_level'], | |
| beer['brewers_tips'], | |
| beer['contributed_by'], | |
| beer['volume']['value'], | |
| ))) + '\n' | |
| for beer in beers | |
| )) | |
| cursor.copy_from(beers_string_iterator, 'staging_beers', sep='|', size=size) | |
| #------------------------ Benchmark | |
| connection = psycopg2.connect( | |
| host='localhost', | |
| database='testload', | |
| user='haki', | |
| password=None, | |
| ) | |
| connection.set_session(autocommit=True) | |
| import psycopg2.extras | |
| def test(connection, n: int): | |
| # Make sure the data was loaded | |
| with connection.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor) as cursor: | |
| # Test number of rows. | |
| cursor.execute('SELECT COUNT(*) AS cnt FROM staging_beers') | |
| record = cursor.fetchone() | |
| assert record.cnt == n, f'Expected {n} rows, got {record.cnt} rows!' | |
| # Test that the data was loaded, and that transformations were applied correctly. | |
| cursor.execute(""" | |
| SELECT DISTINCT ON (id) | |
| * | |
| FROM | |
| staging_beers | |
| WHERE | |
| id IN (1, 235) | |
| ORDER BY | |
| id; | |
| """) | |
| beer_1 = cursor.fetchone() | |
| assert beer_1.name == 'Buzz' | |
| assert beer_1.first_brewed == datetime.date(2007, 9, 1) | |
| assert beer_1.volume == 20 | |
| beer_235 = cursor.fetchone() | |
| assert beer_235.name == 'Mango And Chili Barley Wine' | |
| assert beer_235.first_brewed == datetime.date(2016, 1, 1) | |
| assert beer_235.volume == 20 | |
| beers = list(iter_beers_from_api()) * 100 | |
| insert_one_by_one(connection, beers) | |
| test(connection, len(beers)) | |
| insert_executemany(connection, beers) | |
| test(connection, len(beers)) | |
| insert_executemany_iterator(connection, beers) | |
| test(connection, len(beers)) | |
| insert_execute_batch(connection, beers) | |
| test(connection, len(beers)) | |
| insert_execute_batch_iterator(connection, beers, page_size=1) | |
| test(connection, len(beers)) | |
| insert_execute_batch_iterator(connection, beers, page_size=100) | |
| test(connection, len(beers)) | |
| insert_execute_batch_iterator(connection, beers, page_size=1000) | |
| test(connection, len(beers)) | |
| insert_execute_batch_iterator(connection, beers, page_size=10000) | |
| test(connection, len(beers)) | |
| insert_execute_values(connection, beers) | |
| test(connection, len(beers)) | |
| insert_execute_values_iterator(connection, beers, page_size=1) | |
| test(connection, len(beers)) | |
| insert_execute_values_iterator(connection, beers, page_size=100) | |
| test(connection, len(beers)) | |
| insert_execute_values_iterator(connection, beers, page_size=1000) | |
| test(connection, len(beers)) | |
| insert_execute_values_iterator(connection, beers, page_size=10000) | |
| test(connection, len(beers)) | |
| copy_stringio(connection, beers) | |
| test(connection, len(beers)) | |
| copy_string_iterator(connection, beers, size=1024) | |
| test(connection, len(beers)) | |
| copy_string_iterator(connection, beers, size=1024 * 8) | |
| test(connection, len(beers)) | |
| copy_string_iterator(connection, beers, size=1024 * 16) | |
| test(connection, len(beers)) | |
| copy_string_iterator(connection, beers, size=1024 * 64) | |
| test(connection, len(beers)) |
Thank you for the quick reply. I actually missed that.
Now with your post I'm learning how to optimize insert and some Python orthography at the same time :)
This is wonderful. I am eternally grateful for the efforts you have put in.
I am however facing a problem.
I am copying a 25MB-ish CSV file from an s3 bucket to a Postgres database. Everything in the code is the same except for the part where I convert the byte data (that comes from s3 response [StreamingResponseBody]) to a utf-8 string data inside the _read1() function. And the copy is extremely slow. The 25mb-ish file takes more than 2 secs to copy.
def _read1(self, n=None):
while not self._buff:
try:
self._buff = next(self._iter)
self._buff = self._buff.decode("utf-8") # <--------this line here
except StopIteration:
break
ret = self._buff[:n]
self._buff = self._buff[len(ret):]
return ret
Do you think this is causing the issue? Also is there an alternative to what I am using?
Try using TextIOWrapper.
I used the StringIteratorIO to load data from csv into postgreSQL using the copy_from method available in psycopg2. It is quite fast. With my infrastructure, I was able to load 32 million rows in less than 8 minutes. That was quite impressive. The only observation that I had with it was the buffer/cache went up and it remained high even after the program completed execution. Any suggestion to avoid it?
This is amazing. Thank you. The table parameter in copy_from does not accept 'schema.table' reference, therefore I had to use copy_expert. The code below works -- I did not test for memory and speed. The dataframe columns/data/dtypes match the table so I do not specify columns clearly.
def insert_with_string_io(df: pd.DataFrame):
buffer = io.StringIO()
df.to_csv(buffer, index=False, header=False)
buffer.seek(0)
with conn.cursor() as cursor:
try:
cursor.copy_expert(f"COPY <database>.<schema>.<table> FROM STDIN (FORMAT 'csv', HEADER false)" , buffer)
except (Exception, psycopg2.DatabaseError) as error:
print("Error: %s" % error)
It's subtle difference indeed. The function
insert_execute_batchfirst evaluates the entire list:Notice how
all_beersis a list, not a generator.The function
insert_execute_batch_iteratoris using a generator instead:Notice that
iter_beersis using a round brackets, so the iterator is not evaluated immediately which should reduce the memory footprint.