diff --git a/websocket.go b/websocket.go index c35404b..9480794 100644 --- a/websocket.go +++ b/websocket.go @@ -1,6 +1,7 @@ package slack import ( + "errors" "log" "sync" "time" @@ -21,6 +22,8 @@ conn *websocket.Conn IncomingEvents chan SlackEvent outgoingMessages chan OutgoingMessage + keepRunning chan bool + isRunning bool // Client is the main API, embedded Client @@ -38,13 +41,17 @@ pings: make(map[int]time.Time), IncomingEvents: make(chan SlackEvent, 50), outgoingMessages: make(chan OutgoingMessage, 20), + keepRunning: make(chan bool), + isRunning: true, } } // Disconnect and wait, blocking until a successful disconnection. func (rtm *RTM) Disconnect() error { - log.Println("RTM::Disconnect not implemented!") - return nil + if !rtm.isRunning { + return errors.New("Invalid call to Disconnect - Slack API is already disconnected") + } + return rtm.killConnection(true) } // Reconnect only makes sense if you've successfully disconnectd with Disconnect(). diff --git a/websocket_internals.go b/websocket_internals.go index b73c8e2..9edea0a 100644 --- a/websocket_internals.go +++ b/websocket_internals.go @@ -15,10 +15,20 @@ ConnectionCount int } -type DisconnectedEvent struct{} +type DisconnectedEvent struct { + Intentional bool +} type LatencyReport struct { Value time.Duration } type InvalidAuthEvent struct{} + +type UnmarshallingErrorEvent struct { + ErrorObj error +} + +func (u UnmarshallingErrorEvent) Error() string { + return u.ErrorObj.Error() +} diff --git a/websocket_managed_conn.go b/websocket_managed_conn.go index 319ad1a..50e8472 100644 --- a/websocket_managed_conn.go +++ b/websocket_managed_conn.go @@ -70,22 +70,28 @@ Info: info, }} - killCh := make(chan bool, 3) connErrors := make(chan error, 10) // in case we get many such errors - go rtm.keepalive(30*time.Second, conn, killCh, connErrors) - go rtm.handleIncomingEvents(conn, killCh, connErrors) - go rtm.handleOutgoingMessages(conn, killCh, connErrors) + go rtm.keepalive(30*time.Second, conn, connErrors) + go rtm.handleIncomingEvents(conn, connErrors) + go rtm.handleOutgoingMessages(conn, connErrors) // Here, block and await for disconnection, if it ever happens. err = <-connErrors rtm.Debugln("RTM connection error:", err) - rtm.IncomingEvents <- SlackEvent{"disconnected", &DisconnectedEvent{}} - killCh <- true // 3 child go-routines - killCh <- true - killCh <- true - } + rtm.killConnection(false) + } +} + +func (rtm *RTM) killConnection(intentional bool) error { + rtm.mutex.Lock() + rtm.IncomingEvents <- SlackEvent{"disconnected", &DisconnectedEvent{}} + close(rtm.keepRunning) + err := rtm.conn.Close() + rtm.isRunning = false + rtm.mutex.Unlock() + return err } func (rtm *RTM) startRTMAndDial() (*Info, *websocket.Conn, error) { @@ -101,13 +107,13 @@ return info, conn, err } -func (rtm *RTM) keepalive(interval time.Duration, conn *websocket.Conn, killCh chan bool, errors chan error) { +func (rtm *RTM) keepalive(interval time.Duration, conn *websocket.Conn, errors chan error) { ticker := time.NewTicker(interval) defer ticker.Stop() for { select { - case <-killCh: + case <-rtm.keepRunning: return case <-ticker.C: rtm.ping(conn, errors) @@ -129,13 +135,13 @@ } } -func (rtm *RTM) handleOutgoingMessages(conn *websocket.Conn, killCh chan bool, errors chan error) { +func (rtm *RTM) handleOutgoingMessages(conn *websocket.Conn, errors chan error) { // we pass "conn" in case we do a reconnection, in that case we'll // have a new `conn` even though we're dealing with the same // incoming and outgoing channels for messages/events. for { select { - case <-killCh: + case <-rtm.keepRunning: return case msg := <-rtm.outgoingMessages: @@ -152,11 +158,11 @@ } } -func (rtm *RTM) handleIncomingEvents(conn *websocket.Conn, killCh chan bool, errors chan error) { +func (rtm *RTM) handleIncomingEvents(conn *websocket.Conn, errors chan error) { for { select { - case <-killCh: + case <-rtm.keepRunning: return default: } @@ -234,12 +240,15 @@ if err != nil { rtm.Debugf("RTM Error unmarshalling %q event: %s", em.Type, err) rtm.Debugf(" -> Erroneous %q event: %s", em.Type, string(event)) + rtm.IncomingEvents <- SlackEvent{"unmarshalling-error", &UnmarshallingErrorEvent{err}} return } rtm.IncomingEvents <- SlackEvent{em.Type, recvEvent} } else { rtm.Debugf("RTM Error, received unmapped event %q: %s\n", em.Type, string(event)) + err := fmt.Errorf("RTM Error, received unmapped event %q: %s\n", em.Type, string(event)) + rtm.IncomingEvents <- SlackEvent{"unmarshalling-error", &UnmarshallingErrorEvent{err}} } } }