Codebase list golang-dbus / run/e3949ede-1abe-4a8c-9d05-a5f1e772a578/upstream sequential_handler.go
run/e3949ede-1abe-4a8c-9d05-a5f1e772a578/upstream

Tree @run/e3949ede-1abe-4a8c-9d05-a5f1e772a578/upstream (Download .tar.gz)

sequential_handler.go @run/e3949ede-1abe-4a8c-9d05-a5f1e772a578/upstreamraw · history · blame

package dbus

import (
	"sync"
)

// NewSequentialSignalHandler returns an instance of a new
// signal handler that guarantees sequential processing of signals. It is a
// guarantee of this signal handler that signals will be written to
// channels in the order they are received on the DBus connection.
func NewSequentialSignalHandler() SignalHandler {
	return &sequentialSignalHandler{}
}

type sequentialSignalHandler struct {
	mu      sync.RWMutex
	closed  bool
	signals []*sequentialSignalChannelData
}

func (sh *sequentialSignalHandler) DeliverSignal(intf, name string, signal *Signal) {
	sh.mu.RLock()
	defer sh.mu.RUnlock()
	if sh.closed {
		return
	}
	for _, scd := range sh.signals {
		scd.deliver(signal)
	}
}

func (sh *sequentialSignalHandler) Terminate() {
	sh.mu.Lock()
	defer sh.mu.Unlock()
	if sh.closed {
		return
	}

	for _, scd := range sh.signals {
		scd.close()
		close(scd.ch)
	}
	sh.closed = true
	sh.signals = nil
}

func (sh *sequentialSignalHandler) AddSignal(ch chan<- *Signal) {
	sh.mu.Lock()
	defer sh.mu.Unlock()
	if sh.closed {
		return
	}
	sh.signals = append(sh.signals, newSequentialSignalChannelData(ch))
}

func (sh *sequentialSignalHandler) RemoveSignal(ch chan<- *Signal) {
	sh.mu.Lock()
	defer sh.mu.Unlock()
	if sh.closed {
		return
	}
	for i := len(sh.signals) - 1; i >= 0; i-- {
		if ch == sh.signals[i].ch {
			sh.signals[i].close()
			copy(sh.signals[i:], sh.signals[i+1:])
			sh.signals[len(sh.signals)-1] = nil
			sh.signals = sh.signals[:len(sh.signals)-1]
		}
	}
}

type sequentialSignalChannelData struct {
	ch   chan<- *Signal
	in   chan *Signal
	done chan struct{}
}

func newSequentialSignalChannelData(ch chan<- *Signal) *sequentialSignalChannelData {
	scd := &sequentialSignalChannelData{
		ch:   ch,
		in:   make(chan *Signal),
		done: make(chan struct{}),
	}
	go scd.bufferSignals()
	return scd
}

func (scd *sequentialSignalChannelData) bufferSignals() {
	defer close(scd.done)

	// Ensure that signals are delivered to scd.ch in the same
	// order they are received from scd.in.
	var queue []*Signal
	for {
		if len(queue) == 0 {
			signal, ok := <-scd.in
			if !ok {
				return
			}
			queue = append(queue, signal)
		}
		select {
		case scd.ch <- queue[0]:
			copy(queue, queue[1:])
			queue[len(queue)-1] = nil
			queue = queue[:len(queue)-1]
		case signal, ok := <-scd.in:
			if !ok {
				return
			}
			queue = append(queue, signal)
		}
	}
}

func (scd *sequentialSignalChannelData) deliver(signal *Signal) {
	scd.in <- signal
}

func (scd *sequentialSignalChannelData) close() {
	close(scd.in)
	// Ensure that bufferSignals() has exited and won't attempt
	// any future sends on scd.ch
	<-scd.done
}