Created
December 29, 2024 09:32
-
-
Save willnode/75a96840ff33ada3f2aa3db3d28cde07 to your computer and use it in GitHub Desktop.
Revisions
-
willnode created this gist
Dec 29, 2024 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,144 @@ package main import ( "archive/zip" "bytes" "context" "errors" "fmt" "io" "path/filepath" "sync" "cloud.google.com/go/storage" "github.com/gocarina/gocsv" "github.com/google/uuid" "gorm.io/gorm" ) type GcpStorage struct { client *storage.Client } func (s *GcpStorage) Read(bucket string, object string) (io.ReadCloser, error) { rc, err := s.client.Bucket(bucket).Object(object).NewReader(context.Background()) if err != nil { return nil, err } return io.ReadCloser(rc), nil } func (s *GcpStorage) ReadFileCsvInZipStream(reader io.ReadCloser) (*zip.File, error) { defer reader.Close() // this reads the whole zip file! (~100 MB allocation) fileBytes, err := io.ReadAll(reader) if err != nil { return nil, err } // these readers don't need Close() since the data lives on memory byteReader := bytes.NewReader(fileBytes) zipReader, err := zip.NewReader(byteReader, int64(len(fileBytes))) if err != nil { return nil, err } for _, f := range zipReader.File { if filepath.Ext(f.Name) == ".csv" { f.Open() return f, nil } } return nil, errors.New("CSV File not found") } type SimulationData struct { ID uuid.UUID `gorm:"type:uuid;column:id;primaryKey" json:"id"` Name string `gorm:"type:text;column:name" json:"name" csv:"Name"` Metadata string `gorm:"type:text;column:metadata" json:"metadata" csv:"Metadata"` Properties string `gorm:"type:text;column:properties" json:"properties" csv:"Properties"` NthIndex int `gorm:"type:int;column:nth_index" json:"nth_index"` } func ReadCsvStreamed(reader io.ReadCloser, payloadChan chan []SimulationData) error { defer reader.Close() var csvRowChan = make(chan SimulationData) // this function runs on separate thread go func() { // we close it here since we send the channel data from here defer close(payloadChan) rows := make([]SimulationData, 0) index := 0 for row := range csvRowChan { input := row input.ID = uuid.New() input.NthIndex = index index += 1 rows = append(rows, input) // when current batch is over 100 if len(rows) >= 100 { payloadChan <- rows rows = make([]SimulationData, 0) } } // send remaining data if any if len(rows) > 0 { payloadChan <- rows } }() // csvRowChan is closed inside this function if err := gocsv.UnmarshalToChan(reader, csvRowChan); err != nil { return err } return nil } func Handler(storage *GcpStorage, db *gorm.DB, wg *sync.WaitGroup, filename string) (err error) { zipReader, err := storage.Read("mybucket", filename) if err != nil { return } // zipReader is closed here zipHandle, err := storage.ReadFileCsvInZipStream(zipReader) if err != nil { return } fileReader, err := zipHandle.Open() if err != nil { return } pchan := make(chan []SimulationData) wg.Add(1) go func() { for p := range pchan { if err := db.Create(p).Error; err != nil { fmt.Printf("%+v", err) } } wg.Done() }() // pChan and fileReader is closed here err = ReadCsvStreamed(fileReader, pchan) return } func main() { storage := GcpStorage{} db := gorm.DB{} wg := sync.WaitGroup{} // init storage and db from envar // .... // normally you place the Handler inside HTTP function // but for brevity we call this directly if err := Handler(&storage, &db, &wg, "myfile.csv.zip"); err != nil { panic(err) } fmt.Println("All CSV data is parsed") wg.Wait() fmt.Println("All CSV data is saved") }