Codebase list golang-github-svent-go-nbreader / de3cf45 nbreader.go
de3cf45

Tree @de3cf45 (Download .tar.gz)

nbreader.go @de3cf45raw · history · blame

// Copyright 2014 Sven Taute. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package nbreader implements a non-blocking io.Reader.
//
// The blockSize defines the buffer size that is used to read from the underlying io.Reader.
//
// NBReader allows to specify two timeouts:
//
// Timeout: Read() returns after the specified timeout, even if no data has been read.
//
// ChunkTimeout: Read() returns if no data has been read for the specified time, even if the overall timeout has not been hit yet.
//
// ChunkTimeout must be smaller than Timeout.
//
// Example Usage:
//     // Create a NBReader that immediately returns on Read(), whether any data has been read or not
//     nbr := nbreader.NewNBReader(reader, 1 << 16)
//
//     // Create a NBReader that tries to return on Read() after no data has been read for 200ms
//     // or when the maximum timeout of 2 seconds is hit.
//     nbr := nbreader.NewNBReader(reader, 1 << 16, nbreader.Timeout(2000 * time.Millisecond), nbreader.ChunkTimeout(200 * time.Millisecond))
package nbreader

import (
	"bytes"
	"errors"
	"io"
	"time"
)

var errTimeout = errors.New("timeout")

// NBReader implements a non-blocking io.Reader.
type NBReader struct {
	blockSize    int
	reader       io.Reader
	dataChan     chan []byte
	buffer       bytes.Buffer
	chunkTimeout time.Duration
	forceTimeout time.Duration
	isEOF        bool
}

// Option implements options that can be passed to NewNBReader.
type Option func(r *NBReader)

// ChunkTimeout allows to set the timeout after which the end of a chunk of data is assumed and the read data is returned by Read().
func ChunkTimeout(duration time.Duration) Option {
	return func(r *NBReader) {
		r.chunkTimeout = duration
	}
}

// Timeout allows to set the timeout after which Read() returns, even if no data has been read.
func Timeout(duration time.Duration) Option {
	return func(r *NBReader) {
		r.forceTimeout = duration
	}
}

// NewNBReader returns a new NBReader with the given block size.
func NewNBReader(reader io.Reader, blockSize int, options ...Option) *NBReader {
	dataChan := make(chan []byte)
	r := NBReader{reader: reader, dataChan: dataChan, blockSize: blockSize}
	for _, option := range options {
		option(&r)
	}
	go r.readInput()
	return &r
}

// Read reads data into buffer. It returns the number of bytes read into buffer.
// At EOF, err will be io.EOF. Read() might still have read data when EOF is returned for the first time.
//
// Note: Read() is not safe for concurrent use.
func (r *NBReader) Read(buffer []byte) (int, error) {
	var (
		remaining   time.Duration
		nextTimeout time.Duration
		start       = time.Now()
		lastStart   = time.Now()
	)

	if len(buffer) <= r.buffer.Len() {
		ret, _ := r.buffer.Read(buffer)
		return ret, nil
	}

	if r.isEOF {
		return r.buffer.Read(buffer)
	}

	for r.buffer.Len() < len(buffer) {
		lastStart = time.Now()
		remaining = r.forceTimeout - time.Now().Sub(start)
		if r.chunkTimeout == 0 || r.chunkTimeout > remaining {
			nextTimeout = remaining
		} else {
			nextTimeout = r.chunkTimeout
		}
		_, err := r.readWithTimeout(r.buffer, nextTimeout)
		duration := time.Now().Sub(lastStart)
		if err == errTimeout {
			if duration >= r.chunkTimeout {
				break
			}
		}
		if err == io.EOF {
			r.isEOF = true
			break
		}
		if time.Now().Sub(start) >= r.forceTimeout {
			break
		}
	}
	ret, _ := r.buffer.Read(buffer)
	return ret, nil
}

// readInput is used by a goroutine to read data from the underlying io.Reader
func (r *NBReader) readInput() {
	for {
		tmp := make([]byte, r.blockSize)
		length, err := r.reader.Read(tmp)
		if err != nil {
			break
		}
		r.dataChan <- tmp[0:length]
	}
	close(r.dataChan)
}

// readWithTimeout consumes the data channel filled by readInput() and respects the set timeouts
func (r *NBReader) readWithTimeout(buffer bytes.Buffer, timeout time.Duration) (int, error) {
	select {
	case data, ok := <-r.dataChan:
		r.buffer.Write(data)
		if !ok {
			return len(data), io.EOF
		}
		return len(data), nil
	case <-time.After(timeout):
		return 0, errTimeout
	}
}