Skip to content

Instantly share code, notes, and snippets.

@renthraysk
Last active March 23, 2024 15:48
Show Gist options
  • Select an option

  • Save renthraysk/80d5ed7135671e1af5e6af3e2bd3ddce to your computer and use it in GitHub Desktop.

Select an option

Save renthraysk/80d5ed7135671e1af5e6af3e2bd3ddce to your computer and use it in GitHub Desktop.

Revisions

  1. renthraysk revised this gist Mar 23, 2024. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion 1brc.go
    Original file line number Diff line number Diff line change
    @@ -158,7 +158,7 @@ func (s stats) processLines(lines []byte) {
    continue
    }
    if len(rows) == 0 {
    rows = make([]row, 4<<10)
    rows = make([]row, 64)
    }
    r := &rows[0]
    rows = rows[1:]
  2. renthraysk revised this gist Mar 23, 2024. 1 changed file with 2 additions and 2 deletions.
    4 changes: 2 additions & 2 deletions 1brc.go
    Original file line number Diff line number Diff line change
    @@ -70,7 +70,7 @@ func processFile(name string) (stats, error) {
    }
    defer f.Close()

    return processReader(f, 4*1024*1024, 3)
    return processReader(f, 4*1024*1024, 4)
    }

    func processReader(r io.Reader, chunkSize, workerCount int) (stats, error) {
    @@ -88,7 +88,7 @@ func processReader(r io.Reader, chunkSize, workerCount int) (stats, error) {

    freeCh <- make([]byte, chunkSize)

    s := make(stats, 32<<10)
    s := make(stats, 1<<10)
    for lines := range workCh {
    s.processLines(lines)
    freeCh <- lines
  3. renthraysk created this gist Mar 18, 2024.
    244 changes: 244 additions & 0 deletions 1brc.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,244 @@
    package main

    import (
    "bytes"
    "errors"
    "flag"
    "io"
    "log"
    "os"
    "sort"
    "sync"
    )

    type row struct {
    name string
    sum int64
    count uint32
    min int16
    max int16
    }

    func (r *row) appendLen() int { return len(r.name) + len("=-00.0/-00.0/-00.0, ") }

    func (r *row) append(b []byte) []byte {
    b = append(b, r.name...)
    b = append(b, '=')
    b = appendFixedFloat(b, r.min)
    b = append(b, '/')
    b = appendFixedFloat(b, r.sum/int64(r.count))
    b = append(b, '/')
    b = appendFixedFloat(b, r.max)
    return append(b, ',', ' ')
    }

    func appendFixedFloat[T int16 | int64](p []byte, i T) []byte {
    if i < 0 {
    p = append(p, '-')
    i = -i
    }
    if i <= 9 {
    return append(p, '0', '.', byte(i)+'0')
    }
    j := i / 10
    if j <= 9 {
    return append(p, byte(j)+'0', '.', byte(i-j*10)+'0')
    }
    k := j / 10
    return append(p, byte(k)+'0', byte(j-k*10)+'0', '.', byte(i-j*10+'0'))
    }

    func main() {
    var name string

    fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
    fs.StringVar(&name, "input", "/data/measurements.txt", "file to load")
    if err := fs.Parse(os.Args[1:]); err != nil {
    panic("unreachable")
    }
    s, err := processFile(name)
    if err != nil {
    log.Fatalf("failed to process %q: %v", name, err)
    }
    s.writeResults(os.Stdout)
    }

    func processFile(name string) (stats, error) {
    f, err := os.Open(name)
    if err != nil {
    return nil, err
    }
    defer f.Close()

    return processReader(f, 4*1024*1024, 3)
    }

    func processReader(r io.Reader, chunkSize, workerCount int) (stats, error) {

    var wg sync.WaitGroup

    workCh := make(chan []byte, workerCount)
    freeCh := make(chan []byte, 1+workerCount)
    statsCh := make(chan stats, workerCount)

    for range workerCount {
    wg.Add(1)
    go func() {
    defer wg.Done()

    freeCh <- make([]byte, chunkSize)

    s := make(stats, 32<<10)
    for lines := range workCh {
    s.processLines(lines)
    freeCh <- lines
    }
    statsCh <- s
    }()
    }

    buf := make([]byte, chunkSize)
    n, err := r.Read(buf)
    for n > 0 {
    lines, broken := cutAfterLast(buf[:n], '\n')

    buf = <-freeCh
    buf = append(buf[:0], broken...)
    workCh <- lines

    n, err = r.Read(buf[len(buf):cap(buf)])
    n += len(buf)
    }

    close(workCh)
    wg.Wait()
    close(freeCh)
    close(statsCh)

    if err != nil {
    if !errors.Is(err, io.EOF) {
    return nil, err
    }
    err = nil
    }

    s := <-statsCh
    for m := range statsCh {
    for name, row := range m {
    if r, ok := s[name]; ok {
    r.sum += row.sum
    r.count += row.count
    r.min = min(r.min, row.min)
    r.max = max(r.max, row.max)
    continue
    }
    s[name] = row
    }
    }

    return s, err
    }

    type stats map[string]*row

    func (s stats) processLines(lines []byte) {
    var city, value []byte
    var rows []row

    for len(lines) > 0 {
    city, lines = cut(lines, ';')
    value, lines = cut(lines, '\n')

    t := parseTemp(value)
    if r, ok := s[string(city)]; ok {
    r.sum += int64(t)
    r.count++
    r.min = min(r.min, t)
    r.max = max(r.max, t)
    continue
    }
    if len(rows) == 0 {
    rows = make([]row, 4<<10)
    }
    r := &rows[0]
    rows = rows[1:]
    r.name = string(city)
    r.sum = int64(t)
    r.count = 1
    r.min = t
    r.max = t
    s[r.name] = r
    }
    }

    func (s stats) writeResults(w io.Writer) error {
    ordered := make([]*row, len(s))
    i := 0
    for _, r := range s {
    ordered[i] = r
    i++
    }

    sort.Slice(ordered, func(i, j int) bool { return ordered[i].name < ordered[j].name })

    b := make([]byte, 0, 8<<10)
    b = append(b, '{')
    for _, r := range ordered {
    if r.appendLen() > cap(b)-len(b) {
    // Flush
    n, err := w.Write(b)
    if err != nil {
    return err
    }
    b = append(b[:0], b[n:]...)
    }
    b = r.append(b)
    }
    b = append(b, '}', '\n')
    _, err := w.Write(b)
    return err
    }

    func parseTemp(b []byte) int16 {
    if len(b) == 0 {
    return 0
    }
    if b[0] == '-' {
    return -parseNonNegative(b[1:])
    }
    return parseNonNegative(b)
    }

    func parseNonNegative(b []byte) int16 {
    var i, x int

    for ; i < len(b) && b[i]-'0' <= 9; i++ {
    x += int(b[i] - '0')
    x *= 10
    }
    if i < len(b) {
    b = b[i:]
    if len(b) >= 2 && b[0] == '.' {
    if d := b[1] - '0'; d <= 9 {
    x += int(d)
    }
    }
    }
    return int16(x)
    }

    func cut(s []byte, c byte) (prefix []byte, suffix []byte) {
    if i := bytes.IndexByte(s, c); i >= 0 {
    return s[:i], s[i+1:]
    }
    return s, nil
    }

    // cutAfterLast returns s split into two, after the last location of byte c
    func cutAfterLast(b []byte, c byte) ([]byte, []byte) {
    if i := bytes.LastIndexByte(b, c); i >= 0 {
    i++
    return b[:i], b[i:]
    }
    return nil, b
    }