Codebase list golang-github-htcat-htcat / f466e835-e61f-4240-901a-70e8fcbeb530/main http.go
f466e835-e61f-4240-901a-70e8fcbeb530/main

Tree @f466e835-e61f-4240-901a-70e8fcbeb530/main (Download .tar.gz)

http.go @f466e835-e61f-4240-901a-70e8fcbeb530/mainraw · history · blame

package htcat

import (
	"bufio"
	"fmt"
	"io"
	"net/http"
	"net/url"
	"strconv"
	"sync"
)

const (
	_        = iota
	kB int64 = 1 << (10 * iota)
	mB
	gB
	tB
	pB
	eB
)

type HtCat struct {
	io.WriterTo
	d  defrag
	u  *url.URL
	cl *http.Client

	// Protect httpFragGen with a Mutex.
	httpFragGenMu sync.Mutex
	hfg           httpFragGen
}

type HttpStatusError struct {
	error
	Status string
}

func (cat *HtCat) startup(parallelism int) {
	req := http.Request{
		Method:     "GET",
		URL:        cat.u,
		Proto:      "HTTP/1.1",
		ProtoMajor: 1,
		ProtoMinor: 1,
		Body:       nil,
		Host:       cat.u.Host,
	}

	resp, err := cat.cl.Do(&req)
	if err != nil {
		go cat.d.cancel(err)
		return
	}

	// Check for non-200 OK response codes from the startup-GET.
	if resp.StatusCode != 200 {
		err = HttpStatusError{
			error: fmt.Errorf(
				"Expected HTTP Status 200, received: %q",
				resp.Status),
			Status: resp.Status}
		go cat.d.cancel(err)
		return
	}

	l := resp.Header.Get("Content-Length")

	// Some kinds of small or indeterminate-length files will
	// receive no parallelism.  This procedure helps prepare the
	// HtCat value for a one-HTTP-Request GET.
	noParallel := func(wtc writerToCloser) {
		f := cat.d.nextFragment()
		cat.d.setLast(cat.d.lastAllocated())
		f.contents = wtc
		cat.d.register(f)
	}

	if l == "" {
		// No Content-Length, stream without parallelism nor
		// assumptions about the length of the stream.
		go noParallel(struct {
			io.WriterTo
			io.Closer
		}{
			WriterTo: bufio.NewReader(resp.Body),
			Closer:   resp.Body,
		})
		return
	}

	length, err := strconv.ParseInt(l, 10, 64)
	if err != nil {
		// Invalid integer for Content-Length, defer reporting
		// the error until a WriteTo call is made.
		go cat.d.cancel(err)
		return
	}

	// Set up httpFrag generator state.
	cat.hfg.totalSize = length
	cat.hfg.targetFragSize = 1 + ((length - 1) / int64(parallelism))
	if cat.hfg.targetFragSize > 20*mB {
		cat.hfg.targetFragSize = 20 * mB
	}

	// Very small fragments are probably not worthwhile to start
	// up new requests for, but it in this case it was possible to
	// ascertain the size, so take advantage of that to start
	// reading in the background as eagerly as possible.
	if cat.hfg.targetFragSize < 1*mB {
		cat.hfg.curPos = cat.hfg.totalSize
		er := newEagerReader(resp.Body, cat.hfg.totalSize)
		go noParallel(er)
		go er.WaitClosed()
		return
	}

	// None of the other special short-circuit cases have been
	// triggered, so begin preparation for full-blown parallel
	// GET.  One GET worker is started here to take advantage of
	// the already pending response (which has no determinate
	// length, so it must be limited).
	hf := cat.nextFragment()
	go func() {
		er := newEagerReader(
			struct {
				io.Reader
				io.Closer
			}{
				Reader: io.LimitReader(resp.Body, hf.size),
				Closer: resp.Body,
			},
			hf.size)

		hf.fragment.contents = er
		cat.d.register(hf.fragment)
		er.WaitClosed()

		// Chain into being a regular worker, having finished
		// the special start-up segment.
		cat.get()
	}()
}

func New(client *http.Client, u *url.URL, parallelism int) *HtCat {
	cat := HtCat{
		u:  u,
		cl: client,
	}

	cat.d.initDefrag()
	cat.WriterTo = &cat.d
	cat.startup(parallelism)

	if cat.hfg.curPos == cat.hfg.totalSize {
		return &cat
	}

	// Start background workers.
	//
	// "startup" starts one worker that is specially constructed
	// to deal with the first request, so back off by one to
	// prevent performing with too much parallelism.
	for i := 1; i < parallelism; i += 1 {
		go cat.get()
	}

	return &cat
}

func (cat *HtCat) nextFragment() *httpFrag {
	cat.httpFragGenMu.Lock()
	defer cat.httpFragGenMu.Unlock()

	var hf *httpFrag

	if cat.hfg.hasNext() {
		f := cat.d.nextFragment()
		hf = cat.hfg.nextFragment(f)
	} else {
		cat.d.setLast(cat.d.lastAllocated())
	}

	return hf
}

func (cat *HtCat) get() {
	for {
		hf := cat.nextFragment()
		if hf == nil {
			return
		}

		req := http.Request{
			Method:     "GET",
			URL:        cat.u,
			Proto:      "HTTP/1.1",
			ProtoMajor: 1,
			ProtoMinor: 1,
			Header:     hf.header,
			Body:       nil,
			Host:       cat.u.Host,
		}

		resp, err := cat.cl.Do(&req)
		if err != nil {
			cat.d.cancel(err)
			return
		}

		// Check for an acceptable HTTP status code.
		if !(resp.StatusCode == 206 || resp.StatusCode == 200) {
			err = HttpStatusError{
				error: fmt.Errorf("Expected HTTP Status "+
					"206 or 200, received: %q",
					resp.Status),
				Status: resp.Status}
			go cat.d.cancel(err)
			return
		}

		er := newEagerReader(resp.Body, hf.size)
		hf.fragment.contents = er
		cat.d.register(hf.fragment)
		er.WaitClosed()
	}
}