Skip to content

Instantly share code, notes, and snippets.

@mmirolim
Forked from mcastilho/gist:e051898d129b44e2f502
Last active August 29, 2015 14:26
Show Gist options
  • Save mmirolim/509fa3cc0ef8aa6b05f5 to your computer and use it in GitHub Desktop.
Save mmirolim/509fa3cc0ef8aa6b05f5 to your computer and use it in GitHub Desktop.

Revisions

  1. @mcastilho mcastilho revised this gist Jul 13, 2015. 1 changed file with 34 additions and 46 deletions.
    80 changes: 34 additions & 46 deletions gistfile1.go
    Original file line number Diff line number Diff line change
    @@ -17,47 +17,33 @@ const (
    MaxWorkers = 10
    )

    type Request struct {
    Sender string `json:"Sender,omitempty"`
    Trigger string `json:"Trigger,omitempty"`
    }

    type Program struct {
    Base string `json:"Base,omitempty"`
    }

    type Versions struct {
    Program Program `json:"Program,omitempty"`
    }

    type App struct {
    Program string `json:"Program,omitempty"`
    Build string `json:"Build,omitempty"`
    License string `json:"License,omitempty"`
    Versions Versions `json:"Versions,omitempty"`
    }

    type Connection struct {
    Type string `json:"Type,omitempty"`
    }

    type Region struct {
    Continent string `json:"Continent,omitempty"`
    Country string `json:"Country,omitempty"`
    }

    type Client struct {
    OsVersion string `json:"OsVersion,omitempty"`
    Language string `json:"Language,omitempty"`
    Architecture string `json:"Architecture,omitempty"`
    }

    type Telemetry struct {
    Request Request `json:"Request,omitempty"`
    App App `json:"App,omitempty"`
    Connection Connection `json:"Connection,omitempty"`
    Region Region `json:"Region,omitempty"`
    Client Client `json:"Client,omitempty"`
    Request struct {
    Sender string `json:"Sender,omitempty"`
    Trigger string `json:"Trigger,omitempty"`
    } `json:"Request,omitempty"`

    App struct {
    Program string `json:"Program,omitempty"`
    Build string `json:"Build,omitempty"`
    License string `json:"License,omitempty"`
    Version string `json:"Version,omitempty"`
    } `json:"App,omitempty"`

    Connection struct {
    Type string `json:"Type,omitempty"`
    } `json:"Connection,omitempty"`

    Region struct {
    Continent string `json:"Continent,omitempty"`
    Country string `json:"Country,omitempty"`
    } `json:"Region,omitempty"`

    Client struct {
    OsVersion string `json:"OsVersion,omitempty"`
    Language string `json:"Language,omitempty"`
    Architecture string `json:"Architecture,omitempty"`
    } `json:"Client,omitempty"`
    }

    func enumerateFiles(dirname string) chan interface{} {
    @@ -74,7 +60,7 @@ func enumerateFiles(dirname string) chan interface{} {
    return output
    }

    func eachLine(filename string) chan string {
    func enumerateJSON(filename string) chan string {
    output := make(chan string)
    go func() {
    file, err := os.Open(filename)
    @@ -130,7 +116,8 @@ func reducerDispatcher(collector MapperCollector, reducerInput chan interface{})
    func mapper(filename interface{}, output chan interface{}) {
    results := map[Telemetry]int{}

    for line := range eachLine(filename.(string)) {
    // start the enumeration of each JSON lines in the file
    for line := range enumerateJSON(filename.(string)) {

    // decode the telemetry JSON line
    dec := json.NewDecoder(strings.NewReader(line))
    @@ -184,13 +171,13 @@ func mapReduce(mapper MapperFunc, reducer ReducerFunc, input chan interface{}) i
    }

    func main() {

    runtime.GOMAXPROCS(runtime.NumCPU())

    fmt.Println("Processing. Please wait....")

    // start the enumeration of files to be processed into a channel
    input := enumerateFiles(".")

    // this will start the map reduce work
    results := mapReduce(mapper, reducer, input)

    // open output file
    @@ -212,13 +199,15 @@ func main() {
    record = append(record, telemetry.App.Program)
    record = append(record, telemetry.App.Build)
    record = append(record, telemetry.App.License)
    record = append(record, telemetry.App.Versions.Program.Base)
    record = append(record, telemetry.App.Version)
    record = append(record, telemetry.Connection.Type)
    record = append(record, telemetry.Region.Continent)
    record = append(record, telemetry.Region.Country)
    record = append(record, telemetry.Client.OsVersion)
    record = append(record, telemetry.Client.Language)
    record = append(record, telemetry.Client.Architecture)

    // The last field of the CSV line is the aggregate count for each occurrence
    record = append(record, strconv.Itoa(value))

    writer.Write(record)
    @@ -227,5 +216,4 @@ func main() {
    writer.Flush()

    fmt.Println("Done!")

    }
  2. @mcastilho mcastilho created this gist Jul 13, 2015.
    231 changes: 231 additions & 0 deletions gistfile1.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,231 @@
    package main

    import (
    "bufio"
    "encoding/csv"
    "encoding/json"
    "fmt"
    "io"
    "os"
    "path/filepath"
    "runtime"
    "strconv"
    "strings"
    )

    const (
    MaxWorkers = 10
    )

    type Request struct {
    Sender string `json:"Sender,omitempty"`
    Trigger string `json:"Trigger,omitempty"`
    }

    type Program struct {
    Base string `json:"Base,omitempty"`
    }

    type Versions struct {
    Program Program `json:"Program,omitempty"`
    }

    type App struct {
    Program string `json:"Program,omitempty"`
    Build string `json:"Build,omitempty"`
    License string `json:"License,omitempty"`
    Versions Versions `json:"Versions,omitempty"`
    }

    type Connection struct {
    Type string `json:"Type,omitempty"`
    }

    type Region struct {
    Continent string `json:"Continent,omitempty"`
    Country string `json:"Country,omitempty"`
    }

    type Client struct {
    OsVersion string `json:"OsVersion,omitempty"`
    Language string `json:"Language,omitempty"`
    Architecture string `json:"Architecture,omitempty"`
    }

    type Telemetry struct {
    Request Request `json:"Request,omitempty"`
    App App `json:"App,omitempty"`
    Connection Connection `json:"Connection,omitempty"`
    Region Region `json:"Region,omitempty"`
    Client Client `json:"Client,omitempty"`
    }

    func enumerateFiles(dirname string) chan interface{} {
    output := make(chan interface{})
    go func() {
    filepath.Walk(dirname, func(path string, f os.FileInfo, err error) error {
    if !f.IsDir() {
    output <- path
    }
    return nil
    })
    close(output)
    }()
    return output
    }

    func eachLine(filename string) chan string {
    output := make(chan string)
    go func() {
    file, err := os.Open(filename)
    if err != nil {
    return
    }
    defer file.Close()
    reader := bufio.NewReader(file)
    for {
    line, err := reader.ReadString('\n')
    if err == io.EOF {
    break
    }

    // ignore any meta comments on top of JSON file
    if strings.HasPrefix(line, "#") == true {
    continue
    }

    // add each json line to our enumeration channel
    output <- line
    }
    close(output)
    }()
    return output
    }

    // MapperCollector is a channel that collects the output from mapper tasks
    type MapperCollector chan chan interface{}

    // MapperFunc is a function that performs the mapping part of the MapReduce job
    type MapperFunc func(interface{}, chan interface{})

    // ReducerFunc is a function that performs the reduce part of the MapReduce job
    type ReducerFunc func(chan interface{}, chan interface{})

    func mapperDispatcher(mapper MapperFunc, input chan interface{}, collector MapperCollector) {
    for item := range input {
    taskOutput := make(chan interface{})
    go mapper(item, taskOutput)
    collector <- taskOutput
    }
    close(collector)
    }

    func reducerDispatcher(collector MapperCollector, reducerInput chan interface{}) {
    for output := range collector {
    reducerInput <- <-output
    }
    close(reducerInput)
    }

    func mapper(filename interface{}, output chan interface{}) {
    results := map[Telemetry]int{}

    for line := range eachLine(filename.(string)) {

    // decode the telemetry JSON line
    dec := json.NewDecoder(strings.NewReader(line))
    var telemetry Telemetry

    // if line cannot be JSON decoded then skip to next one
    if err := dec.Decode(&telemetry); err == io.EOF {
    continue
    } else if err != nil {
    continue
    }

    // stores Telemetry structure in the mapper results dictionary
    previousCount, exists := results[telemetry]
    if !exists {
    results[telemetry] = 1
    } else {
    results[telemetry] = previousCount + 1
    }
    }

    output <- results
    }

    func reducer(input chan interface{}, output chan interface{}) {
    results := map[Telemetry]int{}
    for matches := range input {
    for key, value := range matches.(map[Telemetry]int) {
    _, exists := results[key]
    if !exists {
    results[key] = value
    } else {
    results[key] = results[key] + value
    }
    }
    }
    output <- results
    }

    func mapReduce(mapper MapperFunc, reducer ReducerFunc, input chan interface{}) interface{} {

    reducerInput := make(chan interface{})
    reducerOutput := make(chan interface{})
    mapperCollector := make(MapperCollector, MaxWorkers)

    go reducer(reducerInput, reducerOutput)
    go reducerDispatcher(mapperCollector, reducerInput)
    go mapperDispatcher(mapper, input, mapperCollector)

    return <-reducerOutput
    }

    func main() {

    runtime.GOMAXPROCS(runtime.NumCPU())

    fmt.Println("Processing. Please wait....")

    input := enumerateFiles(".")

    results := mapReduce(mapper, reducer, input)

    // open output file
    f, err := os.Create("telemetry.csv")
    if err != nil {
    panic(err)
    }
    defer f.Close()

    // make a write buffer
    writer := csv.NewWriter(f)

    for telemetry, value := range results.(map[Telemetry]int) {

    var record []string

    record = append(record, telemetry.Request.Sender)
    record = append(record, telemetry.Request.Trigger)
    record = append(record, telemetry.App.Program)
    record = append(record, telemetry.App.Build)
    record = append(record, telemetry.App.License)
    record = append(record, telemetry.App.Versions.Program.Base)
    record = append(record, telemetry.Connection.Type)
    record = append(record, telemetry.Region.Continent)
    record = append(record, telemetry.Region.Country)
    record = append(record, telemetry.Client.OsVersion)
    record = append(record, telemetry.Client.Language)
    record = append(record, telemetry.Client.Architecture)
    record = append(record, strconv.Itoa(value))

    writer.Write(record)
    }

    writer.Flush()

    fmt.Println("Done!")

    }