Codebase list golang-github-nlopes-slack / d8daca0 websocket_managed_conn.go
d8daca0

Tree @d8daca0 (Download .tar.gz)

websocket_managed_conn.go @d8daca0raw · history · blame

package slack

import (
	"encoding/json"
	"fmt"
	"io"
	"reflect"
	"time"

	"golang.org/x/net/websocket"
)

// ManageConnection is a long-running goroutine that handles
// reconnections and piping messages back and to `rtm.IncomingEvents`
// and `rtm.OutgoingMessages`.
//
// Usage would look like:
//
//     bot := slack.New("my-token")
//     rtm := bot.NewRTM()  // check err
//     setupYourHandlers(rtm.IncomingEvents, rtm.OutgoingMessages)
//     rtm.ManageConnection()
//
func (rtm *RTM) ManageConnection() {
	boff := &backoff{
		Min:    100 * time.Millisecond,
		Max:    5 * time.Minute,
		Factor: 2,
		Jitter: true,
	}
	connectionCount := 0

	for {
		var conn *websocket.Conn // use as first
		var err error
		var info *Info

		connectionCount += 1

		attempts := 1
		boff.Reset()
		for {
			rtm.IncomingEvents <- SlackEvent{"connecting", &ConnectingEvent{
				Attempt:         attempts,
				ConnectionCount: connectionCount,
			}}

			info, conn, err = rtm.startRTMAndDial()
			if err == nil {
				break // connected
			}

			dur := boff.Duration()
			rtm.Debugf("reconnection %d failed: %s", attempts, err)
			rtm.Debugln(" -> reconnecting in", dur)
			attempts += 1
			time.Sleep(dur)
		}

		rtm.IncomingEvents <- SlackEvent{"connected", &ConnectedEvent{
			ConnectionCount: connectionCount,
			Info:            info,
		}}

		killCh := make(chan bool, 3)
		connErrors := make(chan error, 10) // in case we get many such errors

		go rtm.keepalive(30*time.Second, conn, killCh, connErrors)
		go rtm.handleIncomingEvents(conn, killCh, connErrors)
		go rtm.handleOutgoingMessages(conn, killCh, connErrors)

		// Here, block and await for disconnection, if it ever happens.
		err = <-connErrors

		rtm.Debugln("RTM connection error:", err)
		rtm.IncomingEvents <- SlackEvent{"disconnected", &DisconnectedEvent{}}
		killCh <- true // 3 child go-routines
		killCh <- true
		killCh <- true
	}
}

func (rtm *RTM) startRTMAndDial() (*Info, *websocket.Conn, error) {
	info, url, err := rtm.StartRTM()
	if err != nil {
		return nil, nil, err
	}

	conn, err := websocket.Dial(url, "", "http://api.slack.com")
	if err != nil {
		return nil, nil, err
	}

	return info, conn, err
}

func (rtm *RTM) keepalive(interval time.Duration, conn *websocket.Conn, killCh chan bool, errors chan error) {
	ticker := time.NewTicker(interval)
	defer ticker.Stop()

	for {
		select {
		case <-killCh:
			return
		case <-ticker.C:
			rtm.ping(conn, errors)
		}
	}
}

func (rtm *RTM) ping(conn *websocket.Conn, errors chan error) {
	rtm.mutex.Lock()
	defer rtm.mutex.Unlock()

	rtm.messageID++
	rtm.pings[rtm.messageID] = time.Now()

	msg := &Ping{Id: rtm.messageID, Type: "ping"}
	rtm.Debugln("Sending PING")
	if err := websocket.JSON.Send(conn, msg); err != nil {
		errors <- fmt.Errorf("error sending 'ping': %s", err)
	}
}

func (rtm *RTM) handleOutgoingMessages(conn *websocket.Conn, killCh chan bool, errors chan error) {
	// we pass "conn" in case we do a reconnection, in that case we'll
	// have a new `conn` even though we're dealing with the same
	// incoming and outgoing channels for messages/events.
	for {
		select {
		case <-killCh:
			return

		case msg := <-rtm.outgoingMessages:
			rtm.Debugln("Sending message:", msg)

			rtm.mutex.Lock()
			err := websocket.JSON.Send(conn, msg)
			rtm.mutex.Unlock()
			if err != nil {
				errors <- fmt.Errorf("error sending 'message': %s", err)
				return
			}
		}
	}
}

func (rtm *RTM) handleIncomingEvents(conn *websocket.Conn, killCh chan bool, errors chan error) {
	for {

		select {
		case <-killCh:
			return
		default:
		}

		event := json.RawMessage{}
		err := websocket.JSON.Receive(conn, &event)
		if err == io.EOF {
			rtm.Debugln("GOT EOF, are we killing ??")
		} else if err != nil {
			errors <- err
			return
		}
		if len(event) == 0 {
			rtm.Debugln("Event Empty. WTF?")
			continue
		}

		rtm.Debugln("Incoming event:", string(event[:]))

		rtm.handleEvent(event)

		// FIXME: please I hope we don't need to sleep!!!
		//time.Sleep(time.Millisecond * 500)
	}
}

func (rtm *RTM) handleEvent(event json.RawMessage) {
	em := Event{}
	err := json.Unmarshal(event, &em)
	if err != nil {
		rtm.Debugln("RTM Error unmarshalling event:", err)
		rtm.Debugln(" -> Erroneous event:", string(event))
		return
	}

	switch em.Type {
	case "":
		// try ok
		ack := AckMessage{}
		if err = json.Unmarshal(event, &ack); err != nil {
			rtm.Debugln("RTM Error unmarshalling 'ack' event:", err)
			rtm.Debugln(" -> Erroneous 'ack' event:", string(event))
			return
		}

		if ack.Ok {
			rtm.IncomingEvents <- SlackEvent{"ack", ack}
		} else {
			rtm.IncomingEvents <- SlackEvent{"error", ack.Error}
		}

	case "hello":
		rtm.IncomingEvents <- SlackEvent{"hello", &HelloEvent{}}

	case "pong":
		pong := Pong{}
		if err = json.Unmarshal(event, &pong); err != nil {
			rtm.Debugln("RTM Error unmarshalling 'pong' event:", err)
			rtm.Debugln(" -> Erroneous 'ping' event:", string(event))
			return
		}

		rtm.mutex.Lock()
		latency := time.Since(rtm.pings[pong.ReplyTo])
		rtm.mutex.Unlock()

		rtm.IncomingEvents <- SlackEvent{"latency-report", &LatencyReport{Value: latency}}

	default:
		for k, v := range eventMapping {
			if em.Type == k {
				t := reflect.TypeOf(v)
				recvEvent := reflect.New(t).Interface()

				err := json.Unmarshal(event, recvEvent)
				if err != nil {
					rtm.Debugf("RTM Error unmarshalling %q event: %s", em.Type, err)
					rtm.Debugf(" -> Erroneous %q event: %s", em.Type, string(event))
					return
				}

				rtm.IncomingEvents <- SlackEvent{em.Type, recvEvent}
				return
			}
		}

		rtm.Debugf("RTM Error, received unmapped event %q: %s\n", em.Type, string(event))
	}
}

var eventMapping = map[string]interface{}{
	"message":         MessageEvent{},
	"presence_change": PresenceChangeEvent{},
	"user_typing":     UserTypingEvent{},

	"channel_marked":          ChannelMarkedEvent{},
	"channel_created":         ChannelCreatedEvent{},
	"channel_joined":          ChannelJoinedEvent{},
	"channel_left":            ChannelLeftEvent{},
	"channel_deleted":         ChannelDeletedEvent{},
	"channel_rename":          ChannelRenameEvent{},
	"channel_archive":         ChannelArchiveEvent{},
	"channel_unarchive":       ChannelUnarchiveEvent{},
	"channel_history_changed": ChannelHistoryChangedEvent{},

	"im_created":         IMCreatedEvent{},
	"im_open":            IMOpenEvent{},
	"im_close":           IMCloseEvent{},
	"im_marked":          IMMarkedEvent{},
	"im_history_changed": IMHistoryChangedEvent{},

	"group_marked":          GroupMarkedEvent{},
	"group_open":            GroupOpenEvent{},
	"group_joined":          GroupJoinedEvent{},
	"group_left":            GroupLeftEvent{},
	"group_close":           GroupCloseEvent{},
	"group_rename":          GroupRenameEvent{},
	"group_archive":         GroupArchiveEvent{},
	"group_unarchive":       GroupUnarchiveEvent{},
	"group_history_changed": GroupHistoryChangedEvent{},

	"file_created":         FileCreatedEvent{},
	"file_shared":          FileSharedEvent{},
	"file_unshared":        FileUnsharedEvent{},
	"file_public":          FilePublicEvent{},
	"file_private":         FilePrivateEvent{},
	"file_change":          FileChangeEvent{},
	"file_deleted":         FileDeletedEvent{},
	"file_comment_added":   FileCommentAddedEvent{},
	"file_comment_edited":  FileCommentEditedEvent{},
	"file_comment_deleted": FileCommentDeletedEvent{},

	"star_added":   StarAddedEvent{},
	"star_removed": StarRemovedEvent{},

	"pref_change": PrefChangeEvent{},

	"team_join":              TeamJoinEvent{},
	"team_rename":            TeamRenameEvent{},
	"team_pref_change":       TeamPrefChangeEvent{},
	"team_domain_change":     TeamDomainChangeEvent{},
	"team_migration_started": TeamMigrationStartedEvent{},

	"manual_presence_change": ManualPresenceChangeEvent{},

	"user_change": UserChangeEvent{},

	"emoji_changed": EmojiChangedEvent{},

	"commands_changed": CommandsChangedEvent{},

	"email_domain_changed": EmailDomainChangedEvent{},

	"bot_added":   BotAddedEvent{},
	"bot_changed": BotChangedEvent{},

	"accounts_changed": AccountsChangedEvent{},
}