69 | 69 |
Info: info,
|
70 | 70 |
}}
|
71 | 71 |
|
72 | |
killCh := make(chan bool, 3)
|
73 | 72 |
connErrors := make(chan error, 10) // in case we get many such errors
|
74 | 73 |
|
75 | |
go rtm.keepalive(30*time.Second, conn, killCh, connErrors)
|
76 | |
go rtm.handleIncomingEvents(conn, killCh, connErrors)
|
77 | |
go rtm.handleOutgoingMessages(conn, killCh, connErrors)
|
|
74 |
go rtm.keepalive(30*time.Second, conn, connErrors)
|
|
75 |
go rtm.handleIncomingEvents(conn, connErrors)
|
|
76 |
go rtm.handleOutgoingMessages(conn, connErrors)
|
78 | 77 |
|
79 | 78 |
// Here, block and await for disconnection, if it ever happens.
|
80 | 79 |
err = <-connErrors
|
81 | 80 |
|
82 | 81 |
rtm.Debugln("RTM connection error:", err)
|
83 | |
rtm.IncomingEvents <- SlackEvent{"disconnected", &DisconnectedEvent{}}
|
84 | |
killCh <- true // 3 child go-routines
|
85 | |
killCh <- true
|
86 | |
killCh <- true
|
87 | |
}
|
|
82 |
rtm.killConnection(false)
|
|
83 |
}
|
|
84 |
}
|
|
85 |
|
|
86 |
func (rtm *RTM) killConnection(intentional bool) error {
|
|
87 |
rtm.mutex.Lock()
|
|
88 |
rtm.IncomingEvents <- SlackEvent{"disconnected", &DisconnectedEvent{}}
|
|
89 |
close(rtm.keepRunning)
|
|
90 |
err := rtm.conn.Close()
|
|
91 |
rtm.isRunning = false
|
|
92 |
rtm.mutex.Unlock()
|
|
93 |
return err
|
88 | 94 |
}
|
89 | 95 |
|
90 | 96 |
func (rtm *RTM) startRTMAndDial() (*Info, *websocket.Conn, error) {
|
|
100 | 106 |
return info, conn, err
|
101 | 107 |
}
|
102 | 108 |
|
103 | |
func (rtm *RTM) keepalive(interval time.Duration, conn *websocket.Conn, killCh chan bool, errors chan error) {
|
|
109 |
func (rtm *RTM) keepalive(interval time.Duration, conn *websocket.Conn, errors chan error) {
|
104 | 110 |
ticker := time.NewTicker(interval)
|
105 | 111 |
defer ticker.Stop()
|
106 | 112 |
|
107 | 113 |
for {
|
108 | 114 |
select {
|
109 | |
case <-killCh:
|
|
115 |
case <-rtm.keepRunning:
|
110 | 116 |
return
|
111 | 117 |
case <-ticker.C:
|
112 | 118 |
rtm.ping(conn, errors)
|
|
128 | 134 |
}
|
129 | 135 |
}
|
130 | 136 |
|
131 | |
func (rtm *RTM) handleOutgoingMessages(conn *websocket.Conn, killCh chan bool, errors chan error) {
|
|
137 |
func (rtm *RTM) handleOutgoingMessages(conn *websocket.Conn, errors chan error) {
|
132 | 138 |
// we pass "conn" in case we do a reconnection, in that case we'll
|
133 | 139 |
// have a new `conn` even though we're dealing with the same
|
134 | 140 |
// incoming and outgoing channels for messages/events.
|
135 | 141 |
for {
|
136 | 142 |
select {
|
137 | |
case <-killCh:
|
|
143 |
case <-rtm.keepRunning:
|
138 | 144 |
return
|
139 | 145 |
|
140 | 146 |
case msg := <-rtm.outgoingMessages:
|
|
151 | 157 |
}
|
152 | 158 |
}
|
153 | 159 |
|
154 | |
func (rtm *RTM) handleIncomingEvents(conn *websocket.Conn, killCh chan bool, errors chan error) {
|
|
160 |
func (rtm *RTM) handleIncomingEvents(conn *websocket.Conn, errors chan error) {
|
155 | 161 |
for {
|
156 | 162 |
|
157 | 163 |
select {
|
158 | |
case <-killCh:
|
|
164 |
case <-rtm.keepRunning:
|
159 | 165 |
return
|
160 | 166 |
default:
|
161 | 167 |
}
|
|
233 | 239 |
if err != nil {
|
234 | 240 |
rtm.Debugf("RTM Error unmarshalling %q event: %s", em.Type, err)
|
235 | 241 |
rtm.Debugf(" -> Erroneous %q event: %s", em.Type, string(event))
|
|
242 |
rtm.IncomingEvents <- SlackEvent{"unmarshalling-error", &UnmarshallingErrorEvent{err}}
|
236 | 243 |
return
|
237 | 244 |
}
|
238 | 245 |
|
239 | 246 |
rtm.IncomingEvents <- SlackEvent{em.Type, recvEvent}
|
240 | 247 |
} else {
|
241 | 248 |
rtm.Debugf("RTM Error, received unmapped event %q: %s\n", em.Type, string(event))
|
|
249 |
err := fmt.Errorf("RTM Error, received unmapped event %q: %s\n", em.Type, string(event))
|
|
250 |
rtm.IncomingEvents <- SlackEvent{"unmarshalling-error", &UnmarshallingErrorEvent{err}}
|
242 | 251 |
}
|
243 | 252 |
}
|
244 | 253 |
}
|