Fix 'Client.Endpoint' to not 'cancel' when bufferedStream (#776)
* transport/http/client_test: Modify the test to make it fail
With this modifications we can trigger the error that we are searching, 'context canceled'
* transport/http/client: Add the 'bodyWithCancel' to wrap the Response.Body
It adds the context.CancelFunc to the io.ReadCloser.Close function so bouth are called together
* transport/http/client: Add more documentation to clarify the changes
Also abstracted some logic on the test to make it more clear and also added more docuemntation. Added more documentation
on the definition of 'BufferedStream' to clarify that the Body has to be closed manually to properly close the response.
* transport/http/client: Add period at the end of the doc of 'BufferedStream'
Francesc Gil authored 5 years ago
Peter Bourgon committed 5 years ago
4 | 4 |
"context"
|
5 | 5 |
"encoding/json"
|
6 | 6 |
"encoding/xml"
|
|
7 |
"io"
|
7 | 8 |
"io/ioutil"
|
8 | 9 |
"net/http"
|
9 | 10 |
"net/url"
|
|
83 | 84 |
|
84 | 85 |
// BufferedStream sets whether the Response.Body is left open, allowing it
|
85 | 86 |
// to be read from later. Useful for transporting a file as a buffered stream.
|
|
87 |
// That body has to be Closed to propery end the request.
|
86 | 88 |
func BufferedStream(buffered bool) ClientOption {
|
87 | 89 |
return func(c *Client) { c.bufferedStream = buffered }
|
88 | 90 |
}
|
|
91 | 93 |
func (c Client) Endpoint() endpoint.Endpoint {
|
92 | 94 |
return func(ctx context.Context, request interface{}) (interface{}, error) {
|
93 | 95 |
ctx, cancel := context.WithCancel(ctx)
|
94 | |
defer cancel()
|
95 | 96 |
|
96 | 97 |
var (
|
97 | 98 |
resp *http.Response
|
|
111 | 112 |
|
112 | 113 |
req, err := http.NewRequest(c.method, c.tgt.String(), nil)
|
113 | 114 |
if err != nil {
|
|
115 |
cancel()
|
114 | 116 |
return nil, err
|
115 | 117 |
}
|
116 | 118 |
|
117 | 119 |
if err = c.enc(ctx, req, request); err != nil {
|
|
120 |
cancel()
|
118 | 121 |
return nil, err
|
119 | 122 |
}
|
120 | 123 |
|
|
125 | 128 |
resp, err = c.client.Do(req.WithContext(ctx))
|
126 | 129 |
|
127 | 130 |
if err != nil {
|
128 | |
return nil, err
|
129 | |
}
|
130 | |
|
131 | |
if !c.bufferedStream {
|
|
131 |
cancel()
|
|
132 |
return nil, err
|
|
133 |
}
|
|
134 |
|
|
135 |
// If we expect a buffered stream, we don't cancel the context when the endpoint returns.
|
|
136 |
// Instead, we should call the cancel func when closing the response body.
|
|
137 |
if c.bufferedStream {
|
|
138 |
resp.Body = bodyWithCancel{ReadCloser: resp.Body, cancel: cancel}
|
|
139 |
} else {
|
132 | 140 |
defer resp.Body.Close()
|
|
141 |
defer cancel()
|
133 | 142 |
}
|
134 | 143 |
|
135 | 144 |
for _, f := range c.after {
|
|
143 | 152 |
|
144 | 153 |
return response, nil
|
145 | 154 |
}
|
|
155 |
}
|
|
156 |
|
|
157 |
// bodyWithCancel is a wrapper for an io.ReadCloser with also a
|
|
158 |
// cancel function which is called when the Close is used
|
|
159 |
type bodyWithCancel struct {
|
|
160 |
io.ReadCloser
|
|
161 |
|
|
162 |
cancel context.CancelFunc
|
|
163 |
}
|
|
164 |
|
|
165 |
func (bwc bodyWithCancel) Close() error {
|
|
166 |
bwc.ReadCloser.Close()
|
|
167 |
bwc.cancel()
|
|
168 |
return nil
|
146 | 169 |
}
|
147 | 170 |
|
148 | 171 |
// ClientFinalizerFunc can be used to perform work at the end of a client HTTP
|
97 | 97 |
}
|
98 | 98 |
|
99 | 99 |
func TestHTTPClientBufferedStream(t *testing.T) {
|
|
100 |
// bodysize has a size big enought to make the resopnse.Body not an instant read
|
|
101 |
// so if the response is cancelled it wount be all readed and the test would fail
|
|
102 |
// The 6000 has not a particular meaning, it big enough to fulfill the usecase.
|
|
103 |
const bodysize = 6000
|
100 | 104 |
var (
|
101 | |
testbody = "testbody"
|
|
105 |
testbody = string(make([]byte, bodysize))
|
102 | 106 |
encode = func(context.Context, *http.Request, interface{}) error { return nil }
|
103 | 107 |
decode = func(_ context.Context, r *http.Response) (interface{}, error) {
|
104 | 108 |
return TestResponse{r.Body, ""}, nil
|
|
128 | 132 |
if !ok {
|
129 | 133 |
t.Fatal("response should be TestResponse")
|
130 | 134 |
}
|
|
135 |
defer response.Body.Close()
|
|
136 |
// Faking work
|
|
137 |
time.Sleep(time.Second * 1)
|
131 | 138 |
|
132 | 139 |
// Check that response body was NOT closed
|
133 | 140 |
b := make([]byte, len(testbody))
|