Codebase list golang-github-bsm-redeo / e662d48d-c78d-4bb3-adc8-21e8e0bf068e/upstream
Import upstream version 2.2.1+git20190812.1.3def97a Debian Janitor 2 years ago
37 changed file(s) with 2156 addition(s) and 570 deletion(s). Raw diff Collapse all Expand all
0 package redeofuzz_test
1
2 import (
3 "math/rand"
4 "net"
5 "sync"
6 "testing"
7
8 "github.com/bsm/redeo"
9 "github.com/bsm/redeo/resp"
10 "github.com/go-redis/redis"
11 )
12
13 func TestFuzz(t *testing.T) {
14 lis, err := net.Listen("tcp", "127.0.0.1:")
15 if err != nil {
16 t.Fatalf("could not open a listener: %v", err)
17 return
18 }
19 defer lis.Close()
20
21 srv := initServer()
22 go srv.Serve(lis)
23
24 cln := redis.NewClient(&redis.Options{
25 Addr: lis.Addr().String(),
26 PoolSize: 20,
27 })
28 defer cln.Close()
29
30 if err := cln.Ping().Err(); err != nil {
31 t.Fatalf("could not ping server: %v", err)
32 return
33 }
34
35 n := 10000
36 if testing.Short() {
37 n = 1000
38 }
39
40 var wg sync.WaitGroup
41 for k := 0; k < 10; k++ {
42 wg.Add(1)
43 go func() {
44 defer wg.Done()
45
46 for i := 0; i < n; i++ {
47 if !fuzzIteration(t, cln, i) {
48 return
49 }
50 }
51 }()
52 }
53 wg.Wait()
54 }
55
56 func fuzzIteration(t *testing.T, c *redis.Client, i int) bool {
57 if cmd, act, err, xargs, xbytes := fuzzCallMB(c); err != nil {
58 t.Fatalf("fuzzmb failed with: %v, command: %q", err, cmd.String())
59 return false
60 } else if act["na"] != xargs {
61 t.Fatalf("fuzzmb expected the number of processed arguments to be %d but was %d, command: %q", xargs, act["na"], cmd.String())
62 return false
63 } else if act["nb"] != xbytes {
64 t.Fatalf("fuzzmb expected the number ofprocessed bytes to be %d but was %d, command: %q", xbytes, act["nb"], cmd.String())
65 return false
66 }
67
68 if i%3 == 0 {
69 if err := fuzzCallErr(c); err == nil {
70 t.Fatal("fuzzerr expected error but received none")
71 return false
72 } else if err.Error() != "ERR wrong number of arguments for 'fuzzerr' command" {
73 t.Fatalf("fuzzerr returned unexpected error %v", err)
74 return false
75 }
76 }
77
78 if i%4 == 0 {
79 if err := fuzzCallUnknown(c); err == nil {
80 t.Fatal("fuzzunknown expected error but received none")
81 return false
82 } else if err.Error() != "ERR unknown command 'fuzzunknown'" {
83 t.Fatalf("fuzzunknown returned unexpected error %v", err)
84 return false
85 }
86 }
87
88 if cmd, act, err, exp := fuzzCallStream(c); err != nil {
89 t.Fatalf("fuzzstream failed with: %v, command: %q", err, cmd.String())
90 return false
91 } else if act != exp {
92 t.Fatalf("fuzzstream expected the number of processed arguments to be %d but was %d, command: %q", exp, act, cmd.String())
93 return false
94 }
95
96 return true
97 }
98
99 // --------------------------------------------------------------------
100
101 func fuzzCallMB(c *redis.Client) (cmd *redis.StringIntMapCmd, act map[string]int64, err error, xargs int64, xbytes int64) {
102 xargs = rand.Int63n(20)
103 args := append(make([]interface{}, 0, int(xargs+1)), "fuzzmb")
104 for i := int64(0); i < xargs; i++ {
105 b := make([]byte, rand.Intn(1024))
106 n, _ := rand.Read(b)
107 args = append(args, b[:n])
108 xbytes += int64(n)
109 }
110 cmd = redis.NewStringIntMapCmd(args...)
111 c.Process(cmd)
112 act, err = cmd.Result()
113 return
114 }
115
116 func fuzzCallErr(c *redis.Client) error {
117 cmd := redis.NewStatusCmd("fuzzerr")
118 c.Process(cmd)
119 return cmd.Err()
120 }
121
122 func fuzzCallStream(c *redis.Client) (cmd *redis.IntCmd, act int64, err error, exp int64) {
123 exp = rand.Int63n(3)
124 args := append(make([]interface{}, 0, int(exp+1)), "fuzzstream")
125 for i := int64(0); i < exp; i++ {
126 b := make([]byte, rand.Intn(32*1024))
127 n, _ := rand.Read(b)
128 args = append(args, b[:n])
129 }
130 cmd = redis.NewIntCmd(args...)
131 c.Process(cmd)
132 act, err = cmd.Result()
133 return
134 }
135
136 func fuzzCallUnknown(c *redis.Client) error {
137 cmd := redis.NewStatusCmd("fuzzunknown")
138 c.Process(cmd)
139 return cmd.Err()
140 }
141
142 // --------------------------------------------------------------------
143
144 func initServer() *redeo.Server {
145 s := redeo.NewServer(nil)
146 s.Handle("ping", redeo.Ping())
147
148 s.HandleFunc("fuzzmb", func(w resp.ResponseWriter, c *resp.Command) {
149 sz := 0
150 for _, a := range c.Args {
151 sz += len(a)
152 }
153
154 w.AppendArrayLen(4)
155 w.AppendBulkString("na")
156 w.AppendInt(int64(c.ArgN()))
157 w.AppendBulkString("nb")
158 w.AppendInt(int64(sz))
159 })
160
161 s.HandleFunc("fuzzerr", func(w resp.ResponseWriter, c *resp.Command) {
162 w.AppendError(redeo.WrongNumberOfArgs(c.Name))
163 })
164
165 s.HandleStreamFunc("fuzzstream", func(w resp.ResponseWriter, c *resp.CommandStream) {
166 if c.ArgN() != 0 {
167 for i := 0; i < rand.Intn(c.ArgN()); i++ {
168 rd, err := c.Next()
169 if err != nil {
170 w.AppendErrorf("ERR %v", err)
171 return
172 }
173 if _, err := rd.Read(make([]byte, 16*1024)); err != nil {
174 w.AppendErrorf("ERR %v", err)
175 return
176 }
177 }
178 }
179 w.AppendInt(int64(c.ArgN()))
180 })
181
182 return s
183 }
11 services:
22 - redis
33 install:
4 - go get -u github.com/golang/dep/cmd/dep
5 - dep ensure -v -vendor-only
4 - go get -u github.com/go-redis/redis
5 script:
6 - make default fuzz fuzzrace
67 go:
7 - 1.9.x
8 - 1.8.x
8 - 1.11.x
9 - 1.12.x
10 env:
11 - GO111MODULE=on
+0
-51
Gopkg.lock less more
0 # This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'.
1
2
3 [[projects]]
4 name = "github.com/bsm/pool"
5 packages = ["."]
6 revision = "aef6431197b549db4cc9cbbdc06d9c3dde2996bd"
7 version = "v0.8.0"
8
9 [[projects]]
10 name = "github.com/onsi/ginkgo"
11 packages = [".","config","extensions/table","internal/codelocation","internal/containernode","internal/failer","internal/leafnodes","internal/remote","internal/spec","internal/spec_iterator","internal/specrunner","internal/suite","internal/testingtproxy","internal/writer","reporters","reporters/stenographer","reporters/stenographer/support/go-colorable","reporters/stenographer/support/go-isatty","types"]
12 revision = "9eda700730cba42af70d53180f9dcce9266bc2bc"
13 version = "v1.4.0"
14
15 [[projects]]
16 name = "github.com/onsi/gomega"
17 packages = [".","format","internal/assertion","internal/asyncassertion","internal/oraclematcher","internal/testingtsupport","matchers","matchers/support/goraph/bipartitegraph","matchers/support/goraph/edge","matchers/support/goraph/node","matchers/support/goraph/util","types"]
18 revision = "c893efa28eb45626cdaa76c9f653b62488858837"
19 version = "v1.2.0"
20
21 [[projects]]
22 branch = "master"
23 name = "golang.org/x/net"
24 packages = ["html","html/atom","html/charset"]
25 revision = "8351a756f30f1297fe94bbf4b767ec589c6ea6d0"
26
27 [[projects]]
28 branch = "master"
29 name = "golang.org/x/sys"
30 packages = ["unix"]
31 revision = "b6e1ae21643682ce023deb8d152024597b0e9bb4"
32
33 [[projects]]
34 branch = "master"
35 name = "golang.org/x/text"
36 packages = ["encoding","encoding/charmap","encoding/htmlindex","encoding/internal","encoding/internal/identifier","encoding/japanese","encoding/korean","encoding/simplifiedchinese","encoding/traditionalchinese","encoding/unicode","internal/gen","internal/tag","internal/utf8internal","language","runes","transform","unicode/cldr"]
37 revision = "1cbadb444a806fd9430d14ad08967ed91da4fa0a"
38
39 [[projects]]
40 branch = "v2"
41 name = "gopkg.in/yaml.v2"
42 packages = ["."]
43 revision = "eb3733d160e74a9c7e442f435eb3bea458e1d19f"
44
45 [solve-meta]
46 analyzer-name = "dep"
47 analyzer-version = 1
48 inputs-digest = "202e7cae2f728ff790117ef6fa65af3a598f1075ff39444c50964547d817aefc"
49 solver-name = "gps-cdcl"
50 solver-version = 1
+0
-32
Gopkg.toml less more
0
1 # Gopkg.toml example
2 #
3 # Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md
4 # for detailed Gopkg.toml documentation.
5 #
6 # required = ["github.com/user/thing/cmd/thing"]
7 # ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"]
8 #
9 # [[constraint]]
10 # name = "github.com/user/project"
11 # version = "1.0.0"
12 #
13 # [[constraint]]
14 # name = "github.com/user/project2"
15 # branch = "dev"
16 # source = "github.com/myfork/project2"
17 #
18 # [[override]]
19 # name = "github.com/x/y"
20 # version = "2.4.0"
21
22
23 [[constraint]]
24 branch = "master"
25 name = "github.com/bsm/pool"
26
27 [[constraint]]
28 name = "github.com/onsi/ginkgo"
29
30 [[constraint]]
31 name = "github.com/onsi/gomega"
0 PKG=$(shell go list ./... | grep -v vendor)
1
20 default: vet test
31
42 test:
5 go test $(PKG)
3 go test ./...
64
75 vet:
8 go vet $(PKG)
6 go vet ./...
97
108 bench:
11 go test $(PKG) -run=NONE -bench=. -benchmem -benchtime=5s
9 go test ./... -run=NONE -bench=. -benchmem -benchtime=5s
10
11 fuzz:
12 go test ./.fuzz
13
14 fuzzrace:
15 go test -race ./.fuzz
1216
1317 # go get -u github.com/davelondon/rebecca/cmd/becca
1418
3535 )
3636
3737 func main() {
38 // Init server and define handlers
3938 srv := redeo.NewServer(nil)
39
40 // Define handlers
4041 srv.HandleFunc("ping", func(w resp.ResponseWriter, _ *resp.Command) {
4142 w.AppendInlineString("PONG")
4243 })
4344 srv.HandleFunc("info", func(w resp.ResponseWriter, _ *resp.Command) {
4445 w.AppendBulkString(srv.Info().String())
4546 })
47
48 // More handlers; demo usage of redeo.WrapperFunc
49 srv.Handle("echo", redeo.WrapperFunc(func(c *resp.Command) interface{} {
50 if c.ArgN() != 1 {
51 return redeo.ErrWrongNumberOfArgs(c.Name)
52 }
53 return c.Arg(0)
54 }))
4655
4756 // Open a new listener
4857 lis, err := net.Listen("tcp", ":9736")
6170 ```go
6271 func main() {
6372 mu := sync.RWMutex{}
64 myData := make(map[string]map[string]string)
73 data := make(map[string]string)
6574 srv := redeo.NewServer(nil)
6675
67 // handle HSET
68 srv.HandleFunc("hset", func(w resp.ResponseWriter, c *resp.Command) {
69 // validate arguments
70 if c.ArgN() != 3 {
71 w.AppendError(redeo.WrongNumberOfArgs(c.Name))
72 return
73 }
74
75 // lock for write
76 mu.Lock()
77 defer mu.Unlock()
78
79 // fetch (find-or-create) key
80 hash, ok := myData[c.Arg(0).String()]
81 if !ok {
82 hash = make(map[string]string)
83 myData[c.Arg(0).String()] = hash
84 }
85
86 // check if field already exists
87 _, ok = hash[c.Arg(1).String()]
88
89 // set field
90 hash[c.Arg(1).String()] = c.Arg(2).String()
91
92 // respond
93 if ok {
94 w.AppendInt(0)
95 } else {
96 w.AppendInt(1)
97 }
98 })
99
100 // handle HGET
101 srv.HandleFunc("hget", func(w resp.ResponseWriter, c *resp.Command) {
76 srv.HandleFunc("set", func(w resp.ResponseWriter, c *resp.Command) {
10277 if c.ArgN() != 2 {
10378 w.AppendError(redeo.WrongNumberOfArgs(c.Name))
10479 return
10580 }
10681
107 mu.RLock()
108 defer mu.RUnlock()
82 key := c.Arg(0).String()
83 val := c.Arg(1).String()
10984
110 hash, ok := myData[c.Arg(0).String()]
111 if !ok {
112 w.AppendNil()
85 mu.Lock()
86 data[key] = val
87 mu.Unlock()
88
89 w.AppendInt(1)
90 })
91
92 srv.HandleFunc("get", func(w resp.ResponseWriter, c *resp.Command) {
93 if c.ArgN() != 1 {
94 w.AppendError(redeo.WrongNumberOfArgs(c.Name))
11395 return
11496 }
11597
116 val, ok := hash[c.Arg(1).String()]
117 if !ok {
118 w.AppendNil()
98 key := c.Arg(0).String()
99 mu.RLock()
100 val, ok := data[key]
101 mu.RUnlock()
102
103 if ok {
104 w.AppendBulkString(val)
119105 return
120106 }
121
122 w.AppendBulkString(val)
107 w.AppendNil()
123108 })
124109 }
125110 ```
111
112 Redeo also supports command wrappers:
113
114 ```go
115 func main() {
116 mu := sync.RWMutex{}
117 data := make(map[string]string)
118 srv := redeo.NewServer(nil)
119
120 srv.Handle("set", redeo.WrapperFunc(func(c *resp.Command) interface{} {
121 if c.ArgN() != 2 {
122 return redeo.ErrWrongNumberOfArgs(c.Name)
123 }
124
125 key := c.Arg(0).String()
126 val := c.Arg(1).String()
127
128 mu.Lock()
129 data[key] = val
130 mu.Unlock()
131
132 return 1
133 }))
134
135 srv.Handle("get", redeo.WrapperFunc(func(c *resp.Command) interface{} {
136 if c.ArgN() != 1 {
137 return redeo.ErrWrongNumberOfArgs(c.Name)
138 }
139
140 key := c.Arg(0).String()
141 mu.RLock()
142 val, ok := data[key]
143 mu.RUnlock()
144
145 if ok {
146 return val
147 }
148 return nil
149 }))
150 }
151 ```
4242 ```go
4343 func main() {{ "ExampleHandlerFunc" | code }}
4444 ```
45
46 Redeo also supports command wrappers:
47
48 ```go
49 func main() {{ "ExampleWrapperFunc" | code }}
50 ```
1010 // Conn wraps a single network connection and exposes
1111 // common read/write methods.
1212 type Conn interface {
13 resp.ResponseParser
14
1315 // MarkFailed marks the connection as failed which
1416 // will force it to be closed instead of being returned to the pool
1517 MarkFailed()
16
17 // PeekType returns the type of the next response block
18 PeekType() (resp.ResponseType, error)
19 // ReadNil reads a nil value
20 ReadNil() error
21 // ReadBulk reads a bulk value (optionally appending it to a passed p buffer)
22 ReadBulk(p []byte) ([]byte, error)
23 // ReadBulkString reads a bulk value as string
24 ReadBulkString() (string, error)
25 // ReadInt reads an int value
26 ReadInt() (int64, error)
27 // ReadArrayLen reads the array length
28 ReadArrayLen() (int, error)
29 // ReadError reads an error string
30 ReadError() (string, error)
31 // ReadInline reads an inline status string
32 ReadInlineString() (string, error)
33 // StreamBulk returns a bulk-reader.
34 // Readers must be closed after use.
35 StreamBulk() (io.ReadCloser, error)
3618
3719 // WriteCmd writes a full command as part of a pipeline. To execute the pipeline,
3820 // you must call Flush.
8163 madeByRedeo()
8264 }
8365
66 // Wrap wraps a single network connection.
67 func Wrap(cn net.Conn) Conn {
68 return &conn{
69 Conn: cn,
70
71 RequestWriter: resp.NewRequestWriter(cn),
72 ResponseReader: resp.NewResponseReader(cn),
73 }
74 }
75
8476 type conn struct {
8577 net.Conn
8678
55 )
66
77 var _ = Describe("Client", func() {
8 var subject *Client
9
10 BeforeEach(func() {
11 subject = newClient(&mockConn{Port: 10001})
12 })
138
149 It("should generate IDs", func() {
1510 a, b := newClient(&mockConn{}), newClient(&mockConn{})
2727 broker := redeo.NewPubSubBroker()
2828 srv := redeo.NewServer(nil)
2929 srv.Handle("ping", redeo.Ping())
30 srv.Handle("echo", redeo.Echo())
3031 srv.Handle("info", redeo.Info(srv))
3132 srv.Handle("publish", broker.Publish())
3233 srv.Handle("subscribe", broker.Subscribe())
33
44 A simple server example with two commands:
55
6 // Init server and define handlers
76 srv := redeo.NewServer(nil)
7
8 // Define handlers
89 srv.HandleFunc("ping", func(w resp.ResponseWriter, _ *resp.Command) {
910 w.AppendInlineString("PONG")
1011 })
1112 srv.HandleFunc("info", func(w resp.ResponseWriter, _ *resp.Command) {
1213 w.AppendBulkString(srv.Info().String())
1314 })
15
16 // More handlers; demo usage of redeo.WrapperFunc
17 srv.Handle("echo", redeo.WrapperFunc(func(c *resp.Command) interface{} {
18 if c.ArgN() != 1 {
19 return redeo.ErrWrongNumberOfArgs(c.Name)
20 }
21 return c.Arg(0)
22 }))
1423
1524 // Open a new listener
1625 lis, err := net.Listen("tcp", ":9736")
88 )
99
1010 func ExampleServer() {
11 // Init server and define handlers
1211 srv := redeo.NewServer(nil)
12
13 // Define handlers
1314 srv.HandleFunc("ping", func(w resp.ResponseWriter, _ *resp.Command) {
1415 w.AppendInlineString("PONG")
1516 })
1617 srv.HandleFunc("info", func(w resp.ResponseWriter, _ *resp.Command) {
1718 w.AppendBulkString(srv.Info().String())
1819 })
20
21 // More handlers; demo usage of redeo.WrapperFunc
22 srv.Handle("echo", redeo.WrapperFunc(func(c *resp.Command) interface{} {
23 if c.ArgN() != 1 {
24 return redeo.ErrWrongNumberOfArgs(c.Name)
25 }
26 return c.Arg(0)
27 }))
1928
2029 // Open a new listener
2130 lis, err := net.Listen("tcp", ":9736")
5059 srv.Handle("info", redeo.Info(srv))
5160 }
5261
53 func ExamplePubSub() {
62 func ExampleCommandDescriptions() {
63 srv := redeo.NewServer(nil)
64 srv.Handle("command", redeo.CommandDescriptions{
65 {Name: "get", Arity: 2, Flags: []string{"readonly", "fast"}, FirstKey: 1, LastKey: 1, KeyStepCount: 1},
66 {Name: "randomkey", Arity: 1, Flags: []string{"readonly", "random"}},
67 {Name: "mset", Arity: -3, Flags: []string{"write", "denyoom"}, FirstKey: 1, LastKey: -1, KeyStepCount: 2},
68 {Name: "quit", Arity: 1},
69 })
70 }
71
72 func ExampleSubCommands() {
73 srv := redeo.NewServer(nil)
74 srv.Handle("custom", redeo.SubCommands{
75 "ping": redeo.Ping(),
76 "echo": redeo.Echo(),
77 })
78 }
79
80 func ExamplePubSubBroker() {
5481 broker := redeo.NewPubSubBroker()
5582
5683 srv := redeo.NewServer(nil)
6087
6188 func ExampleHandlerFunc() {
6289 mu := sync.RWMutex{}
63 myData := make(map[string]map[string]string)
90 data := make(map[string]string)
6491 srv := redeo.NewServer(nil)
6592
66 // handle HSET
67 srv.HandleFunc("hset", func(w resp.ResponseWriter, c *resp.Command) {
68 // validate arguments
69 if c.ArgN() != 3 {
70 w.AppendError(redeo.WrongNumberOfArgs(c.Name))
71 return
72 }
73
74 // lock for write
75 mu.Lock()
76 defer mu.Unlock()
77
78 // fetch (find-or-create) key
79 hash, ok := myData[c.Arg(0).String()]
80 if !ok {
81 hash = make(map[string]string)
82 myData[c.Arg(0).String()] = hash
83 }
84
85 // check if field already exists
86 _, ok = hash[c.Arg(1).String()]
87
88 // set field
89 hash[c.Arg(1).String()] = c.Arg(2).String()
90
91 // respond
92 if ok {
93 w.AppendInt(0)
94 } else {
95 w.AppendInt(1)
96 }
97 })
98
99 // handle HGET
100 srv.HandleFunc("hget", func(w resp.ResponseWriter, c *resp.Command) {
93 srv.HandleFunc("set", func(w resp.ResponseWriter, c *resp.Command) {
10194 if c.ArgN() != 2 {
10295 w.AppendError(redeo.WrongNumberOfArgs(c.Name))
10396 return
10497 }
10598
106 mu.RLock()
107 defer mu.RUnlock()
99 key := c.Arg(0).String()
100 val := c.Arg(1).String()
108101
109 hash, ok := myData[c.Arg(0).String()]
110 if !ok {
111 w.AppendNil()
102 mu.Lock()
103 data[key] = val
104 mu.Unlock()
105
106 w.AppendInt(1)
107 })
108
109 srv.HandleFunc("get", func(w resp.ResponseWriter, c *resp.Command) {
110 if c.ArgN() != 1 {
111 w.AppendError(redeo.WrongNumberOfArgs(c.Name))
112112 return
113113 }
114114
115 val, ok := hash[c.Arg(1).String()]
116 if !ok {
117 w.AppendNil()
115 key := c.Arg(0).String()
116 mu.RLock()
117 val, ok := data[key]
118 mu.RUnlock()
119
120 if ok {
121 w.AppendBulkString(val)
118122 return
119123 }
120
121 w.AppendBulkString(val)
124 w.AppendNil()
122125 })
123126 }
127
128 func ExampleWrapperFunc() {
129 mu := sync.RWMutex{}
130 data := make(map[string]string)
131 srv := redeo.NewServer(nil)
132
133 srv.Handle("set", redeo.WrapperFunc(func(c *resp.Command) interface{} {
134 if c.ArgN() != 2 {
135 return redeo.ErrWrongNumberOfArgs(c.Name)
136 }
137
138 key := c.Arg(0).String()
139 val := c.Arg(1).String()
140
141 mu.Lock()
142 data[key] = val
143 mu.Unlock()
144
145 return 1
146 }))
147
148 srv.Handle("get", redeo.WrapperFunc(func(c *resp.Command) interface{} {
149 if c.ArgN() != 1 {
150 return redeo.ErrWrongNumberOfArgs(c.Name)
151 }
152
153 key := c.Arg(0).String()
154 mu.RLock()
155 val, ok := data[key]
156 mu.RUnlock()
157
158 if ok {
159 return val
160 }
161 return nil
162 }))
163 }
0 module github.com/bsm/redeo
1
2 require (
3 github.com/bsm/pool v0.8.1
4 github.com/golang/protobuf v1.3.2 // indirect
5 github.com/kr/pretty v0.1.0 // indirect
6 github.com/onsi/ginkgo v1.8.0
7 github.com/onsi/gomega v1.5.0
8 golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 // indirect
9 golang.org/x/sys v0.0.0-20190812073006-9eafafc0a87e // indirect
10 golang.org/x/text v0.3.2 // indirect
11 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
12 gopkg.in/yaml.v2 v2.2.2 // indirect
13 )
0 github.com/bsm/pool v0.8.1 h1:WS5zo7o629vWBnTWOKOJfRCvWxwS3Yh7NDOk1TiPBeo=
1 github.com/bsm/pool v0.8.1/go.mod h1:20l4nKLbEi8m4F5GWdtu7CZh4DPWwalpLeuqwDYZ1vY=
2 github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
3 github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
4 github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
5 github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
6 github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
7 github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
8 github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
9 github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
10 github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
11 github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
12 github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
13 github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
14 github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
15 github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
16 github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
17 github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w=
18 github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
19 github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
20 github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
21 github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo=
22 github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
23 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
24 golang.org/x/net v0.0.0-20180906233101-161cd47e91fd h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA=
25 golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
26 golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 h1:Ao/3l156eZf2AW5wK8a7/smtodRU+gha3+BeqJ69lRk=
27 golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
28 golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
29 golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e h1:o3PsSEY8E4eXWkXrIP9YJALUkVZqzHJT5DOasTyn8Vs=
30 golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
31 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
32 golang.org/x/sys v0.0.0-20190812073006-9eafafc0a87e h1:TsjK5I7fXk8f2FQrgu6NS7i5Qih3knl2FL1htyguLRE=
33 golang.org/x/sys v0.0.0-20190812073006-9eafafc0a87e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
34 golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
35 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
36 golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
37 golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
38 golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
39 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
40 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
41 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
42 gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
43 gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
44 gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
45 gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
46 gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
47 gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
48 gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
49 gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
00 package info
11
2 import "bytes"
2 import (
3 "bytes"
4 "strings"
5 "sync"
6 )
37
48 // Registry : main info registry.
59 // Please note: in order to minimise performance impact info registries
610 // are not using locks are therefore not thread-safe. Please make sure
711 // you register all metrics and values before you start the server.
8 type Registry struct{ sections []*Section }
12 type Registry struct {
13 sections []*Section
14 mu sync.RWMutex
15 }
916
1017 // New creates a new Registry
1118 func New() *Registry {
1219 return new(Registry)
1320 }
1421
15 // Section returns a section, or appends a new one
22 // FindSection returns a section by name or nil when not found.
23 func (r *Registry) FindSection(name string) *Section {
24 r.mu.RLock()
25 defer r.mu.RUnlock()
26
27 return r.findSection(name)
28 }
29
30 // FetchSection returns a section, or appends a new one
1631 // when the given name cannot be found
17 func (r *Registry) Section(name string) *Section {
18 for _, s := range r.sections {
19 if s.name == name {
20 return s
21 }
32 func (r *Registry) FetchSection(name string) *Section {
33 if s := r.FindSection(name); s != nil {
34 return s
2235 }
23 section := &Section{name: name}
24 r.sections = append(r.sections, section)
25 return section
36
37 r.mu.Lock()
38 defer r.mu.Unlock()
39
40 s := r.findSection(name)
41 if s == nil {
42 s = &Section{name: name}
43 r.sections = append(r.sections, s)
44 }
45 return s
2646 }
2747
2848 // Clear removes all sections from the registry
2949 func (r *Registry) Clear() {
50 r.mu.Lock()
3051 r.sections = nil
52 r.mu.Unlock()
3153 }
3254
3355 // String generates an info string output
3456 func (r *Registry) String() string {
57 r.mu.RLock()
58 defer r.mu.RUnlock()
59
3560 buf := new(bytes.Buffer)
36 for i, section := range r.sections {
37 if len(section.kvs) == 0 {
61 for i, s := range r.sections {
62 if len(s.kvs) == 0 {
3863 continue
3964 }
4065
4166 if i != 0 {
4267 buf.WriteByte('\n')
4368 }
44 buf.WriteString("# " + section.name + "\n")
45 section.writeTo(buf)
69 s.writeTo(buf)
4670 }
4771 return buf.String()
72 }
73
74 func (r *Registry) findSection(name string) *Section {
75 for _, s := range r.sections {
76 if strings.ToLower(s.name) == strings.ToLower(name) {
77 return s
78 }
79 }
80 return nil
4881 }
4982
5083 // Section : an info section contains multiple values
5184 type Section struct {
5285 name string
5386 kvs []kv
87 mu sync.RWMutex
5488 }
5589
5690 // Register registers a value under a name
5791 func (s *Section) Register(name string, value Value) {
92 s.mu.Lock()
5893 s.kvs = append(s.kvs, kv{name, value})
94 s.mu.Unlock()
5995 }
6096
6197 // Clear removes all values from a section
6298 func (s *Section) Clear() {
99 s.mu.Lock()
63100 s.kvs = nil
101 s.mu.Unlock()
102 }
103
104 // Replace replaces the section enties
105 func (s *Section) Replace(fn func(*Section)) {
106 t := &Section{name: s.name}
107 fn(t)
108
109 s.mu.Lock()
110 s.kvs = t.kvs
111 s.mu.Unlock()
64112 }
65113
66114 func (s *Section) writeTo(buf *bytes.Buffer) {
115 buf.WriteString("# " + s.name + "\n")
67116 for _, kv := range s.kvs {
68117 buf.WriteString(kv.name + ":" + kv.value.String() + "\n")
69118 }
119 }
120
121 // String generates an info string output
122 func (s *Section) String() string {
123 if s == nil {
124 return ""
125 }
126
127 s.mu.RLock()
128 defer s.mu.RUnlock()
129
130 if len(s.kvs) == 0 {
131 return ""
132 }
133
134 buf := new(bytes.Buffer)
135 s.writeTo(buf)
136 return buf.String()
70137 }
71138
72139 type kv struct {
1111
1212 BeforeEach(func() {
1313 subject = New()
14 subject.Section("Server").Register("version", StringValue("1.0.1"))
15 subject.Section("Server").Register("date", StringValue("2014-11-11"))
16 subject.Section("Clients").Register("count", StringValue("17"))
17 subject.Section("Clients").Register("total", StringValue("123456"))
18 subject.Section("Empty")
14 subject.FetchSection("Server").Register("version", StaticString("1.0.1"))
15 subject.FetchSection("Server").Register("date", StaticString("2014-11-11"))
16 subject.FetchSection("Clients").Register("count", StaticString("17"))
17 subject.FetchSection("Clients").Register("total", StaticString("123456"))
18 subject.FetchSection("Empty")
1919 })
2020
2121 It("should generate info strings", func() {
2424 })
2525
2626 It("should clear", func() {
27 subject.Section("Clients").Clear()
27 subject.FetchSection("Clients").Clear()
2828 Expect(subject.sections[1].kvs).To(BeEmpty())
2929 subject.Clear()
3030 Expect(subject.sections).To(BeEmpty())
31 })
32
33 It("should replace", func() {
34 subject.FetchSection("Server").Replace(func(s *Section) {
35 s.Register("test", StaticString("string"))
36 })
37 s := subject.FindSection("server").String()
38 Expect(s).To(Equal("# Server\ntest:string\n"))
39 })
40
41 It("should generate section strings", func() {
42 s := subject.FindSection("clients").String()
43 Expect(s).To(Equal("# Clients\ncount:17\ntotal:123456\n"))
44
45 s = subject.FindSection("unknown").String()
46 Expect(s).To(Equal(""))
3147 })
3248
3349 })
99 String() string
1010 }
1111
12 // StringValue is the simplest value type
13 type StringValue string
12 // StaticString is the simplest value type
13 type StaticString string
1414
15 func (v StringValue) String() string { return string(v) }
15 func (v StaticString) String() string { return string(v) }
1616
17 // IntValue converts a static integer into a value
18 func IntValue(n int) Value { return StringValue(strconv.Itoa(n)) }
17 // StaticInt converts a static integer into a value
18 func StaticInt(n int64) Value { return StaticString(strconv.FormatInt(n, 10)) }
19
20 // --------------------------------------------------------------------
1921
2022 // Callback function
2123 type Callback func() string
2224
2325 func (c Callback) String() string { return c() }
2426
25 // Counter is a numeric counter value
26 type Counter struct{ v int64 }
27 // --------------------------------------------------------------------
2728
28 // NewCounter return a Counter
29 func NewCounter() *Counter { return &Counter{} }
29 // IntValue is a int64 value with thread-safe atomic modifiers.
30 type IntValue struct{ n int64 }
3031
31 // Inc atomically increase the Counter by delta
32 func (c *Counter) Inc(delta int64) int64 { return atomic.AddInt64(&c.v, delta) }
32 // NewIntValue return a IntValue
33 func NewIntValue(n int64) *IntValue { return &IntValue{n: n} }
3334
34 // Set atomically set the value of Counter to v
35 func (c *Counter) Set(v int64) { atomic.StoreInt64(&c.v, v) }
35 // Inc atomically increase the IntValue by delta
36 func (v *IntValue) Inc(delta int64) int64 { return atomic.AddInt64(&v.n, delta) }
3637
37 // Value atomically get the value stored in Counter
38 func (c *Counter) Value() int64 { return atomic.LoadInt64(&c.v) }
38 // Set atomically set the value of IntValue to v
39 func (v *IntValue) Set(n int64) { atomic.StoreInt64(&v.n, n) }
3940
40 // String return the value of Counter as string
41 func (c *Counter) String() string { return strconv.FormatInt(c.Value(), 10) }
41 // Value atomically get the value stored in IntValue
42 func (v *IntValue) Value() int64 { return atomic.LoadInt64(&v.n) }
43
44 // String return the value of IntValue as string
45 func (v *IntValue) String() string { return strconv.FormatInt(v.Value(), 10) }
46
47 // --------------------------------------------------------------------
48
49 // StringValue is a string value with thread-safe atomic modifiers.
50 type StringValue struct{ s atomic.Value }
51
52 // NewStringValue returns a StringValue
53 func NewStringValue(s string) *StringValue {
54 v := &StringValue{}
55 v.s.Store(s)
56 return v
57 }
58
59 // Set atomically sets the value to s
60 func (v *StringValue) Set(s string) { v.s.Store(s) }
61
62 // String return the value.
63 func (v *StringValue) String() string { return v.s.Load().(string) }
44 . "github.com/onsi/gomega"
55 )
66
7 var _ = Describe("StringValue", func() {
7 var _ = Describe("StaticString", func() {
8 var _ Value = StaticString("")
9
810 It("should generate strings", func() {
9 var v Value = StringValue("x")
10 Expect(v.String()).To(Equal("x"))
11 Expect(StaticString("x").String()).To(Equal("x"))
12 })
13 })
14
15 var _ = Describe("StaticInt", func() {
16 var _ Value = StaticInt(0)
17
18 It("should generate strings", func() {
19 Expect(StaticInt(12).String()).To(Equal("12"))
20 })
21 })
22
23 var _ = Describe("Callback", func() {
24 var _ Value = Callback(nil)
25
26 It("should generate strings", func() {
27 cb := Callback(func() string { return "x" })
28 Expect(cb.String()).To(Equal("x"))
1129 })
1230 })
1331
1432 var _ = Describe("IntValue", func() {
15 It("should generate strings", func() {
16 var v Value = IntValue(12)
17 Expect(v.String()).To(Equal("12"))
18 })
19 })
20
21 var _ = Describe("Callback", func() {
22 It("should generate strings", func() {
23 var v Value = Callback(func() string { return "x" })
24 Expect(v.String()).To(Equal("x"))
25 })
26 })
27
28 var _ = Describe("Counter", func() {
29 var subject *Counter
33 var subject *IntValue
34 var _ Value = subject
3035
3136 BeforeEach(func() {
32 subject = NewCounter()
37 subject = NewIntValue(0)
3338 })
3439
3540 It("should have accessors", func() {
4348 })
4449
4550 It("should generate strings", func() {
46 var v Value = subject
47 Expect(v.String()).To(Equal("0"))
51 Expect(subject.String()).To(Equal("0"))
4852 })
4953 })
54
55 var _ = Describe("StringValue", func() {
56 var subject *StringValue
57 var _ Value = subject
58
59 BeforeEach(func() {
60 subject = NewStringValue("x")
61 })
62
63 It("should have accessors", func() {
64 subject.Set("z")
65 Expect(subject.String()).To(Equal("z"))
66 })
67
68 It("should generate strings", func() {
69 Expect(subject.String()).To(Equal("x"))
70 })
71 })
00 package redeo
11
22 import (
3 cryptorand "crypto/rand"
4 "encoding/hex"
35 "fmt"
6 mathrand "math/rand"
47 "os"
58 "sort"
69 "strconv"
912
1013 "github.com/bsm/redeo/info"
1114 )
15
16 // CommandDescription describes supported commands
17 type CommandDescription struct {
18 // Name is the command name, returned as a lowercase string.
19 Name string
20
21 // Arity is the command arity specification.
22 // https://redis.io/commands/command#command-arity.
23 // It follows a simple pattern:
24 // positive if command has fixed number of required arguments.
25 // negative if command has minimum number of required arguments, but may have more.
26 Arity int64
27
28 // Flags is an enumeration of command flags.
29 // https://redis.io/commands/command#flags.
30 Flags []string
31
32 // FirstKey is the position of first key in argument list.
33 // https://redis.io/commands/command#first-key-in-argument-list
34 FirstKey int64
35
36 // LastKey is the position of last key in argument list.
37 // https://redis.io/commands/command#last-key-in-argument-list
38 LastKey int64
39
40 // KeyStepCount is the step count for locating repeating keys.
41 // https://redis.io/commands/command#step-count
42 KeyStepCount int64
43 }
44
45 // --------------------------------------------------------------------
1246
1347 // ClientInfo contains client stats
1448 type ClientInfo struct {
6296 pid int
6397
6498 clients clientStats
65 connections *info.Counter
66 commands *info.Counter
99 connections *info.IntValue
100 commands *info.IntValue
67101 }
68102
69103 // newServerInfo creates a new server info container
71105 info := &ServerInfo{
72106 registry: info.New(),
73107 startTime: time.Now(),
74 connections: info.NewCounter(),
75 commands: info.NewCounter(),
108 connections: info.NewIntValue(0),
109 commands: info.NewIntValue(0),
76110 clients: clientStats{stats: make(map[uint64]*ClientInfo)},
77111 }
78 return info.withDefaults()
112 info.initDefaults()
113 return info
79114 }
80115
81116 // ------------------------------------------------------------------------
82117
83 // Section finds-or-creates an info section
84 func (i *ServerInfo) Section(name string) *info.Section { return i.registry.Section(name) }
118 // Fetch finds or creates an info section. This method is not thread-safe.
119 func (i *ServerInfo) Fetch(name string) *info.Section { return i.registry.FetchSection(name) }
120
121 // Find finds an info section by name.
122 func (i *ServerInfo) Find(name string) *info.Section { return i.registry.FindSection(name) }
85123
86124 // String generates an info string
87125 func (i *ServerInfo) String() string { return i.registry.String() }
101139 func (i *ServerInfo) TotalCommands() int64 { return i.commands.Value() }
102140
103141 // Apply default info
104 func (i *ServerInfo) withDefaults() *ServerInfo {
105 server := i.Section("Server")
106 server.Register("process_id", info.IntValue(os.Getpid()))
142 func (i *ServerInfo) initDefaults() {
143 runID := make([]byte, 20)
144 if _, err := cryptorand.Read(runID); err != nil {
145 _, _ = mathrand.Read(runID)
146 }
147
148 server := i.Fetch("Server")
149 server.Register("process_id", info.StaticInt(int64(os.Getpid())))
150 server.Register("run_id", info.StaticString(hex.EncodeToString(runID)))
107151 server.Register("uptime_in_seconds", info.Callback(func() string {
108152 d := time.Since(i.startTime) / time.Second
109153 return strconv.FormatInt(int64(d), 10)
113157 return strconv.FormatInt(int64(d), 10)
114158 }))
115159
116 clients := i.Section("Clients")
160 clients := i.Fetch("Clients")
117161 clients.Register("connected_clients", info.Callback(func() string {
118162 return strconv.Itoa(i.NumClients())
119163 }))
120164
121 stats := i.Section("Stats")
165 stats := i.Fetch("Stats")
122166 stats.Register("total_connections_received", i.connections)
123167 stats.Register("total_commands_processed", i.commands)
124
125 return i
126168 }
127169
128170 func (i *ServerInfo) register(c *Client) {
00 package redeo
11
22 import (
3 "errors"
4 "strings"
5
36 "github.com/bsm/redeo/resp"
47 )
58
811 return "ERR unknown command '" + cmd + "'"
912 }
1013
14 // ErrUnknownCommand returns an unknown command error
15 func ErrUnknownCommand(cmd string) error {
16 return errors.New(UnknownCommand(cmd))
17 }
18
1119 // WrongNumberOfArgs returns an unknown command error string
1220 func WrongNumberOfArgs(cmd string) string {
1321 return "ERR wrong number of arguments for '" + cmd + "' command"
1422 }
1523
16 // Ping returns a ping handler
24 // ErrWrongNumberOfArgs returns an unknown command error
25 func ErrWrongNumberOfArgs(cmd string) error {
26 return errors.New(WrongNumberOfArgs(cmd))
27 }
28
29 // Ping returns a ping handler.
30 // https://redis.io/commands/ping
1731 func Ping() Handler {
1832 return HandlerFunc(func(w resp.ResponseWriter, c *resp.Command) {
1933 switch c.ArgN() {
2034 case 0:
21 w.AppendBulkString("PONG")
35 w.AppendInlineString("PONG")
2236 case 1:
2337 w.AppendBulk(c.Arg(0))
2438 default:
2741 })
2842 }
2943
30 // Info returns an info handler
44 // Echo returns an echo handler.
45 // https://redis.io/commands/echo
46 func Echo() Handler {
47 return HandlerFunc(func(w resp.ResponseWriter, c *resp.Command) {
48 switch c.ArgN() {
49 case 1:
50 w.AppendBulk(c.Arg(0))
51 default:
52 w.AppendError(WrongNumberOfArgs(c.Name))
53 }
54 })
55 }
56
57 // Info returns an info handler.
58 // https://redis.io/commands/info
3159 func Info(s *Server) Handler {
3260 return HandlerFunc(func(w resp.ResponseWriter, c *resp.Command) {
33 w.AppendBulkString(s.Info().String())
61 info := s.Info()
62 resp := ""
63 if c.ArgN() == 1 {
64 resp = info.Find(c.Args[0].String()).String()
65 } else {
66 resp = info.String()
67 }
68 w.AppendBulkString(resp)
3469 })
70 }
71
72 // CommandDescriptions returns a command handler.
73 // https://redis.io/commands/command
74 type CommandDescriptions []CommandDescription
75
76 func (s CommandDescriptions) ServeRedeo(w resp.ResponseWriter, c *resp.Command) {
77 w.AppendArrayLen(len(s))
78
79 for _, cmd := range s {
80 w.AppendArrayLen(6)
81 w.AppendBulkString(strings.ToLower(cmd.Name))
82 w.AppendInt(cmd.Arity)
83 w.AppendArrayLen(len(cmd.Flags))
84 for _, flag := range cmd.Flags {
85 w.AppendBulkString(flag)
86 }
87 w.AppendInt(cmd.FirstKey)
88 w.AppendInt(cmd.LastKey)
89 w.AppendInt(cmd.KeyStepCount)
90 }
91 }
92
93 // SubCommands returns a handler that is parsing sub-commands
94 type SubCommands map[string]Handler
95
96 func (s SubCommands) ServeRedeo(w resp.ResponseWriter, c *resp.Command) {
97
98 // First, check if we have a subcommand
99 if c.ArgN() == 0 {
100 w.AppendError(WrongNumberOfArgs(c.Name))
101 return
102 }
103
104 firstArg := c.Arg(0).String()
105 if h, ok := s[strings.ToLower(firstArg)]; ok {
106 cmd := resp.NewCommand(c.Name+" "+firstArg, c.Args[1:]...)
107 cmd.SetContext(c.Context())
108 h.ServeRedeo(w, cmd)
109 return
110 }
111
112 w.AppendError("ERR Unknown " + strings.ToLower(c.Name) + " subcommand '" + firstArg + "'")
113
35114 }
36115
37116 // --------------------------------------------------------------------
38117
39 // Handler is an abstract handler interface for handling commands
118 // Handler is an abstract handler interface for responding to commands
40119 type Handler interface {
41120 // ServeRedeo serves a request.
42121 ServeRedeo(w resp.ResponseWriter, c *resp.Command)
48127 // ServeRedeo calls f(w, c).
49128 func (f HandlerFunc) ServeRedeo(w resp.ResponseWriter, c *resp.Command) { f(w, c) }
50129
130 // WrapperFunc implements Handler, accepts a command and must return one of
131 // the following types:
132 // nil
133 // error
134 // string
135 // []byte
136 // bool
137 // float32, float64
138 // int, int8, int16, int32, int64
139 // uint, uint8, uint16, uint32, uint64
140 // resp.CustomResponse instances
141 // slices of any of the above typs
142 // maps containing keys and values of any of the above types
143 type WrapperFunc func(c *resp.Command) interface{}
144
145 // ServeRedeo implements Handler
146 func (f WrapperFunc) ServeRedeo(w resp.ResponseWriter, c *resp.Command) {
147 if err := w.Append(f(c)); err != nil {
148 w.AppendError("ERR " + err.Error())
149 }
150 }
151
51152 // --------------------------------------------------------------------
52153
53 // StreamHandler is an interface for handling streaming commands
154 // StreamHandler is an interface for responding to streaming commands
54155 type StreamHandler interface {
55156 // ServeRedeoStream serves a streaming request.
56157 ServeRedeoStream(w resp.ResponseWriter, c *resp.CommandStream)
2525
2626 w = redeotest.NewRecorder()
2727 subject.ServeRedeo(w, resp.NewCommand("PING", resp.CommandArgument("bad"), resp.CommandArgument("args")))
28 Expect(w.Response()).To(Equal("ERR wrong number of arguments for 'PING' command"))
28 Expect(w.Response()).To(MatchError("ERR wrong number of arguments for 'PING' command"))
29 })
30
31 })
32
33 var _ = Describe("CommandDescriptions", func() {
34 subject := CommandDescriptions{
35 {Name: "GeT", Arity: 2, Flags: []string{"readonly", "fast"}, FirstKey: 1, LastKey: 1, KeyStepCount: 1},
36 {Name: "randomkey", Arity: 1, Flags: []string{"readonly", "random"}},
37 {Name: "mset", Arity: -3, Flags: []string{"write", "denyoom"}, FirstKey: 1, LastKey: -1, KeyStepCount: 2},
38 {Name: "quit", Arity: 1},
39 }
40
41 It("should enumerate", func() {
42 w := redeotest.NewRecorder()
43 subject.ServeRedeo(w, resp.NewCommand("COMMAND"))
44 Expect(w.Response()).To(Equal([]interface{}{
45 []interface{}{"get", int64(2), []interface{}{"readonly", "fast"}, int64(1), int64(1), int64(1)},
46 []interface{}{"randomkey", int64(1), []interface{}{"readonly", "random"}, int64(0), int64(0), int64(0)},
47 []interface{}{"mset", int64(-3), []interface{}{"write", "denyoom"}, int64(1), int64(-1), int64(2)},
48 []interface{}{"quit", int64(1), []interface{}{}, int64(0), int64(0), int64(0)},
49 }))
50 })
51
52 })
53
54 var _ = Describe("SubCommands", func() {
55 subject := SubCommands{
56 "echo": Echo(),
57 "ping": Ping(),
58 }
59
60 It("should fail on calls without a sub", func() {
61 w := redeotest.NewRecorder()
62 subject.ServeRedeo(w, resp.NewCommand("CUSTOM"))
63 Expect(w.Response()).To(MatchError("ERR wrong number of arguments for 'CUSTOM' command"))
64 })
65
66 It("should fail on calls with an unknown sub", func() {
67 w := redeotest.NewRecorder()
68 subject.ServeRedeo(w, resp.NewCommand("CUSTOM", resp.CommandArgument("missing")))
69 Expect(w.Response()).To(MatchError("ERR Unknown custom subcommand 'missing'"))
70 })
71
72 It("should fail on calls with invalid args", func() {
73 w := redeotest.NewRecorder()
74 subject.ServeRedeo(w, resp.NewCommand("CUSTOM", resp.CommandArgument("echo")))
75 Expect(w.Response()).To(MatchError("ERR wrong number of arguments for 'CUSTOM echo' command"))
76 })
77
78 It("should succeed", func() {
79 w := redeotest.NewRecorder()
80 subject.ServeRedeo(w, resp.NewCommand("CUSTOM", resp.CommandArgument("echo"), resp.CommandArgument("HeLLo")))
81 Expect(w.Response()).To(Equal("HeLLo"))
82
83 w = redeotest.NewRecorder()
84 subject.ServeRedeo(w, resp.NewCommand("CUSTOM", resp.CommandArgument("ping")))
85 Expect(w.Response()).To(Equal("PONG"))
2986 })
3087
3188 })
77
88 "github.com/bsm/redeo/resp"
99 )
10
11 // ErrorResponse is used to wrap error strings
12 type ErrorResponse string
13
14 // Error implements error interface
15 func (e ErrorResponse) Error() string { return string(e) }
1016
1117 // ResponseRecorder is an implementation of resp.ResponseWriter that
1218 // is helpful in tests.
4349
4450 // Response returns the first response
4551 func (r *ResponseRecorder) Response() (interface{}, error) {
46 _ = r.ResponseWriter.Flush()
47
48 rr := resp.NewResponseReader(bytes.NewReader(r.b.Bytes()))
49 return parseResult(rr)
52 vv, err := r.Responses()
53 if err != nil || len(vv) == 0 {
54 return nil, err
55 }
56 return vv[len(vv)-1], nil
5057 }
5158
5259 // Responses returns all responses
8087 case resp.TypeInt:
8188 return rr.ReadInt()
8289 case resp.TypeError:
83 return rr.ReadError()
90 s, err := rr.ReadError()
91 if err != nil {
92 return nil, err
93 }
94 return ErrorResponse(s), nil
8495 case resp.TypeNil:
8596 return nil, rr.ReadNil()
8697 case resp.TypeArray:
202202 // try to find the end of the line
203203 start := b.r + offset
204204 if start < b.w {
205 index = bytes.IndexByte(b.buf[start:b.w], '\n')
205 index = bytes.IndexByte(b.buf[start:b.w], '\r')
206206 }
207207
208208 // try to read more data into the buffer if not in the buffer
212212 }
213213 start = b.r + offset
214214 if start < b.w {
215 index = bytes.IndexByte(b.buf[start:b.w], '\n')
215 index = bytes.IndexByte(b.buf[start:b.w], '\r')
216216 }
217217 }
218218
220220 if index < 0 {
221221 return nil, errInlineRequestTooLong
222222 }
223 return bufioLn(b.buf[start : start+index+1]), nil
223 // Although rarely, make sure '\n' is buffered.
224 if (start + index + 1) >= b.w {
225 b.require(offset + index + 2)
226 start = b.r + offset
227 }
228 return bufioLn(b.buf[start : start+index+2]), nil
224229 }
225230
226231 // ReadLine returns the next line until CRLF
338343 func (ln bufioLn) Trim() bufioLn {
339344 n := len(ln)
340345 for ; n > 0; n-- {
341 if c := ln[n-1]; c != '\r' && c != '\n' {
346 if c := ln[n-1]; !asciiSpace[c] {
342347 break
343348 }
344349 }
352357 data := ln.Trim()
353358
354359 for i, c := range data {
355 switch c {
356 case ' ', '\t':
360 if asciiSpace[c] {
357361 if inWord {
358362 return string(data[offset:i])
359363 }
360364 inWord = false
361 default:
365 } else {
362366 if !inWord {
363367 offset = i
364368 }
384388 } else if c == '-' && i == 0 {
385389 m = -1
386390 } else {
387 return 0, errNotAnInteger
391 return 0, errNotANumber
388392 }
389393 }
390394 return n * m, nil
431435 // --------------------------------------------------------------------
432436
433437 type bufioW struct {
434 wr io.Writer
438 io.Writer
435439 buf []byte
436440 mu sync.Mutex
437441 }
552556 return err
553557 }
554558 b.buf = b.buf[:cap(b.buf)]
555 _, err := io.CopyBuffer(b.wr, io.LimitReader(src, int64(n)), b.buf)
559 _, err := io.CopyBuffer(b, io.LimitReader(src, int64(n)), b.buf)
556560 b.buf = b.buf[:0]
557561 if err != nil {
558562 return err
580584 return nil
581585 }
582586
583 if _, err := b.wr.Write(b.buf); err != nil {
587 if _, err := b.Write(b.buf); err != nil {
584588 return err
585589 }
586590
595599 }
596600
597601 func (b *bufioW) reset(buf []byte, wr io.Writer) {
598 *b = bufioW{buf: buf[:0], wr: wr}
599 }
602 *b = bufioW{buf: buf[:0], Writer: wr}
603 }
3333 // Name refers to the command name
3434 Name string
3535
36 baseCmd
36 // Args returns arguments
37 Args []CommandArgument
38
39 ctx context.Context
3740 }
3841
3942 // NewCommand returns a new command instance;
4043 // useful for tests
41 func NewCommand(name string, argv ...CommandArgument) *Command {
42 return &Command{
43 Name: name,
44 baseCmd: baseCmd{
45 argc: len(argv),
46 argv: argv,
47 },
48 }
49 }
50
51 // Arg returns the command argument at position i
52 func (c *Command) Arg(i int) CommandArgument {
53 if i > -1 && i < c.argc {
54 return c.Args()[i]
55 }
56 return nil
57 }
58
59 // Args returns all command argument values
60 func (c *Command) Args() []CommandArgument { return c.argv }
61
62 func (c *Command) updateName() {
63 c.Name = string(c.name)
64 }
65
66 func (c *Command) reset() {
67 c.baseCmd.reset()
68 *c = Command{baseCmd: c.baseCmd}
69 }
70
71 func (c *Command) parseMultiBulk(r *bufioR) (bool, error) {
72 ok, err := c.baseCmd.parseMultiBulk(r)
73 if err != nil || !ok {
74 return ok, err
75 }
76
77 c.grow(c.argc)
78 for i := 0; i < c.argc; i++ {
79 c.argv[i], err = r.ReadBulk(c.argv[i])
44 func NewCommand(name string, args ...CommandArgument) *Command {
45 return &Command{Name: name, Args: args}
46 }
47
48 // Arg returns the Nth argument
49 func (c *Command) Arg(n int) CommandArgument {
50 if n > -1 && n < len(c.Args) {
51 return c.Args[n]
52 }
53 return nil
54 }
55
56 // ArgN returns the number of arguments
57 func (c *Command) ArgN() int {
58 return len(c.Args)
59 }
60
61 // Reset discards all data and resets all state
62 func (c *Command) Reset() {
63 args := c.Args
64 for i, v := range args {
65 args[i] = v[:0]
66 }
67 *c = Command{Args: args[:0]}
68 }
69
70 // Context returns the context
71 func (c *Command) Context() context.Context {
72 if c.ctx != nil {
73 return c.ctx
74 }
75 return context.Background()
76 }
77
78 // SetContext sets the request context.
79 func (c *Command) SetContext(ctx context.Context) {
80 if ctx != nil {
81 c.ctx = ctx
82 }
83 }
84
85 func (c *Command) grow(n int) {
86 if d := n - cap(c.Args); d > 0 {
87 c.Args = c.Args[:cap(c.Args)]
88 c.Args = append(c.Args, make([]CommandArgument, d)...)
89 } else {
90 c.Args = c.Args[:n]
91 }
92 }
93
94 func (c *Command) readMultiBulk(r *bufioR, name string, nargs int) error {
95 c.Name = name
96 c.grow(nargs)
97
98 var err error
99 for i := 0; i < nargs; i++ {
100 c.Args[i], err = r.ReadBulk(c.Args[i])
80101 if err != nil {
81 return false, err
82 }
83 }
102 return err
103 }
104 }
105 return nil
106 }
107
108 func (c *Command) readInline(r *bufioR) (bool, error) {
109 line, err := r.ReadLine()
110 if err != nil {
111 return false, err
112 }
113
114 data := line.Trim()
115
116 var name []byte
117 var n int
118
119 name, n = appendArgument(name, data)
120 data = data[n:]
121 if len(name) == 0 {
122 return false, nil
123 }
124
125 for pos := 0; len(data) != 0; pos++ {
126 c.grow(pos + 1)
127 c.Args[pos], n = appendArgument(c.Args[pos], data)
128 data = data[n:]
129 }
130
131 c.Name = string(name)
84132 return true, nil
133 }
134
135 // --------------------------------------------------------------------
136
137 func readCommand(c interface {
138 readInline(*bufioR) (bool, error)
139 readMultiBulk(*bufioR, string, int) error
140 }, r *bufioR) error {
141 x, err := r.PeekByte()
142 if err != nil {
143 return err
144 }
145
146 if x == '*' {
147 sz, err := r.ReadArrayLen()
148 if err != nil {
149 return err
150 } else if sz < 1 {
151 return readCommand(c, r)
152 }
153
154 name, err := r.ReadBulkString()
155 if err != nil {
156 return err
157 }
158
159 return c.readMultiBulk(r, name, sz-1)
160 }
161
162 if ok, err := c.readInline(r); err != nil {
163 return err
164 } else if !ok {
165 return readCommand(c, r)
166 }
167
168 return nil
85169 }
86170
87171 // --------------------------------------------------------------------
93177 // Name refers to the command name
94178 Name string
95179
96 baseCmd
97
98 p int
99 r *bufioR
100
101 cur io.ReadCloser
180 ctx context.Context
181
182 inline Command
183 isInline bool
184
185 nargs int
186 pos int
187 arg io.ReadCloser
188
189 rd *bufioR
190 }
191
192 // Reset discards all data and resets all state
193 func (c *CommandStream) Reset() {
194 c.inline.Reset()
195 *c = CommandStream{inline: c.inline}
102196 }
103197
104198 // Discard discards the (remaining) arguments
105199 func (c *CommandStream) Discard() error {
106 if c.p < len(c.argv) {
107 c.p = len(c.argv)
108 return nil
200 if c.isInline {
201 if c.pos < len(c.inline.Args) {
202 c.pos = len(c.inline.Args)
203 return nil
204 }
109205 }
110206
111207 var err error
112 if c.cur != nil {
113 if e := c.cur.Close(); e != nil {
208 if c.arg != nil {
209 if e := c.arg.Close(); e != nil {
114210 err = e
115211 }
116 }
117
118 if c.r != nil {
119 for ; c.p < c.argc; c.p++ {
120 if e := c.r.SkipBulk(); e != nil {
212 c.arg = nil
213 }
214
215 if c.rd != nil {
216 for ; c.pos < c.nargs; c.pos++ {
217 if e := c.rd.SkipBulk(); e != nil {
121218 err = e
122219 }
123220 }
125222 return err
126223 }
127224
128 // NextArg returns the next argument as an io.Reader
129 func (c *CommandStream) NextArg() (io.Reader, error) {
130 if c.p < len(c.argv) {
131 rd := bytes.NewReader(c.argv[c.p])
132 c.p++
133 return rd, nil
134 } else if c.r != nil && c.p < c.argc {
135 var err error
136 c.cur, err = c.r.StreamBulk()
137 c.p++
138 return c.cur, err
139 }
140 return nil, errNoMoreArgs
141 }
142
143 func (c *CommandStream) updateName() {
144 c.Name = string(c.name)
145 }
146
147 func (c *CommandStream) reset() {
148 c.baseCmd.reset()
149 *c = CommandStream{baseCmd: c.baseCmd}
150 }
151
152 func (c *CommandStream) parseMultiBulk(r *bufioR) (bool, error) {
153 ok, err := c.baseCmd.parseMultiBulk(r)
154 if err != nil || !ok {
155 return ok, err
156 }
157
158 if c.argc > 0 {
159 c.r = r
160 }
161 return true, nil
162 }
163
164 // --------------------------------------------------------------------
165
166 type anyCmd interface {
167 parseMultiBulk(*bufioR) (bool, error)
168 parseInline(*bufioR) (bool, error)
169 updateName()
170 }
171
172 func parseCommand(c anyCmd, r *bufioR) error {
173 x, err := r.PeekByte()
174 if err != nil {
175 return err
176 }
177
178 if x == '*' {
179 if ok, err := c.parseMultiBulk(r); err != nil {
180 return err
181 } else if !ok {
182 return parseCommand(c, r)
183 }
184 c.updateName()
185 return nil
186 }
187
188 if ok, err := c.parseInline(r); err != nil {
189 return err
190 } else if !ok {
191 return parseCommand(c, r)
192 }
193 c.updateName()
194 return nil
195 }
196
197 // --------------------------------------------------------------------
198
199 type baseCmd struct {
200 argc int
201 argv []CommandArgument
202 name []byte
203
204 ctx context.Context
205 }
206
207 // ArgN returns the number of command arguments
208 func (c *baseCmd) ArgN() int {
209 return c.argc
225 // ArgN returns the number of arguments
226 func (c *CommandStream) ArgN() int {
227 if c.isInline {
228 return c.inline.ArgN()
229 }
230 return c.nargs
231 }
232
233 // More returns true if there are unread arguments
234 func (c *CommandStream) More() bool {
235 return c.pos < c.ArgN()
236 }
237
238 // Next returns the next argument as an io.Reader
239 func (c *CommandStream) Next() (io.Reader, error) {
240 if c.ctx != nil {
241 if err := c.ctx.Err(); err != nil {
242 return nil, err
243 }
244 }
245 if !c.More() {
246 return nil, errNoMoreArgs
247 }
248
249 if c.isInline {
250 arg := bytes.NewReader(c.inline.Args[c.pos])
251 c.pos++
252 return arg, nil
253 }
254
255 var err error
256 c.arg, err = c.rd.StreamBulk()
257 c.pos++
258 return c.arg, err
210259 }
211260
212261 // Context returns the context
213 func (c *baseCmd) Context() context.Context {
262 func (c *CommandStream) Context() context.Context {
214263 if c.ctx != nil {
215264 return c.ctx
216265 }
218267 }
219268
220269 // SetContext sets the request context.
221 func (c *baseCmd) SetContext(ctx context.Context) {
270 func (c *CommandStream) SetContext(ctx context.Context) {
222271 if ctx != nil {
223272 c.ctx = ctx
224273 }
225274 }
226275
227 func (c *baseCmd) parseMultiBulk(r *bufioR) (bool, error) {
228 n, err := r.ReadArrayLen()
229 if err != nil || n < 1 {
230 return false, err
231 }
232
233 c.argc = n - 1
234 c.name, err = r.ReadBulk(c.name)
235 if err != nil {
236 return false, err
237 }
238
276 func (c *CommandStream) readMultiBulk(r *bufioR, name string, nargs int) error {
277 c.Name = name
278 c.nargs = nargs
279 c.rd = r
280 return nil
281 }
282
283 func (c *CommandStream) readInline(r *bufioR) (bool, error) {
284 c.isInline = true
285
286 if ok, err := c.inline.readInline(r); err != nil || !ok {
287 return ok, err
288 }
289 c.Name = c.inline.Name
239290 return true, nil
240291 }
241
242 func (c *baseCmd) parseInline(r *bufioR) (bool, error) {
243 line, err := r.ReadLine()
244 if err != nil {
245 return false, err
246 }
247
248 hasName := false
249 inWord := false
250 for _, x := range line.Trim() {
251 switch x {
252 case ' ', '\t':
253 inWord = false
254 default:
255 if !inWord && hasName {
256 c.argc++
257 c.grow(c.argc)
258 }
259 if pos := c.argc - 1; pos > -1 {
260 c.argv[pos] = append(c.argv[pos], x)
261 } else {
262 hasName = true
263 c.name = append(c.name, x)
264 }
265 inWord = true
266 }
267 }
268 return hasName, nil
269 }
270
271 func (c *baseCmd) grow(n int) {
272 if d := n - cap(c.argv); d > 0 {
273 c.argv = c.argv[:cap(c.argv)]
274 c.argv = append(c.argv, make([]CommandArgument, d)...)
275 } else {
276 c.argv = c.argv[:n]
277 }
278 }
279
280 func (c *baseCmd) reset() {
281 argv := c.argv
282 for i, v := range argv {
283 argv[i] = v[:0]
284 }
285 *c = baseCmd{
286 argv: argv[:0],
287 name: c.name[:0],
288 }
289 }
6767 // read command
6868 cmd, _ := r.ReadCmd(nil)
6969 fmt.Println(cmd.Name)
70 for i := 0; i < cmd.ArgN(); i++ {
71 fmt.Println(i, cmd.Arg(i))
70 for i := 0; i < len(cmd.Args); i++ {
71 fmt.Println(i, cmd.Args[i])
7272 }
7373
7474 // read command, recycle previous instance
7575 cmd, _ = r.ReadCmd(cmd)
7676 fmt.Println(cmd.Name)
77 for i := 0; i < cmd.ArgN(); i++ {
78 fmt.Println(i, cmd.Arg(i))
77 for i := 0; i < len(cmd.Args); i++ {
78 fmt.Println(i, cmd.Args[i])
7979 }
8080
8181 // Output:
205205 // Output:
206206 // "*2\r\n$6\r\nitem 1\r\n$6\r\nitem 2\r\n"
207207 }
208
209 func ExampleResponseReader_Scan() {
210 b := new(bytes.Buffer)
211 w := resp.NewResponseWriter(b)
212
213 w.AppendBulkString("foo")
214 w.AppendInt(33)
215 w.AppendInlineString("7.54")
216 w.AppendArrayLen(2)
217 w.AppendInlineString("bar")
218 w.AppendInt(14)
219 w.Flush()
220
221 var v struct {
222 String string
223 Number int
224 Decimal float64
225 Slice []string
226 }
227
228 r := resp.NewResponseReader(b)
229 if err := r.Scan(&v.String, &v.Number, &v.Decimal, &v.Slice); err != nil {
230 fmt.Printf("ERROR: %v\n", err)
231 return
232 }
233 fmt.Printf("%+v\n", v)
234
235 // Output:
236 // {String:foo Number:33 Decimal:7.54 Slice:[bar 14]}
237 }
3636 if cmd == nil {
3737 cmd = new(Command)
3838 } else {
39 cmd.reset()
39 cmd.Reset()
4040 }
4141
42 err := parseCommand(cmd, r.r)
43 return cmd, err
42 return cmd, readCommand(cmd, r.r)
4443 }
4544
4645 // StreamCmd reads the next command as a stream.
4847 if cmd == nil {
4948 cmd = new(CommandStream)
5049 } else {
51 cmd.reset()
50 cmd.Reset()
5251 }
5352
54 err := parseCommand(cmd, r.r)
55 return cmd, err
53 return cmd, readCommand(cmd, r.r)
5654 }
5755
5856 // SkipCmd skips the next command.
2222 cmd, err := r.ReadCmd(nil)
2323 Expect(err).NotTo(HaveOccurred())
2424 Expect(cmd).To(MatchCommand("PING"))
25 Expect(cmd.ArgN()).To(Equal(0))
25 Expect(cmd.Args).To(HaveLen(0))
2626 Expect(cmd.Arg(0)).To(BeNil())
2727
2828 cmd, err = r.ReadCmd(cmd)
2929 Expect(err).NotTo(HaveOccurred())
3030 Expect(cmd).To(MatchCommand("EcHO", "HeLLO"))
31 Expect(cmd.ArgN()).To(Equal(1))
31 Expect(cmd.Args).To(HaveLen(1))
3232 Expect(cmd.Arg(0)).To(Equal(resp.CommandArgument("HeLLO")))
3333 Expect(cmd.Arg(1)).To(BeNil())
3434
3535 cmd, err = r.ReadCmd(cmd)
3636 Expect(err).To(MatchError("EOF"))
3737 Expect(cmd).To(MatchCommand(""))
38 })
39
40 It("should read quoted inline requests", func() {
41 r := setup("try \"double \\x71uotes\" \t 'single quotes' \r\n")
42
43 cmd, err := r.ReadCmd(nil)
44 Expect(err).NotTo(HaveOccurred())
45 Expect(cmd).To(MatchCommand("try", "double quotes", "single quotes"))
46 Expect(cmd.Args).To(HaveLen(2))
3847 })
3948
4049 It("should reject inline commands that are larger than the buffer", func() {
4958 cmd, err := r.ReadCmd(nil)
5059 Expect(err).NotTo(HaveOccurred())
5160 Expect(cmd).To(MatchCommand("PING"))
52 Expect(cmd.ArgN()).To(Equal(0))
61 Expect(cmd.Args).To(HaveLen(0))
5362 Expect(cmd.Arg(0)).To(BeNil())
5463
5564 cmd, err = r.ReadCmd(cmd)
5665 Expect(err).NotTo(HaveOccurred())
5766 Expect(cmd).To(MatchCommand("EcHO", "HeLLO"))
58 Expect(cmd.ArgN()).To(Equal(1))
67 Expect(cmd.Args).To(HaveLen(1))
5968 Expect(cmd.Arg(0)).To(Equal(resp.CommandArgument("HeLLO")))
6069 Expect(cmd.Arg(1)).To(BeNil())
6170
7079 cmd, err := r.ReadCmd(nil)
7180 Expect(err).NotTo(HaveOccurred())
7281 Expect(cmd.Name).To(Equal("ECHO"))
73 Expect(cmd.ArgN()).To(Equal(1))
82 Expect(cmd.Args).To(HaveLen(1))
7483 Expect(len(cmd.Arg(0))).To(Equal(100000))
7584
7685 cmd, err = r.ReadCmd(cmd)
8594 Expect(err).NotTo(HaveOccurred())
8695 Expect(cmd).To(MatchStream("PING"))
8796 Expect(cmd.ArgN()).To(Equal(0))
88 _, err = cmd.NextArg()
97 Expect(cmd.More()).To(BeFalse())
98 _, err = cmd.Next()
8999 Expect(err).To(MatchError("resp: no more arguments"))
90100
91101 cmd, err = r.StreamCmd(cmd)
92102 Expect(err).NotTo(HaveOccurred())
103 Expect(cmd.ArgN()).To(Equal(1))
104 Expect(cmd.More()).To(BeTrue())
93105 Expect(cmd).To(MatchStream("EcHO", "HeLLO"))
94106 Expect(cmd.ArgN()).To(Equal(1))
95 _, err = cmd.NextArg()
107 Expect(cmd.More()).To(BeFalse())
108 _, err = cmd.Next()
96109 Expect(err).To(MatchError("resp: no more arguments"))
97110
98111 cmd, err = r.StreamCmd(cmd)
107120 Expect(err).NotTo(HaveOccurred())
108121 Expect(cmd).To(MatchStream("PING"))
109122 Expect(cmd.ArgN()).To(Equal(0))
110 _, err = cmd.NextArg()
123 Expect(cmd.More()).To(BeFalse())
124 _, err = cmd.Next()
111125 Expect(err).To(MatchError("resp: no more arguments"))
112126
113127 cmd, err = r.StreamCmd(cmd)
114128 Expect(err).NotTo(HaveOccurred())
129 Expect(cmd.ArgN()).To(Equal(1))
130 Expect(cmd.More()).To(BeTrue())
115131 Expect(cmd).To(MatchStream("EcHO", "HeLLO"))
116132 Expect(cmd.ArgN()).To(Equal(1))
117 _, err = cmd.NextArg()
133 Expect(cmd.More()).To(BeFalse())
134 _, err = cmd.Next()
118135 Expect(err).To(MatchError("resp: no more arguments"))
119136
120137 cmd, err = r.StreamCmd(cmd)
130147 Expect(err).NotTo(HaveOccurred())
131148 Expect(cmd.Name).To(Equal("ECHO"))
132149 Expect(cmd.ArgN()).To(Equal(1))
133
134 arg, err := cmd.NextArg()
150 Expect(cmd.More()).To(BeTrue())
151
152 arg, err := cmd.Next()
135153 Expect(err).NotTo(HaveOccurred())
136154
137155 n, err := buf.ReadFrom(arg)
147165 Expect(err).NotTo(HaveOccurred())
148166 Expect(cmd.Name).To(Equal("ECHO"))
149167 Expect(cmd.ArgN()).To(Equal(1))
168 Expect(cmd.More()).To(BeTrue())
150169 Expect(cmd.Discard()).To(Succeed())
170 Expect(cmd.More()).To(BeFalse())
151171
152172 cmd, err = r.StreamCmd(cmd)
153173 Expect(err).NotTo(HaveOccurred())
161181 Expect(err).NotTo(HaveOccurred())
162182 Expect(cmd.Name).To(Equal("ECHO"))
163183 Expect(cmd.ArgN()).To(Equal(1))
164
165 _, err = cmd.NextArg()
166 Expect(err).NotTo(HaveOccurred())
184 Expect(cmd.More()).To(BeTrue())
185
186 _, err = cmd.Next()
187 Expect(err).NotTo(HaveOccurred())
188 Expect(cmd.More()).To(BeFalse())
189
167190 Expect(cmd.Discard()).To(Succeed())
191 Expect(cmd.More()).To(BeFalse())
168192
169193 cmd, err = r.StreamCmd(cmd)
170194 Expect(err).NotTo(HaveOccurred())
22 // server side readers and writers.
33 package resp
44
5 import "fmt"
5 import (
6 "fmt"
7 )
8
9 // ResponseType represents the reply type
10 type ResponseType uint8
11
12 // String returns the response type description
13 func (t ResponseType) String() string {
14 switch t {
15 case TypeArray:
16 return "Array"
17 case TypeBulk:
18 return "Bulk"
19 case TypeInline:
20 return "Inline"
21 case TypeError:
22 return "Error"
23 case TypeInt:
24 return "Int"
25 case TypeNil:
26 return "Nil"
27 }
28 return "Unknown"
29 }
30
31 // response type iota
32 const (
33 TypeUnknown ResponseType = iota
34 TypeArray
35 TypeBulk
36 TypeInline
37 TypeError
38 TypeInt
39 TypeNil
40 )
41
42 // --------------------------------------------------------------------
43
44 // Scannable interfaces may implement custom Scan behaviour
45 type Scannable interface {
46 // ScanResponse scans theresponse from the reader
47 ScanResponse(t ResponseType, r ResponseReader) error
48 }
49
50 // NullString is a scannable that can deal with nil values
51 type NullString struct {
52 Value string
53 Valid bool
54 }
55
56 // ScanResponse implements Scannable
57 func (s *NullString) ScanResponse(t ResponseType, r ResponseReader) error {
58 if t == TypeNil {
59 return r.ReadNil()
60 }
61
62 if err := r.Scan(&s.Value); err != nil {
63 return err
64 }
65 s.Valid = true
66 return nil
67 }
68
69 // --------------------------------------------------------------------
670
771 type protoError string
872
2387 errInvalidBulkLength = protoError("Protocol error: invalid bulk length")
2488 errBlankBulkLength = protoError("Protocol error: expected '$', got ' '")
2589 errInlineRequestTooLong = protoError("Protocol error: too big inline request")
26 errNotAnInteger = protoError("Protocol error: expected an integer")
90 errNotANumber = protoError("Protocol error: expected a number")
2791 errNotANilMessage = protoError("Protocol error: expected a nil")
92 errBadResponseType = protoError("Protocol error: bad response type")
2893 )
2994
3095 var (
1010 . "github.com/onsi/gomega"
1111 "github.com/onsi/gomega/types"
1212 )
13
14 var _ = Describe("ResponseType", func() {
15
16 It("should implement stringer", func() {
17 Expect(resp.TypeArray.String()).To(Equal("Array"))
18 Expect(resp.TypeNil.String()).To(Equal("Nil"))
19 Expect(resp.TypeUnknown.String()).To(Equal("Unknown"))
20 })
21
22 })
1323
1424 // --------------------------------------------------------------------
1525
6373 buf := new(bytes.Buffer)
6474 m.actual = []string{cmd.Name}
6575
66 for i := 0; i < cmd.ArgN(); i++ {
67 ar, err := cmd.NextArg()
76 for cmd.More() {
77 arg, err := cmd.Next()
6878 if err != nil {
69 return false, fmt.Errorf("MatchStream failed to parse argument #%d: %v", i+1, err)
79 return false, fmt.Errorf("MatchStream failed to parse argument: %v", err)
7080 }
7181
7282 buf.Reset()
73 if _, err = buf.ReadFrom(ar); err != nil {
83 if _, err = buf.ReadFrom(arg); err != nil {
7484 return false, fmt.Errorf("MatchStream failed to read argument into buffer: %v", err)
7585 }
7686 m.actual = append(m.actual, buf.String())
8797
8898 func cmdToSlice(cmd *resp.Command) []string {
8999 res := []string{cmd.Name}
90 for _, arg := range cmd.Args() {
91 res = append(res, string(arg))
100 for _, arg := range cmd.Args {
101 res = append(res, arg.String())
92102 }
93103 return res
94104 }
105
106 type ScannableStruct struct {
107 Name string
108 Arity int
109 Flags []string
110 FirstKey int
111 LastKey int
112 KeyStep int
113 }
114
115 func (s *ScannableStruct) ScanResponse(t resp.ResponseType, r resp.ResponseReader) error {
116 if t != resp.TypeArray {
117 return fmt.Errorf("resp_test: unable to scan response, bad type: %s", t.String())
118 }
119
120 sz, err := r.ReadArrayLen()
121 if err != nil {
122 return err
123 } else if sz != 6 {
124 return fmt.Errorf("resp_test: expected 6 attributes, but received %d", sz)
125 }
126
127 return r.Scan(&s.Name, &s.Arity, &s.Flags, &s.FirstKey, &s.LastKey, &s.KeyStep)
128 }
00 package resp
11
2 import "io"
2 import (
3 "io"
4 )
5
6 // CustomResponse values implement custom serialization and can be passed
7 // to ResponseWriter.Append.
8 type CustomResponse interface {
9 // AppendTo must be implemented by custom response types
10 AppendTo(w ResponseWriter)
11 }
312
413 // ResponseWriter is used by servers to wrap a client connection and send
514 // protocol-compatible responses in buffered pipelines.
615 type ResponseWriter interface {
16 io.Writer
17
718 // AppendArrayLen appends an array header to the output buffer.
819 AppendArrayLen(n int)
920 // AppendBulk appends bulk bytes to the output buffer.
2435 AppendNil()
2536 // AppendOK appends "OK" to the output buffer.
2637 AppendOK()
38 // Append automatically serialized given values and appends them to the output buffer.
39 // Supported values include:
40 // * nil
41 // * error
42 // * string
43 // * []byte
44 // * bool
45 // * float32, float64
46 // * int, int8, int16, int32, int64
47 // * uint, uint8, uint16, uint32, uint64
48 // * CustomResponse instances
49 // * slices and maps of any of the above
50 Append(v interface{}) error
2751 // CopyBulk copies n bytes from a reader.
2852 // This call may flush pending buffer to prevent overflows.
2953 CopyBulk(src io.Reader, n int64) error
4569
4670 // --------------------------------------------------------------------
4771
48 // ResponseType represents the reply type
49 type ResponseType uint8
50
51 // response type iota
52 const (
53 TypeUnknown ResponseType = iota
54 TypeArray
55 TypeBulk
56 TypeInline
57 TypeError
58 TypeInt
59 TypeNil
60 )
61
62 // ResponseReader is used by clients to wrap a server connection and
63 // parse responses.
64 type ResponseReader interface {
72 // ResponseParser is a basic response parser
73 type ResponseParser interface {
6574 // PeekType returns the type of the next response block
6675 PeekType() (ResponseType, error)
6776 // ReadNil reads a nil value
8190 ReadError() (string, error)
8291 // ReadInlineString reads a status string
8392 ReadInlineString() (string, error)
93 // Scan scans results into the given values.
94 Scan(vv ...interface{}) error
95 }
96
97 // ResponseReader is used by clients to wrap a server connection and
98 // parse responses.
99 type ResponseReader interface {
100 ResponseParser
101
84102 // Buffered returns the number of buffered (unread) bytes.
85103 Buffered() int
86104 // Reset resets the reader to a new reader and recycles internal buffers.
11
22 import (
33 "bytes"
4 "errors"
5 "fmt"
6 "reflect"
7 "strconv"
48 "strings"
9 "time"
510
611 "github.com/bsm/redeo/resp"
712 . "github.com/onsi/ginkgo"
13 . "github.com/onsi/ginkgo/extensions/table"
814 . "github.com/onsi/gomega"
915 )
1016
8894 Expect(subject.CopyBulk(src, 16)).To(Succeed())
8995 Expect(subject.Flush()).To(Succeed())
9096 Expect(buf.String()).To(Equal("*1\r\n$16\r\nthis is a stream\r\n"))
97 })
98
99 DescribeTable("Append",
100 func(v interface{}, exp string) {
101 subject.Append(v)
102 Expect(subject.Flush()).To(Succeed())
103 Expect(strconv.Quote(buf.String())).To(Equal(strconv.Quote(exp)))
104 },
105
106 Entry("nil", nil, "$-1\r\n"),
107 Entry("error", errors.New("failed"), "-ERR failed\r\n"),
108 Entry("standard error", errors.New("ERR failed"), "-ERR failed\r\n"),
109 Entry("int", 33, ":33\r\n"),
110 Entry("int64", int64(33), ":33\r\n"),
111 Entry("uint", uint(33), ":33\r\n"),
112 Entry("bool (true)", true, ":1\r\n"),
113 Entry("bool (false)", false, ":0\r\n"),
114 Entry("float32", float32(0.1231), "+0.1231\r\n"),
115 Entry("float64", 0.7357, "+0.7357\r\n"),
116 Entry("negative float64", -0.4214, "+-0.4214\r\n"),
117 Entry("string", "many words", "$10\r\nmany words\r\n"),
118 Entry("[]byte", []byte("many words"), "$10\r\nmany words\r\n"),
119 Entry("[]string", []string{"a", "b", "c"}, "*3\r\n$1\r\na\r\n$1\r\nb\r\n$1\r\nc\r\n"),
120 Entry("[][]byte", [][]byte{{'a'}, {'b'}, {'c'}}, "*3\r\n$1\r\na\r\n$1\r\nb\r\n$1\r\nc\r\n"),
121 Entry("[]int", []int{3, 5, 2}, "*3\r\n:3\r\n:5\r\n:2\r\n"),
122 Entry("[]int64", []int64{7, 8, 3}, "*3\r\n:7\r\n:8\r\n:3\r\n"),
123 Entry("[][]int64", [][]int64{{1, 2}, {3, 4}}, "*2\r\n*2\r\n:1\r\n:2\r\n*2\r\n:3\r\n:4\r\n"),
124 Entry("map[string]string", map[string]string{"a": "b"}, "*2\r\n$1\r\na\r\n$1\r\nb\r\n"),
125 Entry("map[int64]float64", map[int64]float64{1: 1.1}, "*2\r\n:1\r\n+1.1\r\n"),
126 Entry("custom response", &customResponse{Host: "foo", Port: 8888}, "$17\r\ncustom 'foo:8888'\r\n"),
127 Entry("custom error", customErrorResponse("bar"), "-WRONG bar\r\n"),
128 )
129
130 It("should reject bad custom types", func() {
131 Expect(subject.Append(time.Time{})).To(MatchError(`resp: unsupported type time.Time`))
91132 })
92133
93134 })
270311 Expect(err).To(MatchError("EOF"))
271312 })
272313
314 Describe("Scan", func() {
315
316 DescribeTable("success",
317 func(s string, v interface{}, exp interface{}) {
318 _, err := buf.WriteString(s)
319 Expect(err).NotTo(HaveOccurred())
320 Expect(subject.Scan(v)).To(Succeed())
321 Expect(reflect.ValueOf(v).Elem().Interface()).To(Equal(exp))
322 },
323
324 Entry("bool (numeric true)", ":1\r\n", new(bool), true),
325 Entry("bool (numeric false)", ":0\r\n", new(bool), false),
326 Entry("bool (OK)", "+OK\r\n", new(bool), true),
327 Entry("bool (true from inline)", "+1\r\n", new(bool), true),
328 Entry("bool (true from bulk)", "$1\r\n1\r\n", new(bool), true),
329 Entry("bool (false from inline)", "+0\r\n", new(bool), false),
330 Entry("bool (false from bulk)", "$1\r\n0\r\n", new(bool), false),
331
332 Entry("int64", ":123\r\n", new(int64), int64(123)),
333 Entry("int32", ":123\r\n", new(int32), int32(123)),
334 Entry("int16", ":123\r\n", new(int16), int16(123)),
335 Entry("int8", ":123\r\n", new(int8), int8(123)),
336 Entry("int", ":123\r\n", new(int), int(123)),
337 Entry("int (from inline)", "+123\r\n", new(int), int(123)),
338 Entry("int (from bulk)", "$3\r\n123\r\n", new(int), int(123)),
339
340 Entry("uint64", ":123\r\n", new(uint64), uint64(123)),
341 Entry("uint32", ":123\r\n", new(uint32), uint32(123)),
342 Entry("uint16", ":123\r\n", new(uint16), uint16(123)),
343 Entry("uint8", ":123\r\n", new(uint8), uint8(123)),
344 Entry("uint", ":123\r\n", new(uint), uint(123)),
345 Entry("uint (from inline)", "+123\r\n", new(uint), uint(123)),
346 Entry("uint (from bulk)", "$3\r\n123\r\n", new(uint), uint(123)),
347
348 Entry("float64 (from string)", "+2.312\r\n", new(float64), 2.312),
349 Entry("float64 (from int)", ":123\r\n", new(float64), 123.0),
350 Entry("float32 (from string)", "$5\r\n2.312\r\n", new(float32), float32(2.312)),
351 Entry("float32 (from int)", ":123\r\n", new(float32), float32(123.0)),
352
353 Entry("string (inline)", "+hello\r\n", new(string), "hello"),
354 Entry("string (bulk)", "$5\r\nhello\r\n", new(string), "hello"),
355 Entry("string (from int)", ":123\r\n", new(string), "123"),
356
357 Entry("bytes (inline)", "+hello\r\n", new([]byte), []byte("hello")),
358 Entry("bytes (bulk)", "$5\r\nhello\r\n", new([]byte), []byte("hello")),
359 Entry("bytes (from int)", ":123\r\n", new([]byte), []byte("123")),
360 Entry("bytes (from nil)", "$-1\r\n", new([]byte), ([]byte)(nil)),
361
362 Entry("string slices", "*2\r\n+hello\r\n$5\r\nworld\r\n", new([]string), []string{"hello", "world"}),
363 Entry("string slices (with ints)", "*2\r\n+hello\r\n:123\r\n", new([]string), []string{"hello", "123"}),
364 Entry("number slices", "*2\r\n:1\r\n:2\r\n", new([]int64), []int64{1, 2}),
365 Entry("number slices (from strings)", "*2\r\n:1\r\n+2\r\n", new([]int64), []int64{1, 2}),
366 Entry("nested slices", "*2\r\n*2\r\n:1\r\n:2\r\n*2\r\n:3\r\n:4\r\n", new([][]int64), [][]int64{
367 {1, 2},
368 {3, 4},
369 }),
370
371 Entry("maps", "*2\r\n+hello\r\n$5\r\nworld\r\n", new(map[string]string), map[string]string{
372 "hello": "world",
373 }),
374 Entry("maps (mixed)", "*4\r\n+foo\r\n+bar\r\n+baz\r\n:3\r\n", new(map[string]string), map[string]string{
375 "foo": "bar",
376 "baz": "3",
377 }),
378 Entry("maps (nested)", "*4\r\n+foo\r\n*2\r\n+bar\r\n:1\r\n+baz\r\n*2\r\n+boo\r\n:2\r\n", new(map[string]map[string]int), map[string]map[string]int{
379 "foo": {"bar": 1},
380 "baz": {"boo": 2},
381 }),
382 Entry("slice of maps", "*2\r\n*2\r\n+bar\r\n:1\r\n*2\r\n+boo\r\n:2\r\n", new([]map[string]int), []map[string]int{
383 {"bar": 1},
384 {"boo": 2},
385 }),
386
387 Entry("nullable (from nil)", "$-1\r\n", new(resp.NullString), resp.NullString{}),
388 Entry("nullable (inline)", "+foo\r\n", new(resp.NullString), resp.NullString{Value: "foo", Valid: true}),
389
390 Entry("scannable", "*2\r\n"+
391 "*6\r\n+llen\r\n:2\r\n*2\r\n+readonly\r\n+fast\r\n:1\r\n:1\r\n:1\r\n"+
392 "*6\r\n+mset\r\n:-3\r\n*1\r\n+write\r\n:1\r\n:-1\r\n:2\r\n",
393 new([]ScannableStruct), []ScannableStruct{
394 {Name: "llen", Arity: 2, Flags: []string{"readonly", "fast"}, FirstKey: 1, LastKey: 1, KeyStep: 1},
395 {Name: "mset", Arity: -3, Flags: []string{"write"}, FirstKey: 1, LastKey: -1, KeyStep: 2},
396 }),
397 )
398
399 DescribeTable("failure",
400 func(s string, v interface{}, exp string) {
401 _, err := buf.WriteString(s)
402 Expect(err).NotTo(HaveOccurred())
403 Expect(subject.Scan(v)).To(MatchError(exp))
404 },
405 Entry("errors", "-ERR something bad\r\n", new(string), `resp: server error "ERR something bad"`),
406 Entry("bad type", "+hello\r\n", new(time.Time), `resp: error on Scan into *time.Time: unsupported conversion from "hello"`),
407 Entry("not a pointer", "+hello\r\n", "value", `resp: error on Scan into string: destination not a pointer`),
408
409 Entry("bool (bad type)", "*3\r\n", new(bool), `resp: error on Scan into *bool: unsupported conversion from array[3]`),
410 Entry("bool (string)", "+hello\r\n", new(bool), `resp: error on Scan into *bool: unsupported conversion from "hello"`),
411 Entry("bool (bad numeric)", ":2\r\n", new(bool), `resp: error on Scan into *bool: unsupported conversion from 2`),
412 Entry("bool (from nil)", "$-1\r\n", new(bool), `resp: error on Scan into *bool: unsupported conversion from <nil>`),
413
414 Entry("int64 (bad type)", "*3\r\n", new(int64), `resp: error on Scan into *int64: unsupported conversion from array[3]`),
415 Entry("int64 (string)", "+hello\r\n", new(int64), `resp: error on Scan into *int64: unsupported conversion from "hello"`),
416 Entry("int64 (from nil)", "$-1\r\n", new(int64), `resp: error on Scan into *int64: unsupported conversion from <nil>`),
417
418 Entry("string (bad type)", "*3\r\n", new(string), `resp: error on Scan into *string: unsupported conversion from array[3]`),
419 Entry("string (nil)", "$-1\r\n", new(string), `resp: error on Scan into *string: unsupported conversion from <nil>`),
420
421 Entry("float64 (bad type)", "*3\r\n", new(float64), `resp: error on Scan into *float64: unsupported conversion from array[3]`),
422 Entry("float64 (bad string)", "+hello\r\n", new(float64), `resp: error on Scan into *float64: unsupported conversion from "hello"`),
423
424 Entry("slices (bad type)", "+hello\r\n", new([]string), `resp: error on Scan into *[]string: unsupported conversion from "hello"`),
425 Entry("maps (odd number)", "*3\r\n+foo\r\n+bar\r\n+ba\r\n", new(map[string]string), `resp: error on Scan into *map[string]string: unsupported conversion from array[3]`),
426 )
427
428 DescribeTable("nil",
429 func(s string, v interface{}) {
430 _, err := buf.WriteString(s)
431 Expect(err).NotTo(HaveOccurred())
432 Expect(subject.Scan(v)).To(Succeed())
433 },
434 Entry("nil (from nil)", "$-1\r\n", nil),
435 Entry("nil (from int)", ":123\r\n", nil),
436 Entry("nil (from inline)", "+foo\r\n", nil),
437 Entry("nil (from array)", "*1\r\n+foo\r\n", nil),
438 )
439
440 })
441
273442 })
443
444 type customResponse struct {
445 Host string
446 Port int
447 }
448
449 func (r *customResponse) AppendTo(w resp.ResponseWriter) {
450 w.AppendBulkString(fmt.Sprintf("custom '%s:%d'", r.Host, r.Port))
451 }
452
453 type customErrorResponse string
454
455 func (r customErrorResponse) AppendTo(w resp.ResponseWriter) {
456 w.AppendError(r.Error())
457 }
458
459 func (r customErrorResponse) Error() string {
460 return "WRONG " + string(r)
461 }
0 package resp
1
2 import (
3 "fmt"
4 "reflect"
5 "strconv"
6 "strings"
7 )
8
9 const (
10 errMsgNilPtr = "destination pointer is nil"
11 errMsgNotPtr = "destination not a pointer"
12 )
13
14 // Scan attempts to scan responses into given values
15 func (b *bufioR) Scan(vv ...interface{}) error {
16 for _, v := range vv {
17 if err := b.scan(v); err != nil {
18 return err
19 }
20 }
21 return nil
22 }
23
24 func (b *bufioR) scan(dst interface{}) error {
25 pt, err := b.PeekType()
26 if err != nil {
27 return err
28 }
29
30 if pt == TypeError {
31 src, err := b.ReadError()
32 if err != nil {
33 return err
34 }
35 return fmt.Errorf(`resp: server error %q`, src)
36 }
37
38 if scn, ok := dst.(Scannable); ok {
39 return scn.ScanResponse(pt, b)
40 }
41
42 switch pt {
43 case TypeArray:
44 sz, err := b.ReadArrayLen()
45 if err != nil {
46 return err
47 }
48 return b.scanArray(dst, sz)
49 case TypeNil:
50 if err := b.ReadNil(); err != nil {
51 return err
52 }
53 return scanNil(dst)
54 case TypeInt:
55 src, err := b.ReadInt()
56 if err != nil {
57 return err
58 }
59 return scanInt(dst, src)
60 case TypeInline:
61 src, err := b.ReadInlineString()
62 if err != nil {
63 return err
64 }
65 return scanString(dst, src)
66 case TypeBulk:
67 if v, ok := dst.(*[]byte); ok {
68 src, err := b.ReadBulk(nil)
69 if err != nil {
70 return err
71 }
72 return assignBytes(v, src)
73 }
74
75 src, err := b.ReadBulkString()
76 if err != nil {
77 return err
78 }
79 return scanString(dst, src)
80 default:
81 return errBadResponseType
82 }
83 }
84
85 func (b *bufioR) scanArray(dst interface{}, sz int) error {
86 dpv := reflect.ValueOf(dst)
87
88 // skip array if nil is passed
89 if dpv.Kind() == reflect.Invalid && dst == nil {
90 for i := 0; i < sz; i++ {
91 if err := b.scan(nil); err != nil {
92 return err
93 }
94 }
95 return nil
96 }
97 if dpv.Kind() != reflect.Ptr {
98 return scanErrf(dst, errMsgNotPtr)
99 }
100 if dpv.IsNil() {
101 return scanErrf(dst, errMsgNilPtr)
102 }
103
104 dv := reflect.Indirect(dpv)
105 switch dv.Kind() {
106 case reflect.Slice:
107 if dv.Len() < sz {
108 nv := reflect.MakeSlice(dv.Type(), sz, sz)
109 reflect.Copy(nv, dv)
110 dv.Set(nv)
111 }
112
113 var err error
114 for i := 0; i < sz; i++ {
115 val := dv.Index(i)
116 if val.Kind() != reflect.Ptr && val.CanAddr() {
117 val = val.Addr()
118 }
119 if e := b.scan(val.Interface()); e != nil && err == nil {
120 err = e
121 }
122 }
123 return err
124 case reflect.Map:
125 if sz%2 != 0 {
126 break
127 }
128
129 dt := dv.Type()
130 if dv.IsNil() {
131 dv.Set(reflect.MakeMap(dt))
132 }
133
134 kt, vt := dt.Key(), dt.Elem()
135
136 var err error
137 for i := 0; i < sz; i += 2 {
138 key, val := reflect.New(kt), reflect.New(vt)
139 if e := b.Scan(key.Interface(), val.Interface()); e != nil && err == nil {
140 err = e
141 } else {
142 dv.SetMapIndex(key.Elem(), val.Elem())
143 }
144 }
145 return err
146 }
147
148 return scanErrf(dst, "unsupported conversion from array[%d]", sz)
149 }
150
151 func scanErrf(dst interface{}, format string, vv ...interface{}) error {
152 vv = append([]interface{}{dst}, vv...)
153 return fmt.Errorf("resp: error on Scan into %T: "+format, vv...)
154 }
155
156 func scanNil(dst interface{}) error {
157 switch w := dst.(type) {
158 case *interface{}:
159 if w == nil {
160 return scanErrf(dst, errMsgNilPtr)
161 }
162 *w = nil
163 return nil
164 case *[]byte:
165 if w == nil {
166 return scanErrf(dst, errMsgNilPtr)
167 }
168 *w = nil
169 return nil
170 case nil:
171 return nil
172 }
173
174 return scanValue(dst, nil)
175 }
176
177 func scanString(dst interface{}, src string) error {
178 switch v := dst.(type) {
179 case *string:
180 if v == nil {
181 return scanErrf(dst, errMsgNilPtr)
182 }
183 *v = src
184 return nil
185 case *[]byte:
186 if v == nil {
187 return scanErrf(dst, errMsgNilPtr)
188 }
189 *v = []byte(src)
190 return nil
191 case *bool:
192 if v == nil {
193 return scanErrf(dst, errMsgNilPtr)
194 }
195 if src == "1" || src == "0" || strings.ToUpper(src) == "OK" {
196 *v = src != "0"
197 return nil
198 }
199 case *int:
200 if v == nil {
201 return scanErrf(dst, errMsgNilPtr)
202 }
203 if n, err := strconv.ParseInt(src, 10, 64); err == nil {
204 *v = int(n)
205 return nil
206 }
207 case *int8:
208 if v == nil {
209 return scanErrf(dst, errMsgNilPtr)
210 }
211 if n, err := strconv.ParseInt(src, 10, 8); err == nil {
212 *v = int8(n)
213 return nil
214 }
215 case *int16:
216 if v == nil {
217 return scanErrf(dst, errMsgNilPtr)
218 }
219 if n, err := strconv.ParseInt(src, 10, 16); err == nil {
220 *v = int16(n)
221 return nil
222 }
223 case *int32:
224 if v == nil {
225 return scanErrf(dst, errMsgNilPtr)
226 }
227 if n, err := strconv.ParseInt(src, 10, 32); err == nil {
228 *v = int32(n)
229 return nil
230 }
231 case *int64:
232 if v == nil {
233 return scanErrf(dst, errMsgNilPtr)
234 }
235 if n, err := strconv.ParseInt(src, 10, 64); err == nil {
236 *v = int64(n)
237 return nil
238 }
239 case *uint:
240 if v == nil {
241 return scanErrf(dst, errMsgNilPtr)
242 }
243 if n, err := strconv.ParseUint(src, 10, 64); err == nil {
244 *v = uint(n)
245 return nil
246 }
247 case *uint8:
248 if v == nil {
249 return scanErrf(dst, errMsgNilPtr)
250 }
251 if n, err := strconv.ParseUint(src, 10, 8); err == nil {
252 *v = uint8(n)
253 return nil
254 }
255 case *uint16:
256 if v == nil {
257 return scanErrf(dst, errMsgNilPtr)
258 }
259 if n, err := strconv.ParseUint(src, 10, 16); err == nil {
260 *v = uint16(n)
261 return nil
262 }
263 case *uint32:
264 if v == nil {
265 return scanErrf(dst, errMsgNilPtr)
266 }
267 if n, err := strconv.ParseUint(src, 10, 32); err == nil {
268 *v = uint32(n)
269 return nil
270 }
271 case *uint64:
272 if v == nil {
273 return scanErrf(dst, errMsgNilPtr)
274 }
275 if n, err := strconv.ParseUint(src, 10, 64); err == nil {
276 *v = uint64(n)
277 return nil
278 }
279 case *float32:
280 if v == nil {
281 return scanErrf(dst, errMsgNilPtr)
282 }
283 if n, err := strconv.ParseFloat(src, 32); err == nil {
284 *v = float32(n)
285 return nil
286 }
287 case *float64:
288 if v == nil {
289 return scanErrf(dst, errMsgNilPtr)
290 }
291 if n, err := strconv.ParseFloat(src, 64); err == nil {
292 *v = float64(n)
293 return nil
294 }
295 case nil:
296 return nil
297 }
298 return scanValue(dst, src)
299 }
300
301 func scanInt(dst interface{}, src int64) error {
302 switch v := dst.(type) {
303 case *string:
304 if v == nil {
305 return scanErrf(dst, errMsgNilPtr)
306 }
307 *v = strconv.FormatInt(src, 10)
308 return nil
309 case *[]byte:
310 if v == nil {
311 return scanErrf(dst, errMsgNilPtr)
312 }
313 *v = []byte(strconv.FormatInt(src, 10))
314 return nil
315 case *bool:
316 if v == nil {
317 return scanErrf(dst, errMsgNilPtr)
318 }
319 if src == 0 || src == 1 {
320 *v = src == 1
321 return nil
322 }
323 case *int:
324 if v == nil {
325 return scanErrf(dst, errMsgNilPtr)
326 }
327 *v = int(src)
328 return nil
329 case *int8:
330 if v == nil {
331 return scanErrf(dst, errMsgNilPtr)
332 }
333 *v = int8(src)
334 return nil
335 case *int16:
336 if v == nil {
337 return scanErrf(dst, errMsgNilPtr)
338 }
339 *v = int16(src)
340 return nil
341 case *int32:
342 if v == nil {
343 return scanErrf(dst, errMsgNilPtr)
344 }
345 *v = int32(src)
346 return nil
347 case *int64:
348 if v == nil {
349 return scanErrf(dst, errMsgNilPtr)
350 }
351 *v = int64(src)
352 return nil
353 case *uint:
354 if v == nil {
355 return scanErrf(dst, errMsgNilPtr)
356 }
357 *v = uint(src)
358 return nil
359 case *uint8:
360 if v == nil {
361 return scanErrf(dst, errMsgNilPtr)
362 }
363 *v = uint8(src)
364 return nil
365 case *uint16:
366 if v == nil {
367 return scanErrf(dst, errMsgNilPtr)
368 }
369 *v = uint16(src)
370 return nil
371 case *uint32:
372 if v == nil {
373 return scanErrf(dst, errMsgNilPtr)
374 }
375 *v = uint32(src)
376 return nil
377 case *uint64:
378 if v == nil {
379 return scanErrf(dst, errMsgNilPtr)
380 }
381 *v = uint64(src)
382 return nil
383 case *float32:
384 if v == nil {
385 return scanErrf(dst, errMsgNilPtr)
386 }
387 *v = float32(src)
388 return nil
389 case *float64:
390 if v == nil {
391 return scanErrf(dst, errMsgNilPtr)
392 }
393 *v = float64(src)
394 return nil
395 case nil:
396 return nil
397 }
398 return scanValue(dst, src)
399 }
400
401 func scanValue(dst, src interface{}) error {
402 dpv := reflect.ValueOf(dst)
403 if dpv.Kind() != reflect.Ptr {
404 return scanErrf(dst, errMsgNotPtr)
405 }
406 if dpv.IsNil() {
407 return scanErrf(dst, errMsgNilPtr)
408 }
409
410 dv := reflect.Indirect(dpv)
411 sv := reflect.ValueOf(src)
412
413 // check if directly assignable
414 if sv.IsValid() && sv.Type().AssignableTo(dv.Type()) {
415 dv.Set(sv)
416 return nil
417 }
418
419 // check if same kind and convertable
420 if dv.Kind() == sv.Kind() && sv.Type().ConvertibleTo(dv.Type()) {
421 dv.Set(sv.Convert(dv.Type()))
422 return nil
423 }
424
425 return scanErrf(dst, "unsupported conversion from %#v", src)
426 }
427
428 func assignBytes(v *[]byte, src []byte) error {
429 if v == nil {
430 return scanErrf(v, errMsgNotPtr)
431 }
432
433 *v = src
434 return nil
435 }
0 package resp
1
2 var asciiSpace = [256]bool{'\t': true, '\n': true, '\v': true, '\f': true, '\r': true, ' ': true}
3
4 // 'Inspired' by sdssplitargs from https://github.com/antirez/sds
5 func appendArgument(dst, src []byte) ([]byte, int) {
6 // skip initial blanks
7 pos := 0
8 for ; pos < len(src) && asciiSpace[src[pos]]; pos++ {
9 }
10
11 var inQ, inSQ bool
12 MainLoop:
13 for pos < len(src) {
14 p := src[pos]
15
16 if inQ {
17 if p == '"' {
18 pos++
19 break MainLoop
20 } else if p == '\\' && pos+3 < len(src) && src[pos+1] == 'x' && isHexChar(src[pos+2]) && isHexChar(src[pos+3]) {
21 p = fromHexChar(src[pos+2])<<4 | fromHexChar(src[pos+3])
22 pos += 3
23 } else if p == '\\' && pos+1 < len(src) {
24 pos++
25 p = src[pos]
26 switch p {
27 case 'n':
28 p = '\n'
29 case 'r':
30 p = '\r'
31 case 't':
32 p = '\t'
33 case 'b':
34 p = '\b'
35 case 'a':
36 p = '\a'
37 }
38 }
39 dst = append(dst, p)
40
41 } else if inSQ {
42 if p == '\'' {
43 pos++
44 break MainLoop
45 }
46 dst = append(dst, p)
47
48 } else {
49 switch p {
50 case ' ', '\n', '\r', '\t':
51 break MainLoop
52 case '"':
53 if len(dst) != 0 {
54 break MainLoop
55 }
56 inQ = true
57 case '\'':
58 if len(dst) != 0 {
59 break MainLoop
60 }
61 inSQ = true
62 default:
63 dst = append(dst, p)
64 }
65 }
66 pos++
67 }
68 return dst, pos
69 }
70
71 // --------------------------------------------------------------------
72
73 func isHexChar(c byte) bool {
74 return (c >= '0' && c <= '9') ||
75 (c >= 'a' && c <= 'f') ||
76 (c >= 'A' && c <= 'F')
77 }
78
79 func fromHexChar(c byte) byte {
80 switch {
81 case '0' <= c && c <= '9':
82 return c - '0'
83 case 'a' <= c && c <= 'f':
84 return c - 'a' + 10
85 case 'A' <= c && c <= 'F':
86 return c - 'A' + 10
87 }
88 return 0
89 }
0 package resp
1
2 import (
3 . "github.com/onsi/ginkgo/extensions/table"
4 . "github.com/onsi/gomega"
5 )
6
7 var _ = DescribeTable("appendArgument",
8 func(src string, exp string, expN int) {
9 dst, n := appendArgument(nil, []byte(src))
10 Expect(string(dst)).To(Equal(exp))
11 Expect(n).To(Equal(expN))
12 },
13
14 Entry("empty",
15 "", "", 0),
16 Entry("blank",
17 " \t ", "", 3),
18 Entry("words",
19 " hello world", "hello", 7),
20 Entry("words with tabs",
21 "hello\tworld", "hello", 5),
22 Entry("words with nl",
23 "hello\nworld", "hello", 5),
24 Entry("words interrupted by quotes",
25 `he"llo" world`, "he", 2),
26 Entry("words interrupted by single quotes",
27 `he'llo' world`, "he", 2),
28 Entry("quoted",
29 ` "hello my" world`, "hello my", 11),
30 Entry("quoted with quotes",
31 `"hello \"my\" " world`, `hello "my" `, 15),
32 Entry("quoted with escaped hex chars",
33 `"hello \x6dy" world`, `hello my`, 13),
34 Entry("single quoted",
35 ` 'hello my' world`, "hello my", 11),
36 )
0 package resp
1
2 import (
3 "fmt"
4 "reflect"
5 "strconv"
6 "strings"
7 )
8
9 // Append implements ResponseWriter
10 func (w *bufioW) Append(v interface{}) error {
11 switch v := v.(type) {
12 case nil:
13 w.AppendNil()
14 case CustomResponse:
15 v.AppendTo(w)
16 case error:
17 msg := v.Error()
18 if !strings.HasPrefix(msg, "ERR ") {
19 msg = "ERR " + msg
20 }
21 w.AppendError(msg)
22 case bool:
23 if v {
24 w.AppendInt(1)
25 } else {
26 w.AppendInt(0)
27 }
28 case int:
29 w.AppendInt(int64(v))
30 case int8:
31 w.AppendInt(int64(v))
32 case int16:
33 w.AppendInt(int64(v))
34 case int32:
35 w.AppendInt(int64(v))
36 case int64:
37 w.AppendInt(v)
38 case uint:
39 w.AppendInt(int64(v))
40 case uint8:
41 w.AppendInt(int64(v))
42 case uint16:
43 w.AppendInt(int64(v))
44 case uint32:
45 w.AppendInt(int64(v))
46 case uint64:
47 w.AppendInt(int64(v))
48 case string:
49 w.AppendBulkString(v)
50 case []byte:
51 w.AppendBulk(v)
52 case CommandArgument:
53 w.AppendBulk(v)
54 case float32:
55 w.AppendInlineString(strconv.FormatFloat(float64(v), 'f', -1, 32))
56 case float64:
57 w.AppendInlineString(strconv.FormatFloat(v, 'f', -1, 64))
58 default:
59 switch reflect.TypeOf(v).Kind() {
60 case reflect.Slice:
61 s := reflect.ValueOf(v)
62
63 w.AppendArrayLen(s.Len())
64 for i := 0; i < s.Len(); i++ {
65 w.Append(s.Index(i).Interface())
66 }
67 case reflect.Map:
68 s := reflect.ValueOf(v)
69
70 w.AppendArrayLen(s.Len() * 2)
71 for _, key := range s.MapKeys() {
72 w.Append(key.Interface())
73 w.Append(s.MapIndex(key).Interface())
74 }
75 default:
76 return fmt.Errorf("resp: unsupported type %T", v)
77 }
78 }
79 return nil
80 }
4040 srv.mu.Unlock()
4141 }
4242
43 // HandleFunc registers a handler func for a command
43 // HandleFunc registers a handler func for a command.
4444 func (srv *Server) HandleFunc(name string, fn HandlerFunc) {
45 srv.Handle(name, Handler(fn))
45 srv.Handle(name, fn)
4646 }
4747
4848 // HandleStream registers a handler for a streaming command.
5454
5555 // HandleStreamFunc registers a handler func for a command
5656 func (srv *Server) HandleStreamFunc(name string, fn StreamHandlerFunc) {
57 srv.HandleStream(name, StreamHandler(fn))
57 srv.HandleStream(name, fn)
5858 }
5959
6060 // Serve accepts incoming connections on a listener, creating a
4646 return
4747 }
4848
49 rd, err := cmd.NextArg()
49 rd, err := cmd.Next()
5050 if err != nil {
5151 w.AppendErrorf("ERR unable to parse argument: %s", err.Error())
5252 return
331331
332332 srv := NewServer(nil)
333333 srv.HandleFunc("echo", func(w resp.ResponseWriter, cmd *resp.Command) {
334 if cmd.ArgN() != 1 {
334 if len(cmd.Args) != 1 {
335335 w.AppendError(WrongNumberOfArgs(cmd.Name))
336336 }
337337 w.AppendInline(cmd.Arg(0))