package main import ( "context" "errors" "fmt" "io/ioutil" "log" "net/http" "reflect" "strings" "time" "github.com/gorilla/websocket" "github.com/rs/xlog" ) // Component represents a runnable component type Component interface { Run(ctx context.Context) } // Graph is a graph network // has a map ot registered elements type Graph struct { ctx context.Context cancel context.CancelFunc components map[string]Component channels []reflect.Value bufferCapacity int } // AddNode is used to give a name to the Component, so it can be used in the AddEdge function // ex: AddNodeComponent("read", c) func (n *Graph) AddNode(name string, c Component) error { if n.components == nil { n.components = make(map[string]Component) } n.components[name] = c return nil } func (n *Graph) AddInitial(in interface{}, definition string) error { left := strings.Split(definition, ".") c := strings.TrimSpace(left[0]) out := strings.TrimSpace(left[1]) inType := reflect.TypeOf(in) outType, _ := reflect.TypeOf(n.components[c]).Elem().FieldByName(out) if inType.Kind() != reflect.Chan { return errors.New("in must be a channel") } if outType.Type.Kind() != reflect.Chan { return errors.New("out must be a channel") } if inType != outType.Type { return errors.New("in and out must be the same channel types") } reflect.ValueOf(n.components[c]).Elem().FieldByName(out).Set(reflect.ValueOf(in)) return nil } // AddEdge connect the input and output of the given components // in -> out // Panic if element is not registered??? func (n *Graph) AddEdge(definition string) error { pieces := strings.Split(definition, "->") left := strings.Split(pieces[0], ".") a := strings.TrimSpace(left[0]) in := strings.TrimSpace(left[1]) right := strings.Split(pieces[1], ".") b := strings.TrimSpace(right[0]) out := strings.TrimSpace(right[1]) inType, _ := reflect.TypeOf(n.components[a]).Elem().FieldByName(in) outType, _ := reflect.TypeOf(n.components[b]).Elem().FieldByName(out) if inType.Type.Kind() != reflect.Chan { return errors.New("in must be a channel") } if outType.Type.Kind() != reflect.Chan { return errors.New("out must be a channel") } if inType.Type != outType.Type { return errors.New("in and out must be the same channel types") } edge := reflect.MakeChan(inType.Type, n.bufferCapacity) n.channels = append(n.channels, edge) reflect.ValueOf(n.components[a]).Elem().FieldByName(in).Set(edge) reflect.ValueOf(n.components[b]).Elem().FieldByName(out).Set(edge) return nil } // Run start all components func (n *Graph) Run(ctx context.Context) { n.ctx, n.cancel = context.WithCancel(ctx) for _, c := range n.components { go c.Run(n.ctx) } } // Shutdown sends a cancel to all the components of the graph // also should closes all the created channels. func (n *Graph) Shutdown() { n.cancel() for _, c := range n.channels { c.Close() } } type readComponent struct { in chan StringWithContext `port:"in"` out chan string `port:"out"` err chan string `port:"error"` } func (r *readComponent) Run(ctx context.Context) { for { select { case message := <-r.in: log := xlog.FromContext(message.ctx) log.Info("Reading", xlog.F{ "message": message.str, }) content, err := ioutil.ReadFile(message.str) if err != nil { log.Error("Error", xlog.F{ "error": err, }) r.err <- err.Error() continue } r.out <- string(content) case <-ctx.Done(): return } } } type sendRequestComponent struct { URL chan StringWithContext `port:"url"` Message chan StringWithContext `port:"message"` Err chan string `port:"err"` } func (s *sendRequestComponent) Run(ctx context.Context) { for { select { case url := <-s.URL: response, err := http.Get(url.str) if err != nil { s.Err <- err.Error() continue } data, err := ioutil.ReadAll(response.Body) if err != nil { s.Err <- err.Error() continue } s.Message <- NewStringWithContext(url.ctx, string(data)) response.Body.Close() case <-ctx.Done(): return } } } type Notifier struct { In chan StringWithContext } func (n *Notifier) Run(ctx context.Context) { for { select { case content := <-n.In: ws := content.ctx.Value("ws").(*websocket.Conn) ws.WriteJSON(content.str) case <-ctx.Done(): return } } } type Writer struct { In chan string } func (w *Writer) Run(ctx context.Context) { for { select { case content := <-w.In: fmt.Println(content) case <-ctx.Done(): return } } } type LowerCaseString struct { In chan StringWithContext Out chan StringWithContext } func (l *LowerCaseString) Run(ctx context.Context) { for { select { case data := <-l.In: l.Out <- NewStringWithContext(data.ctx, strings.ToLower(data.str)) case <-ctx.Done(): return } } } type StringWithContext struct { ctx context.Context str string } func NewStringWithContext(ctx context.Context, str string) StringWithContext { return StringWithContext{ ctx: ctx, str: str, } } func main() { initial := make(chan StringWithContext) graph := &Graph{ bufferCapacity: 10, } graph.AddNode("LowerCaseString", new(LowerCaseString)) graph.AddNode("Notifier", new(Notifier)) // graph.AddNode("SendRequest", new(sendRequestComponent)) //graph.AddEdge("LowerCaseString.Out -> SendRequest.URL", make(chan string)) graph.AddEdge("LowerCaseString.Out -> Notifier.In") // graph.AddEdge("SendRequest.Message -> Notifier.In") graph.AddInitial(initial, "LowerCaseString.In") graph.Run(context.Background()) defer graph.Shutdown() var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { ws, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Println("upgrade:", err) return } defer ws.Close() ctx, cancel := context.WithTimeout(r.Context(), 5*time.Minute) defer cancel() go func() { for { var message string err := ws.ReadJSON(&message) if err != nil { log.Printf("readjson: %s", err) return } another := context.WithValue(ctx, "ws", ws) initial <- NewStringWithContext(another, message) } }() <-ctx.Done() }) http.ListenAndServe(":8080", nil) }