|
|
@@ -0,0 +1,231 @@ |
|
|
package main |
|
|
|
|
|
import ( |
|
|
"bufio" |
|
|
"encoding/csv" |
|
|
"encoding/json" |
|
|
"fmt" |
|
|
"io" |
|
|
"os" |
|
|
"path/filepath" |
|
|
"runtime" |
|
|
"strconv" |
|
|
"strings" |
|
|
) |
|
|
|
|
|
const ( |
|
|
MaxWorkers = 10 |
|
|
) |
|
|
|
|
|
type Request struct { |
|
|
Sender string `json:"Sender,omitempty"` |
|
|
Trigger string `json:"Trigger,omitempty"` |
|
|
} |
|
|
|
|
|
type Program struct { |
|
|
Base string `json:"Base,omitempty"` |
|
|
} |
|
|
|
|
|
type Versions struct { |
|
|
Program Program `json:"Program,omitempty"` |
|
|
} |
|
|
|
|
|
type App struct { |
|
|
Program string `json:"Program,omitempty"` |
|
|
Build string `json:"Build,omitempty"` |
|
|
License string `json:"License,omitempty"` |
|
|
Versions Versions `json:"Versions,omitempty"` |
|
|
} |
|
|
|
|
|
type Connection struct { |
|
|
Type string `json:"Type,omitempty"` |
|
|
} |
|
|
|
|
|
type Region struct { |
|
|
Continent string `json:"Continent,omitempty"` |
|
|
Country string `json:"Country,omitempty"` |
|
|
} |
|
|
|
|
|
type Client struct { |
|
|
OsVersion string `json:"OsVersion,omitempty"` |
|
|
Language string `json:"Language,omitempty"` |
|
|
Architecture string `json:"Architecture,omitempty"` |
|
|
} |
|
|
|
|
|
type Telemetry struct { |
|
|
Request Request `json:"Request,omitempty"` |
|
|
App App `json:"App,omitempty"` |
|
|
Connection Connection `json:"Connection,omitempty"` |
|
|
Region Region `json:"Region,omitempty"` |
|
|
Client Client `json:"Client,omitempty"` |
|
|
} |
|
|
|
|
|
func enumerateFiles(dirname string) chan interface{} { |
|
|
output := make(chan interface{}) |
|
|
go func() { |
|
|
filepath.Walk(dirname, func(path string, f os.FileInfo, err error) error { |
|
|
if !f.IsDir() { |
|
|
output <- path |
|
|
} |
|
|
return nil |
|
|
}) |
|
|
close(output) |
|
|
}() |
|
|
return output |
|
|
} |
|
|
|
|
|
func eachLine(filename string) chan string { |
|
|
output := make(chan string) |
|
|
go func() { |
|
|
file, err := os.Open(filename) |
|
|
if err != nil { |
|
|
return |
|
|
} |
|
|
defer file.Close() |
|
|
reader := bufio.NewReader(file) |
|
|
for { |
|
|
line, err := reader.ReadString('\n') |
|
|
if err == io.EOF { |
|
|
break |
|
|
} |
|
|
|
|
|
// ignore any meta comments on top of JSON file |
|
|
if strings.HasPrefix(line, "#") == true { |
|
|
continue |
|
|
} |
|
|
|
|
|
// add each json line to our enumeration channel |
|
|
output <- line |
|
|
} |
|
|
close(output) |
|
|
}() |
|
|
return output |
|
|
} |
|
|
|
|
|
// MapperCollector is a channel that collects the output from mapper tasks |
|
|
type MapperCollector chan chan interface{} |
|
|
|
|
|
// MapperFunc is a function that performs the mapping part of the MapReduce job |
|
|
type MapperFunc func(interface{}, chan interface{}) |
|
|
|
|
|
// ReducerFunc is a function that performs the reduce part of the MapReduce job |
|
|
type ReducerFunc func(chan interface{}, chan interface{}) |
|
|
|
|
|
func mapperDispatcher(mapper MapperFunc, input chan interface{}, collector MapperCollector) { |
|
|
for item := range input { |
|
|
taskOutput := make(chan interface{}) |
|
|
go mapper(item, taskOutput) |
|
|
collector <- taskOutput |
|
|
} |
|
|
close(collector) |
|
|
} |
|
|
|
|
|
func reducerDispatcher(collector MapperCollector, reducerInput chan interface{}) { |
|
|
for output := range collector { |
|
|
reducerInput <- <-output |
|
|
} |
|
|
close(reducerInput) |
|
|
} |
|
|
|
|
|
func mapper(filename interface{}, output chan interface{}) { |
|
|
results := map[Telemetry]int{} |
|
|
|
|
|
for line := range eachLine(filename.(string)) { |
|
|
|
|
|
// decode the telemetry JSON line |
|
|
dec := json.NewDecoder(strings.NewReader(line)) |
|
|
var telemetry Telemetry |
|
|
|
|
|
// if line cannot be JSON decoded then skip to next one |
|
|
if err := dec.Decode(&telemetry); err == io.EOF { |
|
|
continue |
|
|
} else if err != nil { |
|
|
continue |
|
|
} |
|
|
|
|
|
// stores Telemetry structure in the mapper results dictionary |
|
|
previousCount, exists := results[telemetry] |
|
|
if !exists { |
|
|
results[telemetry] = 1 |
|
|
} else { |
|
|
results[telemetry] = previousCount + 1 |
|
|
} |
|
|
} |
|
|
|
|
|
output <- results |
|
|
} |
|
|
|
|
|
func reducer(input chan interface{}, output chan interface{}) { |
|
|
results := map[Telemetry]int{} |
|
|
for matches := range input { |
|
|
for key, value := range matches.(map[Telemetry]int) { |
|
|
_, exists := results[key] |
|
|
if !exists { |
|
|
results[key] = value |
|
|
} else { |
|
|
results[key] = results[key] + value |
|
|
} |
|
|
} |
|
|
} |
|
|
output <- results |
|
|
} |
|
|
|
|
|
func mapReduce(mapper MapperFunc, reducer ReducerFunc, input chan interface{}) interface{} { |
|
|
|
|
|
reducerInput := make(chan interface{}) |
|
|
reducerOutput := make(chan interface{}) |
|
|
mapperCollector := make(MapperCollector, MaxWorkers) |
|
|
|
|
|
go reducer(reducerInput, reducerOutput) |
|
|
go reducerDispatcher(mapperCollector, reducerInput) |
|
|
go mapperDispatcher(mapper, input, mapperCollector) |
|
|
|
|
|
return <-reducerOutput |
|
|
} |
|
|
|
|
|
func main() { |
|
|
|
|
|
runtime.GOMAXPROCS(runtime.NumCPU()) |
|
|
|
|
|
fmt.Println("Processing. Please wait....") |
|
|
|
|
|
input := enumerateFiles(".") |
|
|
|
|
|
results := mapReduce(mapper, reducer, input) |
|
|
|
|
|
// open output file |
|
|
f, err := os.Create("telemetry.csv") |
|
|
if err != nil { |
|
|
panic(err) |
|
|
} |
|
|
defer f.Close() |
|
|
|
|
|
// make a write buffer |
|
|
writer := csv.NewWriter(f) |
|
|
|
|
|
for telemetry, value := range results.(map[Telemetry]int) { |
|
|
|
|
|
var record []string |
|
|
|
|
|
record = append(record, telemetry.Request.Sender) |
|
|
record = append(record, telemetry.Request.Trigger) |
|
|
record = append(record, telemetry.App.Program) |
|
|
record = append(record, telemetry.App.Build) |
|
|
record = append(record, telemetry.App.License) |
|
|
record = append(record, telemetry.App.Versions.Program.Base) |
|
|
record = append(record, telemetry.Connection.Type) |
|
|
record = append(record, telemetry.Region.Continent) |
|
|
record = append(record, telemetry.Region.Country) |
|
|
record = append(record, telemetry.Client.OsVersion) |
|
|
record = append(record, telemetry.Client.Language) |
|
|
record = append(record, telemetry.Client.Architecture) |
|
|
record = append(record, strconv.Itoa(value)) |
|
|
|
|
|
writer.Write(record) |
|
|
} |
|
|
|
|
|
writer.Flush() |
|
|
|
|
|
fmt.Println("Done!") |
|
|
|
|
|
} |