Allow cancellation of promises returned by functions working on promise collections
Jan Sorgalla
8 years ago
13 | 13 | }, |
14 | 14 | "files": ["src/functions_include.php"] |
15 | 15 | }, |
16 | "autoload-dev": { | |
17 | "psr-4": { | |
18 | "React\\Promise\\": "tests/fixtures" | |
19 | } | |
20 | }, | |
16 | 21 | "extra": { |
17 | 22 | "branch-alias": { |
18 | 23 | "dev-master": "2.0-dev" |
0 | <?php | |
1 | ||
2 | namespace React\Promise; | |
3 | ||
4 | class CancellationQueue | |
5 | { | |
6 | private $started = false; | |
7 | private $queue = []; | |
8 | ||
9 | public function __invoke() | |
10 | { | |
11 | if ($this->started) { | |
12 | return; | |
13 | } | |
14 | ||
15 | $this->started = true; | |
16 | $this->drain(); | |
17 | } | |
18 | ||
19 | public function enqueue($promise) | |
20 | { | |
21 | if (!$promise instanceof CancellablePromiseInterface) { | |
22 | return; | |
23 | } | |
24 | ||
25 | $length = array_push($this->queue, $promise); | |
26 | ||
27 | if ($this->started && 1 === $length) { | |
28 | $this->drain(); | |
29 | } | |
30 | } | |
31 | ||
32 | private function drain() | |
33 | { | |
34 | for ($i = key($this->queue); isset($this->queue[$i]); $i++) { | |
35 | /** @var CancellablePromiseInterface $promise */ | |
36 | $promise = $this->queue[$i]; | |
37 | ||
38 | $exception = null; | |
39 | ||
40 | try { | |
41 | $promise->cancel(); | |
42 | } catch (\Exception $exception) { | |
43 | } | |
44 | ||
45 | unset($this->queue[$i]); | |
46 | ||
47 | if ($exception) { | |
48 | throw $exception; | |
49 | } | |
50 | } | |
51 | ||
52 | $this->queue = []; | |
53 | } | |
54 | } |
36 | 36 | |
37 | 37 | function race($promisesOrValues) |
38 | 38 | { |
39 | return resolve($promisesOrValues) | |
40 | ->then(function ($array) { | |
41 | if (!is_array($array) || !$array) { | |
42 | return resolve(); | |
43 | } | |
44 | ||
45 | return new Promise(function ($resolve, $reject, $notify) use ($array) { | |
39 | $cancellationQueue = new CancellationQueue(); | |
40 | $cancellationQueue->enqueue($promisesOrValues); | |
41 | ||
42 | return new Promise(function ($resolve, $reject, $notify) use ($promisesOrValues, $cancellationQueue) { | |
43 | resolve($promisesOrValues) | |
44 | ->done(function ($array) use ($cancellationQueue, $resolve, $reject, $notify) { | |
45 | if (!is_array($array) || !$array) { | |
46 | $resolve(); | |
47 | return; | |
48 | } | |
49 | ||
46 | 50 | foreach ($array as $promiseOrValue) { |
51 | $cancellationQueue->enqueue($promiseOrValue); | |
52 | ||
47 | 53 | resolve($promiseOrValue) |
48 | 54 | ->done($resolve, $reject, $notify); |
49 | 55 | } |
50 | }); | |
51 | }); | |
56 | }, $reject, $notify); | |
57 | }, $cancellationQueue); | |
52 | 58 | } |
53 | 59 | |
54 | 60 | function any($promisesOrValues) |
61 | 67 | |
62 | 68 | function some($promisesOrValues, $howMany) |
63 | 69 | { |
64 | return resolve($promisesOrValues) | |
65 | ->then(function ($array) use ($howMany) { | |
66 | if (!is_array($array) || !$array || $howMany < 1) { | |
67 | return resolve([]); | |
68 | } | |
69 | ||
70 | return new Promise(function ($resolve, $reject, $notify) use ($array, $howMany) { | |
70 | $cancellationQueue = new CancellationQueue(); | |
71 | $cancellationQueue->enqueue($promisesOrValues); | |
72 | ||
73 | return new Promise(function ($resolve, $reject, $notify) use ($promisesOrValues, $howMany, $cancellationQueue) { | |
74 | resolve($promisesOrValues) | |
75 | ->done(function ($array) use ($howMany, $cancellationQueue, $resolve, $reject, $notify) { | |
76 | if (!is_array($array) || !$array || $howMany < 1) { | |
77 | $resolve([]); | |
78 | return; | |
79 | } | |
80 | ||
71 | 81 | $len = count($array); |
72 | 82 | $toResolve = min($howMany, $len); |
73 | 83 | $toReject = ($len - $toResolve) + 1; |
99 | 109 | } |
100 | 110 | }; |
101 | 111 | |
112 | $cancellationQueue->enqueue($promiseOrValue); | |
113 | ||
102 | 114 | resolve($promiseOrValue) |
103 | 115 | ->done($fulfiller, $rejecter, $notify); |
104 | 116 | } |
105 | }); | |
106 | }); | |
117 | }, $reject, $notify); | |
118 | }, $cancellationQueue); | |
107 | 119 | } |
108 | 120 | |
109 | 121 | function map($promisesOrValues, callable $mapFunc) |
110 | 122 | { |
111 | return resolve($promisesOrValues) | |
112 | ->then(function ($array) use ($mapFunc) { | |
113 | if (!is_array($array) || !$array) { | |
114 | return resolve([]); | |
115 | } | |
116 | ||
117 | return new Promise(function ($resolve, $reject, $notify) use ($array, $mapFunc) { | |
123 | $cancellationQueue = new CancellationQueue(); | |
124 | $cancellationQueue->enqueue($promisesOrValues); | |
125 | ||
126 | return new Promise(function ($resolve, $reject, $notify) use ($promisesOrValues, $mapFunc, $cancellationQueue) { | |
127 | resolve($promisesOrValues) | |
128 | ->done(function ($array) use ($mapFunc, $cancellationQueue, $resolve, $reject, $notify) { | |
129 | if (!is_array($array) || !$array) { | |
130 | $resolve([]); | |
131 | return; | |
132 | } | |
133 | ||
118 | 134 | $toResolve = count($array); |
119 | 135 | $values = []; |
120 | 136 | |
121 | 137 | foreach ($array as $i => $promiseOrValue) { |
138 | $cancellationQueue->enqueue($promiseOrValue); | |
139 | ||
122 | 140 | resolve($promiseOrValue) |
123 | 141 | ->then($mapFunc) |
124 | 142 | ->done( |
133 | 151 | $notify |
134 | 152 | ); |
135 | 153 | } |
136 | }); | |
137 | }); | |
154 | }, $reject, $notify); | |
155 | }, $cancellationQueue); | |
138 | 156 | } |
139 | 157 | |
140 | 158 | function reduce($promisesOrValues, callable $reduceFunc, $initialValue = null) |
141 | 159 | { |
142 | return resolve($promisesOrValues) | |
143 | ->then(function ($array) use ($reduceFunc, $initialValue) { | |
144 | if (!is_array($array)) { | |
145 | $array = []; | |
146 | } | |
147 | ||
148 | $total = count($array); | |
149 | $i = 0; | |
150 | ||
151 | // Wrap the supplied $reduceFunc with one that handles promises and then | |
152 | // delegates to the supplied. | |
153 | $wrappedReduceFunc = function ($current, $val) use ($reduceFunc, $total, &$i) { | |
154 | return resolve($current) | |
155 | ->then(function ($c) use ($reduceFunc, $total, &$i, $val) { | |
156 | return resolve($val) | |
157 | ->then(function ($value) use ($reduceFunc, $total, &$i, $c) { | |
158 | return $reduceFunc($c, $value, $i++, $total); | |
159 | }); | |
160 | }); | |
161 | }; | |
162 | ||
163 | return array_reduce($array, $wrappedReduceFunc, $initialValue); | |
164 | }); | |
160 | $cancellationQueue = new CancellationQueue(); | |
161 | $cancellationQueue->enqueue($promisesOrValues); | |
162 | ||
163 | return new Promise(function ($resolve, $reject, $notify) use ($promisesOrValues, $reduceFunc, $initialValue, $cancellationQueue) { | |
164 | resolve($promisesOrValues) | |
165 | ->done(function ($array) use ($reduceFunc, $initialValue, $cancellationQueue, $resolve, $reject, $notify) { | |
166 | if (!is_array($array)) { | |
167 | $array = []; | |
168 | } | |
169 | ||
170 | $total = count($array); | |
171 | $i = 0; | |
172 | ||
173 | // Wrap the supplied $reduceFunc with one that handles promises and then | |
174 | // delegates to the supplied. | |
175 | $wrappedReduceFunc = function ($current, $val) use ($reduceFunc, $cancellationQueue, $total, &$i) { | |
176 | $cancellationQueue->enqueue($val); | |
177 | ||
178 | return $current | |
179 | ->then(function ($c) use ($reduceFunc, $total, &$i, $val) { | |
180 | return resolve($val) | |
181 | ->then(function ($value) use ($reduceFunc, $total, &$i, $c) { | |
182 | return $reduceFunc($c, $value, $i++, $total); | |
183 | }); | |
184 | }); | |
185 | }; | |
186 | ||
187 | $cancellationQueue->enqueue($initialValue); | |
188 | ||
189 | array_reduce($array, $wrappedReduceFunc, resolve($initialValue)) | |
190 | ->done($resolve, $reject, $notify); | |
191 | }, $reject, $notify); | |
192 | }, $cancellationQueue); | |
165 | 193 | } |
166 | 194 | |
167 | 195 | // Internal functions |
0 | <?php | |
1 | ||
2 | namespace React\Promise; | |
3 | ||
4 | class CancellationQueueTest extends TestCase | |
5 | { | |
6 | /** @test */ | |
7 | public function ignoresNonCancellablePromises() | |
8 | { | |
9 | $p = new SimpleFulfilledTestPromise(); | |
10 | ||
11 | $cancellationQueue = new CancellationQueue(); | |
12 | $cancellationQueue->enqueue($p); | |
13 | ||
14 | $cancellationQueue(); | |
15 | ||
16 | $this->assertFalse($p->cancelCalled); | |
17 | } | |
18 | ||
19 | /** @test */ | |
20 | public function callsCancelOnPromisesEnqueuedBeforeStart() | |
21 | { | |
22 | $d1 = $this->getCancellableDeferred(); | |
23 | $d2 = $this->getCancellableDeferred(); | |
24 | ||
25 | $cancellationQueue = new CancellationQueue(); | |
26 | $cancellationQueue->enqueue($d1->promise()); | |
27 | $cancellationQueue->enqueue($d2->promise()); | |
28 | ||
29 | $cancellationQueue(); | |
30 | } | |
31 | ||
32 | /** @test */ | |
33 | public function callsCancelOnPromisesEnqueuedAfterStart() | |
34 | { | |
35 | $d1 = $this->getCancellableDeferred(); | |
36 | $d2 = $this->getCancellableDeferred(); | |
37 | ||
38 | $cancellationQueue = new CancellationQueue(); | |
39 | ||
40 | $cancellationQueue(); | |
41 | ||
42 | $cancellationQueue->enqueue($d2->promise()); | |
43 | $cancellationQueue->enqueue($d1->promise()); | |
44 | } | |
45 | ||
46 | /** @test */ | |
47 | public function doesNotCallCancelTwiceWhenStartedTwice() | |
48 | { | |
49 | $d = $this->getCancellableDeferred(); | |
50 | ||
51 | $cancellationQueue = new CancellationQueue(); | |
52 | $cancellationQueue->enqueue($d->promise()); | |
53 | ||
54 | $cancellationQueue(); | |
55 | $cancellationQueue(); | |
56 | } | |
57 | ||
58 | /** @test */ | |
59 | public function rethrowsExceptionsThrownFromCancel() | |
60 | { | |
61 | $this->setExpectedException('\Exception', 'test'); | |
62 | ||
63 | $mock = $this->getMock('React\Promise\CancellablePromiseInterface'); | |
64 | $mock | |
65 | ->expects($this->once()) | |
66 | ->method('cancel') | |
67 | ->will($this->throwException(new \Exception('test'))); | |
68 | ||
69 | $cancellationQueue = new CancellationQueue(); | |
70 | $cancellationQueue->enqueue($mock); | |
71 | ||
72 | $cancellationQueue(); | |
73 | } | |
74 | ||
75 | private function getCancellableDeferred() | |
76 | { | |
77 | $mock = $this->createCallableMock(); | |
78 | $mock | |
79 | ->expects($this->once()) | |
80 | ->method('__invoke'); | |
81 | ||
82 | return new Deferred($mock); | |
83 | } | |
84 | } |
112 | 112 | $d2->resolve(2); |
113 | 113 | $d1->resolve(1); |
114 | 114 | } |
115 | ||
116 | /** @test */ | |
117 | public function shouldRejectWhenInputPromiseRejects() | |
118 | { | |
119 | $mock = $this->createCallableMock(); | |
120 | $mock | |
121 | ->expects($this->once()) | |
122 | ->method('__invoke') | |
123 | ->with($this->identicalTo(null)); | |
124 | ||
125 | any(reject()) | |
126 | ->then($this->expectCallableNever(), $mock); | |
127 | } | |
128 | ||
129 | /** @test */ | |
130 | public function shouldCancelInputPromise() | |
131 | { | |
132 | $mock = $this->getMock('React\Promise\CancellablePromiseInterface'); | |
133 | $mock | |
134 | ->expects($this->once()) | |
135 | ->method('cancel'); | |
136 | ||
137 | any($mock)->cancel(); | |
138 | } | |
139 | ||
140 | /** @test */ | |
141 | public function shouldCancelInputArrayPromises() | |
142 | { | |
143 | $mock1 = $this->getMock('React\Promise\CancellablePromiseInterface'); | |
144 | $mock1 | |
145 | ->expects($this->once()) | |
146 | ->method('cancel'); | |
147 | ||
148 | $mock2 = $this->getMock('React\Promise\CancellablePromiseInterface'); | |
149 | $mock2 | |
150 | ->expects($this->once()) | |
151 | ->method('cancel'); | |
152 | ||
153 | any([$mock1, $mock2])->cancel(); | |
154 | } | |
115 | 155 | } |
121 | 121 | $this->mapper() |
122 | 122 | )->then($this->expectCallableNever(), $mock); |
123 | 123 | } |
124 | ||
125 | /** @test */ | |
126 | public function shouldRejectWhenInputPromiseRejects() | |
127 | { | |
128 | $mock = $this->createCallableMock(); | |
129 | $mock | |
130 | ->expects($this->once()) | |
131 | ->method('__invoke') | |
132 | ->with($this->identicalTo(null)); | |
133 | ||
134 | map( | |
135 | reject(), | |
136 | $this->mapper() | |
137 | )->then($this->expectCallableNever(), $mock); | |
138 | } | |
139 | ||
140 | /** @test */ | |
141 | public function shouldCancelInputPromise() | |
142 | { | |
143 | $mock = $this->getMock('React\Promise\CancellablePromiseInterface'); | |
144 | $mock | |
145 | ->expects($this->once()) | |
146 | ->method('cancel'); | |
147 | ||
148 | map( | |
149 | $mock, | |
150 | $this->mapper() | |
151 | )->cancel(); | |
152 | } | |
153 | ||
154 | /** @test */ | |
155 | public function shouldCancelInputArrayPromises() | |
156 | { | |
157 | $mock1 = $this->getMock('React\Promise\CancellablePromiseInterface'); | |
158 | $mock1 | |
159 | ->expects($this->once()) | |
160 | ->method('cancel'); | |
161 | ||
162 | $mock2 = $this->getMock('React\Promise\CancellablePromiseInterface'); | |
163 | $mock2 | |
164 | ->expects($this->once()) | |
165 | ->method('cancel'); | |
166 | ||
167 | map( | |
168 | [$mock1, $mock2], | |
169 | $this->mapper() | |
170 | )->cancel(); | |
171 | } | |
124 | 172 | } |
118 | 118 | resolve(1) |
119 | 119 | )->then($mock); |
120 | 120 | } |
121 | ||
122 | /** @test */ | |
123 | public function shouldRejectWhenInputPromiseRejects() | |
124 | { | |
125 | $mock = $this->createCallableMock(); | |
126 | $mock | |
127 | ->expects($this->once()) | |
128 | ->method('__invoke') | |
129 | ->with($this->identicalTo(null)); | |
130 | ||
131 | race( | |
132 | reject() | |
133 | )->then($this->expectCallableNever(), $mock); | |
134 | } | |
135 | ||
136 | /** @test */ | |
137 | public function shouldCancelInputPromise() | |
138 | { | |
139 | $mock = $this->getMock('React\Promise\CancellablePromiseInterface'); | |
140 | $mock | |
141 | ->expects($this->once()) | |
142 | ->method('cancel'); | |
143 | ||
144 | race($mock)->cancel(); | |
145 | } | |
146 | ||
147 | /** @test */ | |
148 | public function shouldCancelInputArrayPromises() | |
149 | { | |
150 | $mock1 = $this->getMock('React\Promise\CancellablePromiseInterface'); | |
151 | $mock1 | |
152 | ->expects($this->once()) | |
153 | ->method('cancel'); | |
154 | ||
155 | $mock2 = $this->getMock('React\Promise\CancellablePromiseInterface'); | |
156 | $mock2 | |
157 | ->expects($this->once()) | |
158 | ->method('cancel'); | |
159 | ||
160 | race([$mock1, $mock2])->cancel(); | |
161 | } | |
121 | 162 | } |
286 | 286 | $d1->resolve(1); |
287 | 287 | $d2->resolve(2); |
288 | 288 | } |
289 | ||
290 | /** @test */ | |
291 | public function shouldRejectWhenInputPromiseRejects() | |
292 | { | |
293 | $mock = $this->createCallableMock(); | |
294 | $mock | |
295 | ->expects($this->once()) | |
296 | ->method('__invoke') | |
297 | ->with($this->identicalTo(null)); | |
298 | ||
299 | reduce( | |
300 | reject(), | |
301 | $this->plus(), | |
302 | 1 | |
303 | )->then($this->expectCallableNever(), $mock); | |
304 | } | |
305 | ||
306 | /** @test */ | |
307 | public function shouldCancelInputPromise() | |
308 | { | |
309 | $mock = $this->getMock('React\Promise\CancellablePromiseInterface'); | |
310 | $mock | |
311 | ->expects($this->once()) | |
312 | ->method('cancel'); | |
313 | ||
314 | reduce( | |
315 | $mock, | |
316 | $this->plus(), | |
317 | 1 | |
318 | )->cancel(); | |
319 | } | |
320 | ||
321 | /** @test */ | |
322 | public function shouldCancelInputArrayPromises() | |
323 | { | |
324 | $mock1 = $this->getMock('React\Promise\CancellablePromiseInterface'); | |
325 | $mock1 | |
326 | ->expects($this->once()) | |
327 | ->method('cancel'); | |
328 | ||
329 | $mock2 = $this->getMock('React\Promise\CancellablePromiseInterface'); | |
330 | $mock2 | |
331 | ->expects($this->once()) | |
332 | ->method('cancel'); | |
333 | ||
334 | reduce( | |
335 | [$mock1, $mock2], | |
336 | $this->plus(), | |
337 | 1 | |
338 | )->cancel(); | |
339 | } | |
289 | 340 | } |
122 | 122 | 1 |
123 | 123 | )->then($mock); |
124 | 124 | } |
125 | ||
126 | /** @test */ | |
127 | public function shouldRejectWhenInputPromiseRejects() | |
128 | { | |
129 | $mock = $this->createCallableMock(); | |
130 | $mock | |
131 | ->expects($this->once()) | |
132 | ->method('__invoke') | |
133 | ->with($this->identicalTo(null)); | |
134 | ||
135 | some( | |
136 | reject(), | |
137 | 1 | |
138 | )->then($this->expectCallableNever(), $mock); | |
139 | } | |
140 | ||
141 | /** @test */ | |
142 | public function shouldCancelInputPromise() | |
143 | { | |
144 | $mock = $this->getMock('React\Promise\CancellablePromiseInterface'); | |
145 | $mock | |
146 | ->expects($this->once()) | |
147 | ->method('cancel'); | |
148 | ||
149 | some($mock, 1)->cancel(); | |
150 | } | |
151 | ||
152 | /** @test */ | |
153 | public function shouldCancelInputArrayPromises() | |
154 | { | |
155 | $mock1 = $this->getMock('React\Promise\CancellablePromiseInterface'); | |
156 | $mock1 | |
157 | ->expects($this->once()) | |
158 | ->method('cancel'); | |
159 | ||
160 | $mock2 = $this->getMock('React\Promise\CancellablePromiseInterface'); | |
161 | $mock2 | |
162 | ->expects($this->once()) | |
163 | ->method('cancel'); | |
164 | ||
165 | some([$mock1, $mock2], 1)->cancel(); | |
166 | } | |
125 | 167 | } |
81 | 81 | $adapter->resolve(new SimpleRejectedTestPromise()); |
82 | 82 | } |
83 | 83 | } |
84 | ||
85 | class SimpleFulfilledTestPromise implements PromiseInterface | |
86 | { | |
87 | public function then(callable $onFulfilled = null, callable $onRejected = null, callable $onProgress = null) | |
88 | { | |
89 | try { | |
90 | if ($onFulfilled) { | |
91 | $onFulfilled('foo'); | |
92 | } | |
93 | ||
94 | return new self('foo'); | |
95 | } catch (\Exception $exception) { | |
96 | return new RejectedPromise($exception); | |
97 | } | |
98 | } | |
99 | } | |
100 | ||
101 | class SimpleRejectedTestPromise implements PromiseInterface | |
102 | { | |
103 | public function then(callable $onFulfilled = null, callable $onRejected = null, callable $onProgress = null) | |
104 | { | |
105 | try { | |
106 | if ($onRejected) { | |
107 | $onRejected('foo'); | |
108 | } | |
109 | ||
110 | return new self('foo'); | |
111 | } catch (\Exception $exception) { | |
112 | return new RejectedPromise($exception); | |
113 | } | |
114 | } | |
115 | } |
0 | <?php | |
1 | ||
2 | namespace React\Promise; | |
3 | ||
4 | class SimpleFulfilledTestPromise implements PromiseInterface | |
5 | { | |
6 | public $cancelCalled = false; | |
7 | ||
8 | public function then(callable $onFulfilled = null, callable $onRejected = null, callable $onProgress = null) | |
9 | { | |
10 | try { | |
11 | if ($onFulfilled) { | |
12 | $onFulfilled('foo'); | |
13 | } | |
14 | ||
15 | return new self('foo'); | |
16 | } catch (\Exception $exception) { | |
17 | return new RejectedPromise($exception); | |
18 | } | |
19 | } | |
20 | ||
21 | public function cancel() | |
22 | { | |
23 | $this->cancelCalled = true; | |
24 | } | |
25 | } |
0 | <?php | |
1 | ||
2 | namespace React\Promise; | |
3 | ||
4 | class SimpleRejectedTestPromise implements PromiseInterface | |
5 | { | |
6 | public function then(callable $onFulfilled = null, callable $onRejected = null, callable $onProgress = null) | |
7 | { | |
8 | try { | |
9 | if ($onRejected) { | |
10 | $onRejected('foo'); | |
11 | } | |
12 | ||
13 | return new self('foo'); | |
14 | } catch (\Exception $exception) { | |
15 | return new RejectedPromise($exception); | |
16 | } | |
17 | } | |
18 | } |