Codebase list core-async-clojure / 7fdd091
New upstream version 0.3.443 Apollon Oikonomopoulos 6 years ago
58 changed file(s) with 8954 addition(s) and 0 deletion(s). Raw diff Collapse all Expand all
0 *.iml
1 *init.clj
2 .idea
3 out-simp
4 out-simp-node
5 out-adv
6 out-adv-node
7 /target
8 /lib
9 /classes
10 /checkouts
11 *.jar
12 *.class
13 .lein-deps-sum
14 .lein-failures
15 .lein-plugins
16 .lein-repl-history
17 tests.js
18 tests.js.map
19 pom.xml.versionsBackup
0 If you'd like to submit a patch, please follow the [contributing guidelines](http://clojure.org/contributing).
0 # core.async
1
2 A Clojure library providing facilities for async programming and communication.
3
4
5 ## Releases and Dependency Information
6
7 Latest release: 0.3.442
8
9 * [All Released Versions](http://search.maven.org/#search%7Cgav%7C1%7Cg%3A%22org.clojure%22%20AND%20a%3A%22core.async%22)
10
11 [Leiningen](https://github.com/technomancy/leiningen) dependency information:
12
13 ```clj
14 [org.clojure/clojure "1.6.0"]
15 [org.clojure/core.async "0.3.442"]
16 ```
17
18 [Maven](http://maven.apache.org/) dependency information:
19
20 ```xml
21 <dependency>
22 <groupId>org.clojure</groupId>
23 <artifactId>core.async</artifactId>
24 <version>0.3.442</version>
25 </dependency>
26 ```
27
28 ## Documentation
29
30 * [Rationale](http://clojure.com/blog/2013/06/28/clojure-core-async-channels.html)
31 * [API docs](http://clojure.github.io/core.async/)
32 * [Code walkthrough](https://github.com/clojure/core.async/blob/master/examples/walkthrough.clj)
33
34 ## Presentations
35
36 * [Rich Hickey on core.async](http://www.infoq.com/presentations/clojure-core-async)
37 * [Tim Baldridge on core.async](http://www.youtube.com/watch?v=enwIIGzhahw) from Clojure/conj 2013 ([code](https://github.com/halgari/clojure-conj-2013-core.async-examples)).
38 * Tim Baldridge on go macro internals - [part 1](https://www.youtube.com/watch?v=R3PZMIwXN_g) [part 2](https://www.youtube.com/watch?v=SI7qtuuahhU)
39 * David Nolen [core.async webinar](http://go.cognitect.com/core_async_webinar_recording)
40
41 ## Contributing
42
43 [Contributing to Clojure projects](http://clojure.org/contributing) requires a signed Contributor Agreement. Pull requests and GitHub issues are not accepted; please use the [core.async JIRA project](http://dev.clojure.org/jira/browse/ASYNC) to report problems or enhancements.
44
45 To run the ClojureScript tests:
46
47 * lein cljsbuild once
48 * open script/runtests.html
49 * View JavaScript console for test results
50
51 ## License
52
53 Copyright © 2017 Rich Hickey and contributors
54
55 Distributed under the Eclipse Public License, the same as Clojure.
56
57 ## Changelog
58
59 * Release 0.3.xxx on 2017.05.26
60 *
61 * Release 0.3.442 on 2017.03.14
62 * Fix bad `:refer-clojure` clause that violates new spec in Clojure 1.9.0-alpha15
63 * Release 0.3.441 on 2017.02.23
64 * [ASYNC-187](http://dev.clojure.org/jira/browse/ASYNC-187) - Tag metadata is lost in local closed over by a loop
65 * Related: [ASYNC-188](http://dev.clojure.org/jira/browse/ASYNC-188)
66 * [ASYNC-185](http://dev.clojure.org/jira/browse/ASYNC-185) - `thread` prevents clearing of body locals
67 * [ASYNC-186](http://dev.clojure.org/jira/browse/ASYNC-186) - NPE when `go` closes over a local variable bound to nil
68 * Release 0.3.426 on 2017.02.22
69 * [ASYNC-169](http://dev.clojure.org/jira/browse/ASYNC-169) - handling of catch and finally inside go blocks was broken, causing a number of issues. Related: [ASYNC-100](http://dev.clojure.org/jira/browse/ASYNC-100), [ASYNC-173](http://dev.clojure.org/jira/browse/ASYNC-173), [ASYNC-180](http://dev.clojure.org/jira/browse/ASYNC-180), [ASYNC-179](http://dev.clojure.org/jira/browse/ASYNC-179), [ASYNC-122](http://dev.clojure.org/jira/browse/ASYNC-122), [ASYNC-78](http://dev.clojure.org/jira/browse/ASYNC-78), [ASYNC-168](http://dev.clojure.org/jira/browse/ASYNC-168)
70 * [ASYNC-138](http://dev.clojure.org/jira/browse/ASYNC-138) - go blocks do not allow closed over locals to be cleared which can lead to a memory leak. Related: [ASYNC-32](http://dev.clojure.org/jira/browse/ASYNC-32)
71 * [ASYNC-155](http://dev.clojure.org/jira/browse/ASYNC-155) - preserve loop binding metadata when inside a go block
72 * [ASYNC-54](http://dev.clojure.org/jira/browse/ASYNC-54) - fix bad type hint on MAX-QUEUE-SIZE
73 * [ASYNC-177](http://dev.clojure.org/jira/browse/ASYNC-177) - fix typo in Buffer protocol full? method
74 * [ASYNC-70](http://dev.clojure.org/jira/browse/ASYNC-70) - docstring change in thread, thread-call
75 * [ASYNC-143](http://dev.clojure.org/jira/browse/ASYNC-143) - assert that fixed buffers must have size > 0
76 * Update tools.analyzer.jvm dependency
77 * Release 0.2.395 on 2016.10.12
78 * Add async version of transduce
79 * Release 0.2.391 on 2016.09.09
80 * Fix redefinition warning for bounded-count (added in Clojure 1.9)
81 * Add :deprecated meta to the deprecated functions
82 * Release 0.2.385 on 2016.06.17
83 * Updated tools.analyzer.jvm version
84 * Release 0.2.382 on 2016.06.13
85 * Important: Change default dispatch thread pool size to 8.
86 * Add Java system property `clojure.core.async.pool-size` to set the dispatch thread pool size
87 * [ASYNC-152](http://dev.clojure.org/jira/browse/ASYNC-152) - disable t.a.jvm's warn-on-reflection pass
88 * Release 0.2.374 on 2015.11.11
89 * [ASYNC-149](http://dev.clojure.org/jira/browse/ASYNC-149) - fix error compiling recur inside case in a go block
90 * Updated tools.analyzer.jvm version (and other upstream deps)
91 * Updated to latest clojurescript and cljsbuild versions
92 * Release 0.2.371 on 2015.10.28
93 * [ASYNC-124](http://dev.clojure.org/jira/browse/ASYNC-124) - dispatch multiple pending takers from expanding transducer
94 * [ASYNC-103](http://dev.clojure.org/jira/browse/ASYNC-103) - NEW promise-chan
95 * [ASYNC-104](http://dev.clojure.org/jira/browse/ASYNC-104) - NEW non-blocking offer!, poll!
96 * [ASYNC-101](http://dev.clojure.org/jira/browse/ASYNC-101) - async/reduce now respects reduced
97 * [ASYNC-112](http://dev.clojure.org/jira/browse/ASYNC-112) - replace "transformer" with "transducer" in deprecation messages
98 * [ASYNC-6](http://dev.clojure.org/jira/browse/ASYNC-6) - alts! docs updated to explicitly state ports is a vector
99 * Support (try (catch :default)) in CLJS exception handling
100 * Use cljs.test
101 * Updated tools.analyzer.jvm version (and other upstream deps)
102 * Release 0.1.346.0-17112a-alpha on 2014.09.22
103 * cljs nextTick relies on goog.async.nextTick
104 * Updated docstring for put! re result on closed channel
105 * Release 0.1.338.0-5c5012-alpha on 2014.08.19
106 * Add cljs transducers support
107 * Release 0.1.319.0-6b1aca-alpha on 2014.08.06
108 * Add transducers support
109 * NEW pipeline
110 * Release 0.1.303.0-886421-alpha on 2014.05.08
111 * Release 0.1.301.0-deb34a-alpha on 2014.04.29
112 * Release 0.1.298.0-2a82a1-alpha on 2014.04.25
113 * Release 0.1.278.0-76b25b-alpha on 2014.02.07
114 * Release 0.1.267.0-0d7780-alpha on 2013.12.11
115 * Release 0.1.262.0-151b23-alpha on 2013.12.10
116 * Release 0.1.256.0-1bf8cf-alpha on 2013.11.07
117 * Release 0.1.242.0-44b1e3-alpha on 2013.09.27
118 * Release 0.1.222.0-83d0c2-alpha on 2013.09.12
0 0.3.GENERATED_VERSION
0 # Introduction to core.async
1
0 <?xml version="1.0" encoding="ISO-8859-1" ?>
1 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
2 <html xmlns="http://www.w3.org/1999/xhtml">
3
4 <head>
5 <meta http-equiv="Content-Type" content="text/html; charset=ISO-8859-1" />
6 <title>Eclipse Public License - Version 1.0</title>
7 <style type="text/css">
8 body {
9 size: 8.5in 11.0in;
10 margin: 0.25in 0.5in 0.25in 0.5in;
11 tab-interval: 0.5in;
12 }
13 p {
14 margin-left: auto;
15 margin-top: 0.5em;
16 margin-bottom: 0.5em;
17 }
18 p.list {
19 margin-left: 0.5in;
20 margin-top: 0.05em;
21 margin-bottom: 0.05em;
22 }
23 </style>
24
25 </head>
26
27 <body lang="EN-US">
28
29 <h2>Eclipse Public License - v 1.0</h2>
30
31 <p>THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE
32 PUBLIC LICENSE (&quot;AGREEMENT&quot;). ANY USE, REPRODUCTION OR
33 DISTRIBUTION OF THE PROGRAM CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS
34 AGREEMENT.</p>
35
36 <p><b>1. DEFINITIONS</b></p>
37
38 <p>&quot;Contribution&quot; means:</p>
39
40 <p class="list">a) in the case of the initial Contributor, the initial
41 code and documentation distributed under this Agreement, and</p>
42 <p class="list">b) in the case of each subsequent Contributor:</p>
43 <p class="list">i) changes to the Program, and</p>
44 <p class="list">ii) additions to the Program;</p>
45 <p class="list">where such changes and/or additions to the Program
46 originate from and are distributed by that particular Contributor. A
47 Contribution 'originates' from a Contributor if it was added to the
48 Program by such Contributor itself or anyone acting on such
49 Contributor's behalf. Contributions do not include additions to the
50 Program which: (i) are separate modules of software distributed in
51 conjunction with the Program under their own license agreement, and (ii)
52 are not derivative works of the Program.</p>
53
54 <p>&quot;Contributor&quot; means any person or entity that distributes
55 the Program.</p>
56
57 <p>&quot;Licensed Patents&quot; mean patent claims licensable by a
58 Contributor which are necessarily infringed by the use or sale of its
59 Contribution alone or when combined with the Program.</p>
60
61 <p>&quot;Program&quot; means the Contributions distributed in accordance
62 with this Agreement.</p>
63
64 <p>&quot;Recipient&quot; means anyone who receives the Program under
65 this Agreement, including all Contributors.</p>
66
67 <p><b>2. GRANT OF RIGHTS</b></p>
68
69 <p class="list">a) Subject to the terms of this Agreement, each
70 Contributor hereby grants Recipient a non-exclusive, worldwide,
71 royalty-free copyright license to reproduce, prepare derivative works
72 of, publicly display, publicly perform, distribute and sublicense the
73 Contribution of such Contributor, if any, and such derivative works, in
74 source code and object code form.</p>
75
76 <p class="list">b) Subject to the terms of this Agreement, each
77 Contributor hereby grants Recipient a non-exclusive, worldwide,
78 royalty-free patent license under Licensed Patents to make, use, sell,
79 offer to sell, import and otherwise transfer the Contribution of such
80 Contributor, if any, in source code and object code form. This patent
81 license shall apply to the combination of the Contribution and the
82 Program if, at the time the Contribution is added by the Contributor,
83 such addition of the Contribution causes such combination to be covered
84 by the Licensed Patents. The patent license shall not apply to any other
85 combinations which include the Contribution. No hardware per se is
86 licensed hereunder.</p>
87
88 <p class="list">c) Recipient understands that although each Contributor
89 grants the licenses to its Contributions set forth herein, no assurances
90 are provided by any Contributor that the Program does not infringe the
91 patent or other intellectual property rights of any other entity. Each
92 Contributor disclaims any liability to Recipient for claims brought by
93 any other entity based on infringement of intellectual property rights
94 or otherwise. As a condition to exercising the rights and licenses
95 granted hereunder, each Recipient hereby assumes sole responsibility to
96 secure any other intellectual property rights needed, if any. For
97 example, if a third party patent license is required to allow Recipient
98 to distribute the Program, it is Recipient's responsibility to acquire
99 that license before distributing the Program.</p>
100
101 <p class="list">d) Each Contributor represents that to its knowledge it
102 has sufficient copyright rights in its Contribution, if any, to grant
103 the copyright license set forth in this Agreement.</p>
104
105 <p><b>3. REQUIREMENTS</b></p>
106
107 <p>A Contributor may choose to distribute the Program in object code
108 form under its own license agreement, provided that:</p>
109
110 <p class="list">a) it complies with the terms and conditions of this
111 Agreement; and</p>
112
113 <p class="list">b) its license agreement:</p>
114
115 <p class="list">i) effectively disclaims on behalf of all Contributors
116 all warranties and conditions, express and implied, including warranties
117 or conditions of title and non-infringement, and implied warranties or
118 conditions of merchantability and fitness for a particular purpose;</p>
119
120 <p class="list">ii) effectively excludes on behalf of all Contributors
121 all liability for damages, including direct, indirect, special,
122 incidental and consequential damages, such as lost profits;</p>
123
124 <p class="list">iii) states that any provisions which differ from this
125 Agreement are offered by that Contributor alone and not by any other
126 party; and</p>
127
128 <p class="list">iv) states that source code for the Program is available
129 from such Contributor, and informs licensees how to obtain it in a
130 reasonable manner on or through a medium customarily used for software
131 exchange.</p>
132
133 <p>When the Program is made available in source code form:</p>
134
135 <p class="list">a) it must be made available under this Agreement; and</p>
136
137 <p class="list">b) a copy of this Agreement must be included with each
138 copy of the Program.</p>
139
140 <p>Contributors may not remove or alter any copyright notices contained
141 within the Program.</p>
142
143 <p>Each Contributor must identify itself as the originator of its
144 Contribution, if any, in a manner that reasonably allows subsequent
145 Recipients to identify the originator of the Contribution.</p>
146
147 <p><b>4. COMMERCIAL DISTRIBUTION</b></p>
148
149 <p>Commercial distributors of software may accept certain
150 responsibilities with respect to end users, business partners and the
151 like. While this license is intended to facilitate the commercial use of
152 the Program, the Contributor who includes the Program in a commercial
153 product offering should do so in a manner which does not create
154 potential liability for other Contributors. Therefore, if a Contributor
155 includes the Program in a commercial product offering, such Contributor
156 (&quot;Commercial Contributor&quot;) hereby agrees to defend and
157 indemnify every other Contributor (&quot;Indemnified Contributor&quot;)
158 against any losses, damages and costs (collectively &quot;Losses&quot;)
159 arising from claims, lawsuits and other legal actions brought by a third
160 party against the Indemnified Contributor to the extent caused by the
161 acts or omissions of such Commercial Contributor in connection with its
162 distribution of the Program in a commercial product offering. The
163 obligations in this section do not apply to any claims or Losses
164 relating to any actual or alleged intellectual property infringement. In
165 order to qualify, an Indemnified Contributor must: a) promptly notify
166 the Commercial Contributor in writing of such claim, and b) allow the
167 Commercial Contributor to control, and cooperate with the Commercial
168 Contributor in, the defense and any related settlement negotiations. The
169 Indemnified Contributor may participate in any such claim at its own
170 expense.</p>
171
172 <p>For example, a Contributor might include the Program in a commercial
173 product offering, Product X. That Contributor is then a Commercial
174 Contributor. If that Commercial Contributor then makes performance
175 claims, or offers warranties related to Product X, those performance
176 claims and warranties are such Commercial Contributor's responsibility
177 alone. Under this section, the Commercial Contributor would have to
178 defend claims against the other Contributors related to those
179 performance claims and warranties, and if a court requires any other
180 Contributor to pay any damages as a result, the Commercial Contributor
181 must pay those damages.</p>
182
183 <p><b>5. NO WARRANTY</b></p>
184
185 <p>EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, THE PROGRAM IS
186 PROVIDED ON AN &quot;AS IS&quot; BASIS, WITHOUT WARRANTIES OR CONDITIONS
187 OF ANY KIND, EITHER EXPRESS OR IMPLIED INCLUDING, WITHOUT LIMITATION,
188 ANY WARRANTIES OR CONDITIONS OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY
189 OR FITNESS FOR A PARTICULAR PURPOSE. Each Recipient is solely
190 responsible for determining the appropriateness of using and
191 distributing the Program and assumes all risks associated with its
192 exercise of rights under this Agreement , including but not limited to
193 the risks and costs of program errors, compliance with applicable laws,
194 damage to or loss of data, programs or equipment, and unavailability or
195 interruption of operations.</p>
196
197 <p><b>6. DISCLAIMER OF LIABILITY</b></p>
198
199 <p>EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, NEITHER RECIPIENT
200 NOR ANY CONTRIBUTORS SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT,
201 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING
202 WITHOUT LIMITATION LOST PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF
203 LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
204 NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OR
205 DISTRIBUTION OF THE PROGRAM OR THE EXERCISE OF ANY RIGHTS GRANTED
206 HEREUNDER, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.</p>
207
208 <p><b>7. GENERAL</b></p>
209
210 <p>If any provision of this Agreement is invalid or unenforceable under
211 applicable law, it shall not affect the validity or enforceability of
212 the remainder of the terms of this Agreement, and without further action
213 by the parties hereto, such provision shall be reformed to the minimum
214 extent necessary to make such provision valid and enforceable.</p>
215
216 <p>If Recipient institutes patent litigation against any entity
217 (including a cross-claim or counterclaim in a lawsuit) alleging that the
218 Program itself (excluding combinations of the Program with other
219 software or hardware) infringes such Recipient's patent(s), then such
220 Recipient's rights granted under Section 2(b) shall terminate as of the
221 date such litigation is filed.</p>
222
223 <p>All Recipient's rights under this Agreement shall terminate if it
224 fails to comply with any of the material terms or conditions of this
225 Agreement and does not cure such failure in a reasonable period of time
226 after becoming aware of such noncompliance. If all Recipient's rights
227 under this Agreement terminate, Recipient agrees to cease use and
228 distribution of the Program as soon as reasonably practicable. However,
229 Recipient's obligations under this Agreement and any licenses granted by
230 Recipient relating to the Program shall continue and survive.</p>
231
232 <p>Everyone is permitted to copy and distribute copies of this
233 Agreement, but in order to avoid inconsistency the Agreement is
234 copyrighted and may only be modified in the following manner. The
235 Agreement Steward reserves the right to publish new versions (including
236 revisions) of this Agreement from time to time. No one other than the
237 Agreement Steward has the right to modify this Agreement. The Eclipse
238 Foundation is the initial Agreement Steward. The Eclipse Foundation may
239 assign the responsibility to serve as the Agreement Steward to a
240 suitable separate entity. Each new version of the Agreement will be
241 given a distinguishing version number. The Program (including
242 Contributions) may always be distributed subject to the version of the
243 Agreement under which it was received. In addition, after a new version
244 of the Agreement is published, Contributor may elect to distribute the
245 Program (including its Contributions) under the new version. Except as
246 expressly stated in Sections 2(a) and 2(b) above, Recipient receives no
247 rights or licenses to the intellectual property of any Contributor under
248 this Agreement, whether expressly, by implication, estoppel or
249 otherwise. All rights in the Program not expressly granted under this
250 Agreement are reserved.</p>
251
252 <p>This Agreement is governed by the laws of the State of New York and
253 the intellectual property laws of the United States of America. No party
254 to this Agreement will bring a legal action under this Agreement more
255 than one year after the cause of action arose. Each party waives its
256 rights to a jury trial in any resulting litigation.</p>
257
258 </body>
259
260 </html>
0 (require '[clojure.core.async :as async :refer [<! >! <!! >!! timeout chan alt! alts!! go]])
1
2 (defn fan-in [ins]
3 (let [c (chan)]
4 (future (while true
5 (let [[x] (alts!! ins)]
6 (>!! c x))))
7 c))
8
9 (defn fan-out [in cs-or-n]
10 (let [cs (if (number? cs-or-n)
11 (repeatedly cs-or-n chan)
12 cs-or-n)]
13 (future (while true
14 (let [x (<!! in)
15 outs (map #(vector % x) cs)]
16 (alts!! outs))))
17 cs))
18
19 (let [cout (chan)
20 cin (fan-in (fan-out cout (repeatedly 3 chan)))]
21 (dotimes [n 10]
22 (>!! cout n)
23 (prn (<!! cin))))
0 (require '[clojure.core.async :as async :refer [<! >! timeout chan alt! alts! go]])
1
2 (defn fan-in [ins]
3 (let [c (chan)]
4 (go (while true
5 (let [[x] (alts! ins)]
6 (>! c x))))
7 c))
8
9 (defn fan-out [in cs-or-n]
10 (let [cs (if (number? cs-or-n)
11 (repeatedly cs-or-n chan)
12 cs-or-n)]
13 (go (while true
14 (let [x (<! in)
15 outs (map #(vector % x) cs)]
16 (alts! outs))))
17 cs))
18
19 (let [cout (chan)
20 cin (fan-in (fan-out cout (repeatedly 3 chan)))]
21 (go (dotimes [n 10]
22 (>! cout n)
23 (prn (<! cin))))
24 nil)
0 (require '[clojure.core.async :as async :refer [<!! >!! timeout chan alt!!]])
1
2 (defn fake-search [kind]
3 (fn [c query]
4 (future
5 (<!! (timeout (rand-int 100)))
6 (>!! c [kind query]))))
7
8 (def web1 (fake-search :web1))
9 (def web2 (fake-search :web2))
10 (def image1 (fake-search :image1))
11 (def image2 (fake-search :image2))
12 (def video1 (fake-search :video1))
13 (def video2 (fake-search :video2))
14
15 (defn fastest [query & replicas]
16 (let [c (chan)]
17 (doseq [replica replicas]
18 (replica c query))
19 c))
20
21 (defn google [query]
22 (let [c (chan)
23 t (timeout 80)]
24 (future (>!! c (<!! (fastest query web1 web2))))
25 (future (>!! c (<!! (fastest query image1 image2))))
26 (future (>!! c (<!! (fastest query video1 video2))))
27 (loop [i 0 ret []]
28 (if (= i 3)
29 ret
30 (recur (inc i) (conj ret (alt!! [c t] ([v] v))))))))
31
32 (google "clojure")
0 (require '[clojure.core.async :as async :refer [<! >! <!! timeout chan alt! go]])
1
2 (defn fake-search [kind]
3 (fn [c query]
4 (go
5 (<! (timeout (rand-int 100)))
6 (>! c [kind query]))))
7
8 (def web1 (fake-search :web1))
9 (def web2 (fake-search :web2))
10 (def image1 (fake-search :image1))
11 (def image2 (fake-search :image2))
12 (def video1 (fake-search :video1))
13 (def video2 (fake-search :video2))
14
15 (defn fastest [query & replicas]
16 (let [c (chan)]
17 (doseq [replica replicas]
18 (replica c query))
19 c))
20
21 (defn google [query]
22 (let [c (chan)
23 t (timeout 80)]
24 (go (>! c (<! (fastest query web1 web2))))
25 (go (>! c (<! (fastest query image1 image2))))
26 (go (>! c (<! (fastest query video1 video2))))
27 (go (loop [i 0 ret []]
28 (if (= i 3)
29 ret
30 (recur (inc i) (conj ret (alt! [c t] ([v] v)))))))))
31
32 (<!! (google "clojure"))
33
0 ;; This walkthrough introduces the core concepts of core.async.
1
2 ;; The clojure.core.async namespace contains the public API.
3 (require '[clojure.core.async :as async :refer :all])
4
5 ;;;; CHANNELS
6
7 ;; Data is transmitted on queue-like channels. By default channels
8 ;; are unbuffered (0-length) - they require producer and consumer to
9 ;; rendezvous for the transfer of a value through the channel.
10
11 ;; Use `chan` to make an unbuffered channel:
12 (chan)
13
14 ;; Pass a number to create a channel with a fixed buffer:
15 (chan 10)
16
17 ;; `close!` a channel to stop accepting puts. Remaining values are still
18 ;; available to take. Drained channels return nil on take. Nils may
19 ;; not be sent over a channel explicitly!
20
21 (let [c (chan)]
22 (close! c))
23
24 ;;;; ORDINARY THREADS
25
26 ;; In ordinary threads, we use `>!!` (blocking put) and `<!!`
27 ;; (blocking take) to communicate via channels.
28
29 (let [c (chan 10)]
30 (>!! c "hello")
31 (assert (= "hello" (<!! c)))
32 (close! c))
33
34 ;; Because these are blocking calls, if we try to put on an
35 ;; unbuffered channel, we will block the main thread. We can use
36 ;; `thread` (like `future`) to execute a body in a pool thread and
37 ;; return a channel with the result. Here we launch a background task
38 ;; to put "hello" on a channel, then read that value in the current thread.
39
40 (let [c (chan)]
41 (thread (>!! c "hello"))
42 (assert (= "hello" (<!! c)))
43 (close! c))
44
45 ;;;; GO BLOCKS AND IOC THREADS
46
47 ;; The `go` macro asynchronously executes its body in a special pool
48 ;; of threads. Channel operations that would block will pause
49 ;; execution instead, blocking no threads. This mechanism encapsulates
50 ;; the inversion of control that is external in event/callback
51 ;; systems. Inside `go` blocks, we use `>!` (put) and `<!` (take).
52
53 ;; Here we convert our prior channel example to use go blocks:
54 (let [c (chan)]
55 (go (>! c "hello"))
56 (assert (= "hello" (<!! (go (<! c)))))
57 (close! c))
58
59 ;; Instead of the explicit thread and blocking call, we use a go block
60 ;; for the producer. The consumer uses a go block to take, then
61 ;; returns a result channel, from which we do a blocking take.
62
63 ;;;; ALTS
64
65 ;; One killer feature for channels over queues is the ability to wait
66 ;; on many channels at the same time (like a socket select). This is
67 ;; done with `alts!!` (ordinary threads) or `alts!` in go blocks.
68
69 ;; We can create a background thread with alts that combines inputs on
70 ;; either of two channels. `alts!!` takes a set of operations
71 ;; to perform - either a channel to take from or a [channel value] to put
72 ;; and returns the value (nil for put) and channel that succeeded:
73
74 (let [c1 (chan)
75 c2 (chan)]
76 (thread (while true
77 (let [[v ch] (alts!! [c1 c2])]
78 (println "Read" v "from" ch))))
79 (>!! c1 "hi")
80 (>!! c2 "there"))
81
82 ;; Prints (on stdout, possibly not visible at your repl):
83 ;; Read hi from #<ManyToManyChannel ...>
84 ;; Read there from #<ManyToManyChannel ...>
85
86 ;; We can use alts! to do the same thing with go blocks:
87
88 (let [c1 (chan)
89 c2 (chan)]
90 (go (while true
91 (let [[v ch] (alts! [c1 c2])]
92 (println "Read" v "from" ch))))
93 (go (>! c1 "hi"))
94 (go (>! c2 "there")))
95
96 ;; Since go blocks are lightweight processes not bound to threads, we
97 ;; can have LOTS of them! Here we create 1000 go blocks that say hi on
98 ;; 1000 channels. We use alts!! to read them as they're ready.
99
100 (let [n 1000
101 cs (repeatedly n chan)
102 begin (System/currentTimeMillis)]
103 (doseq [c cs] (go (>! c "hi")))
104 (dotimes [i n]
105 (let [[v c] (alts!! cs)]
106 (assert (= "hi" v))))
107 (println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))
108
109 ;; `timeout` creates a channel that waits for a specified ms, then closes:
110
111 (let [t (timeout 100)
112 begin (System/currentTimeMillis)]
113 (<!! t)
114 (println "Waited" (- (System/currentTimeMillis) begin)))
115
116 ;; We can combine timeout with `alts!` to do timed channel waits.
117 ;; Here we wait for 100 ms for a value to arrive on the channel, then
118 ;; give up:
119
120 (let [c (chan)
121 begin (System/currentTimeMillis)]
122 (alts!! [c (timeout 100)])
123 (println "Gave up after" (- (System/currentTimeMillis) begin)))
124
125 ;; ALT
126
127 ;; todo
128
129 ;;;; OTHER BUFFERS
130
131 ;; Channels can also use custom buffers that have different policies
132 ;; for the "full" case. Two useful examples are provided in the API.
133
134 ;; Use `dropping-buffer` to drop newest values when the buffer is full:
135 (chan (dropping-buffer 10))
136
137 ;; Use `sliding-buffer` to drop oldest values when the buffer is full:
138 (chan (sliding-buffer 10))
0 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
1 <modelVersion>4.0.0</modelVersion>
2 <groupId>org.clojure</groupId>
3 <artifactId>core.async</artifactId>
4 <!-- Don't set this manually! Call script/build/update_version -->
5 <version>0.3.443</version>
6 <packaging>jar</packaging>
7 <name>core.async</name>
8 <description>Facilities for async programming and communication in Clojure</description>
9 <url>https://github.com/clojure/core.async</url>
10
11 <developers>
12 <developer>
13 <id>richhickey</id>
14 <name>Rich Hickey</name>
15 <url>http://clojure.org</url>
16 </developer>
17 </developers>
18
19 <parent>
20 <groupId>org.clojure</groupId>
21 <artifactId>pom.contrib</artifactId>
22 <version>0.2.2</version>
23 </parent>
24
25 <scm>
26 <connection>scm:git:git://github.com/clojure/core.async.git</connection>
27 <developerConnection>scm:git:git@github.com:clojure/core.async.git</developerConnection>
28 <url>https://github.com/clojure/core.async</url>
29 <tag>core.async-0.3.443</tag>
30 </scm>
31
32 <properties>
33 <clojure.version>1.7.0</clojure.version>
34 </properties>
35
36 <dependencies>
37 <dependency>
38 <groupId>org.clojure</groupId>
39 <artifactId>clojurescript</artifactId>
40 <version>0.0-2311</version>
41 <scope>provided</scope>
42 </dependency>
43 <dependency>
44 <groupId>org.clojure</groupId>
45 <artifactId>tools.analyzer.jvm</artifactId>
46 <version>0.7.0</version>
47 </dependency>
48 </dependencies>
49
50 <build>
51 <plugins>
52 <plugin>
53 <groupId>org.codehaus.mojo</groupId>
54 <artifactId>versions-maven-plugin</artifactId>
55 <version>2.3</version>
56 </plugin>
57 </plugins>
58 </build>
59 </project>
0 (defproject org.clojure/core.async "0.1.0-SNAPSHOT"
1 :description "Facilities for async programming and communication in Clojure"
2 :url "https://github.com/clojure/core.async"
3 :license {:name "Eclipse Public License"
4 :url "http://www.eclipse.org/legal/epl-v10.html"}
5 :parent [org.clojure/pom.contrib "0.1.2"]
6 :dependencies [[org.clojure/clojure "1.7.0"]
7 [org.clojure/tools.analyzer.jvm "0.7.0"]
8 [org.clojure/clojurescript "1.7.170" :scope "provided"]]
9 :global-vars {*warn-on-reflection* true}
10 :source-paths ["src/main/clojure"]
11 :test-paths ["src/test/clojure"]
12 :jvm-opts ^:replace ["-Xmx1g" "-server"]
13 :java-source-paths ["src/main/java"]
14 :profiles {:dev {:source-paths ["examples"]}}
15
16 :plugins [[lein-cljsbuild "1.1.2"]]
17
18 :clean-targets ["tests.js" "tests.js.map"
19 "out" "out-simp" "out-simp-node"
20 "out-adv" "out-adv-node"]
21
22 :cljsbuild
23 {:builds
24 [{:id "dev"
25 :source-paths ["src/test/cljs" "src/main/clojure/cljs"]
26 :compiler {:main cljs.core.async.test-runner
27 :asset-path "../out"
28 :optimizations :none
29 :output-to "tests.js"
30 :output-dir "out"}}
31 {:id "simple"
32 :source-paths ["src/test/cljs" "src/main/clojure/cljs"]
33 :compiler {:optimizations :simple
34 :pretty-print true
35 :static-fns true
36 :output-to "tests.js"
37 :output-dir "out-simp"}}
38 {:id "simple-node"
39 :source-paths ["src/test/cljs" "src/main/clojure/cljs"]
40 :notify-command ["node" "tests.js"]
41 :compiler {:optimizations :simple
42 :target :nodejs
43 :pretty-print true
44 :static-fns true
45 :output-to "tests.js"
46 :output-dir "out-simp-node"}}
47 {:id "adv"
48 :source-paths ["src/test/cljs" "src/main/clojure/cljs"]
49 :compiler {:optimizations :advanced
50 :pretty-print false
51 :output-dir "out-adv"
52 :output-to "tests.js"
53 :source-map "tests.js.map"}}
54 {:id "adv-node"
55 :source-paths ["src/test/cljs" "src/main/clojure/cljs"]
56 :compiler {:optimizations :advanced
57 :target :nodejs
58 :pretty-print false
59 :output-dir "out-adv-node"
60 :output-to "tests.js"
61 :source-map "tests.js.map"}}]})
0 #!/usr/bin/env bash
1
2 # If on a branch other than master, returns the number of commits made off of master
3 # If on master, returns 0
4
5 set -e
6
7 master_tag=`git rev-parse --abbrev-ref HEAD`
8
9 if [ "$master_tag" == "master" ]; then
10 echo "0"
11 else
12 last_commit=`git rev-parse HEAD`
13 revision=`git rev-list master..$last_commit | wc -l`
14 echo $revision
15 fi
0 #!/usr/bin/env bash
1
2 # Return the portion of the version number generated from git
3 # <trunk-basis>
4
5 set -e
6
7 trunk_basis=`script/build/trunk_revision`
8 sha=`git rev-parse HEAD`
9
10 sha=${sha:0:${#sha}-34} # drop the last 34 characters, keep 6
11
12 echo $trunk_basis
0 #!/usr/bin/env bash
1
2 # Return the complete revision number
3 # <major>.<minor>.<trunk-basis>.<patch-or-0>-<sha>[-qualifier]
4
5 set -e
6
7 version_template=`cat VERSION_TEMPLATE`
8
9 if [[ "$version_template" =~ ^[0-9]+\.[0-9]+\.GENERATED_VERSION(-[a-zA-Z0-9]+)?$ ]]; then
10
11 git_revision=`script/build/git_revision`
12 echo ${version_template/GENERATED_VERSION/$git_revision}
13
14 else
15 echo "Invalid version template string: $version_template" >&2
16 exit -1
17 fi
18
0 #!/usr/bin/env bash
1
2 # Returns the number of commits made since the v0.0 tag
3
4 set -e
5
6 REVISION=`git --no-replace-objects describe --match v0.0`
7
8 # Extract the version number from the string. Do this in two steps so
9 # it is a little easier to understand.
10 REVISION=${REVISION:5} # drop the first 5 characters
11 REVISION=${REVISION:0:${#REVISION}-9} # drop the last 9 characters
12
13 echo $REVISION
0 #!/usr/bin/env bash
1
2 set -e
3
4 mvn versions:set -DnewVersion=`script/build/revision`-SNAPSHOT
0 <html>
1 <head>
2 </head>
3 <body>
4 <script src="../tests.js" language="javascript"></script>
5
6 <h2>Open JavaScript Console to see the test results</h2>
7 </body>
8 </html>
0 ;; Copyright (c) Rich Hickey and contributors. All rights reserved.
1 ;; The use and distribution terms for this software are covered by the
2 ;; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
3 ;; which can be found in the file epl-v10.html at the root of this distribution.
4 ;; By using this software in any fashion, you are agreeing to be bound by
5 ;; the terms of this license.
6 ;; You must not remove this notice, or any other, from this software.
7
8 (ns cljs.core.async.impl.buffers
9 (:require [cljs.core.async.impl.protocols :as impl]))
10
11 ;; -----------------------------------------------------------------------------
12 ;; DO NOT USE, this is internal buffer representation
13
14 (defn acopy [src src-start dest dest-start len]
15 (loop [cnt 0]
16 (when (< cnt len)
17 (aset dest
18 (+ dest-start cnt)
19 (aget src (+ src-start cnt)))
20 (recur (inc cnt)))))
21
22 (deftype RingBuffer [^:mutable head ^:mutable tail ^:mutable length ^:mutable arr]
23 Object
24 (pop [_]
25 (when-not (zero? length)
26 (let [x (aget arr tail)]
27 (aset arr tail nil)
28 (set! tail (js-mod (inc tail) (alength arr)))
29 (set! length (dec length))
30 x)))
31
32 (unshift [_ x]
33 (aset arr head x)
34 (set! head (js-mod (inc head) (alength arr)))
35 (set! length (inc length))
36 nil)
37
38 (unbounded-unshift [this x]
39 (if (== (inc length) (alength arr))
40 (.resize this))
41 (.unshift this x))
42
43 ;; Doubles the size of the buffer while retaining all the existing values
44 (resize
45 [_]
46 (let [new-arr-size (* (alength arr) 2)
47 new-arr (make-array new-arr-size)]
48 (cond
49 (< tail head)
50 (do (acopy arr tail new-arr 0 length)
51 (set! tail 0)
52 (set! head length)
53 (set! arr new-arr))
54
55 (> tail head)
56 (do (acopy arr tail new-arr 0 (- (alength arr) tail))
57 (acopy arr 0 new-arr (- (alength arr) tail) head)
58 (set! tail 0)
59 (set! head length)
60 (set! arr new-arr))
61
62 (== tail head)
63 (do (set! tail 0)
64 (set! head 0)
65 (set! arr new-arr)))))
66
67 (cleanup [this keep?]
68 (dotimes [x length]
69 (let [v (.pop this)]
70 (when ^boolean (keep? v)
71 (.unshift this v))))))
72
73 (defn ring-buffer [n]
74 (assert (> n 0) "Can't create a ring buffer of size 0")
75 (RingBuffer. 0 0 0 (make-array n)))
76
77 ;; -----------------------------------------------------------------------------
78
79 (deftype FixedBuffer [buf n]
80 impl/Buffer
81 (full? [this]
82 (== (.-length buf) n))
83 (remove! [this]
84 (.pop buf))
85 (add!* [this itm]
86 (.unbounded-unshift buf itm)
87 this)
88 (close-buf! [this])
89 cljs.core/ICounted
90 (-count [this]
91 (.-length buf)))
92
93 (defn fixed-buffer [n]
94 (FixedBuffer. (ring-buffer n) n))
95
96 (deftype DroppingBuffer [buf n]
97 impl/UnblockingBuffer
98 impl/Buffer
99 (full? [this]
100 false)
101 (remove! [this]
102 (.pop buf))
103 (add!* [this itm]
104 (when-not (== (.-length buf) n)
105 (.unshift buf itm))
106 this)
107 (close-buf! [this])
108 cljs.core/ICounted
109 (-count [this]
110 (.-length buf)))
111
112 (defn dropping-buffer [n]
113 (DroppingBuffer. (ring-buffer n) n))
114
115 (deftype SlidingBuffer [buf n]
116 impl/UnblockingBuffer
117 impl/Buffer
118 (full? [this]
119 false)
120 (remove! [this]
121 (.pop buf))
122 (add!* [this itm]
123 (when (== (.-length buf) n)
124 (impl/remove! this))
125 (.unshift buf itm)
126 this)
127 (close-buf! [this])
128 cljs.core/ICounted
129 (-count [this]
130 (.-length buf)))
131
132 (defn sliding-buffer [n]
133 (SlidingBuffer. (ring-buffer n) n))
134
135 (defonce ^:private NO-VAL (js/Object.))
136 (defn- undelivered? [val]
137 (identical? NO-VAL val))
138
139 (deftype PromiseBuffer [^:mutable val]
140 impl/UnblockingBuffer
141 impl/Buffer
142 (full? [_]
143 false)
144 (remove! [_]
145 val)
146 (add!* [this itm]
147 (when (undelivered? val)
148 (set! val itm))
149 this)
150 (close-buf! [_]
151 (when (undelivered? val)
152 (set! val nil)))
153 cljs.core/ICounted
154 (-count [_]
155 (if (undelivered? val) 0 1)))
156
157 (defn promise-buffer []
158 (PromiseBuffer. NO-VAL))
0 ;; Copyright (c) Rich Hickey and contributors. All rights reserved.
1 ;; The use and distribution terms for this software are covered by the
2 ;; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
3 ;; which can be found in the file epl-v10.html at the root of this distribution.
4 ;; By using this software in any fashion, you are agreeing to be bound by
5 ;; the terms of this license.
6 ;; You must not remove this notice, or any other, from this software.
7
8 (ns cljs.core.async.impl.channels
9 (:require [cljs.core.async.impl.protocols :as impl]
10 [cljs.core.async.impl.dispatch :as dispatch]
11 [cljs.core.async.impl.buffers :as buffers]))
12
13
14
15 (defn box [val]
16 (reify cljs.core/IDeref
17 (-deref [_] val)))
18
19 (deftype PutBox [handler val])
20
21 (defn put-active? [box]
22 (impl/active? (.-handler box)))
23
24 (def ^:const MAX_DIRTY 64)
25
26 (defprotocol MMC
27 (abort [this]))
28
29 (deftype ManyToManyChannel [takes ^:mutable dirty-takes puts ^:mutable dirty-puts ^not-native buf ^:mutable closed add!]
30 MMC
31 (abort [this]
32 (loop []
33 (let [putter (.pop puts)]
34 (when-not (nil? putter)
35 (let [^not-native put-handler (.-handler putter)
36 val (.-val putter)]
37 (if ^boolean (impl/active? put-handler)
38 (let [put-cb (impl/commit put-handler)]
39 (dispatch/run #(put-cb true)))
40 (recur))))))
41 (.cleanup puts (constantly false))
42 (impl/close! this))
43 impl/WritePort
44 (put! [this val ^not-native handler]
45 (assert (not (nil? val)) "Can't put nil in on a channel")
46 ;; bug in CLJS compiler boolean inference - David
47 (let [^boolean closed closed]
48 (if (or closed (not ^boolean (impl/active? handler)))
49 (box (not closed))
50 (if (and buf (not (impl/full? buf)))
51 (do
52 (impl/commit handler)
53 (let [done? (reduced? (add! buf val))
54 take-cbs (loop [takers []]
55 (if (and (pos? (.-length takes)) (pos? (count buf)))
56 (let [^not-native taker (.pop takes)]
57 (if ^boolean (impl/active? taker)
58 (let [ret (impl/commit taker)
59 val (impl/remove! buf)]
60 (recur (conj takers (fn [] (ret val)))))
61 (recur takers)))
62 takers))]
63 (when done? (abort this))
64 (when (seq take-cbs)
65 (doseq [f take-cbs]
66 (dispatch/run f)))
67 (box true)))
68 (let [taker (loop []
69 (let [^not-native taker (.pop takes)]
70 (when taker
71 (if (impl/active? taker)
72 taker
73 (recur)))))]
74 (if taker
75 (let [take-cb (impl/commit taker)]
76 (impl/commit handler)
77 (dispatch/run (fn [] (take-cb val)))
78 (box true))
79 (do
80 (if (> dirty-puts MAX_DIRTY)
81 (do (set! dirty-puts 0)
82 (.cleanup puts put-active?))
83 (set! dirty-puts (inc dirty-puts)))
84 (when (impl/blockable? handler)
85 (assert (< (.-length puts) impl/MAX-QUEUE-SIZE)
86 (str "No more than " impl/MAX-QUEUE-SIZE
87 " pending puts are allowed on a single channel."
88 " Consider using a windowed buffer."))
89 (.unbounded-unshift puts (PutBox. handler val)))
90 nil)))))))
91 impl/ReadPort
92 (take! [this ^not-native handler]
93 (if (not ^boolean (impl/active? handler))
94 nil
95 (if (and (not (nil? buf)) (pos? (count buf)))
96 (do
97 (if-let [take-cb (impl/commit handler)]
98 (let [val (impl/remove! buf)
99 [done? cbs] (when (pos? (.-length puts))
100 (loop [cbs []]
101 (let [putter (.pop puts)
102 ^not-native put-handler (.-handler putter)
103 val (.-val putter)
104 cb (and ^boolean (impl/active? put-handler) (impl/commit put-handler))
105 cbs (if cb (conj cbs cb) cbs)
106 done? (when cb (reduced? (add! buf val)))]
107 (if (and (not done?) (not (impl/full? buf)) (pos? (.-length puts)))
108 (recur cbs)
109 [done? cbs]))))]
110 (when done?
111 (abort this))
112 (doseq [cb cbs]
113 (dispatch/run #(cb true)))
114 (box val))))
115 (let [putter (loop []
116 (let [putter (.pop puts)]
117 (when putter
118 (if ^boolean (impl/active? (.-handler putter))
119 putter
120 (recur)))))]
121 (if putter
122 (let [put-cb (impl/commit (.-handler putter))]
123 (impl/commit handler)
124 (dispatch/run #(put-cb true))
125 (box (.-val putter)))
126 (if closed
127 (do
128 (when buf (add! buf))
129 (if (and (impl/active? handler) (impl/commit handler))
130 (let [has-val (and buf (pos? (count buf)))]
131 (let [val (when has-val (impl/remove! buf))]
132 (box val)))
133 nil))
134 (do
135 (if (> dirty-takes MAX_DIRTY)
136 (do (set! dirty-takes 0)
137 (.cleanup takes impl/active?))
138 (set! dirty-takes (inc dirty-takes)))
139 (when (impl/blockable? handler)
140 (assert (< (.-length takes) impl/MAX-QUEUE-SIZE)
141 (str "No more than " impl/MAX-QUEUE-SIZE
142 " pending takes are allowed on a single channel."))
143 (.unbounded-unshift takes handler))
144 nil)))))))
145 impl/Channel
146 (closed? [_] closed)
147 (close! [this]
148 (if ^boolean closed
149 nil
150 (do (set! closed true)
151 (when (and buf (zero? (.-length puts)))
152 (add! buf))
153 (loop []
154 (let [^not-native taker (.pop takes)]
155 (when-not (nil? taker)
156 (when ^boolean (impl/active? taker)
157 (let [take-cb (impl/commit taker)
158 val (when (and buf (pos? (count buf))) (impl/remove! buf))]
159 (dispatch/run (fn [] (take-cb val)))))
160 (recur))))
161 (when buf (impl/close-buf! buf))
162 nil))))
163
164 (defn- ex-handler [ex]
165 (.log js/console ex)
166 nil)
167
168 (defn- handle [buf exh t]
169 (let [else ((or exh ex-handler) t)]
170 (if (nil? else)
171 buf
172 (impl/add! buf else))))
173
174 (defn chan
175 ([buf] (chan buf nil))
176 ([buf xform] (chan buf xform nil))
177 ([buf xform exh]
178 (ManyToManyChannel. (buffers/ring-buffer 32) 0 (buffers/ring-buffer 32)
179 0 buf false
180 (let [add! (if xform (xform impl/add!) impl/add!)]
181 (fn
182 ([buf]
183 (try
184 (add! buf)
185 (catch :default t
186 (handle buf exh t))))
187 ([buf val]
188 (try
189 (add! buf val)
190 (catch :default t
191 (handle buf exh t)))))))))
0 (ns cljs.core.async.impl.dispatch
1 (:require [cljs.core.async.impl.buffers :as buffers]
2 [goog.async.nextTick]))
3
4 (def tasks (buffers/ring-buffer 32))
5 (def running? false)
6 (def queued? false)
7
8 (def TASK_BATCH_SIZE 1024)
9
10 (declare queue-dispatcher)
11
12 (defn process-messages []
13 (set! running? true)
14 (set! queued? false)
15 (loop [count 0]
16 (let [m (.pop tasks)]
17 (when-not (nil? m)
18 (m)
19 (when (< count TASK_BATCH_SIZE)
20 (recur (inc count))))))
21 (set! running? false)
22 (when (> (.-length tasks) 0)
23 (queue-dispatcher)))
24
25 (defn queue-dispatcher []
26 (when-not (and queued? running?)
27 (set! queued? true)
28 (goog.async.nextTick process-messages)))
29
30 (defn run [f]
31 (.unbounded-unshift tasks f)
32 (queue-dispatcher))
33
34 (defn queue-delay [f delay]
35 (js/setTimeout f delay))
36
0 (ns cljs.core.async.impl.ioc-helpers
1 (:require [cljs.core.async.impl.protocols :as impl])
2 (:require-macros [cljs.core.async.impl.ioc-macros :as ioc]))
3
4 (def ^:const FN-IDX 0)
5 (def ^:const STATE-IDX 1)
6 (def ^:const VALUE-IDX 2)
7 (def ^:const BINDINGS-IDX 3)
8 (def ^:const EXCEPTION-FRAMES 4)
9 (def ^:const CURRENT-EXCEPTION 5)
10 (def ^:const USER-START-IDX 6)
11
12 (defn aset-object [arr idx o]
13 (aget arr idx o))
14
15 (defn aget-object [arr idx]
16 (aget arr idx))
17
18
19 (defn finished?
20 "Returns true if the machine is in a finished state"
21 [state-array]
22 (keyword-identical? (aget state-array STATE-IDX) :finished))
23
24 (defn- fn-handler
25 [f]
26 (reify
27 impl/Handler
28 (active? [_] true)
29 (blockable? [_] true)
30 (commit [_] f)))
31
32
33 (defn run-state-machine [state]
34 ((aget-object state FN-IDX) state))
35
36 (defn run-state-machine-wrapped [state]
37 (try
38 (run-state-machine state)
39 (catch js/Object ex
40 (impl/close! ^not-native (aget-object state USER-START-IDX))
41 (throw ex))))
42
43 (defn take! [state blk ^not-native c]
44 (if-let [cb (impl/take! c (fn-handler
45 (fn [x]
46 (ioc/aset-all! state VALUE-IDX x STATE-IDX blk)
47 (run-state-machine-wrapped state))))]
48 (do (ioc/aset-all! state VALUE-IDX @cb STATE-IDX blk)
49 :recur)
50 nil))
51
52 (defn put! [state blk ^not-native c val]
53 (if-let [cb (impl/put! c val (fn-handler (fn [ret-val]
54 (ioc/aset-all! state VALUE-IDX ret-val STATE-IDX blk)
55 (run-state-machine-wrapped state))))]
56 (do (ioc/aset-all! state VALUE-IDX @cb STATE-IDX blk)
57 :recur)
58 nil))
59
60 (defn return-chan [state value]
61 (let [^not-native c (aget state USER-START-IDX)]
62 (when-not (nil? value)
63 (impl/put! c value (fn-handler (fn [] nil))))
64 (impl/close! c)
65 c))
66
67 (defrecord ExceptionFrame [catch-block
68 ^Class catch-exception
69 finally-block
70 continue-block
71 prev])
72
73 (defn add-exception-frame [state catch-block catch-exception finally-block continue-block]
74 (ioc/aset-all! state
75 EXCEPTION-FRAMES
76 (->ExceptionFrame catch-block
77 catch-exception
78 finally-block
79 continue-block
80 (aget-object state EXCEPTION-FRAMES))))
81
82 (defn process-exception [state]
83 (let [exception-frame (aget-object state EXCEPTION-FRAMES)
84 catch-block (:catch-block exception-frame)
85 catch-exception (:catch-exception exception-frame)
86 exception (aget-object state CURRENT-EXCEPTION)]
87 (cond
88 (and exception
89 (not exception-frame))
90 (throw exception)
91
92 (and exception
93 catch-block
94 (or (= :default catch-exception)
95 (instance? catch-exception exception)))
96 (ioc/aset-all! state
97 STATE-IDX
98 catch-block
99 VALUE-IDX
100 exception
101 CURRENT-EXCEPTION
102 nil
103 EXCEPTION-FRAMES
104 (assoc exception-frame
105 :catch-block nil
106 :catch-exception nil))
107
108
109 (and exception
110 (not catch-block)
111 (not (:finally-block exception-frame)))
112
113 (do (ioc/aset-all! state
114 EXCEPTION-FRAMES
115 (:prev exception-frame))
116 (recur state))
117
118 (and exception
119 (not catch-block)
120 (:finally-block exception-frame))
121 (ioc/aset-all! state
122 STATE-IDX
123 (:finally-block exception-frame)
124 EXCEPTION-FRAMES
125 (assoc exception-frame
126 :finally-block nil))
127
128 (and (not exception)
129 (:finally-block exception-frame))
130 (do (ioc/aset-all! state
131 STATE-IDX
132 (:finally-block exception-frame)
133 EXCEPTION-FRAMES
134 (assoc exception-frame
135 :finally-block nil)))
136
137 (and (not exception)
138 (not (:finally-block exception-frame)))
139 (do (ioc/aset-all! state
140 STATE-IDX
141 (:continue-block exception-frame)
142 EXCEPTION-FRAMES
143 (:prev exception-frame)))
144
145 :else (throw (js/Error. "No matching clause")))))
0 ; Copyright (c) Rich Hickey. All rights reserved.
1 ; The use and distribution terms for this software are covered by the
2 ; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
3 ; which can be found in the file epl-v10.html at the root of this distribution.
4 ; By using this software in any fashion, you are agreeing to be bound by
5 ; the terms of this license.
6 ; You must not remove this notice, or any other, from this software.
7
8 ;; by Timothy Baldridge
9 ;; April 13, 2013
10
11 (ns cljs.core.async.impl.ioc-macros
12 (:refer-clojure :exclude [all])
13 (:require [clojure.pprint :refer [pprint]]
14 [clojure.set :refer (intersection)]
15 [clojure.core.async.impl.protocols :as impl]
16 [clojure.core.async.impl.dispatch :as dispatch]
17 [cljs.analyzer :as cljs])
18 (:import [java.util.concurrent.locks Lock]))
19
20 (defn debug [x]
21 (binding [*out* *err*]
22 (pprint x))
23 x)
24
25 (def ^:const FN-IDX 0)
26 (def ^:const STATE-IDX 1)
27 (def ^:const VALUE-IDX 2)
28 (def ^:const BINDINGS-IDX 3)
29 (def ^:const EXCEPTION-FRAMES 4)
30 (def ^:const CURRENT-EXCEPTION 5)
31 (def ^:const USER-START-IDX 6)
32
33 (defmacro aset-all!
34 [arr & more]
35 (assert (even? (count more)) "Must give an even number of args to aset-all!")
36 (let [bindings (partition 2 more)
37 arr-sym (gensym "statearr-")]
38 `(let [~arr-sym ~arr]
39 ~@(map
40 (fn [[idx val]]
41 `(aset ~arr-sym ~idx ~val))
42 bindings)
43 ~arr-sym)))
44
45 ;; State monad stuff, used only in SSA construction
46
47 (defmacro gen-plan
48 "Allows a user to define a state monad binding plan.
49
50 (gen-plan
51 [_ (assoc-in-plan [:foo :bar] 42)
52 val (get-in-plan [:foo :bar])]
53 val)"
54 [binds id-expr]
55 (let [binds (partition 2 binds)
56 psym (gensym "plan_")
57 forms (reduce
58 (fn [acc [id expr]]
59 (concat acc `[[~id ~psym] (~expr ~psym)]))
60 []
61 binds)]
62 `(fn [~psym]
63 (let [~@forms]
64 [~id-expr ~psym]))))
65
66 (defn get-plan
67 "Returns the final [id state] from a plan. "
68 [f]
69 (f {}))
70
71 (defn push-binding
72 "Sets the binding 'key' to value. This operation can be undone via pop-bindings.
73 Bindings are stored in the state hashmap."
74 [key value]
75 (fn [plan]
76 [nil (update-in plan [:bindings key] conj value)]))
77
78 (defn push-alter-binding
79 "Pushes the result of (apply f old-value args) as current value of binding key"
80 [key f & args]
81 (fn [plan]
82 [nil (update-in plan [:bindings key]
83 #(conj % (apply f (first %) args)))]))
84
85 (defn get-binding
86 "Gets the value of the current binding for key"
87 [key]
88 (fn [plan]
89 [(first (get-in plan [:bindings key])) plan]))
90
91 (defn pop-binding
92 "Removes the most recent binding for key"
93 [key]
94 (fn [plan]
95 [(first (get-in plan [:bindings key]))
96 (update-in plan [:bindings key] pop)]))
97
98 (defn no-op
99 "This function can be used inside a gen-plan when no operation is to be performed"
100 []
101 (fn [plan]
102 [nil plan]))
103
104 (defn all
105 "Assumes that itms is a list of state monad function results, threads the state map
106 through all of them. Returns a vector of all the results."
107 [itms]
108 (fn [plan]
109 (reduce
110 (fn [[ids plan] f]
111 (let [[id plan] (f plan)]
112 [(conj ids id) plan]))
113 [[] plan]
114 itms)))
115
116 (defn assoc-in-plan
117 "Same as assoc-in, but for state hash map"
118 [path val]
119 (fn [plan]
120 [val (assoc-in plan path val)]))
121
122 (defn update-in-plan
123 "Same as update-in, but for a state hash map"
124 [path f & args]
125 (fn [plan]
126 [nil (apply update-in plan path f args)]))
127
128 (defn get-in-plan
129 "Same as get-in, but for a state hash map"
130 [path]
131 (fn [plan]
132 [(get-in plan path) plan]))
133
134 (defn print-plan []
135 (fn [plan]
136 (pprint plan)
137 [nil plan]))
138
139 (defn set-block
140 "Sets the current block being written to by the functions. The next add-instruction call will append to this block"
141 [block-id]
142 (fn [plan]
143 [block-id (assoc plan :current-block block-id)]))
144
145 (defn get-block
146 "Gets the current block"
147 []
148 (fn [plan]
149 [(:current-block plan) plan]))
150
151 (defn add-block
152 "Adds a new block, returns its id, but does not change the current block (does not call set-block)."
153 []
154 (gen-plan
155 [_ (update-in-plan [:block-id] (fnil inc 0))
156 blk-id (get-in-plan [:block-id])
157 cur-blk (get-block)
158 _ (assoc-in-plan [:blocks blk-id] [])
159 catches (get-binding :catch)
160 _ (assoc-in-plan [:block-catches blk-id] catches)
161 _ (if-not cur-blk
162 (assoc-in-plan [:start-block] blk-id)
163 (no-op))]
164 blk-id))
165
166
167 (defn instruction? [x]
168 (::instruction (meta x)))
169
170 (defn add-instruction
171 "Appends an instruction to the current block. "
172 [inst]
173 (let [inst-id (with-meta (gensym "inst_")
174 {::instruction true})
175 inst (assoc inst :id inst-id)]
176 (gen-plan
177 [blk-id (get-block)
178 _ (update-in-plan [:blocks blk-id] (fnil conj []) inst)]
179 inst-id)))
180
181 ;;
182
183 ;; We're going to reduce Clojure expressions to a ssa format,
184 ;; and then translate the instructions for this
185 ;; virtual-virtual-machine back into Clojure data.
186
187 ;; Here we define the instructions:
188
189 (defprotocol IInstruction
190 (reads-from [this] "Returns a list of instructions this instruction reads from")
191 (writes-to [this] "Returns a list of instructions this instruction writes to")
192 (block-references [this] "Returns all the blocks this instruction references"))
193
194 (defprotocol IEmittableInstruction
195 (emit-instruction [this state-sym] "Returns the clojure code that this instruction represents"))
196
197 (defprotocol ITerminator
198 (terminator-code [this] "Returns a unique symbol for this instruction")
199 (terminate-block [this state-sym custom-terminators] "Emites the code to terminate a given block"))
200
201 (defrecord Const [value]
202 IInstruction
203 (reads-from [this] [value])
204 (writes-to [this] [(:id this)])
205 (block-references [this] [])
206 IEmittableInstruction
207 (emit-instruction [this state-sym]
208 (if (= value ::value)
209 `[~(:id this) (aget ~state-sym ~VALUE-IDX)]
210 `[~(:id this) ~value])))
211
212 (defrecord CustomTerminator [f blk values]
213 IInstruction
214 (reads-from [this] values)
215 (writes-to [this] [])
216 (block-references [this] [])
217 ITerminator
218 (terminate-block [this state-sym _]
219 `(~f ~state-sym ~blk ~@values)))
220
221 (defn- emit-clashing-binds
222 [recur-nodes ids clashes]
223 (let [temp-binds (reduce
224 (fn [acc i]
225 (assoc acc i (gensym "tmp")))
226 {} clashes)]
227 (concat
228 (mapcat (fn [i]
229 `[~(temp-binds i) ~i])
230 clashes)
231 (mapcat (fn [node id]
232 `[~node ~(get temp-binds id id)])
233 recur-nodes
234 ids))))
235
236 (defrecord Recur [recur-nodes ids]
237 IInstruction
238 (reads-from [this] ids)
239 (writes-to [this] recur-nodes)
240 (block-references [this] [])
241 IEmittableInstruction
242 (emit-instruction [this state-sym]
243 (if-let [overlap (seq (intersection (set recur-nodes) (set ids)))]
244 (emit-clashing-binds recur-nodes ids overlap)
245 (mapcat (fn [r i]
246 `[~r ~i]) recur-nodes ids))))
247
248 (defrecord Call [refs]
249 IInstruction
250 (reads-from [this] refs)
251 (writes-to [this] [(:id this)])
252 (block-references [this] [])
253 IEmittableInstruction
254 (emit-instruction [this state-sym]
255 `[~(:id this) ~(seq refs)]))
256
257 (defrecord Case [val-id test-vals jmp-blocks default-block]
258 IInstruction
259 (reads-from [this] [val-id])
260 (writes-to [this] [])
261 (block-references [this] [])
262 ITerminator
263 (terminate-block [this state-sym _]
264 `(do (case ~val-id
265 ~@(concat (mapcat (fn [test blk]
266 `[~test (aset-all! ~state-sym
267 ~STATE-IDX ~blk)])
268 test-vals jmp-blocks)
269 (when default-block
270 `[(do (aset-all! ~state-sym ~STATE-IDX ~default-block)
271 :recur)])))
272 :recur)))
273
274 (defrecord Fn [fn-expr local-names local-refs]
275 IInstruction
276 (reads-from [this] local-refs)
277 (writes-to [this] [(:id this)])
278 (block-references [this] [])
279 IEmittableInstruction
280 (emit-instruction [this state-sym]
281 `[~(:id this)
282 (let [~@(interleave local-names local-refs)]
283 ~@fn-expr)]))
284
285 (defrecord Dot [target method args]
286 IInstruction
287 (reads-from [this] `[~target ~method ~@args])
288 (writes-to [this] [(:id this)])
289 (block-references [this] [])
290 IEmittableInstruction
291 (emit-instruction [this state-sym]
292 (if (.startsWith (name method) "-")
293 `[~(:id this) (. ~target ~method)]
294 `[~(:id this) (. ~target ~(cons method args))])))
295
296 (defrecord Jmp [value block]
297 IInstruction
298 (reads-from [this] [value])
299 (writes-to [this] [])
300 (block-references [this] [block])
301 ITerminator
302 (terminate-block [this state-sym _]
303 `(do (aset-all! ~state-sym ~VALUE-IDX ~value ~STATE-IDX ~block)
304 :recur)))
305
306 (defrecord Return [value]
307 IInstruction
308 (reads-from [this] [value])
309 (writes-to [this] [])
310 (block-references [this] [])
311 ITerminator
312 (terminator-code [this] :Return)
313 (terminate-block [this state-sym custom-terminators]
314 (if-let [f (get custom-terminators (terminator-code this))]
315 `(~f ~state-sym ~value)
316 `(do (aset-all! ~state-sym
317 ~VALUE-IDX ~value
318 ~STATE-IDX :finished)
319 nil))))
320
321 (defrecord Set! [field object val]
322 IInstruction
323 (reads-from [this] [object val])
324 (writes-to [this] [(:id this)])
325 (block-references [this] [])
326 IEmittableInstruction
327 (emit-instruction [this state-sym]
328 (if field
329 `[~(:id this) (set! (~field ~object) ~val)]
330 `[~(:id this) (set! ~object ~val)])))
331
332 (defrecord CondBr [test then-block else-block]
333 IInstruction
334 (reads-from [this] [test])
335 (writes-to [this] [])
336 (block-references [this] [then-block else-block])
337 ITerminator
338 (terminate-block [this state-sym _]
339 `(do (if ~test
340 (aset-all! ~state-sym
341 ~STATE-IDX ~then-block)
342 (aset-all! ~state-sym
343 ~STATE-IDX ~else-block))
344 :recur)))
345
346
347 (defrecord Try [catch-block catch-exception finally-block continue-block]
348 IInstruction
349 (reads-from [this] [])
350 (writes-to [this] [])
351 (block-references [this] [catch-block finally-block continue-block])
352 IEmittableInstruction
353 (emit-instruction [this state-sym]
354 `[~'_ (cljs.core.async.impl.ioc-helpers/add-exception-frame ~state-sym
355 ~catch-block
356 ~catch-exception
357 ~finally-block
358 ~continue-block)]))
359
360 (defrecord ProcessExceptionWithValue [value]
361 IInstruction
362 (reads-from [this] [value])
363 (writes-to [this] [])
364 (block-references [this] [])
365 ITerminator
366 (terminate-block [this state-sym _]
367 `(do (aset-all! ~state-sym
368 ~VALUE-IDX
369 ~value)
370 (cljs.core.async.impl.ioc-helpers/process-exception ~state-sym)
371 :recur)))
372
373 (defrecord EndCatchFinally []
374 IInstruction
375 (reads-from [this] [])
376 (writes-to [this] [])
377 (block-references [this] [])
378 ITerminator
379 (terminate-block [this state-sym _]
380 `(do (cljs.core.async.impl.ioc-helpers/process-exception ~state-sym)
381 :recur)))
382
383
384
385 ;; Dispatch clojure forms based on data type
386 (defmulti -item-to-ssa (fn [x]
387 (cond
388 (symbol? x) :symbol
389 (seq? x) :list
390 (map? x) :map
391 (set? x) :set
392 (vector? x) :vector
393 :else :default)))
394
395 (defn item-to-ssa [x]
396 (-item-to-ssa x))
397
398 ;; given an sexpr, dispatch on the first item
399 (defmulti sexpr-to-ssa (fn [[x & _]]
400 x))
401
402 (defn is-special? [x]
403 (let [^clojure.lang.MultiFn mfn sexpr-to-ssa]
404 (.getMethod mfn x)))
405
406
407
408 (defn default-sexpr [args]
409 (gen-plan
410 [args-ids (all (map item-to-ssa args))
411 inst-id (add-instruction (->Call args-ids))]
412 inst-id))
413
414 (defn let-binding-to-ssa
415 [[sym bind]]
416 (gen-plan
417 [bind-id (item-to-ssa bind)
418 _ (push-alter-binding :locals assoc sym bind-id)]
419 bind-id))
420
421 (defmethod sexpr-to-ssa 'let*
422 [[_ binds & body]]
423 (let [parted (partition 2 binds)]
424 (gen-plan
425 [let-ids (all (map let-binding-to-ssa parted))
426 body-ids (all (map item-to-ssa body))
427 _ (all (map (fn [x]
428 (pop-binding :locals))
429 (range (count parted))))]
430 (last body-ids))))
431
432 (defmethod sexpr-to-ssa 'loop*
433 [[_ locals & body]]
434 (let [parted (partition 2 locals)
435 syms (map first parted)
436 inits (map second parted)]
437 (gen-plan
438 [local-val-ids (all (map ; parallel bind
439 (fn [sym init]
440 (gen-plan
441 [itm-id (item-to-ssa init)
442 _ (push-alter-binding :locals assoc sym itm-id)]
443 itm-id))
444 syms
445 inits))
446 _ (all (for [x syms]
447 (pop-binding :locals)))
448 local-ids (all (map (comp add-instruction ->Const) local-val-ids))
449 body-blk (add-block)
450 final-blk (add-block)
451 _ (add-instruction (->Jmp nil body-blk))
452
453 _ (set-block body-blk)
454 _ (push-alter-binding :locals merge (zipmap syms local-ids))
455 _ (push-binding :recur-point body-blk)
456 _ (push-binding :recur-nodes local-ids)
457
458 body-ids (all (map item-to-ssa body))
459
460 _ (pop-binding :recur-nodes)
461 _ (pop-binding :recur-point)
462 _ (pop-binding :locals)
463 _ (if (not= (last body-ids) ::terminated)
464 (add-instruction (->Jmp (last body-ids) final-blk))
465 (no-op))
466 _ (set-block final-blk)
467 ret-id (add-instruction (->Const ::value))]
468 ret-id)))
469
470 (defmethod sexpr-to-ssa 'set!
471 [[_ assignee val]]
472 (let [target (cond
473 (symbol? assignee)
474 assignee
475 (and (list? assignee)
476 (= (count assignee) 2))
477 (second assignee))
478 field (if (list? assignee)
479 (first assignee))]
480 (gen-plan
481 [locals (get-binding :locals)
482
483 target-id (if (contains? locals target)
484 (fn [p]
485 [(get locals target) p])
486 (item-to-ssa target))
487 val-id (item-to-ssa val)
488
489 ret-id (add-instruction (->Set! field target-id val-id))]
490 ret-id)))
491
492 (defmethod sexpr-to-ssa 'do
493 [[_ & body]]
494 (gen-plan
495 [ids (all (map item-to-ssa body))]
496 (last ids)))
497
498 (defmethod sexpr-to-ssa 'case
499 [[_ val & body]]
500 (let [clauses (partition 2 body)
501 default (when (odd? (count body))
502 (last body))]
503 (gen-plan
504 [end-blk (add-block)
505 start-blk (get-block)
506 clause-blocks (all (map (fn [expr]
507 (gen-plan
508 [blk-id (add-block)
509 _ (set-block blk-id)
510 expr-id (item-to-ssa expr)
511 _ (if (not= expr-id ::terminated)
512 (add-instruction (->Jmp expr-id end-blk))
513 (no-op))]
514 blk-id))
515 (map second clauses)))
516 default-block (if (odd? (count body))
517 (gen-plan
518 [blk-id (add-block)
519 _ (set-block blk-id)
520 expr-id (item-to-ssa default)
521 _ (if (not= expr-id ::terminated)
522 (add-instruction (->Jmp expr-id end-blk))
523 (no-op))]
524 blk-id)
525 (no-op))
526 _ (set-block start-blk)
527 val-id (item-to-ssa val)
528 case-id (add-instruction (->Case val-id (map first clauses) clause-blocks default-block))
529 _ (set-block end-blk)
530 ret-id (add-instruction (->Const ::value))]
531 ret-id)))
532
533 (defmethod sexpr-to-ssa 'quote
534 [expr]
535 (gen-plan
536 [ret-id (add-instruction (->Const expr))]
537 ret-id))
538
539 (defmethod sexpr-to-ssa '.
540 [[_ target method & args]]
541 (let [args (if (seq? method)
542 (next method)
543 args)
544 method (if (seq? method)
545 (first method)
546 method)]
547 (gen-plan
548 [target-id (item-to-ssa target)
549 args-ids (all (map item-to-ssa args))
550 ret-id (add-instruction (->Dot target-id method args-ids))]
551 ret-id)))
552
553 (defmethod sexpr-to-ssa 'try
554 [[_ & body]]
555 (let [finally-fn (every-pred seq? (comp (partial = 'finally) first))
556 catch-fn (every-pred seq? (comp (partial = 'catch) first))
557 finally (next (first (filter finally-fn body)))
558 body (remove finally-fn body)
559 catch (next (first (filter catch-fn body)))
560 [ex ex-bind & catch-body] catch
561 body (remove catch-fn body)]
562 (gen-plan
563 [end-blk (add-block)
564 finally-blk (if finally
565 (gen-plan
566 [cur-blk (get-block)
567 blk (add-block)
568 _ (set-block blk)
569 value-id (add-instruction (->Const ::value))
570 _ (all (map item-to-ssa finally))
571 _ (add-instruction (->EndCatchFinally))
572 _ (set-block cur-blk)]
573 blk)
574 (no-op))
575 catch-blk (if catch
576 (gen-plan
577 [cur-blk (get-block)
578 blk (add-block)
579 _ (set-block blk)
580 ex-id (add-instruction (->Const ::value))
581 _ (push-alter-binding :locals assoc ex-bind ex-id)
582 ids (all (map item-to-ssa catch-body))
583 _ (add-instruction (->ProcessExceptionWithValue (last ids)))
584 _ (pop-binding :locals)
585 _ (set-block cur-blk)
586 _ (push-alter-binding :catch (fnil conj []) [ex blk])]
587 blk)
588 (no-op))
589 body-blk (add-block)
590 _ (add-instruction (->Jmp nil body-blk))
591 _ (set-block body-blk)
592 _ (add-instruction (->Try catch-blk ex finally-blk end-blk))
593 ids (all (map item-to-ssa body))
594 _ (if catch
595 (pop-binding :catch)
596 (no-op))
597 _ (add-instruction (->ProcessExceptionWithValue (last ids)))
598 _ (set-block end-blk)
599 ret (add-instruction (->Const ::value))]
600 ret)))
601
602 (defmethod sexpr-to-ssa 'recur
603 [[_ & vals]]
604 (gen-plan
605 [val-ids (all (map item-to-ssa vals))
606 recurs (get-binding :recur-nodes)
607 _ (do (assert (= (count val-ids)
608 (count recurs))
609 "Wrong number of arguments to recur")
610 (no-op))
611 _ (add-instruction (->Recur recurs val-ids))
612
613 recur-point (get-binding :recur-point)
614 _ (add-instruction (->Jmp nil recur-point))]
615 ::terminated))
616
617 (defmethod sexpr-to-ssa 'if
618 [[_ test then else]]
619 (gen-plan
620 [test-id (item-to-ssa test)
621 then-blk (add-block)
622 else-blk (add-block)
623 final-blk (add-block)
624 _ (add-instruction (->CondBr test-id then-blk else-blk))
625
626 _ (set-block then-blk)
627 then-id (item-to-ssa then)
628 _ (if (not= then-id ::terminated)
629 (gen-plan
630 [_ (add-instruction (->Jmp then-id final-blk))]
631 then-id)
632 (no-op))
633
634 _ (set-block else-blk)
635 else-id (item-to-ssa else)
636 _ (if (not= else-id ::terminated)
637 (gen-plan
638 [_ (add-instruction (->Jmp else-id final-blk))]
639 then-id)
640 (no-op))
641
642 _ (set-block final-blk)
643 val-id (add-instruction (->Const ::value))]
644 val-id))
645
646 (defmethod sexpr-to-ssa 'fn*
647 [& fn-expr]
648 ;; For fn expressions we just want to record the expression as well
649 ;; as a list of all known renamed locals
650 (gen-plan
651 [locals (get-binding :locals)
652 fn-id (add-instruction (->Fn fn-expr (keys locals) (vals locals)))]
653 fn-id))
654
655
656 (def special-override? '#{case clojure.core/case
657 try clojure.core/try})
658
659 (defn expand [locals env form]
660 (loop [form form]
661 (if-not (seq? form)
662 form
663 (let [[s & r] form]
664 (if (symbol? s)
665 (if (or (get locals s)
666 (special-override? s))
667 form
668 (let [new-env (update-in env [:locals] merge locals)
669 expanded (cljs/macroexpand-1 new-env form)]
670 (if (= expanded form)
671 form
672 (recur expanded))))
673 form)))))
674
675 (defn terminate-custom [vals term]
676 (gen-plan
677 [blk (add-block)
678 vals (all (map item-to-ssa vals))
679 val (add-instruction (->CustomTerminator term blk vals))
680 _ (set-block blk)
681 res (add-instruction (->Const ::value))]
682 res))
683
684 (defn fixup-aliases [sym env]
685 (let [aliases (ns-aliases *ns*)]
686 (if-not (namespace sym)
687 sym
688 (if-let [ns (or (get-in env [:ns :requires-macros (symbol (namespace sym))])
689 (get-in env [:ns :requires (symbol (namespace sym))]))]
690 (symbol (name ns) (name sym))
691 sym))))
692
693 (defmethod -item-to-ssa :list
694 [lst]
695 (gen-plan
696 [env (get-binding :env)
697 locals (get-binding :locals)
698 terminators (get-binding :terminators)
699 val (let [exp (expand locals env lst)]
700 (if (seq? exp)
701 (if (symbol? (first exp))
702 (let [f (fixup-aliases (first exp) env)]
703 (cond
704 (is-special? f) (sexpr-to-ssa exp)
705 (get locals f) (default-sexpr exp)
706 (get terminators f) (terminate-custom (next exp) (get terminators f))
707 :else (default-sexpr exp)))
708 (default-sexpr exp))
709 (item-to-ssa exp)))]
710 val))
711
712 (defmethod -item-to-ssa :default
713 [x]
714 (fn [plan]
715 [x plan]))
716
717 (defmethod -item-to-ssa :symbol
718 [x]
719 (gen-plan
720 [locals (get-binding :locals)
721 inst-id (if (contains? locals x)
722 (fn [p]
723 [(locals x) p])
724 (fn [p]
725 [x p])
726 #_(add-instruction (->Const x)))]
727 inst-id))
728
729 (defmethod -item-to-ssa :map
730 [x]
731 (-item-to-ssa `(hash-map ~@(mapcat identity x))))
732
733 (defmethod -item-to-ssa :vector
734 [x]
735 (-item-to-ssa `(vector ~@x)))
736
737 (defmethod -item-to-ssa :set
738 [x]
739 (-item-to-ssa `(hash-set ~@x)))
740
741 (defn parse-to-state-machine
742 "Takes an sexpr and returns a hashmap that describes the execution flow of the sexpr as
743 a series of SSA style blocks."
744 [body env terminators]
745 (-> (gen-plan
746 [_ (push-binding :env env)
747 _ (push-binding :locals (zipmap (:locals (keys env)) (:locals (keys env))))
748 _ (push-binding :terminators terminators)
749 blk (add-block)
750 _ (set-block blk)
751 ids (all (map item-to-ssa body))
752 term-id (add-instruction (->Return (last ids)))
753 _ (pop-binding :terminators)
754 _ (pop-binding :locals)
755 _ (pop-binding :env)]
756 term-id)
757 get-plan))
758
759
760 (defn index-instruction [blk-id idx inst]
761 (let [idx (reduce
762 (fn [acc id]
763 (update-in acc [id :read-in] (fnil conj #{}) blk-id))
764 idx
765 (filter instruction? (reads-from inst)))
766 idx (reduce
767 (fn [acc id]
768 (update-in acc [id :written-in] (fnil conj #{}) blk-id))
769 idx
770 (filter instruction? (writes-to inst)))]
771 idx))
772
773 (defn index-block [idx [blk-id blk]]
774 (reduce (partial index-instruction blk-id) idx blk))
775
776 (defn index-state-machine [machine]
777 (reduce index-block {} (:blocks machine)))
778
779 (defn id-for-inst [m sym] ;; m :: symbols -> integers
780 (if-let [i (get @m sym)]
781 i
782 (let [next-idx (get @m ::next-idx)]
783 (swap! m assoc sym next-idx)
784 (swap! m assoc ::next-idx (inc next-idx))
785 next-idx)))
786
787 (defn persistent-value?
788 "Returns true if this value should be saved in the state hash map"
789 [index value]
790 (or (not= (-> index value :read-in)
791 (-> index value :written-in))
792 (-> index value :read-in count (> 1))))
793
794 (defn count-persistent-values
795 [index]
796 (->> (keys index)
797 (filter instruction?)
798 (filter (partial persistent-value? index))
799 count))
800
801 (defn- build-block-preamble [local-map idx state-sym blk]
802 (let [args (->> (mapcat reads-from blk)
803 (filter instruction?)
804 (filter (partial persistent-value? idx))
805 set
806 vec)]
807 (if (empty? args)
808 []
809 (mapcat (fn [sym]
810 `[~sym (aget ~state-sym ~(id-for-inst local-map sym))])
811 args))))
812
813 (defn- build-block-body [state-sym blk]
814 (mapcat
815 #(emit-instruction % state-sym)
816 (butlast blk)))
817
818 (defn- build-new-state [local-map idx state-sym blk]
819 (let [results (->> blk
820 (mapcat writes-to)
821 (filter instruction?)
822 (filter (partial persistent-value? idx))
823 set
824 vec)
825 results (interleave (map (partial id-for-inst local-map) results) results)]
826 (if-not (empty? results)
827 `(aset-all! ~state-sym ~@results)
828 state-sym)))
829
830 (defn- emit-state-machine [machine num-user-params custom-terminators]
831 (let [index (index-state-machine machine)
832 state-sym (with-meta (gensym "state_")
833 {:tag 'objects})
834 local-start-idx (+ num-user-params USER-START-IDX)
835 state-arr-size (+ local-start-idx (count-persistent-values index))
836 local-map (atom {::next-idx local-start-idx})
837 block-catches (:block-catches machine)
838 state-val-sym (gensym "state_val_")]
839 `(let [switch# (fn [~state-sym]
840 (let [~state-val-sym (aget ~state-sym ~STATE-IDX)]
841 (cond
842 ~@(mapcat
843 (fn [[id blk]]
844 [`(== ~state-val-sym ~id)
845 `(let [~@(concat (build-block-preamble local-map index state-sym blk)
846 (build-block-body state-sym blk))
847 ~state-sym ~(build-new-state local-map index state-sym blk)]
848 ~(terminate-block (last blk) state-sym custom-terminators))])
849 (:blocks machine)))))]
850 (fn state-machine#
851 ([] (aset-all! (make-array ~state-arr-size)
852 ~FN-IDX state-machine#
853 ~STATE-IDX ~(:start-block machine)))
854 ([~state-sym]
855 (let [ret-value# (try (loop []
856 (let [result# (switch# ~state-sym)]
857 (if (cljs.core/keyword-identical? result# :recur)
858 (recur)
859 result#)))
860 (catch js/Object ex#
861 (aset-all! ~state-sym ~CURRENT-EXCEPTION ex#)
862 (cljs.core.async.impl.ioc-helpers/process-exception ~state-sym)
863 :recur))]
864 (if (cljs.core/keyword-identical? ret-value# :recur)
865 (recur ~state-sym)
866 ret-value#)))))))
867
868
869 (def async-custom-terminators
870 {'<! 'cljs.core.async.impl.ioc-helpers/take!
871 'cljs.core.async/<! 'cljs.core.async.impl.ioc-helpers/take!
872 '>! 'cljs.core.async.impl.ioc-helpers/put!
873 'cljs.core.async/>! 'cljs.core.async.impl.ioc-helpers/put!
874 'alts! 'cljs.core.async/ioc-alts!
875 'cljs.core.async/alts! 'cljs.core.async/ioc-alts!
876 :Return 'cljs.core.async.impl.ioc-helpers/return-chan})
877
878
879 (defn state-machine [body num-user-params env user-transitions]
880 (-> (parse-to-state-machine body env user-transitions)
881 second
882 (emit-state-machine num-user-params user-transitions)))
0 ;; Copyright (c) Rich Hickey and contributors. All rights reserved.
1 ;; The use and distribution terms for this software are covered by the
2 ;; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
3 ;; which can be found in the file epl-v10.html at the root of this distribution.
4 ;; By using this software in any fashion, you are agreeing to be bound by
5 ;; the terms of this license.
6 ;; You must not remove this notice, or any other, from this software.
7
8 (ns cljs.core.async.impl.protocols)
9
10 (def ^:const MAX-QUEUE-SIZE 1024)
11
12 (defprotocol ReadPort
13 (take! [port fn1-handler] "derefable val if taken, nil if take was enqueued"))
14
15 (defprotocol WritePort
16 (put! [port val fn1-handler] "derefable boolean (false if already closed) if handled, nil if put was enqueued.
17 Must throw on nil val."))
18
19 (defprotocol Channel
20 (close! [chan])
21 (closed? [chan]))
22
23 (defprotocol Handler
24 (active? [h] "returns true if has callback. Must work w/o lock")
25 (blockable? [h] "returns true if this handler may be blocked, otherwise it must not block")
26 #_(lock-id [h] "a unique id for lock acquisition order, 0 if no lock")
27 (commit [h] "commit to fulfilling its end of the transfer, returns cb. Must be called within lock"))
28
29 (defprotocol Buffer
30 (full? [b] "returns true if buffer cannot accept put")
31 (remove! [b] "remove and return next item from buffer, called under chan mutex")
32 (add!* [b itm] "if room, add item to the buffer, returns b, called under chan mutex")
33 (close-buf! [b] "called on chan closed under chan mutex, return ignored"))
34
35 (defn add!
36 ([b] b)
37 ([b itm]
38 (assert (not (nil? itm)))
39 (add!* b itm)))
40
41 ;; Defines a buffer that will never block (return true to full?)
42 (defprotocol UnblockingBuffer)
0 ;; Copyright (c) Rich Hickey and contributors. All rights reserved.
1 ;; The use and distribution terms for this software are covered by the
2 ;; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
3 ;; which can be found in the file epl-v10.html at the root of this distribution.
4 ;; By using this software in any fashion, you are agreeing to be bound by
5 ;; the terms of this license.
6 ;; You must not remove this notice, or any other, from this software.
7
8 (ns cljs.core.async.impl.timers
9 (:require [cljs.core.async.impl.protocols :as impl]
10 [cljs.core.async.impl.channels :as channels]
11 [cljs.core.async.impl.dispatch :as dispatch]))
12
13 (def MAX_LEVEL 15) ;; 16 levels
14 (def P (/ 1 2))
15
16 (defn random-level
17 ([] (random-level 0))
18 ([level]
19 (if (and (< (.random js/Math) P)
20 (< level MAX_LEVEL))
21 (recur (inc level))
22 level)))
23
24 (deftype SkipListNode [key ^:mutable val forward]
25 ISeqable
26 (-seq [coll]
27 (list key val))
28
29 IPrintWithWriter
30 (-pr-writer [coll writer opts]
31 (pr-sequential-writer writer pr-writer "[" " " "]" opts coll)))
32
33 (defn skip-list-node
34 ([level] (skip-list-node nil nil level))
35 ([k v level]
36 (let [arr (make-array (inc level))]
37 (loop [i 0]
38 (when (< i (alength arr))
39 (aset arr i nil)
40 (recur (inc i))))
41 (SkipListNode. k v arr))))
42
43 (defn least-greater-node
44 ([x k level] (least-greater-node x k level nil))
45 ([x k level update]
46 (if-not (neg? level)
47 (let [x (loop [x x]
48 (if-let [x' (aget (.-forward x) level)]
49 (if (< (.-key x') k)
50 (recur x')
51 x)
52 x))]
53 (when-not (nil? update)
54 (aset update level x))
55 (recur x k (dec level) update))
56 x)))
57
58 (deftype SkipList [header ^:mutable level]
59 Object
60 (put [coll k v]
61 (let [update (make-array MAX_LEVEL)
62 x (least-greater-node header k level update)
63 x (aget (.-forward x) 0)]
64 (if (and (not (nil? x)) (== (.-key x) k))
65 (set! (.-val x) v)
66 (let [new-level (random-level)]
67 (when (> new-level level)
68 (loop [i (inc level)]
69 (when (<= i (inc new-level))
70 (aset update i header)
71 (recur (inc i))))
72 (set! level new-level))
73 (let [x (skip-list-node k v (make-array new-level))]
74 (loop [i 0]
75 (when (<= i level)
76 (let [links (.-forward (aget update i))]
77 (aset (.-forward x) i (aget links i))
78 (aset links i x)))))))))
79
80 (remove [coll k]
81 (let [update (make-array MAX_LEVEL)
82 x (least-greater-node header k level update)
83 x (aget (.-forward x) 0)]
84 (when (and (not (nil? x)) (== (.-key x) k))
85 (loop [i 0]
86 (when (<= i level)
87 (let [links (.-forward (aget update i))]
88 (if (identical? (aget links i) x)
89 (do
90 (aset links i (aget (.-forward x) i))
91 (recur (inc i)))
92 (recur (inc i))))))
93 (while (and (> level 0)
94 (nil? (aget (.-forward header) level)))
95 (set! level (dec level))))))
96
97 (ceilingEntry [coll k]
98 (loop [x header level level]
99 (if-not (neg? level)
100 (let [nx (loop [x x]
101 (let [x' (aget (.-forward x) level)]
102 (when-not (nil? x')
103 (if (>= (.-key x') k)
104 x'
105 (recur x')))))]
106 (if-not (nil? nx)
107 (recur nx (dec level))
108 (recur x (dec level))))
109 (when-not (identical? x header)
110 x))))
111
112 (floorEntry [coll k]
113 (loop [x header level level]
114 (if-not (neg? level)
115 (let [nx (loop [x x]
116 (let [x' (aget (.-forward x) level)]
117 (if-not (nil? x')
118 (if (> (.-key x') k)
119 x
120 (recur x'))
121 (when (zero? level)
122 x))))]
123 (if nx
124 (recur nx (dec level))
125 (recur x (dec level))))
126 (when-not (identical? x header)
127 x))))
128
129 ISeqable
130 (-seq [coll]
131 (letfn [(iter [node]
132 (lazy-seq
133 (when-not (nil? node)
134 (cons [(.-key node) (.-val node)]
135 (iter (aget (.-forward node) 0))))))]
136 (iter (aget (.-forward header) 0))))
137
138 IPrintWithWriter
139 (-pr-writer [coll writer opts]
140 (let [pr-pair (fn [keyval]
141 (pr-sequential-writer writer pr-writer "" " " "" opts keyval))]
142 (pr-sequential-writer writer pr-pair "{" ", " "}" opts coll))))
143
144 (defn skip-list []
145 (SkipList. (skip-list-node 0) 0))
146
147 (def timeouts-map (skip-list))
148
149 (def TIMEOUT_RESOLUTION_MS 10)
150
151 (defn timeout
152 "returns a channel that will close after msecs"
153 [msecs]
154 (let [timeout (+ (.valueOf (js/Date.)) msecs)
155 me (.ceilingEntry timeouts-map timeout)]
156 (or (when (and me (< (.-key me) (+ timeout TIMEOUT_RESOLUTION_MS)))
157 (.-val me))
158 (let [timeout-channel (channels/chan nil)]
159 (.put timeouts-map timeout timeout-channel)
160 (dispatch/queue-delay
161 (fn []
162 (.remove timeouts-map timeout)
163 (impl/close! timeout-channel))
164 msecs)
165 timeout-channel))))
166
0 (ns cljs.core.async.macros
1 (:require [cljs.core.async.impl.ioc-macros :as ioc]))
2
3 (defmacro go
4 "Asynchronously executes the body, returning immediately to the
5 calling thread. Additionally, any visible calls to <!, >! and alt!/alts!
6 channel operations within the body will block (if necessary) by
7 'parking' the calling thread rather than tying up an OS thread (or
8 the only JS thread when in ClojureScript). Upon completion of the
9 operation, the body will be resumed.
10
11 Returns a channel which will receive the result of the body when
12 completed"
13 [& body]
14 `(let [c# (cljs.core.async/chan 1)]
15 (cljs.core.async.impl.dispatch/run
16 (fn []
17 (let [f# ~(ioc/state-machine body 1 &env ioc/async-custom-terminators)
18 state# (-> (f#)
19 (ioc/aset-all! cljs.core.async.impl.ioc-helpers/USER-START-IDX c#))]
20 (cljs.core.async.impl.ioc-helpers/run-state-machine-wrapped state#))))
21 c#))
22
23
24 (defn do-alt [alts clauses]
25 (assert (even? (count clauses)) "unbalanced clauses")
26 (let [clauses (partition 2 clauses)
27 opt? #(keyword? (first %))
28 opts (filter opt? clauses)
29 clauses (remove opt? clauses)
30 [clauses bindings]
31 (reduce
32 (fn [[clauses bindings] [ports expr]]
33 (let [ports (if (vector? ports) ports [ports])
34 [ports bindings]
35 (reduce
36 (fn [[ports bindings] port]
37 (if (vector? port)
38 (let [[port val] port
39 gp (gensym)
40 gv (gensym)]
41 [(conj ports [gp gv]) (conj bindings [gp port] [gv val])])
42 (let [gp (gensym)]
43 [(conj ports gp) (conj bindings [gp port])])))
44 [[] bindings] ports)]
45 [(conj clauses [ports expr]) bindings]))
46 [[] []] clauses)
47 gch (gensym "ch")
48 gret (gensym "ret")]
49 `(let [~@(mapcat identity bindings)
50 [val# ~gch :as ~gret] (~alts [~@(apply concat (map first clauses))] ~@(apply concat opts))]
51 (cond
52 ~@(mapcat (fn [[ports expr]]
53 [`(or ~@(map (fn [port]
54 `(= ~gch ~(if (vector? port) (first port) port)))
55 ports))
56 (if (and (seq? expr) (vector? (first expr)))
57 `(let [~(first expr) ~gret] ~@(rest expr))
58 expr)])
59 clauses)
60 (= ~gch :default) val#))))
61
62 (defmacro alt!
63 "Makes a single choice between one of several channel operations,
64 as if by alts!, returning the value of the result expr corresponding
65 to the operation completed. Must be called inside a (go ...) block.
66
67 Each clause takes the form of:
68
69 channel-op[s] result-expr
70
71 where channel-ops is one of:
72
73 take-port - a single port to take
74 [take-port | [put-port put-val] ...] - a vector of ports as per alts!
75 :default | :priority - an option for alts!
76
77 and result-expr is either a list beginning with a vector, whereupon that
78 vector will be treated as a binding for the [val port] return of the
79 operation, else any other expression.
80
81 (alt!
82 [c t] ([val ch] (foo ch val))
83 x ([v] v)
84 [[out val]] :wrote
85 :default 42)
86
87 Each option may appear at most once. The choice and parking
88 characteristics are those of alts!."
89
90 [& clauses]
91 (do-alt 'alts! clauses))
92
93
94 (defmacro go-loop
95 "Like (go (loop ...))"
96 [bindings & body]
97 `(go (loop ~bindings ~@body)))
0 (ns cljs.core.async
1 (:refer-clojure :exclude [reduce transduce into merge map take partition partition-by])
2 (:require [cljs.core.async.impl.protocols :as impl]
3 [cljs.core.async.impl.channels :as channels]
4 [cljs.core.async.impl.buffers :as buffers]
5 [cljs.core.async.impl.timers :as timers]
6 [cljs.core.async.impl.dispatch :as dispatch]
7 [cljs.core.async.impl.ioc-helpers :as helpers])
8 (:require-macros [cljs.core.async.impl.ioc-macros :as ioc]
9 [cljs.core.async.macros :refer [go go-loop]]))
10
11 (defn- fn-handler
12 ([f] (fn-handler f true))
13 ([f blockable]
14 (reify
15 impl/Handler
16 (active? [_] true)
17 (blockable? [_] blockable)
18 (commit [_] f))))
19
20 (defn buffer
21 "Returns a fixed buffer of size n. When full, puts will block/park."
22 [n]
23 (buffers/fixed-buffer n))
24
25 (defn dropping-buffer
26 "Returns a buffer of size n. When full, puts will complete but
27 val will be dropped (no transfer)."
28 [n]
29 (buffers/dropping-buffer n))
30
31 (defn sliding-buffer
32 "Returns a buffer of size n. When full, puts will complete, and be
33 buffered, but oldest elements in buffer will be dropped (not
34 transferred)."
35 [n]
36 (buffers/sliding-buffer n))
37
38 (defn unblocking-buffer?
39 "Returns true if a channel created with buff will never block. That is to say,
40 puts into this buffer will never cause the buffer to be full. "
41 [buff]
42 (satisfies? impl/UnblockingBuffer buff))
43
44 (defn chan
45 "Creates a channel with an optional buffer, an optional transducer (like (map f),
46 (filter p) etc or a composition thereof), and an optional exception handler.
47 If buf-or-n is a number, will create and use a fixed buffer of that size. If a
48 transducer is supplied a buffer must be specified. ex-handler must be a
49 fn of one argument - if an exception occurs during transformation it will be called
50 with the thrown value as an argument, and any non-nil return value will be placed
51 in the channel."
52 ([] (chan nil))
53 ([buf-or-n] (chan buf-or-n nil nil))
54 ([buf-or-n xform] (chan buf-or-n xform nil))
55 ([buf-or-n xform ex-handler]
56 (let [buf-or-n (if (= buf-or-n 0)
57 nil
58 buf-or-n)]
59 (when xform (assert buf-or-n "buffer must be supplied when transducer is"))
60 (channels/chan (if (number? buf-or-n)
61 (buffer buf-or-n)
62 buf-or-n)
63 xform
64 ex-handler))))
65
66 (defn promise-chan
67 "Creates a promise channel with an optional transducer, and an optional
68 exception-handler. A promise channel can take exactly one value that consumers
69 will receive. Once full, puts complete but val is dropped (no transfer).
70 Consumers will block until either a value is placed in the channel or the
71 channel is closed. See chan for the semantics of xform and ex-handler."
72 ([] (promise-chan nil))
73 ([xform] (promise-chan xform nil))
74 ([xform ex-handler]
75 (chan (buffers/promise-buffer) xform ex-handler)))
76
77 (defn timeout
78 "Returns a channel that will close after msecs"
79 [msecs]
80 (timers/timeout msecs))
81
82 (defn <!
83 "takes a val from port. Must be called inside a (go ...) block. Will
84 return nil if closed. Will park if nothing is available.
85 Returns true unless port is already closed"
86 [port]
87 (throw (js/Error. "<! used not in (go ...) block")))
88
89 (defn take!
90 "Asynchronously takes a val from port, passing to fn1. Will pass nil
91 if closed. If on-caller? (default true) is true, and value is
92 immediately available, will call fn1 on calling thread.
93 Returns nil."
94 ([port fn1] (take! port fn1 true))
95 ([port fn1 on-caller?]
96 (let [ret (impl/take! port (fn-handler fn1))]
97 (when ret
98 (let [val @ret]
99 (if on-caller?
100 (fn1 val)
101 (dispatch/run #(fn1 val)))))
102 nil)))
103
104 (defn- nop [_])
105 (def ^:private fhnop (fn-handler nop))
106
107 (defn >!
108 "puts a val into port. nil values are not allowed. Must be called
109 inside a (go ...) block. Will park if no buffer space is available.
110 Returns true unless port is already closed."
111 [port val]
112 (throw (js/Error. ">! used not in (go ...) block")))
113
114 (defn put!
115 "Asynchronously puts a val into port, calling fn0 (if supplied) when
116 complete. nil values are not allowed. Will throw if closed. If
117 on-caller? (default true) is true, and the put is immediately
118 accepted, will call fn0 on calling thread. Returns nil."
119 ([port val]
120 (if-let [ret (impl/put! port val fhnop)]
121 @ret
122 true))
123 ([port val fn1] (put! port val fn1 true))
124 ([port val fn1 on-caller?]
125 (if-let [retb (impl/put! port val (fn-handler fn1))]
126 (let [ret @retb]
127 (if on-caller?
128 (fn1 ret)
129 (dispatch/run #(fn1 ret)))
130 ret)
131 true)))
132
133 (defn close!
134 ([port]
135 (impl/close! port)))
136
137
138 (defn- random-array
139 [n]
140 (let [a (make-array n)]
141 (dotimes [x n]
142 (aset a x 0))
143 (loop [i 1]
144 (if (= i n)
145 a
146 (do
147 (let [j (rand-int i)]
148 (aset a i (aget a j))
149 (aset a j i)
150 (recur (inc i))))))))
151
152 (defn- alt-flag []
153 (let [flag (atom true)]
154 (reify
155 impl/Handler
156 (active? [_] @flag)
157 (blockable? [_] true)
158 (commit [_]
159 (reset! flag nil)
160 true))))
161
162 (defn- alt-handler [flag cb]
163 (reify
164 impl/Handler
165 (active? [_] (impl/active? flag))
166 (blockable? [_] true)
167 (commit [_]
168 (impl/commit flag)
169 cb)))
170
171 (defn do-alts
172 "returns derefable [val port] if immediate, nil if enqueued"
173 [fret ports opts]
174 (let [flag (alt-flag)
175 n (count ports)
176 idxs (random-array n)
177 priority (:priority opts)
178 ret
179 (loop [i 0]
180 (when (< i n)
181 (let [idx (if priority i (aget idxs i))
182 port (nth ports idx)
183 wport (when (vector? port) (port 0))
184 vbox (if wport
185 (let [val (port 1)]
186 (impl/put! wport val (alt-handler flag #(fret [% wport]))))
187 (impl/take! port (alt-handler flag #(fret [% port]))))]
188 (if vbox
189 (channels/box [@vbox (or wport port)])
190 (recur (inc i))))))]
191 (or
192 ret
193 (when (contains? opts :default)
194 (when-let [got (and (impl/active? flag) (impl/commit flag))]
195 (channels/box [(:default opts) :default]))))))
196
197 (defn alts!
198 "Completes at most one of several channel operations. Must be called
199 inside a (go ...) block. ports is a vector of channel endpoints,
200 which can be either a channel to take from or a vector of
201 [channel-to-put-to val-to-put], in any combination. Takes will be
202 made as if by <!, and puts will be made as if by >!. Unless
203 the :priority option is true, if more than one port operation is
204 ready a non-deterministic choice will be made. If no operation is
205 ready and a :default value is supplied, [default-val :default] will
206 be returned, otherwise alts! will park until the first operation to
207 become ready completes. Returns [val port] of the completed
208 operation, where val is the value taken for takes, and a
209 boolean (true unless already closed, as per put!) for puts.
210
211 opts are passed as :key val ... Supported options:
212
213 :default val - the value to use if none of the operations are immediately ready
214 :priority true - (default nil) when true, the operations will be tried in order.
215
216 Note: there is no guarantee that the port exps or val exprs will be
217 used, nor in what order should they be, so they should not be
218 depended upon for side effects."
219
220 [ports & {:as opts}]
221 (throw (js/Error. "alts! used not in (go ...) block")))
222
223 (defn offer!
224 "Puts a val into port if it's possible to do so immediately.
225 nil values are not allowed. Never blocks. Returns true if offer succeeds."
226 [port val]
227 (let [ret (impl/put! port val (fn-handler nop false))]
228 (when ret @ret)))
229
230 (defn poll!
231 "Takes a val from port if it's possible to do so immediately.
232 Never blocks. Returns value if successful, nil otherwise."
233 [port]
234 (let [ret (impl/take! port (fn-handler nop false))]
235 (when ret @ret)))
236
237 ;;;;;;; channel ops
238
239 (defn pipe
240 "Takes elements from the from channel and supplies them to the to
241 channel. By default, the to channel will be closed when the from
242 channel closes, but can be determined by the close? parameter. Will
243 stop consuming the from channel if the to channel closes"
244
245 ([from to] (pipe from to true))
246 ([from to close?]
247 (go-loop []
248 (let [v (<! from)]
249 (if (nil? v)
250 (when close? (close! to))
251 (when (>! to v)
252 (recur)))))
253 to))
254
255 (defn- pipeline*
256 ([n to xf from close? ex-handler type]
257 (assert (pos? n))
258 (let [jobs (chan n)
259 results (chan n)
260 process (fn [[v p :as job]]
261 (if (nil? job)
262 (do (close! results) nil)
263 (let [res (chan 1 xf ex-handler)]
264 (go
265 (>! res v)
266 (close! res))
267 (put! p res)
268 true)))
269 async (fn [[v p :as job]]
270 (if (nil? job)
271 (do (close! results) nil)
272 (let [res (chan 1)]
273 (xf v res)
274 (put! p res)
275 true)))]
276 (dotimes [_ n]
277 (case type
278 :compute (go-loop []
279 (let [job (<! jobs)]
280 (when (process job)
281 (recur))))
282 :async (go-loop []
283 (let [job (<! jobs)]
284 (when (async job)
285 (recur))))))
286 (go-loop []
287 (let [v (<! from)]
288 (if (nil? v)
289 (close! jobs)
290 (let [p (chan 1)]
291 (>! jobs [v p])
292 (>! results p)
293 (recur)))))
294 (go-loop []
295 (let [p (<! results)]
296 (if (nil? p)
297 (when close? (close! to))
298 (let [res (<! p)]
299 (loop []
300 (let [v (<! res)]
301 (when (and (not (nil? v)) (>! to v))
302 (recur))))
303 (recur))))))))
304
305 (defn pipeline-async
306 "Takes elements from the from channel and supplies them to the to
307 channel, subject to the async function af, with parallelism n. af
308 must be a function of two arguments, the first an input value and
309 the second a channel on which to place the result(s). af must close!
310 the channel before returning. The presumption is that af will
311 return immediately, having launched some asynchronous operation
312 whose completion/callback will manipulate the result channel. Outputs
313 will be returned in order relative to the inputs. By default, the to
314 channel will be closed when the from channel closes, but can be
315 determined by the close? parameter. Will stop consuming the from
316 channel if the to channel closes."
317 ([n to af from] (pipeline-async n to af from true))
318 ([n to af from close?] (pipeline* n to af from close? nil :async)))
319
320 (defn pipeline
321 "Takes elements from the from channel and supplies them to the to
322 channel, subject to the transducer xf, with parallelism n. Because
323 it is parallel, the transducer will be applied independently to each
324 element, not across elements, and may produce zero or more outputs
325 per input. Outputs will be returned in order relative to the
326 inputs. By default, the to channel will be closed when the from
327 channel closes, but can be determined by the close? parameter. Will
328 stop consuming the from channel if the to channel closes.
329
330 Note this is supplied for API compatibility with the Clojure version.
331 Values of N > 1 will not result in actual concurrency in a
332 single-threaded runtime."
333 ([n to xf from] (pipeline n to xf from true))
334 ([n to xf from close?] (pipeline n to xf from close? nil))
335 ([n to xf from close? ex-handler] (pipeline* n to xf from close? ex-handler :compute)))
336
337 (defn split
338 "Takes a predicate and a source channel and returns a vector of two
339 channels, the first of which will contain the values for which the
340 predicate returned true, the second those for which it returned
341 false.
342
343 The out channels will be unbuffered by default, or two buf-or-ns can
344 be supplied. The channels will close after the source channel has
345 closed."
346 ([p ch] (split p ch nil nil))
347 ([p ch t-buf-or-n f-buf-or-n]
348 (let [tc (chan t-buf-or-n)
349 fc (chan f-buf-or-n)]
350 (go-loop []
351 (let [v (<! ch)]
352 (if (nil? v)
353 (do (close! tc) (close! fc))
354 (when (>! (if (p v) tc fc) v)
355 (recur)))))
356 [tc fc])))
357
358 (defn reduce
359 "f should be a function of 2 arguments. Returns a channel containing
360 the single result of applying f to init and the first item from the
361 channel, then applying f to that result and the 2nd item, etc. If
362 the channel closes without yielding items, returns init and f is not
363 called. ch must close before reduce produces a result."
364 [f init ch]
365 (go-loop [ret init]
366 (let [v (<! ch)]
367 (if (nil? v)
368 ret
369 (let [ret' (f ret v)]
370 (if (reduced? ret')
371 @ret'
372 (recur ret')))))))
373
374 (defn transduce
375 "async/reduces a channel with a transformation (xform f).
376 Returns a channel containing the result. ch must close before
377 transduce produces a result."
378 [xform f init ch]
379 (let [f (xform f)]
380 (go
381 (let [ret (<! (reduce f init ch))]
382 (f ret)))))
383
384 (defn onto-chan
385 "Puts the contents of coll into the supplied channel.
386
387 By default the channel will be closed after the items are copied,
388 but can be determined by the close? parameter.
389
390 Returns a channel which will close after the items are copied."
391 ([ch coll] (onto-chan ch coll true))
392 ([ch coll close?]
393 (go-loop [vs (seq coll)]
394 (if (and vs (>! ch (first vs)))
395 (recur (next vs))
396 (when close?
397 (close! ch))))))
398
399
400 (defn to-chan
401 "Creates and returns a channel which contains the contents of coll,
402 closing when exhausted."
403 [coll]
404 (let [ch (chan (bounded-count 100 coll))]
405 (onto-chan ch coll)
406 ch))
407
408
409 (defprotocol Mux
410 (muxch* [_]))
411
412 (defprotocol Mult
413 (tap* [m ch close?])
414 (untap* [m ch])
415 (untap-all* [m]))
416
417 (defn mult
418 "Creates and returns a mult(iple) of the supplied channel. Channels
419 containing copies of the channel can be created with 'tap', and
420 detached with 'untap'.
421
422 Each item is distributed to all taps in parallel and synchronously,
423 i.e. each tap must accept before the next item is distributed. Use
424 buffering/windowing to prevent slow taps from holding up the mult.
425
426 Items received when there are no taps get dropped.
427
428 If a tap puts to a closed channel, it will be removed from the mult."
429 [ch]
430 (let [cs (atom {}) ;;ch->close?
431 m (reify
432 Mux
433 (muxch* [_] ch)
434
435 Mult
436 (tap* [_ ch close?] (swap! cs assoc ch close?) nil)
437 (untap* [_ ch] (swap! cs dissoc ch) nil)
438 (untap-all* [_] (reset! cs {}) nil))
439 dchan (chan 1)
440 dctr (atom nil)
441 done (fn [_] (when (zero? (swap! dctr dec))
442 (put! dchan true)))]
443 (go-loop []
444 (let [val (<! ch)]
445 (if (nil? val)
446 (doseq [[c close?] @cs]
447 (when close? (close! c)))
448 (let [chs (keys @cs)]
449 (reset! dctr (count chs))
450 (doseq [c chs]
451 (when-not (put! c val done)
452 (done nil)
453 (untap* m c)))
454