Codebase list node-pump / upstream/1.0.2
Import Upstream version 1.0.2 Ying-Chun Liu (PaulLiu) 6 years ago
8 changed file(s) with 291 addition(s) and 0 deletion(s). Raw diff Collapse all Expand all
0 node_modules
0 language: node_js
1 node_js:
2 - "0.10"
3
4 script: "npm test"
0 The MIT License (MIT)
1
2 Copyright (c) 2014 Mathias Buus
3
4 Permission is hereby granted, free of charge, to any person obtaining a copy
5 of this software and associated documentation files (the "Software"), to deal
6 in the Software without restriction, including without limitation the rights
7 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 copies of the Software, and to permit persons to whom the Software is
9 furnished to do so, subject to the following conditions:
10
11 The above copyright notice and this permission notice shall be included in
12 all copies or substantial portions of the Software.
13
14 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20 THE SOFTWARE.
0 # pump
1
2 pump is a small node module that pipes streams together and destroys all of them if one of them closes.
3
4 ```
5 npm install pump
6 ```
7
8 [![build status](http://img.shields.io/travis/mafintosh/pump.svg?style=flat)](http://travis-ci.org/mafintosh/pump)
9
10 ## What problem does it solve?
11
12 When using standard `source.pipe(dest)` source will _not_ be destroyed if dest emits close or an error.
13 You are also not able to provide a callback to tell when then pipe has finished.
14
15 pump does these two things for you
16
17 ## Usage
18
19 Simply pass the streams you want to pipe together to pump and add an optional callback
20
21 ``` js
22 var pump = require('pump')
23 var fs = require('fs')
24
25 var source = fs.createReadStream('/dev/random')
26 var dest = fs.createWriteStream('/dev/null')
27
28 pump(source, dest, function(err) {
29 console.log('pipe finished', err)
30 })
31
32 setTimeout(function() {
33 dest.destroy() // when dest is closed pump will destroy source
34 }, 1000)
35 ```
36
37 You can use pump to pipe more than two streams together as well
38
39 ``` js
40 var transform = someTransformStream()
41
42 pump(source, transform, anotherTransform, dest, function(err) {
43 console.log('pipe finished', err)
44 })
45 ```
46
47 If `source`, `transform`, `anotherTransform` or `dest` closes all of them will be destroyed.
48
49 ## License
50
51 MIT
52
53 ## Related
54
55 `pump` is part of the [mississippi stream utility collection](https://github.com/maxogden/mississippi) which includes more useful stream modules similar to this one.
0 var once = require('once')
1 var eos = require('end-of-stream')
2 var fs = require('fs') // we only need fs to get the ReadStream and WriteStream prototypes
3
4 var noop = function () {}
5
6 var isFn = function (fn) {
7 return typeof fn === 'function'
8 }
9
10 var isFS = function (stream) {
11 if (!fs) return false // browser
12 return (stream instanceof (fs.ReadStream || noop) || stream instanceof (fs.WriteStream || noop)) && isFn(stream.close)
13 }
14
15 var isRequest = function (stream) {
16 return stream.setHeader && isFn(stream.abort)
17 }
18
19 var destroyer = function (stream, reading, writing, callback) {
20 callback = once(callback)
21
22 var closed = false
23 stream.on('close', function () {
24 closed = true
25 })
26
27 eos(stream, {readable: reading, writable: writing}, function (err) {
28 if (err) return callback(err)
29 closed = true
30 callback()
31 })
32
33 var destroyed = false
34 return function (err) {
35 if (closed) return
36 if (destroyed) return
37 destroyed = true
38
39 if (isFS(stream)) return stream.close() // use close for fs streams to avoid fd leaks
40 if (isRequest(stream)) return stream.abort() // request.destroy just do .end - .abort is what we want
41
42 if (isFn(stream.destroy)) return stream.destroy()
43
44 callback(err || new Error('stream was destroyed'))
45 }
46 }
47
48 var call = function (fn) {
49 fn()
50 }
51
52 var pipe = function (from, to) {
53 return from.pipe(to)
54 }
55
56 var pump = function () {
57 var streams = Array.prototype.slice.call(arguments)
58 var callback = isFn(streams[streams.length - 1] || noop) && streams.pop() || noop
59
60 if (Array.isArray(streams[0])) streams = streams[0]
61 if (streams.length < 2) throw new Error('pump requires two streams per minimum')
62
63 var error
64 var destroys = streams.map(function (stream, i) {
65 var reading = i < streams.length - 1
66 var writing = i > 0
67 return destroyer(stream, reading, writing, function (err) {
68 if (!error) error = err
69 if (err) destroys.forEach(call)
70 if (reading) return
71 destroys.forEach(call)
72 callback(error)
73 })
74 })
75
76 return streams.reduce(pipe)
77 }
78
79 module.exports = pump
0 {
1 "name": "pump",
2 "version": "1.0.2",
3 "repository": "git://github.com/mafintosh/pump.git",
4 "license": "MIT",
5 "description": "pipe streams together and close all of them if one of them closes",
6 "browser": {
7 "fs": false
8 },
9 "keywords": [
10 "streams",
11 "pipe",
12 "destroy",
13 "callback"
14 ],
15 "author": "Mathias Buus Madsen <mathiasbuus@gmail.com>",
16 "dependencies": {
17 "end-of-stream": "^1.1.0",
18 "once": "^1.3.1"
19 },
20 "scripts": {
21 "test": "node test.js"
22 }
23 }
0 var stream = require('stream')
1 var pump = require('./index')
2
3 var rs = new stream.Readable()
4 var ws = new stream.Writable()
5
6 rs._read = function (size) {
7 this.push(Buffer(size).fill('abc'))
8 }
9
10 ws._write = function (chunk, encoding, cb) {
11 setTimeout(function () {
12 cb()
13 }, 100)
14 }
15
16 var toHex = function () {
17 var reverse = new (require('stream').Transform)()
18
19 reverse._transform = function (chunk, enc, callback) {
20 reverse.push(chunk.toString('hex'))
21 callback()
22 }
23
24 return reverse
25 }
26
27 var wsClosed = false
28 var rsClosed = false
29 var callbackCalled = false
30
31 var check = function () {
32 if (wsClosed && rsClosed && callbackCalled) console.log('done')
33 }
34
35 ws.on('finish', function () {
36 wsClosed = true
37 check()
38 })
39
40 rs.on('end', function () {
41 rsClosed = true
42 check()
43 })
44
45 pump(rs, toHex(), toHex(), toHex(), ws, function () {
46 callbackCalled = true
47 check()
48 })
49
50 setTimeout(function () {
51 rs.push(null)
52 rs.emit('close')
53 }, 1000)
54
55 setTimeout(function () {
56 if (!check()) throw new Error('timeout')
57 }, 5000)
0 var pump = require('./index')
1
2 var rs = require('fs').createReadStream('/dev/random')
3 var ws = require('fs').createWriteStream('/dev/null')
4
5 var toHex = function () {
6 var reverse = new (require('stream').Transform)()
7
8 reverse._transform = function (chunk, enc, callback) {
9 reverse.push(chunk.toString('hex'))
10 callback()
11 }
12
13 return reverse
14 }
15
16 var wsClosed = false
17 var rsClosed = false
18 var callbackCalled = false
19
20 var check = function () {
21 if (wsClosed && rsClosed && callbackCalled) process.exit(0)
22 }
23
24 ws.on('close', function () {
25 wsClosed = true
26 check()
27 })
28
29 rs.on('close', function () {
30 rsClosed = true
31 check()
32 })
33
34 pump(rs, toHex(), toHex(), toHex(), ws, function () {
35 callbackCalled = true
36 check()
37 })
38
39 setTimeout(function () {
40 rs.destroy()
41 }, 1000)
42
43 setTimeout(function () {
44 throw new Error('timeout')
45 }, 5000)