|
0 |
// Copyright 2014 Sven Taute. All rights reserved.
|
|
1 |
// Use of this source code is governed by a BSD-style
|
|
2 |
// license that can be found in the LICENSE file.
|
|
3 |
|
|
4 |
// Package nbreader implements a non-blocking io.Reader.
|
|
5 |
//
|
|
6 |
// The blockSize defines the buffer size that is used to read from the underlying io.Reader.
|
|
7 |
//
|
|
8 |
// NBReader allows to specify two timeouts:
|
|
9 |
//
|
|
10 |
// Timeout: Read() returns after the specified timeout, even if no data has been read.
|
|
11 |
//
|
|
12 |
// ChunkTimeout: Read() returns if no data has been read for the specified time, even if the overall timeout has not been hit yet.
|
|
13 |
//
|
|
14 |
// ChunkTimeout must be smaller than Timeout.
|
|
15 |
//
|
|
16 |
// Example Usage:
|
|
17 |
// // Create a NBReader that immediately returns on Read(), whether any data has been read or not
|
|
18 |
// nbr := nbreader.NewNBReader(reader, 1 << 16)
|
|
19 |
//
|
|
20 |
// // Create a NBReader that tries to return on Read() after no data has been read for 200ms
|
|
21 |
// // or when the maximum timeout of 2 seconds is hit.
|
|
22 |
// nbr := nbreader.NewNBReader(reader, 1 << 16, nbreader.Timeout(2000 * time.Millisecond), nbreader.ChunkTimeout(200 * time.Millisecond))
|
|
23 |
package nbreader
|
|
24 |
|
|
25 |
import (
|
|
26 |
"bytes"
|
|
27 |
"errors"
|
|
28 |
"io"
|
|
29 |
"time"
|
|
30 |
)
|
|
31 |
|
|
32 |
var errTimeout = errors.New("timeout")
|
|
33 |
|
|
34 |
// NBReader implements a non-blocking io.Reader.
|
|
35 |
type NBReader struct {
|
|
36 |
blockSize int
|
|
37 |
reader io.Reader
|
|
38 |
dataChan chan []byte
|
|
39 |
buffer bytes.Buffer
|
|
40 |
chunkTimeout time.Duration
|
|
41 |
forceTimeout time.Duration
|
|
42 |
isEOF bool
|
|
43 |
}
|
|
44 |
|
|
45 |
// Option implements options that can be passed to NewNBReader.
|
|
46 |
type Option func(r *NBReader)
|
|
47 |
|
|
48 |
// ChunkTimeout allows to set the timeout after which the end of a chunk of data is assumed and the read data is returned by Read().
|
|
49 |
func ChunkTimeout(duration time.Duration) Option {
|
|
50 |
return func(r *NBReader) {
|
|
51 |
r.chunkTimeout = duration
|
|
52 |
}
|
|
53 |
}
|
|
54 |
|
|
55 |
// Timeout allows to set the timeout after which Read() returns, even if no data has been read.
|
|
56 |
func Timeout(duration time.Duration) Option {
|
|
57 |
return func(r *NBReader) {
|
|
58 |
r.forceTimeout = duration
|
|
59 |
}
|
|
60 |
}
|
|
61 |
|
|
62 |
// NewNBReader returns a new NBReader with the given block size.
|
|
63 |
func NewNBReader(reader io.Reader, blockSize int, options ...Option) *NBReader {
|
|
64 |
dataChan := make(chan []byte)
|
|
65 |
r := NBReader{reader: reader, dataChan: dataChan, blockSize: blockSize}
|
|
66 |
for _, option := range options {
|
|
67 |
option(&r)
|
|
68 |
}
|
|
69 |
go r.readInput()
|
|
70 |
return &r
|
|
71 |
}
|
|
72 |
|
|
73 |
// Read reads data into buffer. It returns the number of bytes read into buffer.
|
|
74 |
// At EOF, err will be io.EOF. Read() might still have read data when EOF is returned for the first time.
|
|
75 |
//
|
|
76 |
// Note: Read() is not safe for concurrent use.
|
|
77 |
func (r *NBReader) Read(buffer []byte) (int, error) {
|
|
78 |
var (
|
|
79 |
remaining time.Duration
|
|
80 |
nextTimeout time.Duration
|
|
81 |
start = time.Now()
|
|
82 |
lastStart = time.Now()
|
|
83 |
)
|
|
84 |
|
|
85 |
if len(buffer) <= r.buffer.Len() {
|
|
86 |
ret, _ := r.buffer.Read(buffer)
|
|
87 |
return ret, nil
|
|
88 |
}
|
|
89 |
|
|
90 |
if r.isEOF {
|
|
91 |
return r.buffer.Read(buffer)
|
|
92 |
}
|
|
93 |
|
|
94 |
for r.buffer.Len() < len(buffer) {
|
|
95 |
lastStart = time.Now()
|
|
96 |
remaining = r.forceTimeout - time.Now().Sub(start)
|
|
97 |
if r.chunkTimeout == 0 || r.chunkTimeout > remaining {
|
|
98 |
nextTimeout = remaining
|
|
99 |
} else {
|
|
100 |
nextTimeout = r.chunkTimeout
|
|
101 |
}
|
|
102 |
_, err := r.readWithTimeout(r.buffer, nextTimeout)
|
|
103 |
duration := time.Now().Sub(lastStart)
|
|
104 |
if err == errTimeout {
|
|
105 |
if duration >= r.chunkTimeout {
|
|
106 |
break
|
|
107 |
}
|
|
108 |
}
|
|
109 |
if err == io.EOF {
|
|
110 |
r.isEOF = true
|
|
111 |
break
|
|
112 |
}
|
|
113 |
if time.Now().Sub(start) >= r.forceTimeout {
|
|
114 |
break
|
|
115 |
}
|
|
116 |
}
|
|
117 |
ret, _ := r.buffer.Read(buffer)
|
|
118 |
return ret, nil
|
|
119 |
}
|
|
120 |
|
|
121 |
// readInput is used by a goroutine to read data from the underlying io.Reader
|
|
122 |
func (r *NBReader) readInput() {
|
|
123 |
for {
|
|
124 |
tmp := make([]byte, r.blockSize)
|
|
125 |
length, err := r.reader.Read(tmp)
|
|
126 |
if err != nil {
|
|
127 |
break
|
|
128 |
}
|
|
129 |
r.dataChan <- tmp[0:length]
|
|
130 |
}
|
|
131 |
close(r.dataChan)
|
|
132 |
}
|
|
133 |
|
|
134 |
// readWithTimeout consumes the data channel filled by readInput() and respects the set timeouts
|
|
135 |
func (r *NBReader) readWithTimeout(buffer bytes.Buffer, timeout time.Duration) (int, error) {
|
|
136 |
select {
|
|
137 |
case data, ok := <-r.dataChan:
|
|
138 |
r.buffer.Write(data)
|
|
139 |
if !ok {
|
|
140 |
return len(data), io.EOF
|
|
141 |
}
|
|
142 |
return len(data), nil
|
|
143 |
case <-time.After(timeout):
|
|
144 |
return 0, errTimeout
|
|
145 |
}
|
|
146 |
}
|