/* twitter@hector_gool https://github.com/olivere/elastic/wiki/BulkIndex */ package main import ( "fmt" elastic "gopkg.in/olivere/elastic.v5" "encoding/csv" "github.com/satori/go.uuid" "context" "os" "log" "strconv" ) type ( Document struct { Ciudad string `json:"ciudad"` Colonia string `json:"colonia"` Cp string `json:"cp"` Delegacion string `json:"delegacion"` Location `json:"location"` } Location struct { Lat float64 `json:"lat"` Lon float64 `json:"lon"` } ) const ( FILE ="./MX.txt" TOTAL_ROWS = 1000000 ) var ( client *elastic.Client ) func init() { var err error client, err = elastic.NewClient( elastic.SetURL(os.Getenv("ELASTICSEARCH_ENTRYPOINT")), elastic.SetBasicAuth(os.Getenv("ELASTICSEARCH_USERNAME"), os.Getenv("ELASTICSEARCH_PASSWORD")), elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)), elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)), ) printError(err) } func main() { ctx := context.Background() file, err := os.Open(FILE) printError(err) defer file.Close() reader := csv.NewReader(file) reader.Comma = '\t' rows, err := reader.ReadAll() printError(err) bulkRequest := client.Bulk() for n, col := range rows { n++ id := uuid.NewV4().String() if n <= TOTAL_ROWS { lat, err := strconv.ParseFloat(col[9], 64) printError(err) lon, err := strconv.ParseFloat(col[10], 64) printError(err) document := Document{ Ciudad: col[3], Colonia: col[2], Cp: col[1], Delegacion: col[5], Location: Location{ Lat: lat, Lon: lon, }, } req := elastic.NewBulkIndexRequest().Index(os.Getenv("ELASTICSEARCH_INDEX")).Type(os.Getenv("ELASTICSEARCH_TYPE")).Id(id).Doc(document) bulkRequest = bulkRequest.Add(req) fmt.Printf("%v: %v\n", n, document) } } bulkResponse, err := bulkRequest.Do(ctx) printError(err) indexed := bulkResponse.Indexed() if len(indexed) != 1 { fmt.Printf("\n Indexed documents: %v \n", len(indexed)) } } func printError(err error) { if err != nil { fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error()) os.Exit(1) } }