Skip to content

Instantly share code, notes, and snippets.

@ajoshi31
Last active December 22, 2023 03:22
Show Gist options
  • Select an option

  • Save ajoshi31/6d9f827fac4198f6102dea6090fc833c to your computer and use it in GitHub Desktop.

Select an option

Save ajoshi31/6d9f827fac4198f6102dea6090fc833c to your computer and use it in GitHub Desktop.
Python read csv and post url
import csv
import aiohttp
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
from datetime import datetime
async def process_row(session, sem, executor, row):
url = 'https://your_domain_url'
payload = {
'name': row['name'],
'email': row['email']
}
async with sem:
try:
async with session.get(url) as response:
response.raise_for_status()
result = await response.json()
return result
except aiohttp.ClientResponseError as e:
print(f"HTTP error {e.status} occurred for row {row}: {e.message}")
except aiohttp.ClientConnectionError as e:
print(f"Connection error occurred for row {row}: {e}")
except Exception as e:
print(f"An unexpected error occurred for row {row}: {e}")
async def process_batch_async(sem, session, executor, rows):
tasks = [process_row(session, sem, executor, row) for row in rows]
return await asyncio.gather(*tasks)
async def main_async():
csv_file_path = 'test.csv' # with 300k rows
batch_size = 75
num_workers = 12
max_concurrent_connections = 1000 # Adjust this based on your system's capacity
print(f"Reading CSV file: {csv_file_path}")
with open(csv_file_path, 'r') as file:
csv_reader = csv.DictReader(file)
rows = [row for row in csv_reader]
print(f"Total rows in CSV: {len(rows)}")
batches = [rows[i:i + batch_size] for i in range(0, len(rows), batch_size)]
results = []
with ThreadPoolExecutor(max_workers=num_workers) as executor:
loop = asyncio.get_event_loop()
async with aiohttp.ClientSession() as session:
sem = asyncio.Semaphore(max_concurrent_connections)
tasks = [loop.create_task(process_batch_async(sem, session, executor, batch)) for batch in batches]
results = await asyncio.gather(*tasks)
for batch_result in results:
for result in batch_result:
pass
if __name__ == '__main__':
# Measure start time
start_time2 = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"Start Time: {start_time2}")
start_time = time.time()
# Run the asynchronous main function
asyncio.run(main_async())
# Measure end time
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Total processing time: {elapsed_time} seconds")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment