Skip to content

Instantly share code, notes, and snippets.

@claudemirogb
Last active September 22, 2016 20:58
Show Gist options
  • Select an option

  • Save claudemirogb/7a1f084ae0da3d6c800bab6e8b6c724b to your computer and use it in GitHub Desktop.

Select an option

Save claudemirogb/7a1f084ae0da3d6c800bab6e8b6c724b to your computer and use it in GitHub Desktop.
package main
import (
"context"
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"
"reflect"
"strings"
"time"
"github.com/gorilla/websocket"
"github.com/rs/xlog"
)
// Component represents a runnable component
type Component interface {
Run(ctx context.Context)
}
// Graph is a graph network
// has a map ot registered elements
type Graph struct {
ctx context.Context
cancel context.CancelFunc
components map[string]Component
channels []reflect.Value
bufferCapacity int
}
// AddNode is used to give a name to the Component, so it can be used in the AddEdge function
// ex: AddNodeComponent("read", c)
func (n *Graph) AddNode(name string, c Component) error {
if n.components == nil {
n.components = make(map[string]Component)
}
n.components[name] = c
return nil
}
func (n *Graph) AddInitial(in interface{}, definition string) error {
left := strings.Split(definition, ".")
c := strings.TrimSpace(left[0])
out := strings.TrimSpace(left[1])
inType := reflect.TypeOf(in)
outType, _ := reflect.TypeOf(n.components[c]).Elem().FieldByName(out)
if inType.Kind() != reflect.Chan {
return errors.New("in must be a channel")
}
if outType.Type.Kind() != reflect.Chan {
return errors.New("out must be a channel")
}
if inType != outType.Type {
return errors.New("in and out must be the same channel types")
}
reflect.ValueOf(n.components[c]).Elem().FieldByName(out).Set(reflect.ValueOf(in))
return nil
}
// AddEdge connect the input and output of the given components
// in -> out
// Panic if element is not registered???
func (n *Graph) AddEdge(definition string) error {
pieces := strings.Split(definition, "->")
left := strings.Split(pieces[0], ".")
a := strings.TrimSpace(left[0])
in := strings.TrimSpace(left[1])
right := strings.Split(pieces[1], ".")
b := strings.TrimSpace(right[0])
out := strings.TrimSpace(right[1])
inType, _ := reflect.TypeOf(n.components[a]).Elem().FieldByName(in)
outType, _ := reflect.TypeOf(n.components[b]).Elem().FieldByName(out)
if inType.Type.Kind() != reflect.Chan {
return errors.New("in must be a channel")
}
if outType.Type.Kind() != reflect.Chan {
return errors.New("out must be a channel")
}
if inType.Type != outType.Type {
return errors.New("in and out must be the same channel types")
}
edge := reflect.MakeChan(inType.Type, n.bufferCapacity)
n.channels = append(n.channels, edge)
reflect.ValueOf(n.components[a]).Elem().FieldByName(in).Set(edge)
reflect.ValueOf(n.components[b]).Elem().FieldByName(out).Set(edge)
return nil
}
// Run start all components
func (n *Graph) Run(ctx context.Context) {
n.ctx, n.cancel = context.WithCancel(ctx)
for _, c := range n.components {
go c.Run(n.ctx)
}
}
// Shutdown sends a cancel to all the components of the graph
// also should closes all the created channels.
func (n *Graph) Shutdown() {
n.cancel()
for _, c := range n.channels {
c.Close()
}
}
type readComponent struct {
in chan StringWithContext `port:"in"`
out chan string `port:"out"`
err chan string `port:"error"`
}
func (r *readComponent) Run(ctx context.Context) {
for {
select {
case message := <-r.in:
log := xlog.FromContext(message.ctx)
log.Info("Reading", xlog.F{
"message": message.str,
})
content, err := ioutil.ReadFile(message.str)
if err != nil {
log.Error("Error", xlog.F{
"error": err,
})
r.err <- err.Error()
continue
}
r.out <- string(content)
case <-ctx.Done():
return
}
}
}
type sendRequestComponent struct {
URL chan StringWithContext `port:"url"`
Message chan StringWithContext `port:"message"`
Err chan string `port:"err"`
}
func (s *sendRequestComponent) Run(ctx context.Context) {
for {
select {
case url := <-s.URL:
response, err := http.Get(url.str)
if err != nil {
s.Err <- err.Error()
continue
}
data, err := ioutil.ReadAll(response.Body)
if err != nil {
s.Err <- err.Error()
continue
}
s.Message <- NewStringWithContext(url.ctx, string(data))
response.Body.Close()
case <-ctx.Done():
return
}
}
}
type Notifier struct {
In chan StringWithContext
}
func (n *Notifier) Run(ctx context.Context) {
for {
select {
case content := <-n.In:
ws := content.ctx.Value("ws").(*websocket.Conn)
ws.WriteJSON(content.str)
case <-ctx.Done():
return
}
}
}
type Writer struct {
In chan string
}
func (w *Writer) Run(ctx context.Context) {
for {
select {
case content := <-w.In:
fmt.Println(content)
case <-ctx.Done():
return
}
}
}
type LowerCaseString struct {
In chan StringWithContext
Out chan StringWithContext
}
func (l *LowerCaseString) Run(ctx context.Context) {
for {
select {
case data := <-l.In:
l.Out <- NewStringWithContext(data.ctx, strings.ToLower(data.str))
case <-ctx.Done():
return
}
}
}
type StringWithContext struct {
ctx context.Context
str string
}
func NewStringWithContext(ctx context.Context, str string) StringWithContext {
return StringWithContext{
ctx: ctx,
str: str,
}
}
func main() {
initial := make(chan StringWithContext)
graph := &Graph{
bufferCapacity: 10,
}
graph.AddNode("LowerCaseString", new(LowerCaseString))
graph.AddNode("Notifier", new(Notifier))
// graph.AddNode("SendRequest", new(sendRequestComponent))
//graph.AddEdge("LowerCaseString.Out -> SendRequest.URL", make(chan string))
graph.AddEdge("LowerCaseString.Out -> Notifier.In")
// graph.AddEdge("SendRequest.Message -> Notifier.In")
graph.AddInitial(initial, "LowerCaseString.In")
graph.Run(context.Background())
defer graph.Shutdown()
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println("upgrade:", err)
return
}
defer ws.Close()
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Minute)
defer cancel()
go func() {
for {
var message string
err := ws.ReadJSON(&message)
if err != nil {
log.Printf("readjson: %s", err)
return
}
another := context.WithValue(ctx, "ws", ws)
initial <- NewStringWithContext(another, message)
}
}()
<-ctx.Done()
})
http.ListenAndServe(":8080", nil)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment