Last active
December 22, 2023 03:22
-
-
Save ajoshi31/6d9f827fac4198f6102dea6090fc833c to your computer and use it in GitHub Desktop.
Python read csv and post url
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import csv | |
| import aiohttp | |
| import asyncio | |
| from concurrent.futures import ThreadPoolExecutor | |
| import time | |
| from datetime import datetime | |
| async def process_row(session, sem, executor, row): | |
| url = 'https://your_domain_url' | |
| payload = { | |
| 'name': row['name'], | |
| 'email': row['email'] | |
| } | |
| async with sem: | |
| try: | |
| async with session.get(url) as response: | |
| response.raise_for_status() | |
| result = await response.json() | |
| return result | |
| except aiohttp.ClientResponseError as e: | |
| print(f"HTTP error {e.status} occurred for row {row}: {e.message}") | |
| except aiohttp.ClientConnectionError as e: | |
| print(f"Connection error occurred for row {row}: {e}") | |
| except Exception as e: | |
| print(f"An unexpected error occurred for row {row}: {e}") | |
| async def process_batch_async(sem, session, executor, rows): | |
| tasks = [process_row(session, sem, executor, row) for row in rows] | |
| return await asyncio.gather(*tasks) | |
| async def main_async(): | |
| csv_file_path = 'test.csv' # with 300k rows | |
| batch_size = 75 | |
| num_workers = 12 | |
| max_concurrent_connections = 1000 # Adjust this based on your system's capacity | |
| print(f"Reading CSV file: {csv_file_path}") | |
| with open(csv_file_path, 'r') as file: | |
| csv_reader = csv.DictReader(file) | |
| rows = [row for row in csv_reader] | |
| print(f"Total rows in CSV: {len(rows)}") | |
| batches = [rows[i:i + batch_size] for i in range(0, len(rows), batch_size)] | |
| results = [] | |
| with ThreadPoolExecutor(max_workers=num_workers) as executor: | |
| loop = asyncio.get_event_loop() | |
| async with aiohttp.ClientSession() as session: | |
| sem = asyncio.Semaphore(max_concurrent_connections) | |
| tasks = [loop.create_task(process_batch_async(sem, session, executor, batch)) for batch in batches] | |
| results = await asyncio.gather(*tasks) | |
| for batch_result in results: | |
| for result in batch_result: | |
| pass | |
| if __name__ == '__main__': | |
| # Measure start time | |
| start_time2 = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| print(f"Start Time: {start_time2}") | |
| start_time = time.time() | |
| # Run the asynchronous main function | |
| asyncio.run(main_async()) | |
| # Measure end time | |
| end_time = time.time() | |
| elapsed_time = end_time - start_time | |
| print(f"Total processing time: {elapsed_time} seconds") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment