Import Upstream version 1.0.2
Ying-Chun Liu (PaulLiu)
6 years ago
0 | node_modules |
0 | The MIT License (MIT) | |
1 | ||
2 | Copyright (c) 2014 Mathias Buus | |
3 | ||
4 | Permission is hereby granted, free of charge, to any person obtaining a copy | |
5 | of this software and associated documentation files (the "Software"), to deal | |
6 | in the Software without restriction, including without limitation the rights | |
7 | to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
8 | copies of the Software, and to permit persons to whom the Software is | |
9 | furnished to do so, subject to the following conditions: | |
10 | ||
11 | The above copyright notice and this permission notice shall be included in | |
12 | all copies or substantial portions of the Software. | |
13 | ||
14 | THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
15 | IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
16 | FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
17 | AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
18 | LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
19 | OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
20 | THE SOFTWARE.⏎ |
0 | # pump | |
1 | ||
2 | pump is a small node module that pipes streams together and destroys all of them if one of them closes. | |
3 | ||
4 | ``` | |
5 | npm install pump | |
6 | ``` | |
7 | ||
8 | [![build status](http://img.shields.io/travis/mafintosh/pump.svg?style=flat)](http://travis-ci.org/mafintosh/pump) | |
9 | ||
10 | ## What problem does it solve? | |
11 | ||
12 | When using standard `source.pipe(dest)` source will _not_ be destroyed if dest emits close or an error. | |
13 | You are also not able to provide a callback to tell when then pipe has finished. | |
14 | ||
15 | pump does these two things for you | |
16 | ||
17 | ## Usage | |
18 | ||
19 | Simply pass the streams you want to pipe together to pump and add an optional callback | |
20 | ||
21 | ``` js | |
22 | var pump = require('pump') | |
23 | var fs = require('fs') | |
24 | ||
25 | var source = fs.createReadStream('/dev/random') | |
26 | var dest = fs.createWriteStream('/dev/null') | |
27 | ||
28 | pump(source, dest, function(err) { | |
29 | console.log('pipe finished', err) | |
30 | }) | |
31 | ||
32 | setTimeout(function() { | |
33 | dest.destroy() // when dest is closed pump will destroy source | |
34 | }, 1000) | |
35 | ``` | |
36 | ||
37 | You can use pump to pipe more than two streams together as well | |
38 | ||
39 | ``` js | |
40 | var transform = someTransformStream() | |
41 | ||
42 | pump(source, transform, anotherTransform, dest, function(err) { | |
43 | console.log('pipe finished', err) | |
44 | }) | |
45 | ``` | |
46 | ||
47 | If `source`, `transform`, `anotherTransform` or `dest` closes all of them will be destroyed. | |
48 | ||
49 | ## License | |
50 | ||
51 | MIT | |
52 | ||
53 | ## Related | |
54 | ||
55 | `pump` is part of the [mississippi stream utility collection](https://github.com/maxogden/mississippi) which includes more useful stream modules similar to this one. |
0 | var once = require('once') | |
1 | var eos = require('end-of-stream') | |
2 | var fs = require('fs') // we only need fs to get the ReadStream and WriteStream prototypes | |
3 | ||
4 | var noop = function () {} | |
5 | ||
6 | var isFn = function (fn) { | |
7 | return typeof fn === 'function' | |
8 | } | |
9 | ||
10 | var isFS = function (stream) { | |
11 | if (!fs) return false // browser | |
12 | return (stream instanceof (fs.ReadStream || noop) || stream instanceof (fs.WriteStream || noop)) && isFn(stream.close) | |
13 | } | |
14 | ||
15 | var isRequest = function (stream) { | |
16 | return stream.setHeader && isFn(stream.abort) | |
17 | } | |
18 | ||
19 | var destroyer = function (stream, reading, writing, callback) { | |
20 | callback = once(callback) | |
21 | ||
22 | var closed = false | |
23 | stream.on('close', function () { | |
24 | closed = true | |
25 | }) | |
26 | ||
27 | eos(stream, {readable: reading, writable: writing}, function (err) { | |
28 | if (err) return callback(err) | |
29 | closed = true | |
30 | callback() | |
31 | }) | |
32 | ||
33 | var destroyed = false | |
34 | return function (err) { | |
35 | if (closed) return | |
36 | if (destroyed) return | |
37 | destroyed = true | |
38 | ||
39 | if (isFS(stream)) return stream.close() // use close for fs streams to avoid fd leaks | |
40 | if (isRequest(stream)) return stream.abort() // request.destroy just do .end - .abort is what we want | |
41 | ||
42 | if (isFn(stream.destroy)) return stream.destroy() | |
43 | ||
44 | callback(err || new Error('stream was destroyed')) | |
45 | } | |
46 | } | |
47 | ||
48 | var call = function (fn) { | |
49 | fn() | |
50 | } | |
51 | ||
52 | var pipe = function (from, to) { | |
53 | return from.pipe(to) | |
54 | } | |
55 | ||
56 | var pump = function () { | |
57 | var streams = Array.prototype.slice.call(arguments) | |
58 | var callback = isFn(streams[streams.length - 1] || noop) && streams.pop() || noop | |
59 | ||
60 | if (Array.isArray(streams[0])) streams = streams[0] | |
61 | if (streams.length < 2) throw new Error('pump requires two streams per minimum') | |
62 | ||
63 | var error | |
64 | var destroys = streams.map(function (stream, i) { | |
65 | var reading = i < streams.length - 1 | |
66 | var writing = i > 0 | |
67 | return destroyer(stream, reading, writing, function (err) { | |
68 | if (!error) error = err | |
69 | if (err) destroys.forEach(call) | |
70 | if (reading) return | |
71 | destroys.forEach(call) | |
72 | callback(error) | |
73 | }) | |
74 | }) | |
75 | ||
76 | return streams.reduce(pipe) | |
77 | } | |
78 | ||
79 | module.exports = pump |
0 | { | |
1 | "name": "pump", | |
2 | "version": "1.0.2", | |
3 | "repository": "git://github.com/mafintosh/pump.git", | |
4 | "license": "MIT", | |
5 | "description": "pipe streams together and close all of them if one of them closes", | |
6 | "browser": { | |
7 | "fs": false | |
8 | }, | |
9 | "keywords": [ | |
10 | "streams", | |
11 | "pipe", | |
12 | "destroy", | |
13 | "callback" | |
14 | ], | |
15 | "author": "Mathias Buus Madsen <mathiasbuus@gmail.com>", | |
16 | "dependencies": { | |
17 | "end-of-stream": "^1.1.0", | |
18 | "once": "^1.3.1" | |
19 | }, | |
20 | "scripts": { | |
21 | "test": "node test.js" | |
22 | } | |
23 | } |
0 | var stream = require('stream') | |
1 | var pump = require('./index') | |
2 | ||
3 | var rs = new stream.Readable() | |
4 | var ws = new stream.Writable() | |
5 | ||
6 | rs._read = function (size) { | |
7 | this.push(Buffer(size).fill('abc')) | |
8 | } | |
9 | ||
10 | ws._write = function (chunk, encoding, cb) { | |
11 | setTimeout(function () { | |
12 | cb() | |
13 | }, 100) | |
14 | } | |
15 | ||
16 | var toHex = function () { | |
17 | var reverse = new (require('stream').Transform)() | |
18 | ||
19 | reverse._transform = function (chunk, enc, callback) { | |
20 | reverse.push(chunk.toString('hex')) | |
21 | callback() | |
22 | } | |
23 | ||
24 | return reverse | |
25 | } | |
26 | ||
27 | var wsClosed = false | |
28 | var rsClosed = false | |
29 | var callbackCalled = false | |
30 | ||
31 | var check = function () { | |
32 | if (wsClosed && rsClosed && callbackCalled) console.log('done') | |
33 | } | |
34 | ||
35 | ws.on('finish', function () { | |
36 | wsClosed = true | |
37 | check() | |
38 | }) | |
39 | ||
40 | rs.on('end', function () { | |
41 | rsClosed = true | |
42 | check() | |
43 | }) | |
44 | ||
45 | pump(rs, toHex(), toHex(), toHex(), ws, function () { | |
46 | callbackCalled = true | |
47 | check() | |
48 | }) | |
49 | ||
50 | setTimeout(function () { | |
51 | rs.push(null) | |
52 | rs.emit('close') | |
53 | }, 1000) | |
54 | ||
55 | setTimeout(function () { | |
56 | if (!check()) throw new Error('timeout') | |
57 | }, 5000) |
0 | var pump = require('./index') | |
1 | ||
2 | var rs = require('fs').createReadStream('/dev/random') | |
3 | var ws = require('fs').createWriteStream('/dev/null') | |
4 | ||
5 | var toHex = function () { | |
6 | var reverse = new (require('stream').Transform)() | |
7 | ||
8 | reverse._transform = function (chunk, enc, callback) { | |
9 | reverse.push(chunk.toString('hex')) | |
10 | callback() | |
11 | } | |
12 | ||
13 | return reverse | |
14 | } | |
15 | ||
16 | var wsClosed = false | |
17 | var rsClosed = false | |
18 | var callbackCalled = false | |
19 | ||
20 | var check = function () { | |
21 | if (wsClosed && rsClosed && callbackCalled) process.exit(0) | |
22 | } | |
23 | ||
24 | ws.on('close', function () { | |
25 | wsClosed = true | |
26 | check() | |
27 | }) | |
28 | ||
29 | rs.on('close', function () { | |
30 | rsClosed = true | |
31 | check() | |
32 | }) | |
33 | ||
34 | pump(rs, toHex(), toHex(), toHex(), ws, function () { | |
35 | callbackCalled = true | |
36 | check() | |
37 | }) | |
38 | ||
39 | setTimeout(function () { | |
40 | rs.destroy() | |
41 | }, 1000) | |
42 | ||
43 | setTimeout(function () { | |
44 | throw new Error('timeout') | |
45 | }, 5000) |