Codebase list golang-github-go-kit-kit / dc489b7
Consider SRV records with port 0 as an error (#900) * sd/dnssrv: fix Instancer method receivers * sd/dnssrv: SRV record with port 0 is an error * sd/dnssrv: test for SRV port zero issue * .build.yml: update and fix Peter Bourgon authored 3 years ago GitHub committed 3 years ago
2 changed file(s) with 50 addition(s) and 14 deletion(s). Raw diff Collapse all Expand all
00 package dnssrv
11
22 import (
3 "errors"
34 "fmt"
45 "net"
56 "time"
89 "github.com/go-kit/kit/sd"
910 "github.com/go-kit/kit/sd/internal/instance"
1011 )
12
13 // ErrPortZero is returned by the resolve machinery
14 // when a DNS resolver returns an SRV record with its
15 // port set to zero.
16 var ErrPortZero = errors.New("resolver returned SRV record with port 0")
1117
1218 // Instancer yields instances from the named DNS SRV record. The name is
1319 // resolved on a fixed schedule. Priorities and weights are ignored.
5662 }
5763
5864 // Stop terminates the Instancer.
59 func (p *Instancer) Stop() {
60 close(p.quit)
65 func (in *Instancer) Stop() {
66 close(in.quit)
6167 }
6268
63 func (p *Instancer) loop(t *time.Ticker, lookup Lookup) {
69 func (in *Instancer) loop(t *time.Ticker, lookup Lookup) {
6470 defer t.Stop()
6571 for {
6672 select {
6773 case <-t.C:
68 instances, err := p.resolve(lookup)
74 instances, err := in.resolve(lookup)
6975 if err != nil {
70 p.logger.Log("name", p.name, "err", err)
71 p.cache.Update(sd.Event{Err: err})
76 in.logger.Log("name", in.name, "err", err)
77 in.cache.Update(sd.Event{Err: err})
7278 continue // don't replace potentially-good with bad
7379 }
74 p.cache.Update(sd.Event{Instances: instances})
80 in.cache.Update(sd.Event{Instances: instances})
7581
76 case <-p.quit:
82 case <-in.quit:
7783 return
7884 }
7985 }
8086 }
8187
82 func (p *Instancer) resolve(lookup Lookup) ([]string, error) {
83 _, addrs, err := lookup("", "", p.name)
88 func (in *Instancer) resolve(lookup Lookup) ([]string, error) {
89 _, addrs, err := lookup("", "", in.name)
8490 if err != nil {
8591 return nil, err
8692 }
8793 instances := make([]string, len(addrs))
8894 for i, addr := range addrs {
95 if addr.Port == 0 {
96 return nil, ErrPortZero
97 }
8998 instances[i] = net.JoinHostPort(addr.Target, fmt.Sprint(addr.Port))
9099 }
91100 return instances, nil
92101 }
93102
94103 // Register implements Instancer.
95 func (s *Instancer) Register(ch chan<- sd.Event) {
96 s.cache.Register(ch)
104 func (in *Instancer) Register(ch chan<- sd.Event) {
105 in.cache.Register(ch)
97106 }
98107
99108 // Deregister implements Instancer.
100 func (s *Instancer) Deregister(ch chan<- sd.Event) {
101 s.cache.Deregister(ch)
109 func (in *Instancer) Deregister(ch chan<- sd.Event) {
110 in.cache.Deregister(ch)
102111 }
6767 }
6868 }
6969
70 func TestIssue892(t *testing.T) {
71 ticker := time.NewTicker(time.Second)
72 ticker.Stop()
73 tickc := make(chan time.Time)
74 ticker.C = tickc
75
76 records := []*net.SRV{
77 {Target: "1.0.0.1", Port: 80},
78 {Target: "1.0.0.2", Port: 0},
79 {Target: "1.0.0.3", Port: 80},
80 }
81
82 lookup := func(service, proto, name string) (string, []*net.SRV, error) {
83 return "cname", records, nil
84 }
85
86 instancer := NewInstancerDetailed("name", ticker, lookup, log.NewNopLogger())
87 defer instancer.Stop()
88
89 tickc <- time.Now()
90 time.Sleep(100 * time.Millisecond)
91
92 if want, have := ErrPortZero, instancer.cache.State().Err; want != have {
93 t.Fatalf("want %v, have %v", want, have)
94 }
95 }
96
7097 type nopCloser struct{}
7198
7299 func (nopCloser) Close() error { return nil }