Last active
March 23, 2024 15:48
-
-
Save renthraysk/80d5ed7135671e1af5e6af3e2bd3ddce to your computer and use it in GitHub Desktop.
Revisions
-
renthraysk revised this gist
Mar 23, 2024 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -158,7 +158,7 @@ func (s stats) processLines(lines []byte) { continue } if len(rows) == 0 { rows = make([]row, 64) } r := &rows[0] rows = rows[1:] -
renthraysk revised this gist
Mar 23, 2024 . 1 changed file with 2 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -70,7 +70,7 @@ func processFile(name string) (stats, error) { } defer f.Close() return processReader(f, 4*1024*1024, 4) } func processReader(r io.Reader, chunkSize, workerCount int) (stats, error) { @@ -88,7 +88,7 @@ func processReader(r io.Reader, chunkSize, workerCount int) (stats, error) { freeCh <- make([]byte, chunkSize) s := make(stats, 1<<10) for lines := range workCh { s.processLines(lines) freeCh <- lines -
renthraysk created this gist
Mar 18, 2024 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,244 @@ package main import ( "bytes" "errors" "flag" "io" "log" "os" "sort" "sync" ) type row struct { name string sum int64 count uint32 min int16 max int16 } func (r *row) appendLen() int { return len(r.name) + len("=-00.0/-00.0/-00.0, ") } func (r *row) append(b []byte) []byte { b = append(b, r.name...) b = append(b, '=') b = appendFixedFloat(b, r.min) b = append(b, '/') b = appendFixedFloat(b, r.sum/int64(r.count)) b = append(b, '/') b = appendFixedFloat(b, r.max) return append(b, ',', ' ') } func appendFixedFloat[T int16 | int64](p []byte, i T) []byte { if i < 0 { p = append(p, '-') i = -i } if i <= 9 { return append(p, '0', '.', byte(i)+'0') } j := i / 10 if j <= 9 { return append(p, byte(j)+'0', '.', byte(i-j*10)+'0') } k := j / 10 return append(p, byte(k)+'0', byte(j-k*10)+'0', '.', byte(i-j*10+'0')) } func main() { var name string fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError) fs.StringVar(&name, "input", "/data/measurements.txt", "file to load") if err := fs.Parse(os.Args[1:]); err != nil { panic("unreachable") } s, err := processFile(name) if err != nil { log.Fatalf("failed to process %q: %v", name, err) } s.writeResults(os.Stdout) } func processFile(name string) (stats, error) { f, err := os.Open(name) if err != nil { return nil, err } defer f.Close() return processReader(f, 4*1024*1024, 3) } func processReader(r io.Reader, chunkSize, workerCount int) (stats, error) { var wg sync.WaitGroup workCh := make(chan []byte, workerCount) freeCh := make(chan []byte, 1+workerCount) statsCh := make(chan stats, workerCount) for range workerCount { wg.Add(1) go func() { defer wg.Done() freeCh <- make([]byte, chunkSize) s := make(stats, 32<<10) for lines := range workCh { s.processLines(lines) freeCh <- lines } statsCh <- s }() } buf := make([]byte, chunkSize) n, err := r.Read(buf) for n > 0 { lines, broken := cutAfterLast(buf[:n], '\n') buf = <-freeCh buf = append(buf[:0], broken...) workCh <- lines n, err = r.Read(buf[len(buf):cap(buf)]) n += len(buf) } close(workCh) wg.Wait() close(freeCh) close(statsCh) if err != nil { if !errors.Is(err, io.EOF) { return nil, err } err = nil } s := <-statsCh for m := range statsCh { for name, row := range m { if r, ok := s[name]; ok { r.sum += row.sum r.count += row.count r.min = min(r.min, row.min) r.max = max(r.max, row.max) continue } s[name] = row } } return s, err } type stats map[string]*row func (s stats) processLines(lines []byte) { var city, value []byte var rows []row for len(lines) > 0 { city, lines = cut(lines, ';') value, lines = cut(lines, '\n') t := parseTemp(value) if r, ok := s[string(city)]; ok { r.sum += int64(t) r.count++ r.min = min(r.min, t) r.max = max(r.max, t) continue } if len(rows) == 0 { rows = make([]row, 4<<10) } r := &rows[0] rows = rows[1:] r.name = string(city) r.sum = int64(t) r.count = 1 r.min = t r.max = t s[r.name] = r } } func (s stats) writeResults(w io.Writer) error { ordered := make([]*row, len(s)) i := 0 for _, r := range s { ordered[i] = r i++ } sort.Slice(ordered, func(i, j int) bool { return ordered[i].name < ordered[j].name }) b := make([]byte, 0, 8<<10) b = append(b, '{') for _, r := range ordered { if r.appendLen() > cap(b)-len(b) { // Flush n, err := w.Write(b) if err != nil { return err } b = append(b[:0], b[n:]...) } b = r.append(b) } b = append(b, '}', '\n') _, err := w.Write(b) return err } func parseTemp(b []byte) int16 { if len(b) == 0 { return 0 } if b[0] == '-' { return -parseNonNegative(b[1:]) } return parseNonNegative(b) } func parseNonNegative(b []byte) int16 { var i, x int for ; i < len(b) && b[i]-'0' <= 9; i++ { x += int(b[i] - '0') x *= 10 } if i < len(b) { b = b[i:] if len(b) >= 2 && b[0] == '.' { if d := b[1] - '0'; d <= 9 { x += int(d) } } } return int16(x) } func cut(s []byte, c byte) (prefix []byte, suffix []byte) { if i := bytes.IndexByte(s, c); i >= 0 { return s[:i], s[i+1:] } return s, nil } // cutAfterLast returns s split into two, after the last location of byte c func cutAfterLast(b []byte, c byte) ([]byte, []byte) { if i := bytes.LastIndexByte(b, c); i >= 0 { i++ return b[:i], b[i:] } return nil, b }