diff --git a/backoff.go b/backoff.go index c7dc696..e555a1a 100644 --- a/backoff.go +++ b/backoff.go @@ -13,8 +13,9 @@ // Max. It returns to Min on every call to Reset(). Used in // conjunction with the time package. type backoff struct { + attempts int //Factor is the multiplying factor for each increment step - attempts, Factor float64 + Factor float64 //Jitter eases contention by randomizing backoff steps Jitter bool //Min and Max are the minimum and maximum values of the counter @@ -36,7 +37,7 @@ b.Factor = 2 } //calculate this duration - dur := float64(b.Min) * math.Pow(b.Factor, b.attempts) + dur := float64(b.Min) * math.Pow(b.Factor, float64(b.attempts)) if b.Jitter == true { dur = rand.Float64()*(dur-float64(b.Min)) + float64(b.Min) } diff --git a/websocket.go b/websocket.go index 9480794..2c2cad2 100644 --- a/websocket.go +++ b/websocket.go @@ -23,7 +23,8 @@ IncomingEvents chan SlackEvent outgoingMessages chan OutgoingMessage keepRunning chan bool - isRunning bool + wasIntentional bool + isConnected bool // Client is the main API, embedded Client @@ -38,17 +39,19 @@ func newRTM(api *Client) *RTM { return &RTM{ Client: *api, - pings: make(map[int]time.Time), IncomingEvents: make(chan SlackEvent, 50), outgoingMessages: make(chan OutgoingMessage, 20), - keepRunning: make(chan bool), - isRunning: true, + pings: make(map[int]time.Time), + isConnected: false, + wasIntentional: true, } } // Disconnect and wait, blocking until a successful disconnection. func (rtm *RTM) Disconnect() error { - if !rtm.isRunning { + rtm.mutex.Lock() + defer rtm.mutex.Unlock() + if !rtm.isConnected { return errors.New("Invalid call to Disconnect - Slack API is already disconnected") } return rtm.killConnection(true) diff --git a/websocket_internals.go b/websocket_internals.go index 9edea0a..83cf383 100644 --- a/websocket_internals.go +++ b/websocket_internals.go @@ -8,6 +8,15 @@ type ConnectedEvent struct { ConnectionCount int // 1 = first time, 2 = second time Info *Info +} + +type ConnectionErrorEvent struct { + Attempt int + ErrorObj error +} + +func (c *ConnectionErrorEvent) Error() string { + return c.ErrorObj.Error() } type ConnectingEvent struct { @@ -32,3 +41,36 @@ func (u UnmarshallingErrorEvent) Error() string { return u.ErrorObj.Error() } + +type OutgoingErrorEvent struct { + Message OutgoingMessage + ErrorObj error +} + +func (o OutgoingErrorEvent) Error() string { + return o.ErrorObj.Error() +} + +type IncomingEventError struct { + ErrorObj error +} + +func (i *IncomingEventError) Error() string { + return i.ErrorObj.Error() +} + +type AckErrorEvent struct { + ErrorObj error +} + +func (a *AckErrorEvent) Error() string { + return a.ErrorObj.Error() +} + +type SlackErrorEvent struct { + ErrorObj error +} + +func (s SlackErrorEvent) Error() string { + return s.ErrorObj.Error() +} diff --git a/websocket_managed_conn.go b/websocket_managed_conn.go index 50e8472..fa16ae7 100644 --- a/websocket_managed_conn.go +++ b/websocket_managed_conn.go @@ -2,98 +2,124 @@ import ( "encoding/json" + "errors" "fmt" "io" - "log" "reflect" "time" "golang.org/x/net/websocket" ) -// ManageConnection is a long-running goroutine that handles -// reconnections and piping messages back and to `rtm.IncomingEvents` -// and `rtm.OutgoingMessages`. -// -// Usage would look like: -// -// bot := slack.New("my-token") -// rtm := bot.NewRTM() // check err -// setupYourHandlers(rtm.IncomingEvents, rtm.OutgoingMessages) -// rtm.ManageConnection() -// +// ManageConnection can be called on a Slack RTM instance returned by the +// NewRTM method. It will connect to the slack RTM API and handle all incoming +// and outgoing events. If a connection fails then it will attempt to reconnect +// and will notify any listeners through an error event on the IncomingEvents +// channel. +// +// This detects failed and closed connections through the RTM's keepRunning +// channel. Once this channel is closed or has something sent to it, this will +// open a lock on the RTM's mutex and check if the disconnect was intentional +// or not. If it was not then it attempts to reconnect. +// +// The defined error events are located in websocket_internals.go. func (rtm *RTM) ManageConnection() { + var connectionCount int + for { + // open a lock - we want to close this before returning from the + // function so we won't defer the mutex's close therefore we MUST + // release the lock before returning on an error! + rtm.mutex.Lock() + connectionCount++ + // start trying to connect + // the returned err is already passed onto the IncomingEvents channel + info, conn, err := rtm.connect(connectionCount) + // if err != nil then the connection is sucessful + // otherwise we need to send a Disconnected event + if err != nil { + rtm.IncomingEvents <- SlackEvent{"disconnected", &DisconnectedEvent{ + Intentional: false, + }} + rtm.mutex.Unlock() + return + } + rtm.info = info + rtm.IncomingEvents <- SlackEvent{"connected", &ConnectedEvent{ + ConnectionCount: connectionCount, + Info: info, + }} + // set the connection object and unlock the mutex + rtm.conn = conn + rtm.isConnected = true + rtm.keepRunning = make(chan bool) + rtm.mutex.Unlock() + + // we're now connected (or have failed fatally) so we can set up + // listeners and monitor for stopping + go rtm.sendKeepAlive(30 * time.Second) + go rtm.handleIncomingEvents() + go rtm.handleOutgoingMessages() + + // should return only once we are disconnected + <-rtm.keepRunning + + // after being disconnected we need to check if it was intentional + // if not then we should try to reconnect + rtm.mutex.Lock() + intentional := rtm.wasIntentional + rtm.mutex.Unlock() + if intentional { + return + } + // else continue and run the loop again to connect + } +} + +// connect attempts to connect to the slack websocket API. It handles any +// errors that occur while connecting and will return once a connection +// has been successfully opened. +func (rtm *RTM) connect(connectionCount int) (*Info, *websocket.Conn, error) { + // used to provide exponential backoff wait time with jitter before trying + // to connect to slack again boff := &backoff{ Min: 100 * time.Millisecond, Max: 5 * time.Minute, Factor: 2, Jitter: true, } - connectionCount := 0 for { - var conn *websocket.Conn // use as first - var err error - var info *Info - - connectionCount++ - - attempts := 1 - boff.Reset() - for { - rtm.IncomingEvents <- SlackEvent{"connecting", &ConnectingEvent{ - Attempt: attempts, - ConnectionCount: connectionCount, - }} - - info, conn, err = rtm.startRTMAndDial() - if err == nil { - break // connected - } else if sErr, ok := err.(*SlackWebError); ok { - if sErr.Error() == "invalid_auth" { - rtm.IncomingEvents <- SlackEvent{"invalid_auth", &InvalidAuthEvent{}} - return - } - } else { - log.Println(err.Error()) - } - - dur := boff.Duration() - rtm.Debugf("reconnection %d failed: %s", attempts, err) - rtm.Debugln(" -> reconnecting in", dur) - attempts++ - time.Sleep(dur) - } - - rtm.IncomingEvents <- SlackEvent{"connected", &ConnectedEvent{ + // send connecting event + rtm.IncomingEvents <- SlackEvent{"connecting", &ConnectingEvent{ + Attempt: boff.attempts + 1, ConnectionCount: connectionCount, - Info: info, - }} - - connErrors := make(chan error, 10) // in case we get many such errors - - go rtm.keepalive(30*time.Second, conn, connErrors) - go rtm.handleIncomingEvents(conn, connErrors) - go rtm.handleOutgoingMessages(conn, connErrors) - - // Here, block and await for disconnection, if it ever happens. - err = <-connErrors - - rtm.Debugln("RTM connection error:", err) - rtm.killConnection(false) - } -} - -func (rtm *RTM) killConnection(intentional bool) error { - rtm.mutex.Lock() - rtm.IncomingEvents <- SlackEvent{"disconnected", &DisconnectedEvent{}} - close(rtm.keepRunning) - err := rtm.conn.Close() - rtm.isRunning = false - rtm.mutex.Unlock() - return err -} - + }} + // attempt to start the connection + info, conn, err := rtm.startRTMAndDial() + if err == nil { + return info, conn, nil + } + // check for fatal errors - currently only invalid_auth + if sErr, ok := err.(*SlackWebError); ok && sErr.Error() == "invalid_auth" { + rtm.IncomingEvents <- SlackEvent{"invalid_auth", &InvalidAuthEvent{}} + return nil, nil, sErr + } + // any other errors are treated as recoverable and we try again after + // sending the event along the IncomingEvents channel + rtm.IncomingEvents <- SlackEvent{"connection_error", &ConnectionErrorEvent{ + Attempt: boff.attempts, + ErrorObj: err, + }} + // get time we should wait before attempting to connect again + dur := boff.Duration() + rtm.Debugf("reconnection %d failed: %s", boff.attempts+1, err) + rtm.Debugln(" -> reconnecting in", dur) + time.Sleep(dur) + } +} + +// startRTMAndDial attemps to connect to the slack websocket. It returns the +// full information returned by the "rtm.start" method on the slack API. func (rtm *RTM) startRTMAndDial() (*Info, *websocket.Conn, error) { info, url, err := rtm.StartRTM() if err != nil { @@ -107,152 +133,266 @@ return info, conn, err } -func (rtm *RTM) keepalive(interval time.Duration, conn *websocket.Conn, errors chan error) { +// killConnection stops the websocket connection and signals to all goroutines +// that they should cease listening to the connection for events. +// +// This requires that a lock on the RTM's mutex is held before being called. +func (rtm *RTM) killConnection(intentional bool) error { + rtm.Debugln("killing connection") + if rtm.isConnected { + close(rtm.keepRunning) + } + rtm.isConnected = false + rtm.wasIntentional = intentional + err := rtm.conn.Close() + rtm.IncomingEvents <- SlackEvent{"disconnected", &DisconnectedEvent{intentional}} + return err +} + +// handleOutgoingMessages listens on the outgoingMessages channel for any +// queued messages that have not been sent. +// +// This will stop executing once the RTM's keepRunning channel has been closed +// or has anything sent to it. +func (rtm *RTM) handleOutgoingMessages() { + for { + select { + // catch "stop" signal on channel close + case <-rtm.keepRunning: + return + // listen for messages that need to be sent + case msg := <-rtm.outgoingMessages: + rtm.sendOutgoingMessage(msg) + } + } +} + +// sendOutgoingMessage sends the given OutgoingMessage to the slack websocket +// after acquiring a lock on the RTM's mutex. +// +// It does not currently detect if a outgoing message fails due to a disconnect +// and instead lets a future failed 'PING' detect the failed connection. +func (rtm *RTM) sendOutgoingMessage(msg OutgoingMessage) { + rtm.mutex.Lock() + defer rtm.mutex.Unlock() + rtm.Debugln("Sending message:", msg) + if !rtm.isConnected { + // check for race condition of connection closed after lock + // obtained + rtm.IncomingEvents <- SlackEvent{"outgoing_error", &OutgoingErrorEvent{ + Message: msg, + ErrorObj: errors.New("Cannot send message - API is not connected"), + }} + return + } + err := websocket.JSON.Send(rtm.conn, msg) + if err != nil { + rtm.IncomingEvents <- SlackEvent{"outgoing_error", &OutgoingErrorEvent{ + Message: msg, + ErrorObj: err, + }} + } +} + +// sendKeepAlive is a blocking call that sends a 'PING' message once for every +// duration elapsed. +// +// This will stop executing once the RTM's keepRunning channel has been closed +// or has anything sent to it. +func (rtm *RTM) sendKeepAlive(interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() for { select { + // catch "stop" signal on channel close case <-rtm.keepRunning: return + // send pings on ticker interval case <-ticker.C: - rtm.ping(conn, errors) - } - } -} - -func (rtm *RTM) ping(conn *websocket.Conn, errors chan error) { + go rtm.ping() + } + } +} + +// ping sends a 'PING' message to the RTM's websocket. If the 'PING' message +// fails to send then this calls killConnection to signal an unintentional +// websocket disconnect. +// +// This does not handle incoming 'PONG' responses but does store the time of +// each successful 'PING' send so latency can be detected upon a 'PONG' +// response. +func (rtm *RTM) ping() { rtm.mutex.Lock() defer rtm.mutex.Unlock() - + rtm.Debugln("Sending PING") + if !rtm.isConnected { + // it's possible that the API has disconnected while we were waiting + // for a lock on the mutex + rtm.Debugln("Cannot send ping - API is not connected") + // no need to send an error event since it really isn't an error + return + } rtm.messageID++ rtm.pings[rtm.messageID] = time.Now() - msg := &Ping{ID: rtm.messageID, Type: "ping"} - rtm.Debugln("Sending PING") - if err := websocket.JSON.Send(conn, msg); err != nil { - errors <- fmt.Errorf("error sending 'ping': %s", err) - } -} - -func (rtm *RTM) handleOutgoingMessages(conn *websocket.Conn, errors chan error) { - // we pass "conn" in case we do a reconnection, in that case we'll - // have a new `conn` even though we're dealing with the same - // incoming and outgoing channels for messages/events. + msg := &Ping{Id: rtm.messageID, Type: "ping"} + err := websocket.JSON.Send(rtm.conn, msg) + if err != nil { + rtm.Debugf("RTM Error sending 'PING': %s", err.Error()) + rtm.killConnection(false) + } +} + +// handleIncomingEvents monitors the RTM's opened websocket for any incoming +// events. +// +// This will stop executing once the RTM's keepRunning channel has been closed +// or has anything sent to it. +func (rtm *RTM) handleIncomingEvents() { for { + // non-blocking listen to see if channel is closed select { - case <-rtm.keepRunning: - return - - case msg := <-rtm.outgoingMessages: - rtm.Debugln("Sending message:", msg) - - rtm.mutex.Lock() - err := websocket.JSON.Send(conn, msg) - rtm.mutex.Unlock() - if err != nil { - errors <- fmt.Errorf("error sending 'message': %s", err) - return - } - } - } -} - -func (rtm *RTM) handleIncomingEvents(conn *websocket.Conn, errors chan error) { - for { - - select { + // catch "stop" signal on channel close case <-rtm.keepRunning: return default: - } - - event := json.RawMessage{} - err := websocket.JSON.Receive(conn, &event) - if err == io.EOF { - rtm.Debugln("GOT EOF, are we killing ??") - } else if err != nil { - errors <- err - return - } - if len(event) == 0 { - rtm.Debugln("Event Empty. WTF?") - continue - } - - rtm.Debugln("Incoming event:", string(event[:])) - - rtm.handleEvent(event) - - // FIXME: please I hope we don't need to sleep!!! - //time.Sleep(time.Millisecond * 500) - } -} - -func (rtm *RTM) handleEvent(event json.RawMessage) { - em := Event{} - err := json.Unmarshal(event, &em) - if err != nil { - rtm.Debugln("RTM Error unmarshalling event:", err) - rtm.Debugln(" -> Erroneous event:", string(event)) - return - } - - switch em.Type { + rtm.receiveIncomingEvent() + } + } +} + +// receiveIncomingEvent attempts to receive an event from the RTM's websocket. +// This will block until a frame is available from the websocket. +func (rtm *RTM) receiveIncomingEvent() { + event := json.RawMessage{} + err := websocket.JSON.Receive(rtm.conn, &event) + if err == io.EOF { + // EOF's don't seem to signify a failed connection so instead we ignore + // them here and detect a failed connection upon attempting to send a + // 'PING' message + + // trigger a 'PING' to detect pontential websocket disconnect + go rtm.ping() + return + } else if err != nil { + // TODO detect if this is a fatal error + rtm.IncomingEvents <- SlackEvent{"incoming_error", &IncomingEventError{ + ErrorObj: err, + }} + return + } else if len(event) == 0 { + rtm.Debugln("Received empty event") + return + } + rtm.Debugln("Incoming Event:", string(event[:])) + rtm.handleRawEvent(event) +} + +// handleEOF should be called after receiving an EOF on the RTM's websocket. +// It calls the internal killConnection method if the RTM was still considered +// to be connected. If it is not considered connected then it is because +// the killConnection method has already been called elsewhere. +func (rtm *RTM) handleEOF() { + rtm.Debugln("Received EOF on websocket") + // we need a lock in order to access isConnected and to call killConnection + rtm.mutex.Lock() + defer rtm.mutex.Unlock() + // if isConnected is true then we didn't expect the EOF event + // so for it to be intentional we need to have it be false + if rtm.isConnected { + // try to kill the connection - this should fail silently if the + // API has already disconnected + _ = rtm.killConnection(false) + } +} + +// handleRawEvent takes a raw JSON message received from the slack websocket +// and handles the encoded event. +func (rtm *RTM) handleRawEvent(rawEvent json.RawMessage) { + event := &Event{} + err := json.Unmarshal(rawEvent, event) + if err != nil { + rtm.IncomingEvents <- SlackEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}} + return + } + switch event.Type { case "": - // try ok - ack := AckMessage{} - if err = json.Unmarshal(event, &ack); err != nil { - rtm.Debugln("RTM Error unmarshalling 'ack' event:", err) - rtm.Debugln(" -> Erroneous 'ack' event:", string(event)) - return - } - - if ack.Ok { - rtm.IncomingEvents <- SlackEvent{"ack", ack} - } else { - rtm.IncomingEvents <- SlackEvent{"error", ack.Error} - } - + rtm.handleAck(rawEvent) case "hello": rtm.IncomingEvents <- SlackEvent{"hello", &HelloEvent{}} - case "pong": - pong := Pong{} - if err = json.Unmarshal(event, &pong); err != nil { - rtm.Debugln("RTM Error unmarshalling 'pong' event:", err) - rtm.Debugln(" -> Erroneous 'ping' event:", string(event)) - return - } - - rtm.mutex.Lock() - latency := time.Since(rtm.pings[pong.ReplyTo]) - rtm.mutex.Unlock() - - rtm.IncomingEvents <- SlackEvent{"latency-report", &LatencyReport{Value: latency}} - + rtm.handlePong(rawEvent) default: - if v, ok := eventMapping[em.Type]; ok { - t := reflect.TypeOf(v) - recvEvent := reflect.New(t).Interface() - - err := json.Unmarshal(event, recvEvent) - if err != nil { - rtm.Debugf("RTM Error unmarshalling %q event: %s", em.Type, err) - rtm.Debugf(" -> Erroneous %q event: %s", em.Type, string(event)) - rtm.IncomingEvents <- SlackEvent{"unmarshalling-error", &UnmarshallingErrorEvent{err}} - return - } - - rtm.IncomingEvents <- SlackEvent{em.Type, recvEvent} - } else { - rtm.Debugf("RTM Error, received unmapped event %q: %s\n", em.Type, string(event)) - err := fmt.Errorf("RTM Error, received unmapped event %q: %s\n", em.Type, string(event)) - rtm.IncomingEvents <- SlackEvent{"unmarshalling-error", &UnmarshallingErrorEvent{err}} - } - } -} - + rtm.handleEvent(event.Type, rawEvent) + } +} + +// handleAck handles an incoming 'ACK' message. +func (rtm *RTM) handleAck(event json.RawMessage) { + ack := &AckMessage{} + if err := json.Unmarshal(event, ack); err != nil { + rtm.Debugln("RTM Error unmarshalling 'ack' event:", err) + rtm.Debugln(" -> Erroneous 'ack' event:", string(event)) + return + } + if ack.Ok { + rtm.IncomingEvents <- SlackEvent{"ack", ack} + } else { + rtm.IncomingEvents <- SlackEvent{"ack_error", &AckErrorEvent{ack.Error}} + } +} + +// handlePong handles an incoming 'PONG' message which should be in response to +// a previously sent 'PING' message. This is then used to compute the +// connection's latency. +func (rtm *RTM) handlePong(event json.RawMessage) { + pong := &Pong{} + if err := json.Unmarshal(event, pong); err != nil { + rtm.Debugln("RTM Error unmarshalling 'pong' event:", err) + rtm.Debugln(" -> Erroneous 'ping' event:", string(event)) + return + } + rtm.mutex.Lock() + defer rtm.mutex.Unlock() + if pingTime, exists := rtm.pings[pong.ReplyTo]; exists { + latency := time.Since(pingTime) + rtm.IncomingEvents <- SlackEvent{"latency_report", &LatencyReport{Value: latency}} + delete(rtm.pings, pong.ReplyTo) + } else { + rtm.Debugln("RTM Error - unmatched 'pong' event:", string(event)) + } +} + +// handleEvent is the "default" response to an event that does not have a +// special case. It matches the command's name to a mapping of defined events +// and then sends the corresponding event struct to the IncomingEvents channel. +// If the event type is not found or the event cannot be unmarshalled into the +// correct struct then this sends an UnmarshallingErrorEvent to the +// IncomingEvents channel. +func (rtm *RTM) handleEvent(typeStr string, event json.RawMessage) { + v, exists := eventMapping[typeStr] + if !exists { + rtm.Debugf("RTM Error, received unmapped event %q: %s\n", typeStr, string(event)) + err := fmt.Errorf("RTM Error: Received unmapped event %q: %s\n", typeStr, string(event)) + rtm.IncomingEvents <- SlackEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}} + return + } + t := reflect.TypeOf(v) + recvEvent := reflect.New(t).Interface() + err := json.Unmarshal(event, recvEvent) + if err != nil { + rtm.Debugf("RTM Error, received unmapped event %q: %s\n", typeStr, string(event)) + err := fmt.Errorf("RTM Error: Could not unmarshall event %q: %s\n", typeStr, string(event)) + rtm.IncomingEvents <- SlackEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}} + return + } + rtm.IncomingEvents <- SlackEvent{typeStr, recvEvent} +} + +// eventMapping holds a mapping of event names to their corresponding struct +// implementations. The structs should be instances of the unmarshalling +// target for the matching event type. var eventMapping = map[string]interface{}{ "message": MessageEvent{}, "presence_change": PresenceChangeEvent{},