Skip to content

Instantly share code, notes, and snippets.

@phucnguyenvn
Created January 14, 2020 07:32
Show Gist options
  • Save phucnguyenvn/a96f6f553bb2a7e35f7d77a0da06c21a to your computer and use it in GitHub Desktop.
Save phucnguyenvn/a96f6f553bb2a7e35f7d77a0da06c21a to your computer and use it in GitHub Desktop.
Python multiprocessing with pool
# Read CSV file and convert to parquet, upload to S3
def csv_manipulation(self, merchant: str, directory: str):
temp = []
schema = pyarrow.schema(self.get_schema())
p = Pool(processes=self.limit_process)
for f in os.listdir(directory):
filepath = os.path.join(directory, f)
try:
with open(filepath, 'r', encoding='ISO-8859-1') as csvfile:
reader = csv.reader(csvfile)
for row in reader:
if len(temp) < self.chunk_size:
temp.append(self.calculate_stat(row))
else:
p.apply_async(self.upload, args=(temp, merchant, schema))
temp = []
temp.append(self.calculate_stat(row))
except csv.Error:
# skip error when CSV is empty
pass
if len(temp) > 0:
p.apply_async(self.upload, args=(temp, merchant, schema))
temp = []
p.close()
p.join()
return
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment