Skip to content

Instantly share code, notes, and snippets.

@willnode
Created December 29, 2024 09:32
Show Gist options
  • Save willnode/75a96840ff33ada3f2aa3db3d28cde07 to your computer and use it in GitHub Desktop.
Save willnode/75a96840ff33ada3f2aa3db3d28cde07 to your computer and use it in GitHub Desktop.

Revisions

  1. willnode created this gist Dec 29, 2024.
    144 changes: 144 additions & 0 deletions readcsvinzip.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,144 @@
    package main

    import (
    "archive/zip"
    "bytes"
    "context"
    "errors"
    "fmt"
    "io"
    "path/filepath"
    "sync"

    "cloud.google.com/go/storage"
    "github.com/gocarina/gocsv"
    "github.com/google/uuid"
    "gorm.io/gorm"
    )

    type GcpStorage struct {
    client *storage.Client
    }

    func (s *GcpStorage) Read(bucket string, object string) (io.ReadCloser, error) {
    rc, err := s.client.Bucket(bucket).Object(object).NewReader(context.Background())
    if err != nil {
    return nil, err
    }
    return io.ReadCloser(rc), nil
    }

    func (s *GcpStorage) ReadFileCsvInZipStream(reader io.ReadCloser) (*zip.File, error) {
    defer reader.Close()
    // this reads the whole zip file! (~100 MB allocation)
    fileBytes, err := io.ReadAll(reader)
    if err != nil {
    return nil, err
    }

    // these readers don't need Close() since the data lives on memory
    byteReader := bytes.NewReader(fileBytes)
    zipReader, err := zip.NewReader(byteReader, int64(len(fileBytes)))
    if err != nil {
    return nil, err
    }
    for _, f := range zipReader.File {
    if filepath.Ext(f.Name) == ".csv" {
    f.Open()
    return f, nil
    }
    }

    return nil, errors.New("CSV File not found")
    }

    type SimulationData struct {
    ID uuid.UUID `gorm:"type:uuid;column:id;primaryKey" json:"id"`
    Name string `gorm:"type:text;column:name" json:"name" csv:"Name"`
    Metadata string `gorm:"type:text;column:metadata" json:"metadata" csv:"Metadata"`
    Properties string `gorm:"type:text;column:properties" json:"properties" csv:"Properties"`
    NthIndex int `gorm:"type:int;column:nth_index" json:"nth_index"`
    }

    func ReadCsvStreamed(reader io.ReadCloser, payloadChan chan []SimulationData) error {
    defer reader.Close()
    var csvRowChan = make(chan SimulationData)

    // this function runs on separate thread
    go func() {
    // we close it here since we send the channel data from here
    defer close(payloadChan)
    rows := make([]SimulationData, 0)
    index := 0
    for row := range csvRowChan {
    input := row
    input.ID = uuid.New()
    input.NthIndex = index
    index += 1
    rows = append(rows, input)
    // when current batch is over 100
    if len(rows) >= 100 {
    payloadChan <- rows
    rows = make([]SimulationData, 0)
    }
    }
    // send remaining data if any
    if len(rows) > 0 {
    payloadChan <- rows
    }
    }()

    // csvRowChan is closed inside this function
    if err := gocsv.UnmarshalToChan(reader, csvRowChan); err != nil {
    return err
    }
    return nil
    }

    func Handler(storage *GcpStorage, db *gorm.DB, wg *sync.WaitGroup, filename string) (err error) {
    zipReader, err := storage.Read("mybucket", filename)
    if err != nil {
    return
    }
    // zipReader is closed here
    zipHandle, err := storage.ReadFileCsvInZipStream(zipReader)
    if err != nil {
    return
    }
    fileReader, err := zipHandle.Open()
    if err != nil {
    return
    }

    pchan := make(chan []SimulationData)
    wg.Add(1)
    go func() {
    for p := range pchan {
    if err := db.Create(p).Error; err != nil {
    fmt.Printf("%+v", err)
    }
    }
    wg.Done()
    }()

    // pChan and fileReader is closed here
    err = ReadCsvStreamed(fileReader, pchan)
    return
    }

    func main() {
    storage := GcpStorage{}
    db := gorm.DB{}
    wg := sync.WaitGroup{}
    // init storage and db from envar
    // ....

    // normally you place the Handler inside HTTP function
    // but for brevity we call this directly
    if err := Handler(&storage, &db, &wg, "myfile.csv.zip"); err != nil {
    panic(err)
    }
    fmt.Println("All CSV data is parsed")
    wg.Wait()
    fmt.Println("All CSV data is saved")
    }