package irc import ( "crypto/tls" "errors" "fmt" "github.com/mozilla-services/heka/pipeline" "github.com/mozilla-services/heka/plugins/tcp" "github.com/thoj/go-ircevent" "sync/atomic" "time" ) type IRCOutputConfig struct { Server string `toml:"server"` Nick string `toml:"nick"` Ident string `toml:"ident"` Password string `toml:"password"` Channels []string `toml:"channels"` UseTLS bool `toml:"use_tls"` // Subsection for TLS configuration. Tls tcp.TlsConfig // Should we join and part an irc channel between sending messages? JoinAndPart bool `toml:"join_and_part"` // This controls the size of the OutQueue and Backlog queue for messages. QueueSize int `toml:"queue_size"` RejoinOnKick bool `toml:"rejoin_on_kick"` // Default interval at which IRC messages will be sent is minimum of 2 // seconds between messages. TickerInterval uint `toml:"ticker_interval"` } type IrcMsgQueue chan IrcMsg type IRCOutput struct { *IRCOutputConfig Conn *irc.Connection OutQueue IrcMsgQueue BacklogQueues []IrcMsgQueue JoinedChannels []int32 } type IrcMsg struct { Output []byte IrcChannel string Idx int } const ( // These are replies from the IRC Server CONNECTED = "001" ERROR = "ERROR" // This is what we get on a disconnect QUIT = "QUIT" PART = "PART" KICK = "KICK" IRC_RPL_ENDOFNAMES = "366" // These are to track our JoinedChannels slice of joined/not joined NOTJOINED = 0 JOINED = 1 ) func (output *IRCOutput) ConfigStruct() interface{} { return &IRCOutputConfig{ Server: "irc.freenode.net", Nick: "heka_bot", Ident: "heka", Channels: []string{"#heka_bot"}, QueueSize: 100, TickerInterval: uint(2), } } // NewIRCConn creates an *irc.Connection. It handles using Heka's tcp plugin to // create a cryto/tls config func NewIRCConn(config *IRCOutputConfig) (*irc.Connection, error) { conn := irc.IRC(config.Nick, config.Ident) if conn == nil { return nil, errors.New("Nick or Ident cannot be blank") } if config.Server == "" { return nil, errors.New("IRC server cannot be blank.") } if len(config.Channels) < 1 { return nil, errors.New("Need at least 1 channel to join.") } var tlsConf *tls.Config = nil var err error = nil if tlsConf, err = tcp.CreateGoTlsConfig(&config.Tls); err != nil { return nil, fmt.Errorf("TLS init error: %s", err) } conn.UseTLS = config.UseTLS conn.TLSConfig = tlsConf return conn, nil } // Privmsg wraps the irc.Privmsg by accepting an ircMsg struct, and checking if // we've joined a channel before trying to send a message to it. Returns whether // or not the message was successfully sent. func (output *IRCOutput) Privmsg(ircMsg IrcMsg) bool { idx := ircMsg.Idx if atomic.LoadInt32(&output.JoinedChannels[idx]) == JOINED { output.Conn.Privmsg(ircMsg.IrcChannel, string(ircMsg.Output)) } else { return false } if output.JoinAndPart { // Leave the channel if we're configured to part after sending messages. output.Conn.Part(ircMsg.IrcChannel) } return true } // UpdateJoinList atomically updates our global slice of joined channels for a // particular irc channel. It sets the irc channel's joined status to 'status'. // Returns whether or not it found the IRC Channel in our slice. func UpdateJoinList(output *IRCOutput, ircChan string, status int32) bool { for i, channel := range output.Channels { if ircChan == channel { // Update if we have or haven't joined the channel atomic.StoreInt32(&output.JoinedChannels[i], status) return true } } return false } // UpdateJoinListAll sets the status of all IRC Channels in our config to // 'status' func UpdateJoinListAll(output *IRCOutput, status int32) { for channel := range output.Channels { atomic.StoreInt32(&output.JoinedChannels[channel], status) } } // SendFromOutQueue attempts to send a message to the IRC Channel specified in // the ircMsg struct. If sending fails due to not being in the IRC channel, it // will put the message into that IRC Channel's backlog queue. If the queue is // full it will drop the message and log an error. // It returns whether or not a message was successfully delivered to an // IRC channel. func SendFromOutQueue(output *IRCOutput, runner pipeline.OutputRunner, ircMsg IrcMsg) bool { if output.Privmsg(ircMsg) { return true } else { // We haven't joined this channel yet, so we need to send // the message to the backlog queue of messages // Get the proper Channel for the backlog idx := ircMsg.Idx backlogQueue := output.BacklogQueues[idx] select { // try to put the message into the backlog queue case backlogQueue <- ircMsg: // Just putting default: // Failed to put, which means the backlog for this IRC // channel is full. So drop it and log a message. runner.LogError(fmt.Errorf("backlog queue for "+ "IRC Channel %s, full. Dropping message.", ircMsg.IrcChannel)) } return false } return false } // SendFromBacklogQueue attempts to send a message from the first backlog queue // which has a message in it. It returns whether or not a message was // successfully delivered to an IRC channel. func SendFromBacklogQueue(output *IRCOutput, runner pipeline.OutputRunner, ircMsg IrcMsg) bool { // No messages in the out queue, so lets try the backlog queue for i, queue := range output.BacklogQueues { if atomic.LoadInt32(&output.JoinedChannels[i]) != JOINED { continue } select { case ircMsg = <-queue: if output.Privmsg(ircMsg) { return true } default: // No backed up messages for this IRC Channel } } return false } // ProcessOutQueue attempts to send an IRC message from the OutQueue, or the // BacklogQueue if nothing is in the OutQueue. It is throttled by a ticker to // prevent flooding the IRC server. func ProcessOutQueue(output *IRCOutput, runner pipeline.OutputRunner) { var delivered bool var ircMsg ircMsg ok := true // ticker := runner.Ticker() for ok { delivered = false // <-ticker select { case ircMsg, ok = <-output.OutQueue: if !ok { // We havent actually delivered but we want to escape that // loop delivered = true break } delivered = SendFromOutQueue(output, runner, ircMsg) default: // Nothing } if !delivered { SendFromBacklogQueue(...) } } for _, queue := range output.BacklogQueues { close(queue) } for _, queue := range output.BacklogQueues { // drain } } // RegisterCallbacks sets up all the event handler callbacks for recieving // particular irc events. func RegisterCallbacks(output *IRCOutput, runner pipeline.OutputRunner) { // add a callback to check if we've gotten successfully connected output.Conn.AddCallback(CONNECTED, func(event *irc.Event) { for _, ircChan := range output.Channels { // Only join on connect if we aren't going to join whenever we send // a message if !output.JoinAndPart { output.Conn.Join(ircChan) } } }) // Once we've recieved the names list, we've successfully joined the channel // And should begin processing Heka messages output.Conn.AddCallback(IRC_RPL_ENDOFNAMES, func(event *irc.Event) { // This is the actual IRC Channel name (ie: #heka) ircChan := event.Arguments[1] UpdateJoinList(output, ircChan, JOINED) }) // We want to handle errors (disconnects) ourself. output.Conn.ClearCallback(ERROR) output.Conn.AddCallback(ERROR, func(event *irc.Event) { UpdateJoinListAll(output, NOTJOINED) runner.LogMessage("Disconnected from IRC. Retrying to connect in 3 seconds..") time.Sleep(3 * time.Second) err := output.Conn.Reconnect() if err != nil { runner.LogError(fmt.Errorf("Error reconnecting:", err)) output.Conn.Quit() } runner.LogMessage("Reconnected to IRC!") }) output.Conn.AddCallback(KICK, func(event *irc.Event) { ircChan := event.Arguments[0] UpdateJoinList(output, ircChan, NOTJOINED) if output.RejoinOnKick { output.Conn.Join(ircChan) } }) // These next 2 events shouldn't really matter much, but we should update // the JoinList anyways. output.Conn.AddCallback(QUIT, func(event *irc.Event) { UpdateJoinListAll(output, NOTJOINED) }) output.Conn.AddCallback(PART, func(event *irc.Event) { ircChan := event.Arguments[1] UpdateJoinList(output, ircChan, NOTJOINED) }) } func (output *IRCOutput) Init(config interface{}) error { conf := config.(*IRCOutputConfig) output.IRCOutputConfig = conf conn, err := NewIRCConn(conf) if err != nil { return fmt.Errorf("Error setting up IRC Connection: %s", err) } output.Conn = conn // Create our chans for passing messages from the main runner InChan to // the irc channels numChannels := len(output.Channels) output.JoinedChannels = make([]int32, numChannels) output.OutQueue = make(IrcMsgQueue, output.QueueSize) output.BacklogQueues = make([]IrcMsgQueue, numChannels) for queue := range output.BacklogQueues { output.BacklogQueues[queue] = make(IrcMsgQueue, output.QueueSize) } return nil } func (output *IRCOutput) Run(runner pipeline.OutputRunner, helper pipeline.PluginHelper) error { if runner.Encoder() == nil { return errors.New("Encoder required.") } // Register callbacks to handle events RegisterCallbacks(output, runner) var err error // Connect to the IRC Server err = output.Conn.Connect(output.Server) if err != nil { return fmt.Errorf("Unable to connect to irc server %s: %s", output.Server, err) } // Start a goroutine for recieving messages, and throttling before sending // to the IRC Server go ProcessOutQueue(output, runner) var outgoing []byte for pack := range runner.InChan() { outgoing, err = runner.Encode(pack) if err != nil { runner.LogError(err) } // Send the message to each IRC Channel. // If the out queue is full, then we need to drop the message and log // an error. for i, ircChannel := range output.Channels { ircMsg := IrcMsg{outgoing, ircChannel, i} select { case output.OutQueue <- ircMsg: if output.JoinAndPart { // We wont have joined on connect in this case. output.Conn.Join(ircChannel) } default: runner.LogError(errors.New("Dropped message. " + "irc_output OutQueue is full.")) } } pack.Recycle() } close(output.OutQueue) output.Conn.ClearCallback(ERROR) output.Conn.Quit() output.Conn.Disconnect() return nil } func init() { pipeline.RegisterPlugin("IRCOutput", func() interface{} { return new(IRCOutput) }) }