Skip to content

Instantly share code, notes, and snippets.

@phucnguyenvn
Created January 14, 2020 07:32
Show Gist options
  • Select an option

  • Save phucnguyenvn/a96f6f553bb2a7e35f7d77a0da06c21a to your computer and use it in GitHub Desktop.

Select an option

Save phucnguyenvn/a96f6f553bb2a7e35f7d77a0da06c21a to your computer and use it in GitHub Desktop.

Revisions

  1. phucnguyenvn created this gist Jan 14, 2020.
    30 changes: 30 additions & 0 deletions pool.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,30 @@
    # Read CSV file and convert to parquet, upload to S3
    def csv_manipulation(self, merchant: str, directory: str):
    temp = []
    schema = pyarrow.schema(self.get_schema())
    p = Pool(processes=self.limit_process)

    for f in os.listdir(directory):
    filepath = os.path.join(directory, f)
    try:
    with open(filepath, 'r', encoding='ISO-8859-1') as csvfile:
    reader = csv.reader(csvfile)
    for row in reader:
    if len(temp) < self.chunk_size:
    temp.append(self.calculate_stat(row))
    else:
    p.apply_async(self.upload, args=(temp, merchant, schema))
    temp = []
    temp.append(self.calculate_stat(row))
    except csv.Error:
    # skip error when CSV is empty
    pass


    if len(temp) > 0:
    p.apply_async(self.upload, args=(temp, merchant, schema))
    temp = []

    p.close()
    p.join()
    return