package irc import ( "errors" "fmt" "sync/atomic" "time" "github.com/mozilla-services/heka/pipeline" "github.com/thoj/go-ircevent" ) // Privmsg wraps the irc.Privmsg by accepting an ircMsg struct, and checking if // we've joined a channel before trying to send a message to it. Returns whether // or not the message was successfully sent. func (output *IrcOutput) Privmsg(ircMsg *IrcMsg) bool { idx := ircMsg.Idx if atomic.LoadInt32(&output.JoinedChannels[idx]) == JOINED { output.Conn.Privmsg(ircMsg.IrcChannel, string(ircMsg.Output)) } else { return false } if output.JoinAndPart { // Leave the channel if we're configured to part after sending messages. output.Conn.Part(ircMsg.IrcChannel) } return true } // updateJoinList atomically updates our global slice of joined channels for a // particular irc channel. It sets the irc channel's joined status to 'status'. // Returns whether or not it found the irc channel in our slice. func updateJoinList(output *IrcOutput, ircChan string, status int32) bool { for i, channel := range output.Channels { if ircChan == channel { // Update if we have or haven't joined the channel atomic.StoreInt32(&output.JoinedChannels[i], status) return true } } return false } // updateJoinListAll sets the status of all irc channels in our config to // 'status' func updateJoinListAll(output *IrcOutput, status int32) { for channel := range output.Channels { atomic.StoreInt32(&output.JoinedChannels[channel], status) } } // sendFromOutQueue attempts to send a message to the irc channel specified in // the ircMsg struct. If sending fails due to not being in the irc channel, it // will put the message into that irc channel's backlog queue. If the queue is // full it will drop the message and log an error. // It returns whether or not a message was successfully delivered to an // irc channel. func sendFromOutQueue(output *IrcOutput, runner pipeline.OutputRunner, ircMsg *IrcMsg) bool { if output.Privmsg(ircMsg) { return true } else { // We haven't joined this channel yet, so we need to send // the message to the backlog queue of messages // Get the proper Channel for the backlog idx := ircMsg.Idx backlogQueue := output.BacklogQueues[idx] select { // try to put the message into the backlog queue case backlogQueue <- *ircMsg: // Just putting default: // Failed to put, which means the backlog for this Irc // channel is full. So drop it and log a message. runner.LogError(fmt.Errorf("backlog queue for "+ "Irc Channel %s, full. Dropping message.", ircMsg.IrcChannel)) } return false } return false } // sendFromBacklogQueue attempts to send a message from the first backlog queue // which has a message in it. It returns whether or not a message was // successfully delivered to an irc channel. func sendFromBacklogQueue(output *IrcOutput, runner pipeline.OutputRunner) bool { var ircMsg IrcMsg // No messages in the out queue, so lets try the backlog queue for i, queue := range output.BacklogQueues { if atomic.LoadInt32(&output.JoinedChannels[i]) != JOINED { continue } select { case ircMsg = <-queue: if output.Privmsg(&ircMsg) { return true } default: // No backed up messages for this irc channel } } return false } // processOutQueue attempts to send an Irc message from the OutQueue, or the // BacklogQueue if nothing is in the OutQueue. It is throttled by a ticker to // prevent flooding the Irc server. func processOutQueue(output *IrcOutput, runner pipeline.OutputRunner) { var delivered bool var ircMsg IrcMsg ok := true ticker := runner.Ticker() for ok { delivered = false <-ticker select { case ircMsg, ok = <-output.OutQueue: if !ok { // We havent actually delivered but we want to escape that // loop. delivered = true // Time to cleanup, and close our chans break } delivered = sendFromOutQueue(output, runner, &ircMsg) default: // Just here to prevent blocking } if !delivered { sendFromBacklogQueue(output, runner) } } // Cleanup heka for _, queue := range output.BacklogQueues { close(queue) } // Try to send the rest of our msgs in the backlog before quitting. for _, queue := range output.BacklogQueues { for msg := range queue { output.Privmsg(&msg) } } // Once we have no messages left, we can quit output.Conn.Quit() output.Conn.Disconnect() } // registerCallbacks sets up all the event handler callbacks for recieving // particular irc events. func registerCallbacks(output *IrcOutput, runner pipeline.OutputRunner) { // add a callback to check if we've gotten successfully connected output.Conn.AddCallback(CONNECTED, func(event *irc.Event) { for _, ircChan := range output.Channels { // Only join on connect if we aren't going to join whenever we send // a message if !output.JoinAndPart { output.Conn.Join(ircChan) } } }) // Once we've recieved the names list, we've successfully joined the channel // And should begin processing Heka messages output.Conn.AddCallback(IRC_RPL_ENDOFNAMES, func(event *irc.Event) { // This is the actual irc channel name (ie: #heka) ircChan := event.Arguments[1] updateJoinList(output, ircChan, JOINED) }) // We want to handle errors (disconnects) ourself. output.Conn.ClearCallback(ERROR) output.Conn.AddCallback(ERROR, func(event *irc.Event) { updateJoinListAll(output, NOTJOINED) runner.LogMessage("Disconnected from Irc. Retrying to connect in 3 seconds..") time.Sleep(3 * time.Second) err := output.Conn.Reconnect() if err != nil { runner.LogError(fmt.Errorf("Error reconnecting:", err)) output.Conn.Quit() } runner.LogMessage("Reconnected to Irc!") }) output.Conn.AddCallback(KICK, func(event *irc.Event) { ircChan := event.Arguments[0] updateJoinList(output, ircChan, NOTJOINED) if output.RejoinOnKick { output.Conn.Join(ircChan) } }) // These next 2 events shouldn't really matter much, but we should update // the JoinList anyways. output.Conn.AddCallback(QUIT, func(event *irc.Event) { updateJoinListAll(output, NOTJOINED) }) output.Conn.AddCallback(PART, func(event *irc.Event) { ircChan := event.Arguments[1] updateJoinList(output, ircChan, NOTJOINED) }) } func (output *IrcOutput) Init(config interface{}) error { conf := config.(*IrcOutputConfig) output.IrcOutputConfig = conf conn, err := output.InitIrcCon(conf) if err != nil { return fmt.Errorf("Error setting up Irc Connection: %s", err) } output.Conn = conn // Create our chans for passing messages from the main runner InChan to // the irc channels numChannels := len(output.Channels) output.JoinedChannels = make([]int32, numChannels) output.OutQueue = make(IrcMsgQueue, output.QueueSize) output.BacklogQueues = make([]IrcMsgQueue, numChannels) for queue := range output.BacklogQueues { output.BacklogQueues[queue] = make(IrcMsgQueue, output.QueueSize) } return nil } func (output *IrcOutput) Run(runner pipeline.OutputRunner, helper pipeline.PluginHelper) error { if runner.Encoder() == nil { return errors.New("Encoder required.") } // Register callbacks to handle events registerCallbacks(output, runner) var err error // Connect to the Irc Server err = output.Conn.Connect(output.Server) if err != nil { return fmt.Errorf("Unable to connect to irc server %s: %s", output.Server, err) } // Start a goroutine for recieving messages, and throttling before sending // to the Irc Server go processOutQueue(output, runner) var outgoing []byte for pack := range runner.InChan() { outgoing, err = runner.Encode(pack) if err != nil { runner.LogError(err) } // Send the message to each irc channel. // If the out queue is full, then we need to drop the message and log // an error. for i, ircChannel := range output.Channels { ircMsg := IrcMsg{outgoing, ircChannel, i} select { case output.OutQueue <- ircMsg: if output.JoinAndPart { // We wont have joined on connect in this case. output.Conn.Join(ircChannel) } default: runner.LogError(errors.New("Dropped message. " + "irc_output OutQueue is full.")) } } pack.Recycle() } close(output.OutQueue) output.Conn.ClearCallback(ERROR) return nil } func init() { pipeline.RegisterPlugin("IrcOutput", func() interface{} { output := new(IrcOutput) output.InitIrcCon = NewIrcConn return output }) }