Update upstream source from tag 'upstream/1.2.5'
Update to upstream version '1.2.5'
with Debian dir 69279437bf92bb75f797d8ae4c1d48bd3bb704f0
Reinhard Tartler
3 years ago
0 | 0 | language: go |
1 | ||
2 | sudo: false | |
3 | 1 | |
4 | 2 | os: |
5 | 3 | - linux |
6 | 4 | - osx |
7 | 5 | |
8 | 6 | go: |
9 | - 1.9.x | |
10 | - 1.10.x | |
7 | - 1.13.x | |
8 | - 1.14.x | |
9 | - 1.15.x | |
11 | 10 | - master |
12 | 11 | |
13 | script: | |
14 | - go test -v -cpu=1,2,4 . | |
15 | - go test -v -cpu=2 -race -short . | |
12 | env: | |
13 | - GO111MODULE=off | |
14 | ||
15 | script: | |
16 | - diff <(gofmt -d .) <(printf "") | |
17 | - go test -v -cpu=1,2,4 . | |
18 | - go test -v -cpu=2 -race -short . | |
16 | 19 | |
17 | 20 | matrix: |
18 | 21 | allow_failures: |
0 | The MIT License (MIT) | |
0 | MIT License | |
1 | 1 | |
2 | 2 | Copyright (c) 2014 Klaus Post |
3 | 3 | |
18 | 18 | LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
19 | 19 | OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE |
20 | 20 | SOFTWARE. |
21 |
38 | 38 | |
39 | 39 | ``` |
40 | 40 | go get -u github.com/klauspost/compress |
41 | go get -u github.com/klauspost/crc32 | |
42 | 41 | ``` |
43 | 42 | |
44 | 43 | Usage |
64 | 63 | ## Compression |
65 | 64 | The simplest way to use this is to simply do the same as you would when using [compress/gzip](http://golang.org/pkg/compress/gzip). |
66 | 65 | |
67 | To change the block size, use the added (*pgzip.Writer).SetConcurrency(blockSize, blocks int) function. With this you can control the approximate size of your blocks, as well as how many you want to be processing in parallel. Default values for this is SetConcurrency(250000, 16), meaning blocks are split at 250000 bytes and up to 16 blocks can be processing at once before the writer blocks. | |
66 | To change the block size, use the added (*pgzip.Writer).SetConcurrency(blockSize, blocks int) function. With this you can control the approximate size of your blocks, as well as how many you want to be processing in parallel. Default values for this is SetConcurrency(1MB, runtime.GOMAXPROCS(0)), meaning blocks are split at 1 MB and up to the number of CPU threads blocks can be processing at once before the writer blocks. | |
68 | 67 | |
69 | 68 | |
70 | 69 | Example: |
98 | 97 | |
99 | 98 | Compression cost is usually about 0.2% with default settings with a block size of 250k. |
100 | 99 | |
101 | Example with GOMAXPROC set to 8 (quad core with 8 hyperthreads) | |
100 | Example with GOMAXPROC set to 32 (16 core CPU) | |
102 | 101 | |
103 | 102 | Content is [Matt Mahoneys 10GB corpus](http://mattmahoney.net/dc/10gb.html). Compression level 6. |
104 | 103 | |
105 | 104 | Compressor | MB/sec | speedup | size | size overhead (lower=better) |
106 | 105 | ------------|----------|---------|------|--------- |
107 | [gzip](http://golang.org/pkg/compress/gzip) (golang) | 7.21MB/s | 1.0x | 4786608902 | 0% | |
108 | [gzip](http://github.com/klauspost/compress/gzip) (klauspost) | 10.98MB/s | 1.52x | 4781331645 | -0.11% | |
109 | [pgzip](https://github.com/klauspost/pgzip) (klauspost) | 50.76MB/s|7.04x | 4784121440 | -0.052% | |
110 | [bgzf](https://godoc.org/github.com/biogo/hts/bgzf) (biogo) | 38.65MB/s | 5.36x | 4924899484 | 2.889% | |
111 | [pargzip](https://godoc.org/github.com/golang/build/pargzip) (builder) | 32.00MB/s | 4.44x | 4791226567 | 0.096% | |
106 | [gzip](http://golang.org/pkg/compress/gzip) (golang) | 15.44MB/s (1 thread) | 1.0x | 4781329307 | 0% | |
107 | [gzip](http://github.com/klauspost/compress/gzip) (klauspost) | 135.04MB/s (1 thread) | 8.74x | 4894858258 | +2.37% | |
108 | [pgzip](https://github.com/klauspost/pgzip) (klauspost) | 1573.23MB/s| 101.9x | 4902285651 | +2.53% | |
109 | [bgzf](https://godoc.org/github.com/biogo/hts/bgzf) (biogo) | 361.40MB/s | 23.4x | 4869686090 | +1.85% | |
110 | [pargzip](https://godoc.org/github.com/golang/build/pargzip) (builder) | 306.01MB/s | 19.8x | 4786890417 | +0.12% | |
112 | 111 | |
113 | pgzip also contains a [linear time compression](https://github.com/klauspost/compress#linear-time-compression) mode, that will allow compression at ~150MB per core per second, independent of the content. | |
112 | pgzip also contains a [linear time compression](https://github.com/klauspost/compress#linear-time-compression-huffman-only) mode, that will allow compression at ~250MB per core per second, independent of the content. | |
114 | 113 | |
115 | 114 | See the [complete sheet](https://docs.google.com/spreadsheets/d/1nuNE2nPfuINCZJRMt6wFWhKpToF95I47XjSsc-1rbPQ/edit?usp=sharing) for different content types and compression settings. |
116 | 115 |
330 | 330 | // Wait for decompressor to be closed and return error, if any. |
331 | 331 | e, ok := <-z.closeErr |
332 | 332 | z.activeRA = false |
333 | ||
334 | for blk := range z.readAhead { | |
335 | if blk.b != nil { | |
336 | z.blockPool <- blk.b | |
337 | } | |
338 | } | |
339 | if cap(z.current) > 0 { | |
340 | z.blockPool <- z.current | |
341 | z.current = nil | |
342 | } | |
333 | 343 | if !ok { |
334 | 344 | // Channel is closed, so if there was any error it has already been returned. |
335 | 345 | return nil |
417 | 427 | case z.readAhead <- read{b: buf, err: err}: |
418 | 428 | case <-closeReader: |
419 | 429 | // Sent on close, we don't care about the next results |
430 | z.blockPool <- buf | |
420 | 431 | return |
421 | 432 | } |
422 | 433 | if err != nil { |
10 | 10 | "io" |
11 | 11 | "io/ioutil" |
12 | 12 | "os" |
13 | "runtime/pprof" | |
13 | 14 | "strings" |
14 | 15 | "testing" |
15 | 16 | "time" |
330 | 331 | } |
331 | 332 | } |
332 | 333 | |
334 | func TestDecompressorReset(t *testing.T) { | |
335 | b := new(bytes.Buffer) | |
336 | var gzip *Reader | |
337 | ||
338 | for _, tt := range gunzipTests { | |
339 | in := bytes.NewReader(tt.gzip) | |
340 | if gzip == nil { | |
341 | var err error | |
342 | gzip, err = NewReader(in) | |
343 | if err != nil { | |
344 | t.Fatalf("NewReader: %s", err) | |
345 | } | |
346 | defer gzip.Close() | |
347 | } else { | |
348 | err := gzip.Reset(in) | |
349 | if err != nil { | |
350 | t.Errorf("%s: Reset: %s", tt.name, err) | |
351 | continue | |
352 | } | |
353 | } | |
354 | if tt.name != gzip.Name { | |
355 | t.Errorf("%s: got name %s", tt.name, gzip.Name) | |
356 | } | |
357 | b.Reset() | |
358 | ||
359 | n, err := io.Copy(b, gzip) | |
360 | if err != tt.err { | |
361 | t.Errorf("%s: io.Copy: %v want %v", tt.name, err, tt.err) | |
362 | } | |
363 | s := b.String() | |
364 | if s != tt.raw { | |
365 | t.Errorf("%s: got %d-byte %q want %d-byte %q", tt.name, n, s, len(tt.raw), tt.raw) | |
366 | } | |
367 | ||
368 | // Test Reader Reset. | |
369 | in = bytes.NewReader(tt.gzip) | |
370 | err = gzip.Reset(in) | |
371 | if err != nil { | |
372 | t.Errorf("%s: Reset: %s", tt.name, err) | |
373 | continue | |
374 | } | |
375 | if tt.name != gzip.Name { | |
376 | t.Errorf("%s: got name %s", tt.name, gzip.Name) | |
377 | } | |
378 | b.Reset() | |
379 | n, err = io.Copy(b, gzip) | |
380 | if err != tt.err { | |
381 | t.Errorf("%s: io.Copy: %v want %v", tt.name, err, tt.err) | |
382 | } | |
383 | s = b.String() | |
384 | if s != tt.raw { | |
385 | t.Errorf("%s: got %d-byte %q want %d-byte %q", tt.name, n, s, len(tt.raw), tt.raw) | |
386 | } | |
387 | } | |
388 | } | |
389 | ||
390 | func TestDecompressorResetNoRead(t *testing.T) { | |
391 | done := make(chan struct{}) | |
392 | defer close(done) | |
393 | go func() { | |
394 | select { | |
395 | // Typical runtime is 2-3s, so we add an order of magnitude. | |
396 | case <-time.After(30 * time.Second): | |
397 | pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) | |
398 | case <-done: | |
399 | } | |
400 | }() | |
401 | in, err := ioutil.ReadFile("testdata/bigempty.gz") | |
402 | if err != nil { | |
403 | t.Fatal(err) | |
404 | } | |
405 | gz, err := NewReader(bytes.NewBuffer(in)) | |
406 | if err != nil { | |
407 | t.Fatal(err) | |
408 | } | |
409 | for i := 0; i < 100; i++ { | |
410 | if testing.Short() && i > 10 { | |
411 | break | |
412 | } | |
413 | err := gz.Reset(bytes.NewBuffer(in)) | |
414 | if err != nil { | |
415 | t.Fatal(i, err) | |
416 | } | |
417 | // Read 100KB, ignore the rest | |
418 | lr := io.LimitedReader{N: 100 << 10, R: gz} | |
419 | _, err = io.Copy(ioutil.Discard, &lr) | |
420 | if err != nil { | |
421 | t.Fatal(i, err) | |
422 | } | |
423 | } | |
424 | } | |
425 | ||
333 | 426 | func TestIssue6550(t *testing.T) { |
334 | 427 | f, err := os.Open("testdata/issue6550.gz") |
335 | 428 | if err != nil { |
636 | 729 | rand.Read(in) |
637 | 730 | var buf bytes.Buffer |
638 | 731 | for i := 0; i < len(in); i += 512 { |
639 | enc,_ := kpgzip.NewWriterLevel(&buf, 0) | |
732 | enc, _ := kpgzip.NewWriterLevel(&buf, 0) | |
640 | 733 | _, err := enc.Write(in[:i]) |
641 | 734 | if err != nil { |
642 | 735 | t.Fatal(err) |
10 | 10 | "hash" |
11 | 11 | "hash/crc32" |
12 | 12 | "io" |
13 | "runtime" | |
13 | 14 | "sync" |
14 | 15 | "time" |
15 | 16 | |
17 | 18 | ) |
18 | 19 | |
19 | 20 | const ( |
20 | defaultBlockSize = 256 << 10 | |
21 | defaultBlockSize = 1 << 20 | |
21 | 22 | tailSize = 16384 |
22 | defaultBlocks = 16 | |
23 | defaultBlocks = 4 | |
23 | 24 | ) |
24 | 25 | |
25 | 26 | // These constants are copied from the flate package, so that code that imports |
67 | 68 | // With this you can control the approximate size of your blocks, |
68 | 69 | // as well as how many you want to be processing in parallel. |
69 | 70 | // |
70 | // Default values for this is SetConcurrency(250000, 16), | |
71 | // meaning blocks are split at 250000 bytes and up to 16 blocks | |
71 | // Default values for this is SetConcurrency(defaultBlockSize, runtime.GOMAXPROCS(0)), | |
72 | // meaning blocks are split at 1 MB and up to the number of CPU threads | |
72 | 73 | // can be processing at once before the writer blocks. |
73 | 74 | func (z *Writer) SetConcurrency(blockSize, blocks int) error { |
74 | 75 | if blockSize <= tailSize { |
83 | 84 | z.blockSize = blockSize |
84 | 85 | z.results = make(chan result, blocks) |
85 | 86 | z.blocks = blocks |
86 | z.dstPool = sync.Pool{New: func() interface{} { return make([]byte, 0, blockSize+(blockSize)>>4) }} | |
87 | z.dstPool.New = func() interface{} { return make([]byte, 0, blockSize+(blockSize)>>4) } | |
87 | 88 | return nil |
88 | 89 | } |
89 | 90 | |
114 | 115 | return nil, fmt.Errorf("gzip: invalid compression level: %d", level) |
115 | 116 | } |
116 | 117 | z := new(Writer) |
117 | z.SetConcurrency(defaultBlockSize, defaultBlocks) | |
118 | z.SetConcurrency(defaultBlockSize, runtime.GOMAXPROCS(0)) | |
118 | 119 | z.init(w, level) |
119 | 120 | return z, nil |
120 | 121 | } |
173 | 174 | if z.results != nil && !z.closed { |
174 | 175 | close(z.results) |
175 | 176 | } |
176 | z.SetConcurrency(defaultBlockSize, defaultBlocks) | |
177 | z.SetConcurrency(defaultBlockSize, runtime.GOMAXPROCS(0)) | |
177 | 178 | z.init(w, z.level) |
178 | 179 | } |
179 | 180 | |
238 | 239 | // compressCurrent will compress the data currently buffered |
239 | 240 | // This should only be called from the main writer/flush/closer |
240 | 241 | func (z *Writer) compressCurrent(flush bool) { |
242 | c := z.currentBuffer | |
243 | if len(c) > z.blockSize { | |
244 | // This can never happen through the public interface. | |
245 | panic("len(z.currentBuffer) > z.blockSize (most likely due to concurrent Write race)") | |
246 | } | |
247 | ||
241 | 248 | r := result{} |
242 | 249 | r.result = make(chan []byte, 1) |
243 | 250 | r.notifyWritten = make(chan struct{}, 0) |
251 | // Reserve a result slot | |
244 | 252 | select { |
245 | 253 | case z.results <- r: |
246 | 254 | case <-z.pushedErr: |
247 | 255 | return |
248 | 256 | } |
249 | 257 | |
250 | // If block given is more than twice the block size, split it. | |
251 | c := z.currentBuffer | |
252 | if len(c) > z.blockSize*2 { | |
253 | c = c[:z.blockSize] | |
254 | z.wg.Add(1) | |
255 | go z.compressBlock(c, z.prevTail, r, false) | |
256 | z.prevTail = c[len(c)-tailSize:] | |
257 | z.currentBuffer = z.currentBuffer[z.blockSize:] | |
258 | z.compressCurrent(flush) | |
259 | // Last one flushes if needed | |
260 | return | |
261 | } | |
262 | ||
263 | 258 | z.wg.Add(1) |
264 | go z.compressBlock(c, z.prevTail, r, z.closed) | |
259 | tail := z.prevTail | |
265 | 260 | if len(c) > tailSize { |
266 | z.prevTail = c[len(c)-tailSize:] | |
261 | buf := z.dstPool.Get().([]byte) // Put in .compressBlock | |
262 | // Copy tail from current buffer before handing the buffer over to the | |
263 | // compressBlock goroutine. | |
264 | buf = append(buf[:0], c[len(c)-tailSize:]...) | |
265 | z.prevTail = buf | |
267 | 266 | } else { |
268 | 267 | z.prevTail = nil |
269 | 268 | } |
270 | z.currentBuffer = z.dstPool.Get().([]byte) | |
269 | go z.compressBlock(c, tail, r, z.closed) | |
270 | ||
271 | z.currentBuffer = z.dstPool.Get().([]byte) // Put in .compressBlock | |
271 | 272 | z.currentBuffer = z.currentBuffer[:0] |
272 | 273 | |
273 | 274 | // Wait if flushing |
357 | 358 | // Start receiving data from compressors |
358 | 359 | go func() { |
359 | 360 | listen := z.results |
361 | var failed bool | |
360 | 362 | for { |
361 | 363 | r, ok := <-listen |
362 | 364 | // If closed, we are finished. |
363 | 365 | if !ok { |
364 | 366 | return |
365 | 367 | } |
368 | if failed { | |
369 | close(r.notifyWritten) | |
370 | continue | |
371 | } | |
366 | 372 | buf := <-r.result |
367 | 373 | n, err := z.w.Write(buf) |
368 | 374 | if err != nil { |
369 | 375 | z.pushError(err) |
370 | 376 | close(r.notifyWritten) |
371 | return | |
377 | failed = true | |
378 | continue | |
372 | 379 | } |
373 | 380 | if n != len(buf) { |
374 | 381 | z.pushError(fmt.Errorf("gzip: short write %d should be %d", n, len(buf))) |
382 | failed = true | |
375 | 383 | close(r.notifyWritten) |
376 | return | |
384 | continue | |
377 | 385 | } |
378 | 386 | z.dstPool.Put(buf) |
379 | 387 | close(r.notifyWritten) |
380 | 388 | } |
381 | 389 | }() |
382 | z.currentBuffer = make([]byte, 0, z.blockSize) | |
390 | z.currentBuffer = z.dstPool.Get().([]byte) | |
391 | z.currentBuffer = z.currentBuffer[:0] | |
383 | 392 | } |
384 | 393 | q := p |
385 | 394 | for len(q) > 0 { |
389 | 398 | } |
390 | 399 | z.digest.Write(q[:length]) |
391 | 400 | z.currentBuffer = append(z.currentBuffer, q[:length]...) |
392 | if len(z.currentBuffer) >= z.blockSize { | |
401 | if len(z.currentBuffer) > z.blockSize { | |
402 | panic("z.currentBuffer too large (most likely due to concurrent Write race)") | |
403 | } | |
404 | if len(z.currentBuffer) == z.blockSize { | |
393 | 405 | z.compressCurrent(false) |
394 | 406 | if err := z.checkError(); err != nil { |
395 | return len(p) - len(q) - length, err | |
407 | return len(p) - len(q), err | |
396 | 408 | } |
397 | 409 | } |
398 | 410 | z.size += length |
409 | 421 | close(r.result) |
410 | 422 | z.wg.Done() |
411 | 423 | }() |
412 | buf := z.dstPool.Get().([]byte) | |
424 | buf := z.dstPool.Get().([]byte) // Corresponding Put in .Write's result writer | |
413 | 425 | dest := bytes.NewBuffer(buf[:0]) |
414 | 426 | |
415 | compressor := z.dictFlatePool.Get().(*flate.Writer) | |
427 | compressor := z.dictFlatePool.Get().(*flate.Writer) // Put below | |
416 | 428 | compressor.ResetDict(dest, prevTail) |
417 | 429 | compressor.Write(p) |
430 | z.dstPool.Put(p) // Corresponding Get in .Write and .compressCurrent | |
418 | 431 | |
419 | 432 | err := compressor.Flush() |
420 | 433 | if err != nil { |
428 | 441 | return |
429 | 442 | } |
430 | 443 | } |
431 | z.dictFlatePool.Put(compressor) | |
444 | z.dictFlatePool.Put(compressor) // Get above | |
445 | ||
446 | if prevTail != nil { | |
447 | z.dstPool.Put(prevTail) // Get in .compressCurrent | |
448 | } | |
449 | ||
432 | 450 | // Read back buffer |
433 | 451 | buf = dest.Bytes() |
434 | 452 | r.result <- buf |
0 | // These tests are skipped when the race detector (-race) is on | |
1 | // +build !race | |
2 | ||
3 | package pgzip | |
4 | ||
5 | import ( | |
6 | "bytes" | |
7 | "io/ioutil" | |
8 | "runtime" | |
9 | "runtime/debug" | |
10 | "testing" | |
11 | ) | |
12 | ||
13 | // Test that the sync.Pools are working properly and we are not leaking buffers | |
14 | // Disabled with -race, because the race detector allocates a lot of memory | |
15 | func TestAllocations(t *testing.T) { | |
16 | ||
17 | w := NewWriter(ioutil.Discard) | |
18 | w.SetConcurrency(100000, 10) | |
19 | data := bytes.Repeat([]byte("TEST"), 41234) // varying block splits | |
20 | ||
21 | // Prime the pool to do initial allocs | |
22 | for i := 0; i < 10; i++ { | |
23 | _, _ = w.Write(data) | |
24 | } | |
25 | _ = w.Flush() | |
26 | ||
27 | allocBytes := allocBytesPerRun(1000, func() { | |
28 | _, _ = w.Write(data) | |
29 | }) | |
30 | t.Logf("Allocated %.0f bytes per Write on average", allocBytes) | |
31 | ||
32 | // Locally it still allocates 660 bytes, which can probably be further reduced, | |
33 | // but it's better than the 175846 bytes before the pool release fix this tests. | |
34 | // TODO: Further reduce allocations | |
35 | if allocBytes > 10240 { | |
36 | t.Errorf("Write allocated too much memory per run (%.0f bytes), Pool used incorrectly?", allocBytes) | |
37 | } | |
38 | } | |
39 | ||
40 | // allocBytesPerRun returns the average total size of allocations during calls to f. | |
41 | // The return value is in bytes. | |
42 | // | |
43 | // To compute the number of allocations, the function will first be run once as | |
44 | // a warm-up. The average total size of allocations over the specified number of | |
45 | // runs will then be measured and returned. | |
46 | // | |
47 | // AllocBytesPerRun sets GOMAXPROCS to 1 during its measurement and will restore | |
48 | // it before returning. | |
49 | // | |
50 | // This function is based on testing.AllocsPerRun, which counts the number of | |
51 | // allocations instead of the total size of them in bytes. | |
52 | func allocBytesPerRun(runs int, f func()) (avg float64) { | |
53 | defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(1)) | |
54 | // Disable garbage collector, because it could clear our pools during the run | |
55 | oldGCPercent := debug.SetGCPercent(-1) | |
56 | defer debug.SetGCPercent(oldGCPercent) | |
57 | ||
58 | // Warm up the function | |
59 | f() | |
60 | ||
61 | // Measure the starting statistics | |
62 | var memstats runtime.MemStats | |
63 | runtime.ReadMemStats(&memstats) | |
64 | oldTotal := memstats.TotalAlloc | |
65 | ||
66 | // Run the function the specified number of times | |
67 | for i := 0; i < runs; i++ { | |
68 | f() | |
69 | } | |
70 | ||
71 | // Read the final statistics | |
72 | runtime.ReadMemStats(&memstats) | |
73 | allocs := memstats.TotalAlloc - oldTotal | |
74 | ||
75 | // Average the mallocs over the runs (not counting the warm-up). | |
76 | // We are forced to return a float64 because the API is silly, but do | |
77 | // the division as integers so we can ask if AllocsPerRun()==1 | |
78 | // instead of AllocsPerRun()<2. | |
79 | return float64(allocs / uint64(runs)) | |
80 | } |
10 | 10 | "io" |
11 | 11 | "io/ioutil" |
12 | 12 | "math/rand" |
13 | "strconv" | |
13 | 14 | "sync" |
14 | 15 | "testing" |
15 | 16 | "time" |
510 | 511 | |
511 | 512 | } |
512 | 513 | |
513 | // A writer that fails after N writes. | |
514 | // A writer that fails after N bytes. | |
514 | 515 | type errorWriter2 struct { |
515 | 516 | N int |
516 | 517 | } |
517 | 518 | |
518 | 519 | func (e *errorWriter2) Write(b []byte) (int, error) { |
520 | e.N -= len(b) | |
519 | 521 | if e.N <= 0 { |
520 | 522 | return 0, io.ErrClosedPipe |
521 | 523 | } |
522 | e.N-- | |
523 | 524 | return len(b), nil |
524 | 525 | } |
525 | 526 | |
526 | 527 | // Test if errors from the underlying writer is passed upwards. |
527 | 528 | func TestWriteError(t *testing.T) { |
528 | buf := new(bytes.Buffer) | |
529 | n := 65536 | |
529 | n := defaultBlockSize + 1024 | |
530 | 530 | if !testing.Short() { |
531 | 531 | n *= 4 |
532 | 532 | } |
533 | for i := 0; i < n; i++ { | |
534 | fmt.Fprintf(buf, "asdasfasf%d%dfghfgujyut%dyutyu\n", i, i, i) | |
535 | } | |
536 | in := buf.Bytes() | |
533 | // Make it incompressible... | |
534 | in := make([]byte, n+1<<10) | |
535 | io.ReadFull(rand.New(rand.NewSource(0xabad1dea)), in) | |
536 | ||
537 | 537 | // We create our own buffer to control number of writes. |
538 | 538 | copyBuf := make([]byte, 128) |
539 | 539 | for l := 0; l < 10; l++ { |
540 | for fail := 1; fail <= 16; fail *= 2 { | |
541 | // Fail after 'fail' writes | |
542 | ew := &errorWriter2{N: fail} | |
543 | w, err := NewWriterLevel(ew, l) | |
544 | if err != nil { | |
545 | t.Fatalf("NewWriter: level %d: %v", l, err) | |
540 | t.Run("level-"+strconv.Itoa(l), func(t *testing.T) { | |
541 | for fail := 1; fail < n; fail *= 10 { | |
542 | // Fail after 'fail' writes | |
543 | ew := &errorWriter2{N: fail} | |
544 | w, err := NewWriterLevel(ew, l) | |
545 | if err != nil { | |
546 | t.Fatalf("NewWriter: level %d: %v", l, err) | |
547 | } | |
548 | // Set concurrency low enough that errors should propagate. | |
549 | w.SetConcurrency(128<<10, 4) | |
550 | _, err = copyBuffer(w, bytes.NewBuffer(in), copyBuf) | |
551 | if err == nil { | |
552 | t.Errorf("Level %d: Expected an error, writer was %#v", l, ew) | |
553 | } | |
554 | n2, err := w.Write([]byte{1, 2, 2, 3, 4, 5}) | |
555 | if n2 != 0 { | |
556 | t.Error("Level", l, "Expected 0 length write, got", n2) | |
557 | } | |
558 | if err == nil { | |
559 | t.Error("Level", l, "Expected an error") | |
560 | } | |
561 | err = w.Flush() | |
562 | if err == nil { | |
563 | t.Error("Level", l, "Expected an error on flush") | |
564 | } | |
565 | err = w.Close() | |
566 | if err == nil { | |
567 | t.Error("Level", l, "Expected an error on close") | |
568 | } | |
569 | ||
570 | w.Reset(ioutil.Discard) | |
571 | n2, err = w.Write([]byte{1, 2, 3, 4, 5, 6}) | |
572 | if err != nil { | |
573 | t.Error("Level", l, "Got unexpected error after reset:", err) | |
574 | } | |
575 | if n2 == 0 { | |
576 | t.Error("Level", l, "Got 0 length write, expected > 0") | |
577 | } | |
578 | if testing.Short() { | |
579 | return | |
580 | } | |
546 | 581 | } |
547 | n, err := copyBuffer(w, bytes.NewBuffer(in), copyBuf) | |
548 | if err == nil { | |
549 | t.Fatalf("Level %d: Expected an error, writer was %#v", l, ew) | |
550 | } | |
551 | n2, err := w.Write([]byte{1, 2, 2, 3, 4, 5}) | |
552 | if n2 != 0 { | |
553 | t.Fatal("Level", l, "Expected 0 length write, got", n) | |
554 | } | |
555 | if err == nil { | |
556 | t.Fatal("Level", l, "Expected an error") | |
557 | } | |
558 | err = w.Flush() | |
559 | if err == nil { | |
560 | t.Fatal("Level", l, "Expected an error on flush") | |
561 | } | |
562 | err = w.Close() | |
563 | if err == nil { | |
564 | t.Fatal("Level", l, "Expected an error on close") | |
565 | } | |
566 | ||
567 | w.Reset(ioutil.Discard) | |
568 | n2, err = w.Write([]byte{1, 2, 3, 4, 5, 6}) | |
569 | if err != nil { | |
570 | t.Fatal("Level", l, "Got unexpected error after reset:", err) | |
571 | } | |
572 | if n2 == 0 { | |
573 | t.Fatal("Level", l, "Got 0 length write, expected > 0") | |
574 | } | |
575 | if testing.Short() { | |
576 | return | |
577 | } | |
578 | } | |
582 | }) | |
579 | 583 | } |
580 | 584 | } |
581 | 585 |
0 | // These tests are unreliable or only pass under certain conditions. | |
1 | // To run: go test -v -count=1 -cpu=1,2,4,8,16 -tags=unreliable | |
2 | // +build unreliable,!race | |
3 | ||
4 | package pgzip | |
5 | ||
6 | import ( | |
7 | "bytes" | |
8 | "sync" | |
9 | "testing" | |
10 | "time" | |
11 | ) | |
12 | ||
13 | type SlowDiscard time.Duration | |
14 | ||
15 | func (delay SlowDiscard) Write(p []byte) (int, error) { | |
16 | time.Sleep(time.Duration(delay)) | |
17 | return len(p), nil | |
18 | } | |
19 | ||
20 | // Test that the panics catch unsafe concurrent writing (a panic is better than data corruption) | |
21 | // This test is UNRELIABLE and slow. The more concurrency (GOMAXPROCS), the more likely | |
22 | // a race condition will be hit. If GOMAXPROCS=1, the condition is never hit. | |
23 | func TestConcurrentRacePanic(t *testing.T) { | |
24 | w := NewWriter(SlowDiscard(2 * time.Millisecond)) | |
25 | w.SetConcurrency(1000, 1) | |
26 | data := bytes.Repeat([]byte("T"), 100000) // varying block splits | |
27 | ||
28 | const n = 1000 | |
29 | recovered := make(chan string, n) | |
30 | var wg sync.WaitGroup | |
31 | start := make(chan struct{}) | |
32 | for i := 0; i < n; i++ { | |
33 | wg.Add(1) | |
34 | go func() { | |
35 | defer wg.Done() | |
36 | defer func() { | |
37 | s, ok := recover().(string) | |
38 | if ok { | |
39 | recovered <- s | |
40 | t.Logf("Recovered from panic: %s", s) | |
41 | } | |
42 | }() | |
43 | // INCORRECT CONCURRENT USAGE! | |
44 | <-start | |
45 | _, _ = w.Write(data) | |
46 | }() | |
47 | } | |
48 | close(start) // give the start signal | |
49 | ||
50 | timer := time.NewTimer(10 * time.Second) | |
51 | defer timer.Stop() | |
52 | hasPanic := false | |
53 | select { | |
54 | case <-recovered: | |
55 | // OK, expected | |
56 | hasPanic = true | |
57 | case <-timer.C: | |
58 | t.Error("Timout") | |
59 | } | |
60 | wg.Wait() | |
61 | if !hasPanic { | |
62 | t.Error("Expected a panic, but none happened") | |
63 | } | |
64 | } |
Binary diff not shown