1 | 1 |
|
2 | 2 |
import (
|
3 | 3 |
"encoding/json"
|
|
4 |
"errors"
|
4 | 5 |
"fmt"
|
5 | 6 |
"io"
|
6 | |
"log"
|
7 | 7 |
"reflect"
|
8 | 8 |
"time"
|
9 | 9 |
|
10 | 10 |
"golang.org/x/net/websocket"
|
11 | 11 |
)
|
12 | 12 |
|
13 | |
// ManageConnection is a long-running goroutine that handles
|
14 | |
// reconnections and piping messages back and to `rtm.IncomingEvents`
|
15 | |
// and `rtm.OutgoingMessages`.
|
16 | |
//
|
17 | |
// Usage would look like:
|
18 | |
//
|
19 | |
// bot := slack.New("my-token")
|
20 | |
// rtm := bot.NewRTM() // check err
|
21 | |
// setupYourHandlers(rtm.IncomingEvents, rtm.OutgoingMessages)
|
22 | |
// rtm.ManageConnection()
|
23 | |
//
|
|
13 |
// ManageConnection can be called on a Slack RTM instance returned by the
|
|
14 |
// NewRTM method. It will connect to the slack RTM API and handle all incoming
|
|
15 |
// and outgoing events. If a connection fails then it will attempt to reconnect
|
|
16 |
// and will notify any listeners through an error event on the IncomingEvents
|
|
17 |
// channel.
|
|
18 |
//
|
|
19 |
// This detects failed and closed connections through the RTM's keepRunning
|
|
20 |
// channel. Once this channel is closed or has something sent to it, this will
|
|
21 |
// open a lock on the RTM's mutex and check if the disconnect was intentional
|
|
22 |
// or not. If it was not then it attempts to reconnect.
|
|
23 |
//
|
|
24 |
// The defined error events are located in websocket_internals.go.
|
24 | 25 |
func (rtm *RTM) ManageConnection() {
|
|
26 |
var connectionCount int
|
|
27 |
for {
|
|
28 |
// open a lock - we want to close this before returning from the
|
|
29 |
// function so we won't defer the mutex's close therefore we MUST
|
|
30 |
// release the lock before returning on an error!
|
|
31 |
rtm.mutex.Lock()
|
|
32 |
connectionCount++
|
|
33 |
// start trying to connect
|
|
34 |
// the returned err is already passed onto the IncomingEvents channel
|
|
35 |
info, conn, err := rtm.connect(connectionCount)
|
|
36 |
// if err != nil then the connection is sucessful
|
|
37 |
// otherwise we need to send a Disconnected event
|
|
38 |
if err != nil {
|
|
39 |
rtm.IncomingEvents <- SlackEvent{"disconnected", &DisconnectedEvent{
|
|
40 |
Intentional: false,
|
|
41 |
}}
|
|
42 |
rtm.mutex.Unlock()
|
|
43 |
return
|
|
44 |
}
|
|
45 |
rtm.info = info
|
|
46 |
rtm.IncomingEvents <- SlackEvent{"connected", &ConnectedEvent{
|
|
47 |
ConnectionCount: connectionCount,
|
|
48 |
Info: info,
|
|
49 |
}}
|
|
50 |
// set the connection object and unlock the mutex
|
|
51 |
rtm.conn = conn
|
|
52 |
rtm.isConnected = true
|
|
53 |
rtm.keepRunning = make(chan bool)
|
|
54 |
rtm.mutex.Unlock()
|
|
55 |
|
|
56 |
// we're now connected (or have failed fatally) so we can set up
|
|
57 |
// listeners and monitor for stopping
|
|
58 |
go rtm.sendKeepAlive(30 * time.Second)
|
|
59 |
go rtm.handleIncomingEvents()
|
|
60 |
go rtm.handleOutgoingMessages()
|
|
61 |
|
|
62 |
// should return only once we are disconnected
|
|
63 |
<-rtm.keepRunning
|
|
64 |
|
|
65 |
// after being disconnected we need to check if it was intentional
|
|
66 |
// if not then we should try to reconnect
|
|
67 |
rtm.mutex.Lock()
|
|
68 |
intentional := rtm.wasIntentional
|
|
69 |
rtm.mutex.Unlock()
|
|
70 |
if intentional {
|
|
71 |
return
|
|
72 |
}
|
|
73 |
// else continue and run the loop again to connect
|
|
74 |
}
|
|
75 |
}
|
|
76 |
|
|
77 |
// connect attempts to connect to the slack websocket API. It handles any
|
|
78 |
// errors that occur while connecting and will return once a connection
|
|
79 |
// has been successfully opened.
|
|
80 |
func (rtm *RTM) connect(connectionCount int) (*Info, *websocket.Conn, error) {
|
|
81 |
// used to provide exponential backoff wait time with jitter before trying
|
|
82 |
// to connect to slack again
|
25 | 83 |
boff := &backoff{
|
26 | 84 |
Min: 100 * time.Millisecond,
|
27 | 85 |
Max: 5 * time.Minute,
|
28 | 86 |
Factor: 2,
|
29 | 87 |
Jitter: true,
|
30 | 88 |
}
|
31 | |
connectionCount := 0
|
32 | 89 |
|
33 | 90 |
for {
|
34 | |
var conn *websocket.Conn // use as first
|
35 | |
var err error
|
36 | |
var info *Info
|
37 | |
|
38 | |
connectionCount++
|
39 | |
|
40 | |
attempts := 1
|
41 | |
boff.Reset()
|
42 | |
for {
|
43 | |
rtm.IncomingEvents <- SlackEvent{"connecting", &ConnectingEvent{
|
44 | |
Attempt: attempts,
|
45 | |
ConnectionCount: connectionCount,
|
46 | |
}}
|
47 | |
|
48 | |
info, conn, err = rtm.startRTMAndDial()
|
49 | |
if err == nil {
|
50 | |
break // connected
|
51 | |
} else if sErr, ok := err.(*SlackWebError); ok {
|
52 | |
if sErr.Error() == "invalid_auth" {
|
53 | |
rtm.IncomingEvents <- SlackEvent{"invalid_auth", &InvalidAuthEvent{}}
|
54 | |
return
|
55 | |
}
|
56 | |
} else {
|
57 | |
log.Println(err.Error())
|
58 | |
}
|
59 | |
|
60 | |
dur := boff.Duration()
|
61 | |
rtm.Debugf("reconnection %d failed: %s", attempts, err)
|
62 | |
rtm.Debugln(" -> reconnecting in", dur)
|
63 | |
attempts++
|
64 | |
time.Sleep(dur)
|
65 | |
}
|
66 | |
|
67 | |
rtm.IncomingEvents <- SlackEvent{"connected", &ConnectedEvent{
|
|
91 |
// send connecting event
|
|
92 |
rtm.IncomingEvents <- SlackEvent{"connecting", &ConnectingEvent{
|
|
93 |
Attempt: boff.attempts + 1,
|
68 | 94 |
ConnectionCount: connectionCount,
|
69 | |
Info: info,
|
70 | |
}}
|
71 | |
|
72 | |
connErrors := make(chan error, 10) // in case we get many such errors
|
73 | |
|
74 | |
go rtm.keepalive(30*time.Second, conn, connErrors)
|
75 | |
go rtm.handleIncomingEvents(conn, connErrors)
|
76 | |
go rtm.handleOutgoingMessages(conn, connErrors)
|
77 | |
|
78 | |
// Here, block and await for disconnection, if it ever happens.
|
79 | |
err = <-connErrors
|
80 | |
|
81 | |
rtm.Debugln("RTM connection error:", err)
|
82 | |
rtm.killConnection(false)
|
83 | |
}
|
84 | |
}
|
85 | |
|
86 | |
func (rtm *RTM) killConnection(intentional bool) error {
|
87 | |
rtm.mutex.Lock()
|
88 | |
rtm.IncomingEvents <- SlackEvent{"disconnected", &DisconnectedEvent{}}
|
89 | |
close(rtm.keepRunning)
|
90 | |
err := rtm.conn.Close()
|
91 | |
rtm.isRunning = false
|
92 | |
rtm.mutex.Unlock()
|
93 | |
return err
|
94 | |
}
|
95 | |
|
|
95 |
}}
|
|
96 |
// attempt to start the connection
|
|
97 |
info, conn, err := rtm.startRTMAndDial()
|
|
98 |
if err == nil {
|
|
99 |
return info, conn, nil
|
|
100 |
}
|
|
101 |
// check for fatal errors - currently only invalid_auth
|
|
102 |
if sErr, ok := err.(*SlackWebError); ok && sErr.Error() == "invalid_auth" {
|
|
103 |
rtm.IncomingEvents <- SlackEvent{"invalid_auth", &InvalidAuthEvent{}}
|
|
104 |
return nil, nil, sErr
|
|
105 |
}
|
|
106 |
// any other errors are treated as recoverable and we try again after
|
|
107 |
// sending the event along the IncomingEvents channel
|
|
108 |
rtm.IncomingEvents <- SlackEvent{"connection_error", &ConnectionErrorEvent{
|
|
109 |
Attempt: boff.attempts,
|
|
110 |
ErrorObj: err,
|
|
111 |
}}
|
|
112 |
// get time we should wait before attempting to connect again
|
|
113 |
dur := boff.Duration()
|
|
114 |
rtm.Debugf("reconnection %d failed: %s", boff.attempts+1, err)
|
|
115 |
rtm.Debugln(" -> reconnecting in", dur)
|
|
116 |
time.Sleep(dur)
|
|
117 |
}
|
|
118 |
}
|
|
119 |
|
|
120 |
// startRTMAndDial attemps to connect to the slack websocket. It returns the
|
|
121 |
// full information returned by the "rtm.start" method on the slack API.
|
96 | 122 |
func (rtm *RTM) startRTMAndDial() (*Info, *websocket.Conn, error) {
|
97 | 123 |
info, url, err := rtm.StartRTM()
|
98 | 124 |
if err != nil {
|
|
106 | 132 |
return info, conn, err
|
107 | 133 |
}
|
108 | 134 |
|
109 | |
func (rtm *RTM) keepalive(interval time.Duration, conn *websocket.Conn, errors chan error) {
|
|
135 |
// killConnection stops the websocket connection and signals to all goroutines
|
|
136 |
// that they should cease listening to the connection for events.
|
|
137 |
//
|
|
138 |
// This requires that a lock on the RTM's mutex is held before being called.
|
|
139 |
func (rtm *RTM) killConnection(intentional bool) error {
|
|
140 |
rtm.Debugln("killing connection")
|
|
141 |
if rtm.isConnected {
|
|
142 |
close(rtm.keepRunning)
|
|
143 |
}
|
|
144 |
rtm.isConnected = false
|
|
145 |
rtm.wasIntentional = intentional
|
|
146 |
err := rtm.conn.Close()
|
|
147 |
rtm.IncomingEvents <- SlackEvent{"disconnected", &DisconnectedEvent{intentional}}
|
|
148 |
return err
|
|
149 |
}
|
|
150 |
|
|
151 |
// handleOutgoingMessages listens on the outgoingMessages channel for any
|
|
152 |
// queued messages that have not been sent.
|
|
153 |
//
|
|
154 |
// This will stop executing once the RTM's keepRunning channel has been closed
|
|
155 |
// or has anything sent to it.
|
|
156 |
func (rtm *RTM) handleOutgoingMessages() {
|
|
157 |
for {
|
|
158 |
select {
|
|
159 |
// catch "stop" signal on channel close
|
|
160 |
case <-rtm.keepRunning:
|
|
161 |
return
|
|
162 |
// listen for messages that need to be sent
|
|
163 |
case msg := <-rtm.outgoingMessages:
|
|
164 |
rtm.sendOutgoingMessage(msg)
|
|
165 |
}
|
|
166 |
}
|
|
167 |
}
|
|
168 |
|
|
169 |
// sendOutgoingMessage sends the given OutgoingMessage to the slack websocket
|
|
170 |
// after acquiring a lock on the RTM's mutex.
|
|
171 |
//
|
|
172 |
// It does not currently detect if a outgoing message fails due to a disconnect
|
|
173 |
// and instead lets a future failed 'PING' detect the failed connection.
|
|
174 |
func (rtm *RTM) sendOutgoingMessage(msg OutgoingMessage) {
|
|
175 |
rtm.mutex.Lock()
|
|
176 |
defer rtm.mutex.Unlock()
|
|
177 |
rtm.Debugln("Sending message:", msg)
|
|
178 |
if !rtm.isConnected {
|
|
179 |
// check for race condition of connection closed after lock
|
|
180 |
// obtained
|
|
181 |
rtm.IncomingEvents <- SlackEvent{"outgoing_error", &OutgoingErrorEvent{
|
|
182 |
Message: msg,
|
|
183 |
ErrorObj: errors.New("Cannot send message - API is not connected"),
|
|
184 |
}}
|
|
185 |
return
|
|
186 |
}
|
|
187 |
err := websocket.JSON.Send(rtm.conn, msg)
|
|
188 |
if err != nil {
|
|
189 |
rtm.IncomingEvents <- SlackEvent{"outgoing_error", &OutgoingErrorEvent{
|
|
190 |
Message: msg,
|
|
191 |
ErrorObj: err,
|
|
192 |
}}
|
|
193 |
}
|
|
194 |
}
|
|
195 |
|
|
196 |
// sendKeepAlive is a blocking call that sends a 'PING' message once for every
|
|
197 |
// duration elapsed.
|
|
198 |
//
|
|
199 |
// This will stop executing once the RTM's keepRunning channel has been closed
|
|
200 |
// or has anything sent to it.
|
|
201 |
func (rtm *RTM) sendKeepAlive(interval time.Duration) {
|
110 | 202 |
ticker := time.NewTicker(interval)
|
111 | 203 |
defer ticker.Stop()
|
112 | 204 |
|
113 | 205 |
for {
|
114 | 206 |
select {
|
|
207 |
// catch "stop" signal on channel close
|
115 | 208 |
case <-rtm.keepRunning:
|
116 | 209 |
return
|
|
210 |
// send pings on ticker interval
|
117 | 211 |
case <-ticker.C:
|
118 | |
rtm.ping(conn, errors)
|
119 | |
}
|
120 | |
}
|
121 | |
}
|
122 | |
|
123 | |
func (rtm *RTM) ping(conn *websocket.Conn, errors chan error) {
|
|
212 |
go rtm.ping()
|
|
213 |
}
|
|
214 |
}
|
|
215 |
}
|
|
216 |
|
|
217 |
// ping sends a 'PING' message to the RTM's websocket. If the 'PING' message
|
|
218 |
// fails to send then this calls killConnection to signal an unintentional
|
|
219 |
// websocket disconnect.
|
|
220 |
//
|
|
221 |
// This does not handle incoming 'PONG' responses but does store the time of
|
|
222 |
// each successful 'PING' send so latency can be detected upon a 'PONG'
|
|
223 |
// response.
|
|
224 |
func (rtm *RTM) ping() {
|
124 | 225 |
rtm.mutex.Lock()
|
125 | 226 |
defer rtm.mutex.Unlock()
|
126 | |
|
|
227 |
rtm.Debugln("Sending PING")
|
|
228 |
if !rtm.isConnected {
|
|
229 |
// it's possible that the API has disconnected while we were waiting
|
|
230 |
// for a lock on the mutex
|
|
231 |
rtm.Debugln("Cannot send ping - API is not connected")
|
|
232 |
// no need to send an error event since it really isn't an error
|
|
233 |
return
|
|
234 |
}
|
127 | 235 |
rtm.messageID++
|
128 | 236 |
rtm.pings[rtm.messageID] = time.Now()
|
129 | 237 |
|
130 | |
msg := &Ping{ID: rtm.messageID, Type: "ping"}
|
131 | |
rtm.Debugln("Sending PING")
|
132 | |
if err := websocket.JSON.Send(conn, msg); err != nil {
|
133 | |
errors <- fmt.Errorf("error sending 'ping': %s", err)
|
134 | |
}
|
135 | |
}
|
136 | |
|
137 | |
func (rtm *RTM) handleOutgoingMessages(conn *websocket.Conn, errors chan error) {
|
138 | |
// we pass "conn" in case we do a reconnection, in that case we'll
|
139 | |
// have a new `conn` even though we're dealing with the same
|
140 | |
// incoming and outgoing channels for messages/events.
|
|
238 |
msg := &Ping{Id: rtm.messageID, Type: "ping"}
|
|
239 |
err := websocket.JSON.Send(rtm.conn, msg)
|
|
240 |
if err != nil {
|
|
241 |
rtm.Debugf("RTM Error sending 'PING': %s", err.Error())
|
|
242 |
rtm.killConnection(false)
|
|
243 |
}
|
|
244 |
}
|
|
245 |
|
|
246 |
// handleIncomingEvents monitors the RTM's opened websocket for any incoming
|
|
247 |
// events.
|
|
248 |
//
|
|
249 |
// This will stop executing once the RTM's keepRunning channel has been closed
|
|
250 |
// or has anything sent to it.
|
|
251 |
func (rtm *RTM) handleIncomingEvents() {
|
141 | 252 |
for {
|
|
253 |
// non-blocking listen to see if channel is closed
|
142 | 254 |
select {
|
143 | |
case <-rtm.keepRunning:
|
144 | |
return
|
145 | |
|
146 | |
case msg := <-rtm.outgoingMessages:
|
147 | |
rtm.Debugln("Sending message:", msg)
|
148 | |
|
149 | |
rtm.mutex.Lock()
|
150 | |
err := websocket.JSON.Send(conn, msg)
|
151 | |
rtm.mutex.Unlock()
|
152 | |
if err != nil {
|
153 | |
errors <- fmt.Errorf("error sending 'message': %s", err)
|
154 | |
return
|
155 | |
}
|
156 | |
}
|
157 | |
}
|
158 | |
}
|
159 | |
|
160 | |
func (rtm *RTM) handleIncomingEvents(conn *websocket.Conn, errors chan error) {
|
161 | |
for {
|
162 | |
|
163 | |
select {
|
|
255 |
// catch "stop" signal on channel close
|
164 | 256 |
case <-rtm.keepRunning:
|
165 | 257 |
return
|
166 | 258 |
default:
|
167 | |
}
|
168 | |
|
169 | |
event := json.RawMessage{}
|
170 | |
err := websocket.JSON.Receive(conn, &event)
|
171 | |
if err == io.EOF {
|
172 | |
rtm.Debugln("GOT EOF, are we killing ??")
|
173 | |
} else if err != nil {
|
174 | |
errors <- err
|
175 | |
return
|
176 | |
}
|
177 | |
if len(event) == 0 {
|
178 | |
rtm.Debugln("Event Empty. WTF?")
|
179 | |
continue
|
180 | |
}
|
181 | |
|
182 | |
rtm.Debugln("Incoming event:", string(event[:]))
|
183 | |
|
184 | |
rtm.handleEvent(event)
|
185 | |
|
186 | |
// FIXME: please I hope we don't need to sleep!!!
|
187 | |
//time.Sleep(time.Millisecond * 500)
|
188 | |
}
|
189 | |
}
|
190 | |
|
191 | |
func (rtm *RTM) handleEvent(event json.RawMessage) {
|
192 | |
em := Event{}
|
193 | |
err := json.Unmarshal(event, &em)
|
194 | |
if err != nil {
|
195 | |
rtm.Debugln("RTM Error unmarshalling event:", err)
|
196 | |
rtm.Debugln(" -> Erroneous event:", string(event))
|
197 | |
return
|
198 | |
}
|
199 | |
|
200 | |
switch em.Type {
|
|
259 |
rtm.receiveIncomingEvent()
|
|
260 |
}
|
|
261 |
}
|
|
262 |
}
|
|
263 |
|
|
264 |
// receiveIncomingEvent attempts to receive an event from the RTM's websocket.
|
|
265 |
// This will block until a frame is available from the websocket.
|
|
266 |
func (rtm *RTM) receiveIncomingEvent() {
|
|
267 |
event := json.RawMessage{}
|
|
268 |
err := websocket.JSON.Receive(rtm.conn, &event)
|
|
269 |
if err == io.EOF {
|
|
270 |
// EOF's don't seem to signify a failed connection so instead we ignore
|
|
271 |
// them here and detect a failed connection upon attempting to send a
|
|
272 |
// 'PING' message
|
|
273 |
|
|
274 |
// trigger a 'PING' to detect pontential websocket disconnect
|
|
275 |
go rtm.ping()
|
|
276 |
return
|
|
277 |
} else if err != nil {
|
|
278 |
// TODO detect if this is a fatal error
|
|
279 |
rtm.IncomingEvents <- SlackEvent{"incoming_error", &IncomingEventError{
|
|
280 |
ErrorObj: err,
|
|
281 |
}}
|
|
282 |
return
|
|
283 |
} else if len(event) == 0 {
|
|
284 |
rtm.Debugln("Received empty event")
|
|
285 |
return
|
|
286 |
}
|
|
287 |
rtm.Debugln("Incoming Event:", string(event[:]))
|
|
288 |
rtm.handleRawEvent(event)
|
|
289 |
}
|
|
290 |
|
|
291 |
// handleEOF should be called after receiving an EOF on the RTM's websocket.
|
|
292 |
// It calls the internal killConnection method if the RTM was still considered
|
|
293 |
// to be connected. If it is not considered connected then it is because
|
|
294 |
// the killConnection method has already been called elsewhere.
|
|
295 |
func (rtm *RTM) handleEOF() {
|
|
296 |
rtm.Debugln("Received EOF on websocket")
|
|
297 |
// we need a lock in order to access isConnected and to call killConnection
|
|
298 |
rtm.mutex.Lock()
|
|
299 |
defer rtm.mutex.Unlock()
|
|
300 |
// if isConnected is true then we didn't expect the EOF event
|
|
301 |
// so for it to be intentional we need to have it be false
|
|
302 |
if rtm.isConnected {
|
|
303 |
// try to kill the connection - this should fail silently if the
|
|
304 |
// API has already disconnected
|
|
305 |
_ = rtm.killConnection(false)
|
|
306 |
}
|
|
307 |
}
|
|
308 |
|
|
309 |
// handleRawEvent takes a raw JSON message received from the slack websocket
|
|
310 |
// and handles the encoded event.
|
|
311 |
func (rtm *RTM) handleRawEvent(rawEvent json.RawMessage) {
|
|
312 |
event := &Event{}
|
|
313 |
err := json.Unmarshal(rawEvent, event)
|
|
314 |
if err != nil {
|
|
315 |
rtm.IncomingEvents <- SlackEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}}
|
|
316 |
return
|
|
317 |
}
|
|
318 |
switch event.Type {
|
201 | 319 |
case "":
|
202 | |
// try ok
|
203 | |
ack := AckMessage{}
|
204 | |
if err = json.Unmarshal(event, &ack); err != nil {
|
205 | |
rtm.Debugln("RTM Error unmarshalling 'ack' event:", err)
|
206 | |
rtm.Debugln(" -> Erroneous 'ack' event:", string(event))
|
207 | |
return
|
208 | |
}
|
209 | |
|
210 | |
if ack.Ok {
|
211 | |
rtm.IncomingEvents <- SlackEvent{"ack", ack}
|
212 | |
} else {
|
213 | |
rtm.IncomingEvents <- SlackEvent{"error", ack.Error}
|
214 | |
}
|
215 | |
|
|
320 |
rtm.handleAck(rawEvent)
|
216 | 321 |
case "hello":
|
217 | 322 |
rtm.IncomingEvents <- SlackEvent{"hello", &HelloEvent{}}
|
218 | |
|
219 | 323 |
case "pong":
|
220 | |
pong := Pong{}
|
221 | |
if err = json.Unmarshal(event, &pong); err != nil {
|
222 | |
rtm.Debugln("RTM Error unmarshalling 'pong' event:", err)
|
223 | |
rtm.Debugln(" -> Erroneous 'ping' event:", string(event))
|
224 | |
return
|
225 | |
}
|
226 | |
|
227 | |
rtm.mutex.Lock()
|
228 | |
latency := time.Since(rtm.pings[pong.ReplyTo])
|
229 | |
rtm.mutex.Unlock()
|
230 | |
|
231 | |
rtm.IncomingEvents <- SlackEvent{"latency-report", &LatencyReport{Value: latency}}
|
232 | |
|
|
324 |
rtm.handlePong(rawEvent)
|
233 | 325 |
default:
|
234 | |
if v, ok := eventMapping[em.Type]; ok {
|
235 | |
t := reflect.TypeOf(v)
|
236 | |
recvEvent := reflect.New(t).Interface()
|
237 | |
|
238 | |
err := json.Unmarshal(event, recvEvent)
|
239 | |
if err != nil {
|
240 | |
rtm.Debugf("RTM Error unmarshalling %q event: %s", em.Type, err)
|
241 | |
rtm.Debugf(" -> Erroneous %q event: %s", em.Type, string(event))
|
242 | |
rtm.IncomingEvents <- SlackEvent{"unmarshalling-error", &UnmarshallingErrorEvent{err}}
|
243 | |
return
|
244 | |
}
|
245 | |
|
246 | |
rtm.IncomingEvents <- SlackEvent{em.Type, recvEvent}
|
247 | |
} else {
|
248 | |
rtm.Debugf("RTM Error, received unmapped event %q: %s\n", em.Type, string(event))
|
249 | |
err := fmt.Errorf("RTM Error, received unmapped event %q: %s\n", em.Type, string(event))
|
250 | |
rtm.IncomingEvents <- SlackEvent{"unmarshalling-error", &UnmarshallingErrorEvent{err}}
|
251 | |
}
|
252 | |
}
|
253 | |
}
|
254 | |
|
|
326 |
rtm.handleEvent(event.Type, rawEvent)
|
|
327 |
}
|
|
328 |
}
|
|
329 |
|
|
330 |
// handleAck handles an incoming 'ACK' message.
|
|
331 |
func (rtm *RTM) handleAck(event json.RawMessage) {
|
|
332 |
ack := &AckMessage{}
|
|
333 |
if err := json.Unmarshal(event, ack); err != nil {
|
|
334 |
rtm.Debugln("RTM Error unmarshalling 'ack' event:", err)
|
|
335 |
rtm.Debugln(" -> Erroneous 'ack' event:", string(event))
|
|
336 |
return
|
|
337 |
}
|
|
338 |
if ack.Ok {
|
|
339 |
rtm.IncomingEvents <- SlackEvent{"ack", ack}
|
|
340 |
} else {
|
|
341 |
rtm.IncomingEvents <- SlackEvent{"ack_error", &AckErrorEvent{ack.Error}}
|
|
342 |
}
|
|
343 |
}
|
|
344 |
|
|
345 |
// handlePong handles an incoming 'PONG' message which should be in response to
|
|
346 |
// a previously sent 'PING' message. This is then used to compute the
|
|
347 |
// connection's latency.
|
|
348 |
func (rtm *RTM) handlePong(event json.RawMessage) {
|
|
349 |
pong := &Pong{}
|
|
350 |
if err := json.Unmarshal(event, pong); err != nil {
|
|
351 |
rtm.Debugln("RTM Error unmarshalling 'pong' event:", err)
|
|
352 |
rtm.Debugln(" -> Erroneous 'ping' event:", string(event))
|
|
353 |
return
|
|
354 |
}
|
|
355 |
rtm.mutex.Lock()
|
|
356 |
defer rtm.mutex.Unlock()
|
|
357 |
if pingTime, exists := rtm.pings[pong.ReplyTo]; exists {
|
|
358 |
latency := time.Since(pingTime)
|
|
359 |
rtm.IncomingEvents <- SlackEvent{"latency_report", &LatencyReport{Value: latency}}
|
|
360 |
delete(rtm.pings, pong.ReplyTo)
|
|
361 |
} else {
|
|
362 |
rtm.Debugln("RTM Error - unmatched 'pong' event:", string(event))
|
|
363 |
}
|
|
364 |
}
|
|
365 |
|
|
366 |
// handleEvent is the "default" response to an event that does not have a
|
|
367 |
// special case. It matches the command's name to a mapping of defined events
|
|
368 |
// and then sends the corresponding event struct to the IncomingEvents channel.
|
|
369 |
// If the event type is not found or the event cannot be unmarshalled into the
|
|
370 |
// correct struct then this sends an UnmarshallingErrorEvent to the
|
|
371 |
// IncomingEvents channel.
|
|
372 |
func (rtm *RTM) handleEvent(typeStr string, event json.RawMessage) {
|
|
373 |
v, exists := eventMapping[typeStr]
|
|
374 |
if !exists {
|
|
375 |
rtm.Debugf("RTM Error, received unmapped event %q: %s\n", typeStr, string(event))
|
|
376 |
err := fmt.Errorf("RTM Error: Received unmapped event %q: %s\n", typeStr, string(event))
|
|
377 |
rtm.IncomingEvents <- SlackEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}}
|
|
378 |
return
|
|
379 |
}
|
|
380 |
t := reflect.TypeOf(v)
|
|
381 |
recvEvent := reflect.New(t).Interface()
|
|
382 |
err := json.Unmarshal(event, recvEvent)
|
|
383 |
if err != nil {
|
|
384 |
rtm.Debugf("RTM Error, received unmapped event %q: %s\n", typeStr, string(event))
|
|
385 |
err := fmt.Errorf("RTM Error: Could not unmarshall event %q: %s\n", typeStr, string(event))
|
|
386 |
rtm.IncomingEvents <- SlackEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}}
|
|
387 |
return
|
|
388 |
}
|
|
389 |
rtm.IncomingEvents <- SlackEvent{typeStr, recvEvent}
|
|
390 |
}
|
|
391 |
|
|
392 |
// eventMapping holds a mapping of event names to their corresponding struct
|
|
393 |
// implementations. The structs should be instances of the unmarshalling
|
|
394 |
// target for the matching event type.
|
255 | 395 |
var eventMapping = map[string]interface{}{
|
256 | 396 |
"message": MessageEvent{},
|
257 | 397 |
"presence_change": PresenceChangeEvent{},
|