Codebase list golang-github-thejerf-suture / d7307f9
New upstream version 4.0.0 Alexandre Viau 3 years ago
16 changed file(s) with 2600 addition(s) and 12 deletion(s). Raw diff Collapse all Expand all
00 language: go
1 arch:
2 - amd64
3 - ppc64le
14 go:
2 - 1.1
3 - 1.2
4 - 1.3
5 - 1.4
6 - 1.5
7 - 1.6
5 - 1.9
6 - 1.11
7 - 1.13
8 - 1.15
89 - tip
10 script: go test -timeout 20s ${gobuild_args} ./...
22
33 [![Build Status](https://travis-ci.org/thejerf/suture.png?branch=master)](https://travis-ci.org/thejerf/suture)
44
5 import "gopkg.in/thejerf/suture.v4"
6
57 Suture provides Erlang-ish supervisor trees for Go. "Supervisor trees" ->
68 "sutree" -> "suture" -> holds your code together when it's trying to die.
7
8 This library has hit maturity, and isn't expected to be changed
9 radically. This can also be imported via gopkg.in/thejerf/suture.v2 .
109
1110 It is intended to deal gracefully with the real failure cases that can
1211 occur with supervision trees (such as burning all your CPU time endlessly
2120 with [godoc](http://godoc.org/github.com/thejerf/suture),
2221 including an example, usage, and everything else you might expect from a
2322 README.md on GitHub. (DRY.)
23
24 Special Thanks
25 --------------
26
27 Special thanks to the [Syncthing team](https://syncthing.net/), who have
28 been fantastic about working with me to push fixes upstream of them.
29
30 Major Versions
31 --------------
32
33 v4 is a rewrite to make Suture function
34 with [contexts](https://golang.org/pkg/context/). If you are using suture
35 for the first time, I recommend it. It also changes how logging works, to
36 get a single function from the user that is presented with a defined set of
37 structs, rather than requiring a number of closures from the consumer.
38
39 [suture v3](https://godoc.org/gopkg.in/thejerf/suture.v3) is the latest
40 version that does not feature contexts. It is still supported and getting
41 backported fixes as of now.
2442
2543 Code Signing
2644 ------------
4664 Changelog
4765 ---------
4866
49 suture uses semantic versioning.
67 suture uses semantic versioning and go modules.
5068
69 * 4.0:
70 * Switched the entire API to be context based.
71 * Switched how logging works to take a single closure that will be
72 presented with a defined set of structs, rather than a set of closures
73 for each event.
74 * Consequently, "Stop" removed from the Service interface. A wrapper for
75 old-style code is provided.
76 * Services can now return errors. Errors will be included in the log
77 message. Two special errors control restarting behavior:
78 * ErrDoNotRestart indicates the service should not be restarted,
79 but other services should be unaffected.
80 * ErrTerminateTree indicates the parent service tree should be
81 terminated. Supervisor trees can be configured to either continue
82 terminating upwards, or terminate themselves but not continue
83 propagating the termination upwards.
84 * UnstoppedServiceReport calling semantics modified to allow correctly
85 retrieving reports from entire trees. (Prior to 4.0, a report was
86 only on the supervisor it was called on.)
87 * 3.0.4:
88 * Fix a problem with adding services to a stopped supervisor.
5189 * 3.0.3:
5290 * Implemented request in Issue #37, creating a new method StopWithReport
5391 on supervisors that reports what services failed to stop. While a bit
369369
370370 If the supervisor is currently running, the service will be started
371371 immediately. If the supervisor is not currently running, the service
372 will be started when the supervisor is.
372 will be started when the supervisor is. If the supervisor was already stopped,
373 this is a no-op returning an empty service-token.
373374
374375 The returned ServiceID may be passed to the Remove method of the Supervisor
375376 to terminate the service.
406407 s.Unlock()
407408
408409 response := make(chan serviceID)
409 s.control <- addService{service, serviceName(service), response}
410 if !s.sendControl(addService{service, serviceName(service), response}) {
411 return ServiceToken{}
412 }
410413 return ServiceToken{uint64(s.id)<<32 | uint64(<-response)}
411414 }
412415
712712 func TestEverMultistarted(t *testing.T) {
713713 if everMultistarted {
714714 t.Fatal("Seem to have multistarted a service at some point, bummer.")
715 }
716 }
717
718 func TestAddAfterStopping(t *testing.T) {
719 // t.Parallel()
720
721 s := NewSimple("main")
722
723 service := NewService("A1")
724 addDone := make(chan struct{})
725
726 s.ServeBackground()
727 s.Stop()
728
729 go func() {
730 s.Add(service)
731 close(addDone)
732 }()
733
734 select {
735 case <-time.After(5 * time.Second):
736 t.Fatal("Timed out waiting for Add to return")
737 case <-addDone:
715738 }
716739 }
717740
0 package suture
1
2 import (
3 "context"
4 "fmt"
5 "testing"
6 )
7
8 const (
9 JobLimit = 2
10 )
11
12 type IncrementorJob struct {
13 current int
14 next chan int
15 }
16
17 func (i *IncrementorJob) Serve(ctx context.Context) error {
18 for {
19 select {
20 case i.next <- i.current + 1:
21 i.current++
22 if i.current >= JobLimit {
23 fmt.Println("Stopping the service")
24 return ErrDoNotRestart
25 }
26 }
27 }
28 }
29
30 func TestCompleteJob(t *testing.T) {
31 supervisor := NewSimple("Supervisor")
32 service := &IncrementorJob{0, make(chan int)}
33 supervisor.Add(service)
34
35 ctx, myCancel := context.WithCancel(context.Background())
36 supervisor.ServeBackground(ctx)
37
38 fmt.Println("Got:", <-service.next)
39 fmt.Println("Got:", <-service.next)
40
41 myCancel()
42
43 // Output:
44 // Got: 1
45 // Got: 2
46 // Stopping the service
47 }
0 /*
1
2 Package suture provides Erlang-like supervisor trees.
3
4 This implements Erlang-esque supervisor trees, as adapted for Go. This is
5 intended to be an industrial-strength implementation, but it has not yet
6 been deployed in a hostile environment. (It's headed there, though.)
7
8 Supervisor Tree -> SuTree -> suture -> holds your code together when it's
9 trying to fall apart.
10
11 Why use Suture?
12
13 * You want to write bullet-resistant services that will remain available
14 despite unforeseen failure.
15 * You need the code to be smart enough not to consume 100% of the CPU
16 restarting things.
17 * You want to easily compose multiple such services in one program.
18 * You want the Erlang programmers to stop lording their supervision
19 trees over you.
20
21 Suture has 100% test coverage, and is golint clean. This doesn't prove it
22 free of bugs, but it shows I care.
23
24 A blog post describing the design decisions is available at
25 http://www.jerf.org/iri/post/2930 .
26
27 Using Suture
28
29 To idiomatically use Suture, create a Supervisor which is your top level
30 "application" supervisor. This will often occur in your program's "main"
31 function.
32
33 Create "Service"s, which implement the Service interface. .Add() them
34 to your Supervisor. Supervisors are also services, so you can create a
35 tree structure here, depending on the exact combination of restarts
36 you want to create.
37
38 As a special case, when adding Supervisors to Supervisors, the "sub"
39 supervisor will have the "super" supervisor's Log function copied.
40 This allows you to set one log function on the "top" supervisor, and
41 have it propagate down to all the sub-supervisors. This also allows
42 libraries or modules to provide Supervisors without having to commit
43 their users to a particular logging method.
44
45 Finally, as what is probably the last line of your main() function, call
46 .Serve() on your top level supervisor. This will start all the services
47 you've defined.
48
49 See the Example for an example, using a simple service that serves out
50 incrementing integers.
51
52 */
53 package suture
0 // +build go1.13
1
2 package suture
3
4 import "errors"
5
6 func isErr(err error, target error) bool {
7 return errors.Is(err, target)
8 }
0 // +build !go1.13
1
2 package suture
3
4 func isErr(err error, target error) bool {
5 return err == target
6 }
0 package suture
1
2 import (
3 "fmt"
4 )
5
6 // Event defines the interface implemented by all events Suture will
7 // generate.
8 //
9 // Map will return a map with the details of the event serialized into a
10 // map[string]interface{}, with only the values suitable for serialization.
11 type Event interface {
12 fmt.Stringer
13 Type() EventType
14 Map() map[string]interface{}
15 }
16
17 type (
18 EventType int
19
20 EventHook func(Event)
21 )
22
23 const (
24 EventTypeStopTimeout EventType = iota
25 EventTypeServicePanic
26 EventTypeServiceTerminate
27 EventTypeBackoff
28 EventTypeResume
29 )
30
31 type EventStopTimeout struct {
32 Supervisor *Supervisor `json:"-"`
33 SupervisorName string `json:"supervisor_name"`
34 Service Service `json:"-"`
35 ServiceName string `json:"service"`
36 }
37
38 func (e EventStopTimeout) Type() EventType {
39 return EventTypeStopTimeout
40 }
41
42 func (e EventStopTimeout) String() string {
43 return fmt.Sprintf(
44 "%s: Service %s failed to terminate in a timely manner",
45 e.Supervisor,
46 e.Service,
47 )
48 }
49
50 func (e EventStopTimeout) Map() map[string]interface{} {
51 return map[string]interface{}{
52 "supervisor_name": e.SupervisorName,
53 "service_name": e.ServiceName,
54 }
55 }
56
57 type EventServicePanic struct {
58 Supervisor *Supervisor `json:"-"`
59 SupervisorName string `json:"supervisor_name"`
60 Service Service `json:"-"`
61 ServiceName string `json:"service_name"`
62 CurrentFailures float64 `json:"current_failures"`
63 FailureThreshold float64 `json:"failure_threshold"`
64 Restarting bool `json:"restarting"`
65 PanicMsg string `json:"panic_msg"`
66 Stacktrace string `json:"stacktrace"`
67 }
68
69 func (e EventServicePanic) Type() EventType {
70 return EventTypeServicePanic
71 }
72
73 func (e EventServicePanic) String() string {
74 return fmt.Sprintf(
75 "%s, panic: %s, stacktrace: %s",
76 serviceFailureString(
77 e.SupervisorName,
78 e.ServiceName,
79 e.CurrentFailures,
80 e.FailureThreshold,
81 e.Restarting,
82 ),
83 e.PanicMsg,
84 string(e.Stacktrace),
85 )
86 }
87
88 func (e EventServicePanic) Map() map[string]interface{} {
89 return map[string]interface{}{
90 "supervisor_name": e.SupervisorName,
91 "service_name": e.ServiceName,
92 "current_failures": e.CurrentFailures,
93 "failure_threshold": e.FailureThreshold,
94 "restarting": e.Restarting,
95 "panic_msg": e.PanicMsg,
96 "stacktrace": e.Stacktrace,
97 }
98 }
99
100 type EventServiceTerminate struct {
101 Supervisor *Supervisor `json:"-"`
102 SupervisorName string `json:"supervisor_name"`
103 Service Service `json:"-"`
104 ServiceName string `json:"service_name"`
105 CurrentFailures float64 `json:"current_failures"`
106 FailureThreshold float64 `json:"failure_threshold"`
107 Restarting bool `json:"restarting"`
108 Err interface{} `json:"error_msg"`
109 }
110
111 func (e EventServiceTerminate) Type() EventType {
112 return EventTypeServiceTerminate
113 }
114
115 func (e EventServiceTerminate) String() string {
116 return fmt.Sprintf(
117 "%s, error: %s",
118 serviceFailureString(e.SupervisorName, e.ServiceName, e.CurrentFailures, e.FailureThreshold, e.Restarting),
119 e.Err)
120 }
121
122 func (e EventServiceTerminate) Map() map[string]interface{} {
123 return map[string]interface{}{
124 "supervisor_name": e.SupervisorName,
125 "service_name": e.ServiceName,
126 "current_failures": e.CurrentFailures,
127 "failure_threshold": e.FailureThreshold,
128 "restarting": e.Restarting,
129 "error": e.Err,
130 }
131 }
132
133 func serviceFailureString(supervisor, service string, currentFailures, failureThreshold float64, restarting bool) string {
134 return fmt.Sprintf(
135 "%s: Failed service '%s' (%f failures of %f), restarting: %#v",
136 supervisor,
137 service,
138 currentFailures,
139 failureThreshold,
140 restarting,
141 )
142 }
143
144 type EventBackoff struct {
145 Supervisor *Supervisor `json:"-"`
146 SupervisorName string `json:"supervisor_name"`
147 }
148
149 func (e EventBackoff) Type() EventType {
150 return EventTypeBackoff
151 }
152
153 func (e EventBackoff) String() string {
154 return fmt.Sprintf("%s: Entering the backoff state.", e.Supervisor)
155 }
156
157 func (e EventBackoff) Map() map[string]interface{} {
158 return map[string]interface{}{
159 "supervisor_name": e.SupervisorName,
160 }
161 }
162
163 type EventResume struct {
164 Supervisor *Supervisor `json:"-"`
165 SupervisorName string `json:"supervisor_name"`
166 }
167
168 func (e EventResume) Type() EventType {
169 return EventTypeResume
170 }
171
172 func (e EventResume) String() string {
173 return fmt.Sprintf("%s: Exiting backoff state.", e.Supervisor)
174 }
175
176 func (e EventResume) Map() map[string]interface{} {
177 return map[string]interface{}{
178 "supervisor_name": e.SupervisorName,
179 }
180 }
0 module github.com/thejerf/suture/v4
1
2 go 1.9
0 package suture
1
2 // sum type pattern for type-safe message passing; see
3 // http://www.jerf.org/iri/post/2917
4
5 type supervisorMessage interface {
6 isSupervisorMessage()
7 }
8
9 type listServices struct {
10 c chan []Service
11 }
12
13 func (ls listServices) isSupervisorMessage() {}
14
15 type removeService struct {
16 id serviceID
17 notification chan struct{}
18 }
19
20 func (rs removeService) isSupervisorMessage() {}
21
22 func (s *Supervisor) sync() {
23 s.control <- syncSupervisor{}
24 }
25
26 type syncSupervisor struct {
27 }
28
29 func (ss syncSupervisor) isSupervisorMessage() {}
30
31 func (s *Supervisor) fail(id serviceID, panicMsg string, stacktrace []byte) {
32 s.control <- serviceFailed{id, panicMsg, stacktrace}
33 }
34
35 type serviceFailed struct {
36 id serviceID
37 panicMsg string
38 stacktrace []byte
39 }
40
41 func (sf serviceFailed) isSupervisorMessage() {}
42
43 func (s *Supervisor) serviceEnded(id serviceID, err error) {
44 s.sendControl(serviceEnded{id, err})
45 }
46
47 type serviceEnded struct {
48 id serviceID
49 err error
50 }
51
52 func (s serviceEnded) isSupervisorMessage() {}
53
54 // added by the Add() method
55 type addService struct {
56 service Service
57 name string
58 response chan serviceID
59 }
60
61 func (as addService) isSupervisorMessage() {}
62
63 type stopSupervisor struct {
64 done chan UnstoppedServiceReport
65 }
66
67 func (ss stopSupervisor) isSupervisorMessage() {}
68
69 func (s *Supervisor) panic() {
70 s.control <- panicSupervisor{}
71 }
72
73 type panicSupervisor struct {
74 }
75
76 func (ps panicSupervisor) isSupervisorMessage() {}
0 package suture
1
2 import (
3 "context"
4 "errors"
5 )
6
7 /*
8 Service is the interface that describes a service to a Supervisor.
9
10 Serve Method
11
12 The Serve method is called by a Supervisor to start the service.
13 The service should execute within the goroutine that this is
14 called in, that is, it should not spawn a "worker" goroutine.
15 If this function either returns error or panics, the Supervisor
16 will call it again.
17
18 A Serve method SHOULD do as much cleanup of the state as possible,
19 to prevent any corruption in the previous state from crashing the
20 service again. The beginning of a service with persistent state should
21 generally be a few lines to initialize and clean up that state.
22
23 The error returned by the service, if any, will be part of the log
24 message generated for it. There are two distinguished errors a
25 Service can return:
26
27 ErrDoNotRestart indicates that the service should
28 not be restarted and removed from the supervisor entirely.
29
30 ErrTerminateTree indicates that the Supervisor the service is running
31 in should be terminated. If that Supervisor recursively returns that,
32 its parent supervisor will also be terminated. (This can be controlled
33 with configuration in the Supervisor.)
34
35 In Go 1.13 and greater, this is checked via errors.Is, so the error
36 can be further wrapped with whatever additional info you like. Prior
37 to Go 1.13, it will be checked via directly equality check, so the
38 distinguished errors cannot be wrapped.
39
40 Once the service has been instructed to stop, the Service SHOULD NOT be
41 reused in any other supervisor! Because of the impossibility of
42 guaranteeing that the service has fully stopped in Go, you can't
43 prove that you won't be starting two goroutines using the exact
44 same memory to store state, causing completely unpredictable behavior.
45
46 Serve should not return until the service has actually stopped.
47 "Stopped" here is defined as "the service will stop servicing any
48 further requests in the future". Any mandatory cleanup related to
49 the Service should also have been performed.
50
51 If a service does not stop within the supervisor's timeout duration, the
52 supervisor will log an entry to that effect. This does
53 not guarantee that the service is hung; it may still get around to being
54 properly stopped in the future. Until the service is fully stopped,
55 both the service and the spawned goroutine trying to stop it will be
56 "leaked".
57
58 Stringer Interface
59
60 When a Service is added to a Supervisor, the Supervisor will create a
61 string representation of that service used for logging.
62
63 If you implement the fmt.Stringer interface, that will be used.
64
65 If you do not implement the fmt.Stringer interface, a default
66 fmt.Sprintf("%#v") will be used.
67
68 */
69 type Service interface {
70 Serve(ctx context.Context) error
71 }
72
73 // ErrDoNotRestart can be returned by a service to voluntarily not
74 // be restarted.
75 var ErrDoNotRestart = errors.New("service should not be restarted")
76
77 // ErrTerminateSupervisorTree can can be returned by a service to terminate the
78 // entire supervision tree above it as well.
79 var ErrTerminateSupervisorTree = errors.New("tree should be terminated")
0 package suture
1
2 import (
3 "context"
4 )
5
6 type DeprecatedService interface {
7 Serve()
8 Stop()
9 }
10
11 // AsService converts old-style suture service to a new style suture service.
12 func AsService(service DeprecatedService) Service {
13 return &serviceShim{service: service}
14 }
15
16 type serviceShim struct {
17 service DeprecatedService
18 }
19
20 func (s *serviceShim) Serve(ctx context.Context) error {
21 done := make(chan struct{})
22 go func() {
23 s.service.Serve()
24 close(done)
25 }()
26
27 select {
28 case <-done:
29 // If the service stops by itself (done closes), return straight away, there is no error, and we don't need
30 // to wait for the context.
31 return nil
32 case <-ctx.Done():
33 // If the context is closed, stop the service, then wait for it's termination and return the error from the
34 // context.
35 s.service.Stop()
36 <-done
37 return ctx.Err()
38 }
39 }
0 package suture
1
2 // FIXMES in progress:
3 // 1. Ensure the supervisor actually gets to the terminated state for the
4 // unstopped service report.
5 // 2. Save the unstopped service report in the supervisor.
6
7 import (
8 "context"
9 "errors"
10 "fmt"
11 "log"
12 "math"
13 "math/rand"
14 "runtime"
15 "sync"
16 "time"
17 )
18
19 const (
20 notRunning = iota
21 normal
22 paused
23 terminated
24 )
25
26 type supervisorID uint32
27 type serviceID uint32
28
29 // ErrSupervisorNotRunning is returned by some methods if the supervisor is
30 // not running, either because it has not been started or because it has
31 // been terminated.
32 var ErrSupervisorNotRunning = errors.New("supervisor not running")
33
34 /*
35 Supervisor is the core type of the module that represents a Supervisor.
36
37 Supervisors should be constructed either by New or NewSimple.
38
39 Once constructed, a Supervisor should be started in one of three ways:
40
41 1. Calling .Serve(ctx).
42 2. Calling .ServeBackground(ctx).
43 3. Adding it to an existing Supervisor.
44
45 Calling Serve will cause the supervisor to run until the passed-in
46 context is cancelled. Often one of the last lines of the "main" func for a
47 program will be to call one of the Serve methods.
48
49 Calling ServeBackground will CORRECTLY start the supervisor running in a
50 new goroutine. It is risky to directly run
51
52 go supervisor.Serve()
53
54 because that will briefly create a race condition as it starts up, if you
55 try to .Add() services immediately afterward.
56
57 */
58 type Supervisor struct {
59 Name string
60
61 spec Spec
62
63 services map[serviceID]serviceWithName
64 cancellations map[serviceID]context.CancelFunc
65 servicesShuttingDown map[serviceID]serviceWithName
66 lastFail time.Time
67 failures float64
68 restartQueue []serviceID
69 serviceCounter serviceID
70 control chan supervisorMessage
71 notifyServiceDone chan serviceID
72 resumeTimer <-chan time.Time
73 liveness chan struct{}
74
75 // despite the recommendation in the context package to avoid
76 // holding this in a struct, I think due to the function of suture
77 // and the way it works, I think it's OK in this case. This is the
78 // exceptional case, basically.
79 ctxMutex sync.Mutex
80 ctx context.Context
81 // This function cancels this supervisor specifically.
82 ctxCancel func()
83
84 getNow func() time.Time
85 getAfterChan func(time.Duration) <-chan time.Time
86
87 m sync.Mutex
88
89 // The unstopped service report is generated when we finish
90 // stopping.
91 unstoppedServiceReport UnstoppedServiceReport
92
93 // malign leftovers
94 id supervisorID
95 state uint8
96 }
97
98 /*
99
100 New is the full constructor function for a supervisor.
101
102 The name is a friendly human name for the supervisor, used in logging. Suture
103 does not care if this is unique, but it is good for your sanity if it is.
104
105 If not set, the following values are used:
106
107 * Log: A function is created that uses log.Print.
108 * FailureDecay: 30 seconds
109 * FailureThreshold: 5 failures
110 * FailureBackoff: 15 seconds
111 * Timeout: 10 seconds
112 * BackoffJitter: DefaultJitter
113
114 The Log function will be called when errors occur. Suture will log the
115 following:
116
117 * When a service has failed, with a descriptive message about the
118 current backoff status, and whether it was immediately restarted
119 * When the supervisor has gone into its backoff mode, and when it
120 exits it
121 * When a service fails to stop
122
123 The failureRate, failureThreshold, and failureBackoff controls how failures
124 are handled, in order to avoid the supervisor failure case where the
125 program does nothing but restarting failed services. If you do not
126 care how failures behave, the default values should be fine for the
127 vast majority of services, but if you want the details:
128
129 The supervisor tracks the number of failures that have occurred, with an
130 exponential decay on the count. Every FailureDecay seconds, the number of
131 failures that have occurred is cut in half. (This is done smoothly with an
132 exponential function.) When a failure occurs, the number of failures
133 is incremented by one. When the number of failures passes the
134 FailureThreshold, the entire service waits for FailureBackoff seconds
135 before attempting any further restarts, at which point it resets its
136 failure count to zero.
137
138 Timeout is how long Suture will wait for a service to properly terminate.
139
140 The PassThroughPanics options can be set to let panics in services propagate
141 and crash the program, should this be desirable.
142
143 DontPropagateTermination indicates whether this supervisor tree will
144 propagate a ErrTerminateTree if a child process returns it. If false,
145 this supervisor will itself return an error that will terminate its
146 parent. If true, it will merely return ErrDoNotRestart. false by default.
147
148 */
149 func New(name string, spec Spec) *Supervisor {
150 spec.configureDefaults(name)
151
152 return &Supervisor{
153 name,
154
155 spec,
156
157 // services
158 make(map[serviceID]serviceWithName),
159 // cancellations
160 make(map[serviceID]context.CancelFunc),
161 // servicesShuttingDown
162 make(map[serviceID]serviceWithName),
163 // lastFail, deliberately the zero time
164 time.Time{},
165 // failures
166 0,
167 // restartQueue
168 make([]serviceID, 0, 1),
169 // serviceCounter
170 0,
171 // control
172 make(chan supervisorMessage),
173 // notifyServiceDone
174 make(chan serviceID),
175 // resumeTimer
176 make(chan time.Time),
177
178 // liveness
179 make(chan struct{}),
180
181 sync.Mutex{},
182 // ctx
183 nil,
184 // myCancel
185 nil,
186
187 // the tests can override these for testing threshold
188 // behavior
189 // getNow
190 time.Now,
191 // getAfterChan
192 time.After,
193
194 // m
195 sync.Mutex{},
196
197 // unstoppedServiceReport
198 nil,
199
200 // id
201 nextSupervisorID(),
202 // state
203 notRunning,
204 }
205 }
206
207 func serviceName(service Service) (serviceName string) {
208 stringer, canStringer := service.(fmt.Stringer)
209 if canStringer {
210 serviceName = stringer.String()
211 } else {
212 serviceName = fmt.Sprintf("%#v", service)
213 }
214 return
215 }
216
217 // NewSimple is a convenience function to create a service with just a name
218 // and the sensible defaults.
219 func NewSimple(name string) *Supervisor {
220 return New(name, Spec{})
221 }
222
223 // HasSupervisor is an interface that indicates the given struct contains a
224 // supervisor. If the struct is either already a *Supervisor, or it embeds
225 // a *Supervisor, this will already be implemented for you. Otherwise, a
226 // struct containing a supervisor will need to implement this in order to
227 // participate in the log function propagation and recursive
228 // UnstoppedService report.
229 //
230 // It is legal for GetSupervisor to return nil, in which case
231 // the supervisor-specific behaviors will simply be ignored.
232 type HasSupervisor interface {
233 GetSupervisor() *Supervisor
234 }
235
236 func (s *Supervisor) GetSupervisor() *Supervisor {
237 return s
238 }
239
240 /*
241 Add adds a service to this supervisor.
242
243 If the supervisor is currently running, the service will be started
244 immediately. If the supervisor has not been started yet, the service
245 will be started when the supervisor is. If the supervisor was already stopped,
246 this is a no-op returning an empty service-token.
247
248 The returned ServiceID may be passed to the Remove method of the Supervisor
249 to terminate the service.
250
251 As a special behavior, if the service added is itself a supervisor, the
252 supervisor being added will copy the Log function from the Supervisor it
253 is being added to. This allows factoring out providing a Supervisor
254 from its logging. This unconditionally overwrites the child Supervisor's
255 logging functions.
256
257 */
258 func (s *Supervisor) Add(service Service) ServiceToken {
259 if s == nil {
260 panic("can't add service to nil *suture.Supervisor")
261 }
262
263 if hasSupervisor, isHaveSupervisor := service.(HasSupervisor); isHaveSupervisor {
264 supervisor := hasSupervisor.GetSupervisor()
265 if supervisor != nil {
266 supervisor.spec.EventHook = s.spec.EventHook
267 }
268 }
269
270 s.m.Lock()
271 if s.state == notRunning {
272 id := s.serviceCounter
273 s.serviceCounter++
274
275 s.services[id] = serviceWithName{service, serviceName(service)}
276 s.restartQueue = append(s.restartQueue, id)
277
278 s.m.Unlock()
279 return ServiceToken{uint64(s.id)<<32 | uint64(id)}
280 }
281 s.m.Unlock()
282
283 response := make(chan serviceID)
284 if s.sendControl(addService{service, serviceName(service), response}) != nil {
285 return ServiceToken{}
286 }
287 return ServiceToken{uint64(s.id)<<32 | uint64(<-response)}
288 }
289
290 // ServeBackground starts running a supervisor in its own goroutine. When
291 // this method returns, the supervisor is guaranteed to be in a running state.
292 func (s *Supervisor) ServeBackground(ctx context.Context) {
293 go s.Serve(ctx)
294 s.sync()
295 }
296
297 /*
298 Serve starts the supervisor. You should call this on the top-level supervisor,
299 but nothing else.
300 */
301 func (s *Supervisor) Serve(ctx context.Context) error {
302 // context documentation suggests that it is legal for functions to
303 // take nil contexts, it's user's responsibility to never pass them in.
304 if ctx == nil {
305 ctx = context.Background()
306 }
307
308 if s == nil {
309 panic("Can't serve with a nil *suture.Supervisor")
310 }
311 // Take a separate cancellation function so this tree can be
312 // indepedently cancelled.
313 ctx, myCancel := context.WithCancel(ctx)
314 s.ctxMutex.Lock()
315 s.ctx = ctx
316 s.ctxMutex.Unlock()
317 s.ctxCancel = myCancel
318
319 if s.id == 0 {
320 panic("Can't call Serve on an incorrectly-constructed *suture.Supervisor")
321 }
322
323 s.m.Lock()
324 if s.state == normal || s.state == paused {
325 s.m.Unlock()
326 panic("Called .Serve() on a supervisor that is already Serve()ing")
327 }
328
329 s.state = normal
330 s.m.Unlock()
331
332 defer func() {
333 s.m.Lock()
334 s.state = terminated
335 s.m.Unlock()
336 }()
337
338 // for all the services I currently know about, start them
339 for _, id := range s.restartQueue {
340 namedService, present := s.services[id]
341 if present {
342 s.runService(ctx, namedService.Service, id)
343 }
344 }
345 s.restartQueue = make([]serviceID, 0, 1)
346
347 for {
348 select {
349 case <-ctx.Done():
350 s.stopSupervisor()
351 return ctx.Err()
352 case m := <-s.control:
353 switch msg := m.(type) {
354 case serviceFailed:
355 s.handleFailedService(ctx, msg.id, msg.panicMsg, msg.stacktrace, true)
356 case serviceEnded:
357 _, monitored := s.services[msg.id]
358 if monitored {
359 cancel := s.cancellations[msg.id]
360 if isErr(msg.err, ErrDoNotRestart) || isErr(msg.err, context.Canceled) || isErr(msg.err, context.DeadlineExceeded) {
361 delete(s.services, msg.id)
362 delete(s.cancellations, msg.id)
363 go cancel()
364 } else if isErr(msg.err, ErrTerminateSupervisorTree) {
365 s.stopSupervisor()
366 if s.spec.DontPropagateTermination {
367 return ErrDoNotRestart
368 } else {
369 return msg.err
370 }
371 } else {
372 s.handleFailedService(ctx, msg.id, msg.err, nil, false)
373 }
374 }
375 case addService:
376 id := s.serviceCounter
377 s.serviceCounter++
378
379 s.services[id] = serviceWithName{msg.service, msg.name}
380 s.runService(ctx, msg.service, id)
381
382 msg.response <- id
383 case removeService:
384 s.removeService(msg.id, msg.notification)
385 case stopSupervisor:
386 msg.done <- s.stopSupervisor()
387 return nil
388 case listServices:
389 services := []Service{}
390 for _, service := range s.services {
391 services = append(services, service.Service)
392 }
393 msg.c <- services
394 case syncSupervisor:
395 // this does nothing on purpose; its sole purpose is to
396 // introduce a sync point via the channel receive
397 case panicSupervisor:
398 // used only by tests
399 panic("Panicking as requested!")
400 }
401 case serviceEnded := <-s.notifyServiceDone:
402 delete(s.servicesShuttingDown, serviceEnded)
403 case <-s.resumeTimer:
404 // We're resuming normal operation after a pause due to
405 // excessive thrashing
406 // FIXME: Ought to permit some spacing of these functions, rather
407 // than simply hammering through them
408 s.m.Lock()
409 s.state = normal
410 s.m.Unlock()
411 s.failures = 0
412 s.spec.EventHook(EventResume{s, s.Name})
413 for _, id := range s.restartQueue {
414 namedService, present := s.services[id]
415 if present {
416 s.runService(ctx, namedService.Service, id)
417 }
418 }
419 s.restartQueue = make([]serviceID, 0, 1)
420 }
421 }
422 }
423
424 // UnstoppedServiceReport will return a report of what services failed to
425 // stop when the supervisor was stopped. This call will return when the
426 // supervisor is done shutting down. It will hang on a supervisor that has
427 // not been stopped, because it will not be "done shutting down".
428 //
429 // Calling this on a supervisor will return a report for the whole
430 // supervisor tree under it.
431 //
432 // WARNING: Technically, any use of the returned data structure is a
433 // TOCTOU violation:
434 // https://en.wikipedia.org/wiki/Time-of-check_to_time-of-use
435 // Since the data structure was generated and returned to you, any of these
436 // services may have stopped since then.
437 //
438 // However, this can still be useful information at program teardown
439 // time. For instance, logging that a service failed to stop as expected is
440 // still useful, as even if it shuts down later, it was still later than
441 // you expected.
442 //
443 // But if you cast the Service objects back to their underlying objects and
444 // start trying to manipulate them ("shut down harder!"), be sure to
445 // account for the possibility they are in fact shut down before you get
446 // them.
447 //
448 // If there are no services to report, the UnstoppedServiceReport will be
449 // nil. A zero-length constructed slice is never returned.
450 func (s *Supervisor) UnstoppedServiceReport() (UnstoppedServiceReport, error) {
451 // the only thing that ever happens to this channel is getting
452 // closed when the supervisor terminates.
453 _, _ = <-s.liveness
454
455 // FIXME: Recurse on the supervisors
456 return s.unstoppedServiceReport, nil
457 }
458
459 func (s *Supervisor) handleFailedService(ctx context.Context, id serviceID, err interface{}, stacktrace []byte, panic bool) {
460 now := s.getNow()
461
462 if s.lastFail.IsZero() {
463 s.lastFail = now
464 s.failures = 1.0
465 } else {
466 sinceLastFail := now.Sub(s.lastFail).Seconds()
467 intervals := sinceLastFail / s.spec.FailureDecay
468 s.failures = s.failures*math.Pow(.5, intervals) + 1
469 }
470
471 if s.failures > s.spec.FailureThreshold {
472 s.m.Lock()
473 s.state = paused
474 s.m.Unlock()
475 s.spec.EventHook(EventBackoff{s, s.Name})
476 s.resumeTimer = s.getAfterChan(
477 s.spec.BackoffJitter.Jitter(s.spec.FailureBackoff))
478 }
479
480 s.lastFail = now
481
482 failedService, monitored := s.services[id]
483
484 // It is possible for a service to be no longer monitored
485 // by the time we get here. In that case, just ignore it.
486 if monitored {
487 s.m.Lock()
488 curState := s.state
489 s.m.Unlock()
490 if curState == normal {
491 s.runService(ctx, failedService.Service, id)
492 } else {
493 s.restartQueue = append(s.restartQueue, id)
494 }
495 if panic {
496 s.spec.EventHook(EventServicePanic{
497 Supervisor: s,
498 SupervisorName: s.Name,
499 Service: failedService.Service,
500 ServiceName: failedService.name,
501 CurrentFailures: s.failures,
502 FailureThreshold: s.spec.FailureThreshold,
503 Restarting: curState == normal,
504 PanicMsg: err.(string),
505 Stacktrace: string(stacktrace),
506 })
507 } else {
508 e := EventServiceTerminate{
509 Supervisor: s,
510 SupervisorName: s.Name,
511 Service: failedService.Service,
512 ServiceName: failedService.name,
513 CurrentFailures: s.failures,
514 FailureThreshold: s.spec.FailureThreshold,
515 Restarting: curState == normal,
516 }
517 if err != nil {
518 e.Err = err
519 }
520 s.spec.EventHook(e)
521 }
522 }
523 }
524
525 func (s *Supervisor) runService(ctx context.Context, service Service, id serviceID) {
526 childCtx, cancel := context.WithCancel(ctx)
527 done := make(chan struct{})
528 blockingCancellation := func() {
529 cancel()
530 <-done
531 }
532 s.cancellations[id] = blockingCancellation
533 go func() {
534 if !s.spec.PassThroughPanics {
535 defer func() {
536 if r := recover(); r != nil {
537 buf := make([]byte, 65535)
538 written := runtime.Stack(buf, false)
539 buf = buf[:written]
540 s.fail(id, r.(string), buf)
541 }
542 }()
543 }
544
545 err := service.Serve(childCtx)
546 cancel()
547 close(done)
548
549 s.serviceEnded(id, err)
550 }()
551 }
552
553 func (s *Supervisor) removeService(id serviceID, notificationChan chan struct{}) {
554 namedService, present := s.services[id]
555 if present {
556 cancel := s.cancellations[id]
557 delete(s.services, id)
558 delete(s.cancellations, id)
559
560 s.servicesShuttingDown[id] = namedService
561 go func() {
562 successChan := make(chan struct{})
563 go func() {
564 cancel()
565 close(successChan)
566 if notificationChan != nil {
567 notificationChan <- struct{}{}
568 }
569 }()
570
571 select {
572 case <-successChan:
573 // Life is good!
574 case <-s.getAfterChan(s.spec.Timeout):
575 s.spec.EventHook(EventStopTimeout{
576 s, s.Name,
577 namedService.Service, namedService.name})
578 }
579 s.notifyServiceDone <- id
580 }()
581 } else {
582 if notificationChan != nil {
583 notificationChan <- struct{}{}
584 }
585 }
586 }
587
588 func (s *Supervisor) stopSupervisor() UnstoppedServiceReport {
589 notifyDone := make(chan serviceID, len(s.services))
590
591 for id, namedService := range s.services {
592 cancel := s.cancellations[id]
593 delete(s.services, id)
594 delete(s.cancellations, id)
595 s.servicesShuttingDown[id] = namedService
596 go func(sID serviceID) {
597 cancel()
598 notifyDone <- sID
599 }(id)
600 }
601
602 timeout := s.getAfterChan(s.spec.Timeout)
603
604 SHUTTING_DOWN_SERVICES:
605 for len(s.servicesShuttingDown) > 0 {
606 select {
607 case id := <-notifyDone:
608 delete(s.servicesShuttingDown, id)
609 case serviceID := <-s.notifyServiceDone:
610 delete(s.servicesShuttingDown, serviceID)
611 case <-timeout:
612 for _, namedService := range s.servicesShuttingDown {
613 s.spec.EventHook(EventStopTimeout{
614 s, s.Name,
615 namedService.Service, namedService.name,
616 })
617 }
618
619 // failed remove statements will log the errors.
620 break SHUTTING_DOWN_SERVICES
621 }
622 }
623
624 // If nothing else has cancelled our context, we should now.
625 s.ctxCancel()
626
627 // Indicate that we're done shutting down
628 defer close(s.liveness)
629
630 if len(s.servicesShuttingDown) == 0 {
631 return nil
632 } else {
633 report := UnstoppedServiceReport{}
634 for serviceID, serviceWithName := range s.servicesShuttingDown {
635 report = append(report, UnstoppedService{
636 SupervisorPath: []*Supervisor{s},
637 Service: serviceWithName.Service,
638 Name: serviceWithName.name,
639 ServiceToken: ServiceToken{uint64(s.id)<<32 | uint64(serviceID)},
640 })
641 }
642 s.m.Lock()
643 s.unstoppedServiceReport = report
644 s.m.Unlock()
645 return report
646 }
647 }
648
649 // String implements the fmt.Stringer interface.
650 func (s *Supervisor) String() string {
651 return s.Name
652 }
653
654 // sendControl abstracts checking for the supervisor to still be running
655 // when we send a message. This prevents blocking when sending to a
656 // cancelled supervisor.
657 func (s *Supervisor) sendControl(sm supervisorMessage) error {
658 var doneChan <-chan struct{}
659 s.ctxMutex.Lock()
660 if s.ctx == nil {
661 s.ctxMutex.Unlock()
662 return ErrSupervisorNotStarted
663 }
664 doneChan = s.ctx.Done()
665 s.ctxMutex.Unlock()
666
667 select {
668 case s.control <- sm:
669 return nil
670 case <-doneChan:
671 return ErrSupervisorNotRunning
672 }
673 }
674
675 /*
676 Remove will remove the given service from the Supervisor, and attempt to Stop() it.
677 The ServiceID token comes from the Add() call. This returns without waiting
678 for the service to stop.
679 */
680 func (s *Supervisor) Remove(id ServiceToken) error {
681 sID := supervisorID(id.id >> 32)
682 if sID != s.id {
683 return ErrWrongSupervisor
684 }
685 err := s.sendControl(removeService{serviceID(id.id & 0xffffffff), nil})
686 if err == ErrSupervisorNotRunning {
687 // No meaningful error handling if the supervisor is stopped.
688 return nil
689 }
690 return err
691 }
692
693 /*
694 RemoveAndWait will remove the given service from the Supervisor and attempt
695 to Stop() it. It will wait up to the given timeout value for the service to
696 terminate. A timeout value of 0 means to wait forever.
697
698 If a nil error is returned from this function, then the service was
699 terminated normally. If either the supervisor terminates or the timeout
700 passes, ErrTimeout is returned. (If this isn't even the right supervisor
701 ErrWrongSupervisor is returned.)
702 */
703 func (s *Supervisor) RemoveAndWait(id ServiceToken, timeout time.Duration) error {
704 sID := supervisorID(id.id >> 32)
705 if sID != s.id {
706 return ErrWrongSupervisor
707 }
708
709 var timeoutC <-chan time.Time
710
711 if timeout > 0 {
712 timer := time.NewTimer(timeout)
713 defer timer.Stop()
714 timeoutC = timer.C
715 }
716
717 notificationC := make(chan struct{})
718
719 sentControlErr := s.sendControl(removeService{serviceID(id.id & 0xffffffff), notificationC})
720
721 if sentControlErr != nil {
722 return sentControlErr
723 }
724
725 select {
726 case <-notificationC:
727 // normal case; the service is terminated.
728 return nil
729
730 // This occurs if the entire supervisor ends without the service
731 // having terminated, and includes the timeout the supervisor
732 // itself waited before closing the liveness channel.
733 case <-s.ctx.Done():
734 return ErrTimeout
735
736 // The local timeout.
737 case <-timeoutC:
738 return ErrTimeout
739 }
740 }
741
742 /*
743
744 Services returns a []Service containing a snapshot of the services this
745 Supervisor is managing.
746
747 */
748 func (s *Supervisor) Services() []Service {
749 ls := listServices{make(chan []Service)}
750
751 if s.sendControl(ls) == nil {
752 return <-ls.c
753 }
754 return nil
755 }
756
757 var currentSupervisorIDL sync.Mutex
758 var currentSupervisorID uint32
759
760 func nextSupervisorID() supervisorID {
761 currentSupervisorIDL.Lock()
762 defer currentSupervisorIDL.Unlock()
763 currentSupervisorID++
764 return supervisorID(currentSupervisorID)
765 }
766
767 // ServiceToken is an opaque identifier that can be used to terminate a service that
768 // has been Add()ed to a Supervisor.
769 type ServiceToken struct {
770 id uint64
771 }
772
773 // An UnstoppedService is the component member of an
774 // UnstoppedServiceReport.
775 //
776 // The SupervisorPath is the path down the supervisor tree to the given
777 // service.
778 type UnstoppedService struct {
779 SupervisorPath []*Supervisor
780 Service Service
781 Name string
782 ServiceToken ServiceToken
783 }
784
785 // An UnstoppedServiceReport will be returned by StopWithReport, reporting
786 // which services failed to stop.
787 type UnstoppedServiceReport []UnstoppedService
788
789 type serviceWithName struct {
790 Service Service
791 name string
792 }
793
794 // Jitter returns the sum of the input duration and a random jitter. It is
795 // compatible with the jitter functions in github.com/lthibault/jitterbug.
796 type Jitter interface {
797 Jitter(time.Duration) time.Duration
798 }
799
800 // NoJitter does not apply any jitter to the input duration
801 type NoJitter struct{}
802
803 // Jitter leaves the input duration d unchanged.
804 func (NoJitter) Jitter(d time.Duration) time.Duration { return d }
805
806 // DefaultJitter is the jitter function that is applied when spec.BackoffJitter
807 // is set to nil.
808 type DefaultJitter struct {
809 rand *rand.Rand
810 }
811
812 // Jitter will jitter the backoff time by uniformly distributing it into
813 // the range [FailureBackoff, 1.5 * FailureBackoff).
814 func (dj *DefaultJitter) Jitter(d time.Duration) time.Duration {
815 // this is only called by the core supervisor loop, so it is
816 // single-thread safe.
817 if dj.rand == nil {
818 dj.rand = rand.New(rand.NewSource(time.Now().UnixNano()))
819 }
820 jitter := dj.rand.Float64() / 2
821 return d + time.Duration(float64(d)*jitter)
822 }
823
824 // ErrWrongSupervisor is returned by the (*Supervisor).Remove method
825 // if you pass a ServiceToken from the wrong Supervisor.
826 var ErrWrongSupervisor = errors.New("wrong supervisor for this service token, no service removed")
827
828 // ErrTimeout is returned when an attempt to RemoveAndWait for a service to
829 // stop has timed out.
830 var ErrTimeout = errors.New("waiting for service to stop has timed out")
831
832 // ErrSupervisorNotTerminated is returned when asking for a stopped service
833 // report before the supervisor has been terminated.
834 var ErrSupervisorNotTerminated = errors.New("supervisor not terminated")
835
836 // ErrSupervisorNotStarted is returned if you try to send control messages
837 // to a supervisor that has not started yet. See note on Supervisor struct
838 // about the legal ways to start a supervisor.
839 var ErrSupervisorNotStarted = errors.New("supervisor not started yet")
840
841 // Spec is used to pass arguments to the New function to create a
842 // supervisor. See the New function for full documentation.
843 type Spec struct {
844 EventHook EventHook
845 FailureDecay float64
846 FailureThreshold float64
847 FailureBackoff time.Duration
848 BackoffJitter Jitter
849 Timeout time.Duration
850 PassThroughPanics bool
851 DontPropagateTermination bool
852 }
853
854 func (s *Spec) configureDefaults(supervisorName string) {
855 if s.FailureDecay == 0 {
856 s.FailureDecay = 30
857 }
858 if s.FailureThreshold == 0 {
859 s.FailureThreshold = 5
860 }
861 if s.FailureBackoff == 0 {
862 s.FailureBackoff = time.Second * 15
863 }
864 if s.BackoffJitter == nil {
865 s.BackoffJitter = &DefaultJitter{}
866 }
867 if s.Timeout == 0 {
868 s.Timeout = time.Second * 10
869 }
870
871 // set up the default logging handlers
872 if s.EventHook == nil {
873 s.EventHook = func(e Event) {
874 log.Print(e)
875 }
876 }
877 }
0 package suture
1
2 import (
3 "context"
4 "fmt"
5 )
6
7 type Incrementor struct {
8 current int
9 next chan int
10 stop chan bool
11 }
12
13 func (i *Incrementor) Stop() {
14 fmt.Println("Stopping the service")
15 i.stop <- true
16 }
17
18 func (i *Incrementor) Serve(ctx context.Context) error {
19 for {
20 select {
21 case i.next <- i.current:
22 i.current++
23 case <-ctx.Done():
24 // This message on i.stop is just to synchronize
25 // this test with the example code so the output is
26 // consistent for the test code; most services
27 // would just "return nil" here.
28 fmt.Println("Stopping the service")
29 i.stop <- true
30 return nil
31 }
32 }
33 }
34
35 func ExampleNew_simple() {
36 supervisor := NewSimple("Supervisor")
37 service := &Incrementor{0, make(chan int), make(chan bool)}
38 supervisor.Add(service)
39
40 ctx, cancel := context.WithCancel(context.Background())
41 supervisor.ServeBackground(ctx)
42
43 fmt.Println("Got:", <-service.next)
44 fmt.Println("Got:", <-service.next)
45 cancel()
46
47 // We sync here just to guarantee the output of "Stopping the service"
48 <-service.stop
49
50 // Output:
51 // Got: 0
52 // Got: 1
53 // Stopping the service
54 }
0 package suture
1
2 import (
3 "context"
4 "fmt"
5 "reflect"
6 "strings"
7 "sync"
8 "testing"
9 "time"
10 )
11
12 const (
13 Happy = iota
14 Fail
15 Panic
16 Hang
17 UseStopChan
18 TerminateTree
19 DoNotRestart
20 )
21
22 var everMultistarted = false
23
24 // Test that supervisors work perfectly when everything is hunky dory.
25 func TestTheHappyCase(t *testing.T) {
26 // t.Parallel()
27
28 s := NewSimple("A")
29 if s.String() != "A" {
30 t.Fatal("Can't get name from a supervisor")
31 }
32 service := NewService("B")
33
34 s.Add(service)
35
36 ctx, cancel := context.WithCancel(context.Background())
37 go s.Serve(ctx)
38
39 <-service.started
40
41 // If we stop the service, it just gets restarted
42 service.take <- Fail
43 <-service.started
44
45 // And it is shut down when we stop the supervisor
46 service.take <- UseStopChan
47 cancel()
48 <-service.stop
49 }
50
51 // Test that adding to a running supervisor does indeed start the service.
52 func TestAddingToRunningSupervisor(t *testing.T) {
53 // t.Parallel()
54
55 s := NewSimple("A1")
56
57 ctx, cancel := context.WithCancel(context.Background())
58 s.ServeBackground(ctx)
59 defer cancel()
60
61 service := NewService("B1")
62 s.Add(service)
63
64 <-service.started
65
66 services := s.Services()
67 if !reflect.DeepEqual([]Service{service}, services) {
68 t.Fatal("Can't get list of services as expected.")
69 }
70 }
71
72 // Test what happens when services fail.
73 func TestFailures(t *testing.T) {
74 // t.Parallel()
75
76 s := NewSimple("A2")
77 s.spec.FailureThreshold = 3.5
78
79 ctx, cancel := context.WithCancel(context.Background())
80 go s.Serve(ctx)
81 defer func() {
82 // to avoid deadlocks during shutdown, we have to not try to send
83 // things out on channels while we're shutting down (this undoes the
84 // LogFailure overide about 25 lines down)
85 s.spec.EventHook = func(Event) {}
86 cancel()
87 }()
88 s.sync()
89
90 service1 := NewService("B2")
91 service2 := NewService("C2")
92
93 s.Add(service1)
94 <-service1.started
95 s.Add(service2)
96 <-service2.started
97
98 nowFeeder := NewNowFeeder()
99 pastVal := time.Unix(1000000, 0)
100 nowFeeder.appendTimes(pastVal)
101 s.getNow = nowFeeder.getter
102
103 resumeChan := make(chan time.Time)
104 s.getAfterChan = func(d time.Duration) <-chan time.Time {
105 return resumeChan
106 }
107
108 failNotify := make(chan bool)
109 // use this to synchronize on here
110 s.spec.EventHook = func(e Event) {
111 switch e.Type() {
112 case EventTypeServiceTerminate:
113 failNotify <- e.(EventServiceTerminate).Restarting
114 case EventTypeServicePanic:
115 failNotify <- e.(EventServicePanic).Restarting
116 }
117 }
118
119 // All that setup was for this: Service1, please return now.
120 service1.take <- Fail
121 restarted := <-failNotify
122 <-service1.started
123
124 if !restarted || s.failures != 1 || s.lastFail != pastVal {
125 t.Fatal("Did not fail in the expected manner")
126 }
127 // Getting past this means the service was restarted.
128 service1.take <- Happy
129
130 // Service2, your turn.
131 service2.take <- Fail
132 nowFeeder.appendTimes(pastVal)
133 restarted = <-failNotify
134 <-service2.started
135 if !restarted || s.failures != 2 || s.lastFail != pastVal {
136 t.Fatal("Did not fail in the expected manner")
137 }
138 // And you're back. (That is, the correct service was restarted.)
139 service2.take <- Happy
140
141 // Now, one failureDecay later, is everything working correctly?
142 oneDecayLater := time.Unix(1000030, 0)
143 nowFeeder.appendTimes(oneDecayLater)
144 service2.take <- Fail
145 restarted = <-failNotify
146 <-service2.started
147 // playing a bit fast and loose here with floating point, but...
148 // we get 2 by taking the current failure value of 2, decaying it
149 // by one interval, which cuts it in half to 1, then adding 1 again,
150 // all of which "should" be precise
151 if !restarted || s.failures != 2 || s.lastFail != oneDecayLater {
152 t.Fatal("Did not decay properly", s.lastFail, oneDecayLater)
153 }
154
155 // For a change of pace, service1 would you be so kind as to panic?
156 nowFeeder.appendTimes(oneDecayLater)
157 service1.take <- Panic
158 restarted = <-failNotify
159 <-service1.started
160 if !restarted || s.failures != 3 || s.lastFail != oneDecayLater {
161 t.Fatal("Did not correctly recover from a panic")
162 }
163
164 nowFeeder.appendTimes(oneDecayLater)
165 backingoff := make(chan bool)
166 s.spec.EventHook = func(e Event) {
167 switch e.Type() {
168 case EventTypeServiceTerminate:
169 failNotify <- e.(EventServiceTerminate).Restarting
170 case EventTypeBackoff:
171 backingoff <- true
172 case EventTypeResume:
173 backingoff <- false
174 }
175 }
176
177 // And with this failure, we trigger the backoff code.
178 service1.take <- Fail
179 backoff := <-backingoff
180 restarted = <-failNotify
181
182 if !backoff || restarted || s.failures != 4 {
183 t.Fatal("Broke past the threshold but did not log correctly", s.failures, backoff, restarted)
184 }
185 if service1.existing != 0 {
186 t.Fatal("service1 still exists according to itself?")
187 }
188
189 // service2 is still running, because we don't shut anything down in a
190 // backoff, we just stop restarting.
191 service2.take <- Happy
192
193 var correct bool
194 timer := time.NewTimer(time.Millisecond * 10)
195 // verify the service has not been restarted
196 // hard to get around race conditions here without simply using a timer...
197 select {
198 case service1.take <- Happy:
199 correct = false
200 case <-timer.C:
201 correct = true
202 }
203 if !correct {
204 t.Fatal("Restarted the service during the backoff interval")
205 }
206
207 // tell the supervisor the restart interval has passed
208 resumeChan <- time.Time{}
209 backoff = <-backingoff
210 <-service1.started
211 s.sync()
212 if s.failures != 0 {
213 t.Fatal("Did not reset failure count after coming back from timeout.")
214 }
215
216 nowFeeder.appendTimes(oneDecayLater)
217 service1.take <- Fail
218 restarted = <-failNotify
219 <-service1.started
220 if !restarted || backoff {
221 t.Fatal("For some reason, got that we were backing off again.", restarted, backoff)
222 }
223 }
224
225 func TestRunningAlreadyRunning(t *testing.T) {
226 // t.Parallel()
227
228 s := NewSimple("A3")
229 ctx, cancel := context.WithCancel(context.Background())
230 go s.Serve(ctx)
231 defer cancel()
232
233 // ensure the supervisor has made it to its main loop
234 s.sync()
235 if !panics(s.Serve) {
236 t.Fatal("Supervisor failed to prevent itself from double-running.")
237 }
238 }
239
240 func TestFullConstruction(t *testing.T) {
241 // t.Parallel()
242
243 s := New("Moo", Spec{
244 EventHook: func(Event) {},
245 FailureDecay: 1,
246 FailureThreshold: 2,
247 FailureBackoff: 3,
248 Timeout: time.Second * 29,
249 })
250 if s.String() != "Moo" || s.spec.FailureDecay != 1 || s.spec.FailureThreshold != 2 || s.spec.FailureBackoff != 3 || s.spec.Timeout != time.Second*29 {
251 t.Fatal("Full construction failed somehow")
252 }
253 }
254
255 // This is mostly for coverage testing.
256 func TestDefaultLogging(t *testing.T) {
257 // t.Parallel()
258
259 s := NewSimple("A4")
260
261 service := NewService("B4")
262 s.Add(service)
263
264 s.spec.FailureThreshold = .5
265 s.spec.FailureBackoff = time.Millisecond * 25
266 ctx, cancel := context.WithCancel(context.Background())
267 go s.Serve(ctx)
268 s.sync()
269
270 <-service.started
271
272 resumeChan := make(chan time.Time)
273 s.getAfterChan = func(d time.Duration) <-chan time.Time {
274 return resumeChan
275 }
276
277 service.take <- UseStopChan
278 service.take <- Fail
279 <-service.stop
280 resumeChan <- time.Time{}
281
282 <-service.started
283
284 service.take <- Happy
285
286 s.spec.EventHook(EventStopTimeout{s, s.Name, service, service.name})
287 s.spec.EventHook(EventServicePanic{
288 SupervisorName: s.Name,
289 ServiceName: service.name,
290 CurrentFailures: 1,
291 FailureThreshold: 1,
292 Restarting: true,
293 PanicMsg: "test error",
294 Stacktrace: "",
295 })
296
297 cancel()
298 }
299
300 func TestNestedSupervisors(t *testing.T) {
301 // t.Parallel()
302
303 super1 := NewSimple("Top5")
304 super2 := NewSimple("Nested5")
305 service := NewService("Service5")
306
307 super2.spec.EventHook = func(e Event) {
308 if e.Type() == EventTypeStopTimeout {
309 panic("Failed to copy LogBadStop")
310 }
311 }
312
313 super1.Add(super2)
314 super2.Add(service)
315
316 // test the functions got copied from super1; if this panics, it didn't
317 // get copied
318 super2.spec.EventHook(EventStopTimeout{
319 super2, super2.Name,
320 service, service.name,
321 })
322
323 ctx, cancel := context.WithCancel(context.Background())
324 go super1.Serve(ctx)
325 super1.sync()
326
327 <-service.started
328 service.take <- Happy
329
330 cancel()
331 }
332
333 func TestStoppingSupervisorStopsServices(t *testing.T) {
334 // t.Parallel()
335
336 s := NewSimple("Top6")
337 service := NewService("Service 6")
338
339 s.Add(service)
340
341 ctx, cancel := context.WithCancel(context.Background())
342 go s.Serve(ctx)
343 s.sync()
344
345 <-service.started
346
347 service.take <- UseStopChan
348
349 cancel()
350 <-service.stop
351
352 if s.sendControl(syncSupervisor{}) != ErrSupervisorNotRunning {
353 t.Fatal("supervisor is shut down, should be returning ErrSupervisorNotRunning for sendControl")
354 }
355 if s.Services() != nil {
356 t.Fatal("Non-running supervisor is returning services list")
357 }
358 }
359
360 // This tests that even if a service is hung, the supervisor will stop.
361 func TestStoppingStillWorksWithHungServices(t *testing.T) {
362 // t.Parallel()
363
364 s := NewSimple("Top7")
365 service := NewService("Service WillHang7")
366
367 s.Add(service)
368
369 ctx, cancel := context.WithCancel(context.Background())
370 go s.Serve(ctx)
371
372 <-service.started
373
374 service.take <- UseStopChan
375 service.take <- Hang
376
377 resumeChan := make(chan time.Time)
378 s.getAfterChan = func(d time.Duration) <-chan time.Time {
379 return resumeChan
380 }
381 failNotify := make(chan struct{})
382 s.spec.EventHook = func(e Event) {
383 if e.Type() == EventTypeStopTimeout {
384 failNotify <- struct{}{}
385 }
386 }
387
388 // stop the supervisor, then immediately call time on it
389 go cancel()
390
391 resumeChan <- time.Time{}
392 <-failNotify
393 service.release <- true
394 <-service.stop
395 }
396
397 // This tests that even if a service is hung, the supervisor can still
398 // remove it.
399 func TestRemovingHungService(t *testing.T) {
400 // t.Parallel()
401
402 s := NewSimple("TopHungService")
403 failNotify := make(chan struct{})
404 resumeChan := make(chan time.Time)
405 s.getAfterChan = func(d time.Duration) <-chan time.Time {
406 return resumeChan
407 }
408 s.spec.EventHook = func(e Event) {
409 if e.Type() == EventTypeStopTimeout {
410 failNotify <- struct{}{}
411 }
412 }
413 service := NewService("Service WillHang")
414
415 sToken := s.Add(service)
416
417 go s.Serve(context.Background())
418
419 <-service.started
420 service.take <- Hang
421
422 _ = s.Remove(sToken)
423 resumeChan <- time.Time{}
424
425 <-failNotify
426 service.release <- true
427 }
428
429 func TestRemoveService(t *testing.T) {
430 // t.Parallel()
431
432 s := NewSimple("Top")
433 service := NewService("ServiceToRemove8")
434
435 id := s.Add(service)
436
437 go s.Serve(context.Background())
438
439 <-service.started
440 service.take <- UseStopChan
441
442 err := s.Remove(id)
443 if err != nil {
444 t.Fatal("Removing service somehow failed")
445 }
446 <-service.stop
447
448 err = s.Remove(ServiceToken{id.id + (1 << 32)})
449 if err != ErrWrongSupervisor {
450 t.Fatal("Did not detect that the ServiceToken was wrong")
451 }
452 err = s.RemoveAndWait(ServiceToken{id.id + (1 << 32)}, time.Second)
453 if err != ErrWrongSupervisor {
454 t.Fatal("Did not detect that the ServiceToken was wrong")
455 }
456 }
457
458 func TestServiceReport(t *testing.T) {
459 // t.Parallel()
460
461 s := NewSimple("Top")
462 s.spec.Timeout = time.Millisecond
463 service := NewService("ServiceName")
464
465 id := s.Add(service)
466 ctx, cancel := context.WithCancel(context.Background())
467 go s.Serve(ctx)
468
469 <-service.started
470 service.take <- Hang
471
472 expected := UnstoppedServiceReport{
473 {[]*Supervisor{s}, service, "ServiceName", id},
474 }
475
476 cancel()
477
478 report, err := s.UnstoppedServiceReport()
479 if err != nil {
480 t.Fatalf("error getting unstopped service report: %v", err)
481 }
482 if !reflect.DeepEqual(report, expected) {
483 t.Fatalf("did not get expected stop service report %#v != %#v", report, expected)
484 }
485 }
486
487 func TestFailureToConstruct(t *testing.T) {
488 // t.Parallel()
489
490 var s *Supervisor
491
492 panics(s.Serve)
493
494 s = new(Supervisor)
495 panics(s.Serve)
496 }
497
498 func TestFailingSupervisors(t *testing.T) {
499 // t.Parallel()
500
501 // This is a bit of a complicated test, so let me explain what
502 // all this is doing:
503 // 1. Set up a top-level supervisor with a hair-trigger backoff.
504 // 2. Add a supervisor to that.
505 // 3. To that supervisor, add a service.
506 // 4. Panic the supervisor in the middle, sending the top-level into
507 // backoff.
508 // 5. Kill the lower level service too.
509 // 6. Verify that when the top-level service comes out of backoff,
510 // the service ends up restarted as expected.
511
512 // Ultimately, we can't have more than a best-effort recovery here.
513 // A panic'ed supervisor can't really be trusted to have consistent state,
514 // and without *that*, we can't trust it to do anything sensible with
515 // the children it may have been running. So unlike Erlang, we can't
516 // can't really expect to be able to safely restart them or anything.
517 // Really, the "correct" answer is that the Supervisor must never panic,
518 // but in the event that it does, this verifies that it at least tries
519 // to get on with life.
520
521 // This also tests that if a Supervisor itself panics, and one of its
522 // monitored services goes down in the meantime, that the monitored
523 // service also gets correctly restarted when the supervisor does.
524
525 s1 := NewSimple("Top9")
526 s2 := NewSimple("Nested9")
527 service := NewService("Service9")
528
529 s1.Add(s2)
530 s2.Add(service)
531
532 // start the top-level supervisor...
533 ctx, cancel := context.WithCancel(context.Background())
534 go s1.Serve(ctx)
535 defer cancel()
536 // and sync on the service being started.
537 <-service.started
538
539 // Set the failure threshold such that even one failure triggers
540 // backoff on the top-level supervisor.
541 s1.spec.FailureThreshold = .5
542
543 // This lets us control exactly when the top-level supervisor comes
544 // back from its backoff, by forcing it to block on this channel
545 // being sent something in order to come back.
546 resumeChan := make(chan time.Time)
547 s1.getAfterChan = func(d time.Duration) <-chan time.Time {
548 return resumeChan
549 }
550 failNotify := make(chan string)
551 // synchronize on the expected failure of the middle supervisor
552 s1.spec.EventHook = func(e Event) {
553 if e.Type() == EventTypeServicePanic {
554 failNotify <- fmt.Sprintf("%s", e.(EventServicePanic).Service)
555 }
556 }
557
558 // Now, the middle supervisor panics and dies.
559 s2.panic()
560
561 // Receive the notification from the hacked log message from the
562 // top-level supervisor that the middle has failed.
563 failing := <-failNotify
564 // that's enough sync to guarantee this:
565 if failing != "Nested9" || s1.state != paused {
566 t.Fatal("Top-level supervisor did not go into backoff as expected")
567 }
568
569 // Tell the service to fail. Note the top-level supervisor has
570 // still not restarted the middle supervisor.
571 service.take <- Fail
572
573 // We now permit the top-level supervisor to resume. It should
574 // restart the middle supervisor, which should then restart the
575 // child service...
576 resumeChan <- time.Time{}
577
578 // which we can pick up from here. If this successfully restarts,
579 // then the whole chain must have worked.
580 <-service.started
581 }
582
583 func TestNilSupervisorAdd(t *testing.T) {
584 // t.Parallel()
585
586 var s *Supervisor
587
588 defer func() {
589 if r := recover(); r == nil {
590 t.Fatal("did not panic as expected on nil add")
591 }
592 }()
593
594 s.Add(s)
595 }
596
597 func TestPassNoContextToSupervisor(t *testing.T) {
598 s := NewSimple("main")
599 service := NewService("B")
600 s.Add(service)
601
602 go s.Serve(nil)
603 <-service.started
604
605 s.ctxCancel()
606 }
607
608 func TestNilSupervisorPanicsAsExpected(t *testing.T) {
609 s := (*Supervisor)(nil)
610 if !panicsWith(s.Serve, "with a nil *suture.Supervisor") {
611 t.Fatal("nil supervisor doesn't panic as expected")
612 }
613 }
614
615 // https://github.com/thejerf/suture/issues/11
616 //
617 // The purpose of this test is to verify that it does not cause data races,
618 // so there are no obvious assertions.
619 func TestIssue11(t *testing.T) {
620 // t.Parallel()
621
622 s := NewSimple("main")
623 s.ServeBackground(context.Background())
624
625 subsuper := NewSimple("sub")
626 s.Add(subsuper)
627
628 subsuper.Add(NewService("may cause data race"))
629 }
630
631 func TestRemoveAndWait(t *testing.T) {
632 // t.Parallel()
633
634 s := NewSimple("main")
635 s.spec.Timeout = time.Second
636 ctx, cancel := context.WithCancel(context.Background())
637 s.ServeBackground(ctx)
638
639 service := NewService("A1")
640 token := s.Add(service)
641 <-service.started
642
643 // Normal termination case; without the useStopChan flag on the
644 // NewService, this will just terminate. So we can freely use a long
645 // timeout, because it should not trigger.
646 err := s.RemoveAndWait(token, time.Second)
647 if err != nil {
648 t.Fatal("Happy case for RemoveAndWait failed: " + err.Error())
649 }
650 // Removing already-removed service does unblock the channel
651 err = s.RemoveAndWait(token, time.Second)
652 if err != nil {
653 t.Fatal("Removing already-removed service failed: " + err.Error())
654 }
655
656 service = NewService("A2")
657 token = s.Add(service)
658 <-service.started
659 service.take <- Hang
660
661 // Abnormal case; the service is hung until we release it
662 err = s.RemoveAndWait(token, time.Millisecond)
663 if err == nil {
664 t.Fatal("RemoveAndWait unexpectedly returning that everything is fine")
665 }
666 if err != ErrTimeout {
667 // laziness; one of the unhappy results is err == nil, which will
668 // panic here, but, hey, that's a failing test, right?
669 t.Fatal("Unexpected result for RemoveAndWait on frozen service: " +
670 err.Error())
671 }
672
673 // Abnormal case: The service is hung and we get the supervisor
674 // stopping instead.
675 service = NewService("A3")
676 token = s.Add(service)
677 <-service.started
678 cancel()
679 err = s.RemoveAndWait(token, 10*time.Millisecond)
680
681 if err != ErrSupervisorNotRunning {
682 t.Fatal("Unexpected result for RemoveAndWait on a stopped service: " + err.Error())
683 }
684
685 // Abnormal case: The service takes long to terminate, which takes more than the timeout of the spec, but
686 // if the service eventually terminates, this does not hang RemoveAndWait.
687 s = NewSimple("main")
688 s.spec.Timeout = time.Millisecond
689 ctx, cancel = context.WithCancel(context.Background())
690 s.ServeBackground(ctx)
691 defer cancel()
692 service = NewService("A1")
693 token = s.Add(service)
694 <-service.started
695 service.take <- Hang
696
697 go func() {
698 time.Sleep(10 * time.Millisecond)
699 service.release <- true
700 }()
701
702 err = s.RemoveAndWait(token, 0)
703 if err != nil {
704 t.Fatal("Unexpected result of RemoveAndWait: " + err.Error())
705 }
706 }
707
708 func TestSupervisorManagementIssue35(t *testing.T) {
709 s := NewSimple("issue 35")
710
711 for i := 1; i < 100; i++ {
712 s2 := NewSimple("test")
713 s.Add(s2)
714 }
715
716 ctx, cancel := context.WithCancel(context.Background())
717 s.ServeBackground(ctx)
718 // should not have any panics
719 cancel()
720 }
721
722 func TestCoverage(t *testing.T) {
723 New("testing coverage", Spec{
724 EventHook: func(Event) {},
725 })
726 NoJitter{}.Jitter(time.Millisecond)
727 }
728
729 func TestStopAfterRemoveAndWait(t *testing.T) {
730 // t.Parallel()
731
732 var badStopError error
733
734 s := NewSimple("main")
735 s.spec.Timeout = time.Second
736 s.spec.EventHook = func(e Event) {
737 if e.Type() == EventTypeStopTimeout {
738 ev := e.(EventStopTimeout)
739 badStopError = fmt.Errorf("%s: Service %s failed to terminate in a timely manner", ev.Supervisor, ev.Service)
740 }
741 }
742
743 ctx, cancel := context.WithCancel(context.Background())
744 s.ServeBackground(ctx)
745
746 service := NewService("A1")
747 token := s.Add(service)
748
749 <-service.started
750 service.take <- UseStopChan
751
752 err := s.RemoveAndWait(token, time.Second)
753 if err != nil {
754 t.Fatal("Happy case for RemoveAndWait failed: " + err.Error())
755 }
756 <-service.stop
757
758 cancel()
759
760 if badStopError != nil {
761 t.Fatal("Unexpected timeout while stopping supervisor: " + badStopError.Error())
762 }
763 }
764
765 // This tests that the entire supervisor tree is terminated when a service
766 // returns returns ErrTerminateTree directly.
767 func TestServiceAndTreeTermination(t *testing.T) {
768 // t.Parallel()
769 s1 := NewSimple("TestTreeTermination1")
770 s2 := NewSimple("TestTreeTermination2")
771 s1.Add(s2)
772
773 service1 := NewService("TestTreeTerminationService1")
774 service2 := NewService("TestTreeTerminationService2")
775 service3 := NewService("TestTreeTerminationService2")
776 s2.Add(service1)
777 s2.Add(service2)
778 s2.Add(service3)
779
780 terminated := make(chan struct{})
781 go func() {
782 // we don't need the context because the service is going
783 // to terminate the supervisor.
784 s1.Serve(nil)
785 terminated <- struct{}{}
786 }()
787
788 <-service1.started
789 <-service2.started
790 <-service3.started
791
792 // OK, everything is up and running. Start by telling one service
793 // to terminate itself, and verify it isn't restarted.
794 service3.take <- DoNotRestart
795
796 // I've got nothing other than just waiting for a suitable period
797 // of time and hoping for the best here; it's hard to synchronize
798 // on an event not happening...!
799 time.Sleep(250 * time.Microsecond)
800 service3.m.Lock()
801 service3Running := service3.running
802 service3.m.Unlock()
803
804 if service3Running {
805 t.Fatal("service3 was restarted")
806 }
807
808 service1.take <- TerminateTree
809 <-terminated
810
811 if service1.running || service2.running || service3.running {
812 t.Fatal("Didn't shut services & tree down properly.")
813 }
814 }
815
816 // Test that supervisors set to not propagate service failures upwards will
817 // not kill the whole tree.
818 func TestDoNotPropagate(t *testing.T) {
819 s1 := NewSimple("TestDoNotPropagate")
820 s2 := New("TestDoNotPropgate Subtree", Spec{DontPropagateTermination: true})
821
822 s1.Add(s2)
823
824 service1 := NewService("should keep running")
825 service2 := NewService("should end up terminating")
826 s1.Add(service1)
827 s2.Add(service2)
828
829 ctx, cancel := context.WithCancel(context.Background())
830 go s1.Serve(ctx)
831 defer cancel()
832
833 <-service1.started
834 <-service2.started
835
836 fmt.Println("Service about to take")
837 service2.take <- TerminateTree
838 fmt.Println("Service took")
839 time.Sleep(time.Millisecond)
840
841 if service2.running {
842 t.Fatal("service 2 should have terminated")
843 }
844 if s2.state != terminated {
845 t.Fatal("child supervisor should be terminated")
846 }
847 if s1.state != normal {
848 t.Fatal("parent supervisor should be running")
849 }
850 }
851
852 func TestShim(t *testing.T) {
853 s := NewSimple("TEST: TestShim")
854 ctx, cancel := context.WithCancel(context.Background())
855 s.ServeBackground(ctx)
856
857 os := &OldService{
858 make(chan struct{}),
859 make(chan struct{}),
860 make(chan struct{}),
861 make(chan struct{}),
862 }
863 s.Add(AsService(os))
864
865 // Old service can return as normal and gets restarted; only the
866 // first one of these works if it doesn't get restarted.
867 os.doReturn <- struct{}{}
868 os.doReturn <- struct{}{}
869 // without this, the cancel command below can end up trying to stop
870 // this service at a bad time
871 os.sync <- struct{}{}
872
873 go func() {
874 cancel()
875 }()
876
877 // old-style service stops as expected.
878 <-os.stopping
879 }
880
881 // http://golangtutorials.blogspot.com/2011/10/gotest-unit-testing-and-benchmarking-go.html
882 // claims test function are run in the same order as the source file...
883 // I'm not sure if this is part of the contract, though. Especially in the
884 // face of "t.Parallel()"...
885 //
886 // This is also why all the tests must go in this file; this test needs to
887 // run last, and the only way I know to even hopefully guarantee that is to
888 // have them all in one file.
889 func TestEverMultistarted(t *testing.T) {
890 if everMultistarted {
891 t.Fatal("Seem to have multistarted a service at some point, bummer.")
892 }
893 }
894
895 func TestAddAfterStopping(t *testing.T) {
896 // t.Parallel()
897
898 s := NewSimple("main")
899 ctx, cancel := context.WithCancel(context.Background())
900
901 service := NewService("A1")
902 supDone := make(chan struct{})
903 addDone := make(chan struct{})
904
905 go func() {
906 s.Serve(ctx)
907 close(supDone)
908 }()
909
910 cancel()
911 <-supDone
912
913 go func() {
914 s.Add(service)
915 close(addDone)
916 }()
917
918 select {
919 case <-time.After(5 * time.Second):
920 t.Fatal("Timed out waiting for Add to return")
921 case <-addDone:
922 }
923 }
924
925 // A test service that can be induced to fail, panic, or hang on demand.
926 func NewService(name string) *FailableService {
927 return &FailableService{name, make(chan bool), make(chan int),
928 make(chan bool), make(chan bool, 1), 0, sync.Mutex{}, false}
929 }
930
931 type FailableService struct {
932 name string
933 started chan bool
934 take chan int
935 release chan bool
936 stop chan bool
937 existing int
938
939 m sync.Mutex
940 running bool
941 }
942
943 func (s *FailableService) Serve(ctx context.Context) error {
944 if s.existing != 0 {
945 everMultistarted = true
946 panic("Multi-started the same service! " + s.name)
947 }
948 s.existing++
949
950 s.m.Lock()
951 s.running = true
952 s.m.Unlock()
953
954 defer func() {
955 s.m.Lock()
956 s.running = false
957 s.m.Unlock()
958 }()
959
960 s.started <- true
961
962 useStopChan := false
963
964 for {
965 select {
966 case val := <-s.take:
967 switch val {
968 case Happy:
969 // Do nothing on purpose. Life is good!
970 case Fail:
971 s.existing--
972 if useStopChan {
973 s.stop <- true
974 }
975 return nil
976 case Panic:
977 s.existing--
978 panic("Panic!")
979 case Hang:
980 // or more specifically, "hang until I release you"
981 <-s.release
982 case UseStopChan:
983 useStopChan = true
984 case TerminateTree:
985 return ErrTerminateSupervisorTree
986 case DoNotRestart:
987 return ErrDoNotRestart
988 }
989 case <-ctx.Done():
990 s.existing--
991 if useStopChan {
992 s.stop <- true
993 }
994 return ctx.Err()
995 }
996 }
997 }
998
999 func (s *FailableService) String() string {
1000 return s.name
1001 }
1002
1003 type OldService struct {
1004 done chan struct{}
1005 doReturn chan struct{}
1006 stopping chan struct{}
1007 sync chan struct{}
1008 }
1009
1010 func (os *OldService) Serve() {
1011 for {
1012 select {
1013 case <-os.done:
1014 return
1015 case <-os.doReturn:
1016 return
1017 case <-os.sync:
1018 // deliberately do nothing
1019 }
1020 }
1021 }
1022
1023 func (os *OldService) Stop() {
1024 close(os.done)
1025 os.stopping <- struct{}{}
1026 }
1027
1028 type NowFeeder struct {
1029 values []time.Time
1030 getter func() time.Time
1031 m sync.Mutex
1032 }
1033
1034 // This is used to test serviceName; it's a service without a Stringer.
1035 type BarelyService struct{}
1036
1037 func (bs *BarelyService) Serve(context context.Context) error {
1038 return nil
1039 }
1040
1041 func NewNowFeeder() (nf *NowFeeder) {
1042 nf = new(NowFeeder)
1043 nf.getter = func() time.Time {
1044 nf.m.Lock()
1045 defer nf.m.Unlock()
1046 if len(nf.values) > 0 {
1047 ret := nf.values[0]
1048 nf.values = nf.values[1:]
1049 return ret
1050 }
1051 panic("Ran out of values for NowFeeder")
1052 }
1053 return
1054 }
1055
1056 func (nf *NowFeeder) appendTimes(t ...time.Time) {
1057 nf.m.Lock()
1058 defer nf.m.Unlock()
1059 nf.values = append(nf.values, t...)
1060 }
1061
1062 func panics(doesItPanic func(ctx context.Context) error) (panics bool) {
1063 defer func() {
1064 if r := recover(); r != nil {
1065 panics = true
1066 }
1067 }()
1068
1069 doesItPanic(context.Background())
1070
1071 return
1072 }
1073
1074 func panicsWith(doesItPanic func(context.Context) error, s string) (panics bool) {
1075 defer func() {
1076 if r := recover(); r != nil {
1077 rStr := fmt.Sprintf("%v", r)
1078 if !strings.Contains(rStr, s) {
1079 fmt.Println("unexpected:", rStr)
1080 } else {
1081 panics = true
1082 }
1083 }
1084 }()
1085
1086 doesItPanic(context.Background())
1087
1088 return
1089 }