diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..4d51276 --- /dev/null +++ b/LICENSE @@ -0,0 +1,24 @@ +Copyright (c) 2014, Sven Taute +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + * Neither the name of go-nbreader nor the + names of its contributors may be used to endorse or promote products + derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY +DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..dfae717 --- /dev/null +++ b/README.md @@ -0,0 +1,25 @@ +go-nbreader: a non-blocking io.Reader for go +============================================ + +go-nbreader provides a non-blocking io.Reader for go (golang). + +NBReader allows to specify two timeouts: + +Timeout: Read() returns after the specified timeout, even if no data has been read. + +ChunkTimeout: Read() returns if no data has been read for the specified time, even if the overall timeout has not been hit yet. + +ChunkTimeout must be smaller than Timeout. + +When the internal buffer contains at least blockSize bytes, Read() returns regardless of the specified timeouts. + +Example Usage: + + // Create a NBReader that immediately returns on Read(), whether any data has been read or not + nbr := nbreader.NewNBReader(reader, 1 << 16) + + // Create a NBReader that tries to return on Read() after no data has been read for 200ms + // or when at least 64k bytes have been read or the maximum timeout of 2 seconds is hit. + nbr := nbreader.NewNBReader(reader, 1 << 16, nbreader.Timeout(2000 * time.Millisecond), nbreader.ChunkTimeout(200 * time.Millisecond)) + +The full documentation can be found here: diff --git a/nbreader.go b/nbreader.go new file mode 100644 index 0000000..ccd066e --- /dev/null +++ b/nbreader.go @@ -0,0 +1,147 @@ +// Copyright 2014 Sven Taute. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package nbreader implements a non-blocking io.Reader. +// +// The blockSize defines the buffer size that is used to read from the underlying io.Reader. +// +// NBReader allows to specify two timeouts: +// +// Timeout: Read() returns after the specified timeout, even if no data has been read. +// +// ChunkTimeout: Read() returns if no data has been read for the specified time, even if the overall timeout has not been hit yet. +// +// ChunkTimeout must be smaller than Timeout. +// +// Example Usage: +// // Create a NBReader that immediately returns on Read(), whether any data has been read or not +// nbr := nbreader.NewNBReader(reader, 1 << 16) +// +// // Create a NBReader that tries to return on Read() after no data has been read for 200ms +// // or when the maximum timeout of 2 seconds is hit. +// nbr := nbreader.NewNBReader(reader, 1 << 16, nbreader.Timeout(2000 * time.Millisecond), nbreader.ChunkTimeout(200 * time.Millisecond)) +package nbreader + +import ( + "bytes" + "errors" + "io" + "time" +) + +var errTimeout = errors.New("timeout") + +// NBReader implements a non-blocking io.Reader. +type NBReader struct { + blockSize int + reader io.Reader + dataChan chan []byte + buffer bytes.Buffer + chunkTimeout time.Duration + forceTimeout time.Duration + isEOF bool +} + +// Option implements options that can be passed to NewNBReader. +type Option func(r *NBReader) + +// ChunkTimeout allows to set the timeout after which the end of a chunk of data is assumed and the read data is returned by Read(). +func ChunkTimeout(duration time.Duration) Option { + return func(r *NBReader) { + r.chunkTimeout = duration + } +} + +// Timeout allows to set the timeout after which Read() returns, even if no data has been read. +func Timeout(duration time.Duration) Option { + return func(r *NBReader) { + r.forceTimeout = duration + } +} + +// NewNBReader returns a new NBReader with the given block size. +func NewNBReader(reader io.Reader, blockSize int, options ...Option) *NBReader { + dataChan := make(chan []byte) + r := NBReader{reader: reader, dataChan: dataChan, blockSize: blockSize} + for _, option := range options { + option(&r) + } + go r.readInput() + return &r +} + +// Read reads data into buffer. It returns the number of bytes read into buffer. +// At EOF, err will be io.EOF. Read() might still have read data when EOF is returned for the first time. +// +// Note: Read() is not safe for concurrent use. +func (r *NBReader) Read(buffer []byte) (int, error) { + var ( + remaining time.Duration + nextTimeout time.Duration + start = time.Now() + lastStart = time.Now() + ) + + if len(buffer) <= r.buffer.Len() { + ret, _ := r.buffer.Read(buffer) + return ret, nil + } + + if r.isEOF { + return r.buffer.Read(buffer) + } + + for r.buffer.Len() < len(buffer) { + lastStart = time.Now() + remaining = r.forceTimeout - time.Now().Sub(start) + if r.chunkTimeout == 0 || r.chunkTimeout > remaining { + nextTimeout = remaining + } else { + nextTimeout = r.chunkTimeout + } + _, err := r.readWithTimeout(r.buffer, nextTimeout) + duration := time.Now().Sub(lastStart) + if err == errTimeout { + if duration >= r.chunkTimeout { + break + } + } + if err == io.EOF { + r.isEOF = true + break + } + if time.Now().Sub(start) >= r.forceTimeout { + break + } + } + ret, _ := r.buffer.Read(buffer) + return ret, nil +} + +// readInput is used by a goroutine to read data from the underlying io.Reader +func (r *NBReader) readInput() { + for { + tmp := make([]byte, r.blockSize) + length, err := r.reader.Read(tmp) + if err != nil { + break + } + r.dataChan <- tmp[0:length] + } + close(r.dataChan) +} + +// readWithTimeout consumes the data channel filled by readInput() and respects the set timeouts +func (r *NBReader) readWithTimeout(buffer bytes.Buffer, timeout time.Duration) (int, error) { + select { + case data, ok := <-r.dataChan: + r.buffer.Write(data) + if !ok { + return len(data), io.EOF + } + return len(data), nil + case <-time.After(timeout): + return 0, errTimeout + } +}