New upstream version 1.838
gregor herrmann
5 years ago
0 | 0 | |
1 | 1 | Revision history for Perl module MCE. |
2 | ||
3 | 1.838 Wed Jan 23 08:30:00 EST 2019 | |
4 | ||
5 | * IPC update, raising reliability across multiple platforms. | |
6 | * Improved hack for the Windows platform for nested MCE sessions. | |
7 | * Added _sysread, _sysseek, _syswrite, and _nonblocking to MCE::Util. | |
8 | * Added barrier option to MCE::Queue: allows one to disable. | |
2 | 9 | |
3 | 10 | 1.837 Sat Aug 25 13:00:00 EST 2018 |
4 | 11 |
42 | 42 | }, |
43 | 43 | "requires" : { |
44 | 44 | "Carp" : "0", |
45 | "Errno" : "0", | |
45 | 46 | "Fcntl" : "0", |
46 | 47 | "File::Path" : "0", |
47 | 48 | "Getopt::Long" : "0", |
63 | 64 | "provides" : { |
64 | 65 | "MCE" : { |
65 | 66 | "file" : "lib/MCE.pm", |
66 | "version" : "1.837" | |
67 | "version" : "1.838" | |
67 | 68 | }, |
68 | 69 | "MCE::Candy" : { |
69 | 70 | "file" : "lib/MCE/Candy.pm", |
70 | "version" : "1.837" | |
71 | "version" : "1.838" | |
71 | 72 | }, |
72 | 73 | "MCE::Core::Input::Generator" : { |
73 | 74 | "file" : "lib/MCE/Core/Input/Generator.pm", |
74 | "version" : "1.837" | |
75 | "version" : "1.838" | |
75 | 76 | }, |
76 | 77 | "MCE::Core::Input::Handle" : { |
77 | 78 | "file" : "lib/MCE/Core/Input/Handle.pm", |
78 | "version" : "1.837" | |
79 | "version" : "1.838" | |
79 | 80 | }, |
80 | 81 | "MCE::Core::Input::Iterator" : { |
81 | 82 | "file" : "lib/MCE/Core/Input/Iterator.pm", |
82 | "version" : "1.837" | |
83 | "version" : "1.838" | |
83 | 84 | }, |
84 | 85 | "MCE::Core::Input::Request" : { |
85 | 86 | "file" : "lib/MCE/Core/Input/Request.pm", |
86 | "version" : "1.837" | |
87 | "version" : "1.838" | |
87 | 88 | }, |
88 | 89 | "MCE::Core::Input::Sequence" : { |
89 | 90 | "file" : "lib/MCE/Core/Input/Sequence.pm", |
90 | "version" : "1.837" | |
91 | "version" : "1.838" | |
91 | 92 | }, |
92 | 93 | "MCE::Core::Manager" : { |
93 | 94 | "file" : "lib/MCE/Core/Manager.pm", |
94 | "version" : "1.837" | |
95 | "version" : "1.838" | |
95 | 96 | }, |
96 | 97 | "MCE::Core::Validation" : { |
97 | 98 | "file" : "lib/MCE/Core/Validation.pm", |
98 | "version" : "1.837" | |
99 | "version" : "1.838" | |
99 | 100 | }, |
100 | 101 | "MCE::Core::Worker" : { |
101 | 102 | "file" : "lib/MCE/Core/Worker.pm", |
102 | "version" : "1.837" | |
103 | "version" : "1.838" | |
103 | 104 | }, |
104 | 105 | "MCE::Flow" : { |
105 | 106 | "file" : "lib/MCE/Flow.pm", |
106 | "version" : "1.837" | |
107 | "version" : "1.838" | |
107 | 108 | }, |
108 | 109 | "MCE::Grep" : { |
109 | 110 | "file" : "lib/MCE/Grep.pm", |
110 | "version" : "1.837" | |
111 | "version" : "1.838" | |
111 | 112 | }, |
112 | 113 | "MCE::Loop" : { |
113 | 114 | "file" : "lib/MCE/Loop.pm", |
114 | "version" : "1.837" | |
115 | "version" : "1.838" | |
115 | 116 | }, |
116 | 117 | "MCE::Map" : { |
117 | 118 | "file" : "lib/MCE/Map.pm", |
118 | "version" : "1.837" | |
119 | "version" : "1.838" | |
119 | 120 | }, |
120 | 121 | "MCE::Mutex" : { |
121 | 122 | "file" : "lib/MCE/Mutex.pm", |
122 | "version" : "1.837" | |
123 | "version" : "1.838" | |
123 | 124 | }, |
124 | 125 | "MCE::Mutex::Channel" : { |
125 | 126 | "file" : "lib/MCE/Mutex/Channel.pm", |
126 | "version" : "1.837" | |
127 | "version" : "1.838" | |
127 | 128 | }, |
128 | 129 | "MCE::Mutex::Flock" : { |
129 | 130 | "file" : "lib/MCE/Mutex/Flock.pm", |
130 | "version" : "1.837" | |
131 | "version" : "1.838" | |
131 | 132 | }, |
132 | 133 | "MCE::Queue" : { |
133 | 134 | "file" : "lib/MCE/Queue.pm", |
134 | "version" : "1.837" | |
135 | "version" : "1.838" | |
135 | 136 | }, |
136 | 137 | "MCE::Relay" : { |
137 | 138 | "file" : "lib/MCE/Relay.pm", |
138 | "version" : "1.837" | |
139 | "version" : "1.838" | |
139 | 140 | }, |
140 | 141 | "MCE::Signal" : { |
141 | 142 | "file" : "lib/MCE/Signal.pm", |
142 | "version" : "1.837" | |
143 | "version" : "1.838" | |
143 | 144 | }, |
144 | 145 | "MCE::Step" : { |
145 | 146 | "file" : "lib/MCE/Step.pm", |
146 | "version" : "1.837" | |
147 | "version" : "1.838" | |
147 | 148 | }, |
148 | 149 | "MCE::Stream" : { |
149 | 150 | "file" : "lib/MCE/Stream.pm", |
150 | "version" : "1.837" | |
151 | "version" : "1.838" | |
151 | 152 | }, |
152 | 153 | "MCE::Subs" : { |
153 | 154 | "file" : "lib/MCE/Subs.pm", |
154 | "version" : "1.837" | |
155 | "version" : "1.838" | |
155 | 156 | }, |
156 | 157 | "MCE::Util" : { |
157 | 158 | "file" : "lib/MCE/Util.pm", |
158 | "version" : "1.837" | |
159 | "version" : "1.838" | |
159 | 160 | } |
160 | 161 | }, |
161 | 162 | "release_status" : "stable", |
171 | 172 | "url" : "https://github.com/marioroy/mce-perl.git" |
172 | 173 | } |
173 | 174 | }, |
174 | "version" : "1.837" | |
175 | "version" : "1.838" | |
175 | 176 | } |
24 | 24 | provides: |
25 | 25 | MCE: |
26 | 26 | file: lib/MCE.pm |
27 | version: '1.837' | |
27 | version: '1.838' | |
28 | 28 | MCE::Candy: |
29 | 29 | file: lib/MCE/Candy.pm |
30 | version: '1.837' | |
30 | version: '1.838' | |
31 | 31 | MCE::Core::Input::Generator: |
32 | 32 | file: lib/MCE/Core/Input/Generator.pm |
33 | version: '1.837' | |
33 | version: '1.838' | |
34 | 34 | MCE::Core::Input::Handle: |
35 | 35 | file: lib/MCE/Core/Input/Handle.pm |
36 | version: '1.837' | |
36 | version: '1.838' | |
37 | 37 | MCE::Core::Input::Iterator: |
38 | 38 | file: lib/MCE/Core/Input/Iterator.pm |
39 | version: '1.837' | |
39 | version: '1.838' | |
40 | 40 | MCE::Core::Input::Request: |
41 | 41 | file: lib/MCE/Core/Input/Request.pm |
42 | version: '1.837' | |
42 | version: '1.838' | |
43 | 43 | MCE::Core::Input::Sequence: |
44 | 44 | file: lib/MCE/Core/Input/Sequence.pm |
45 | version: '1.837' | |
45 | version: '1.838' | |
46 | 46 | MCE::Core::Manager: |
47 | 47 | file: lib/MCE/Core/Manager.pm |
48 | version: '1.837' | |
48 | version: '1.838' | |
49 | 49 | MCE::Core::Validation: |
50 | 50 | file: lib/MCE/Core/Validation.pm |
51 | version: '1.837' | |
51 | version: '1.838' | |
52 | 52 | MCE::Core::Worker: |
53 | 53 | file: lib/MCE/Core/Worker.pm |
54 | version: '1.837' | |
54 | version: '1.838' | |
55 | 55 | MCE::Flow: |
56 | 56 | file: lib/MCE/Flow.pm |
57 | version: '1.837' | |
57 | version: '1.838' | |
58 | 58 | MCE::Grep: |
59 | 59 | file: lib/MCE/Grep.pm |
60 | version: '1.837' | |
60 | version: '1.838' | |
61 | 61 | MCE::Loop: |
62 | 62 | file: lib/MCE/Loop.pm |
63 | version: '1.837' | |
63 | version: '1.838' | |
64 | 64 | MCE::Map: |
65 | 65 | file: lib/MCE/Map.pm |
66 | version: '1.837' | |
66 | version: '1.838' | |
67 | 67 | MCE::Mutex: |
68 | 68 | file: lib/MCE/Mutex.pm |
69 | version: '1.837' | |
69 | version: '1.838' | |
70 | 70 | MCE::Mutex::Channel: |
71 | 71 | file: lib/MCE/Mutex/Channel.pm |
72 | version: '1.837' | |
72 | version: '1.838' | |
73 | 73 | MCE::Mutex::Flock: |
74 | 74 | file: lib/MCE/Mutex/Flock.pm |
75 | version: '1.837' | |
75 | version: '1.838' | |
76 | 76 | MCE::Queue: |
77 | 77 | file: lib/MCE/Queue.pm |
78 | version: '1.837' | |
78 | version: '1.838' | |
79 | 79 | MCE::Relay: |
80 | 80 | file: lib/MCE/Relay.pm |
81 | version: '1.837' | |
81 | version: '1.838' | |
82 | 82 | MCE::Signal: |
83 | 83 | file: lib/MCE/Signal.pm |
84 | version: '1.837' | |
84 | version: '1.838' | |
85 | 85 | MCE::Step: |
86 | 86 | file: lib/MCE/Step.pm |
87 | version: '1.837' | |
87 | version: '1.838' | |
88 | 88 | MCE::Stream: |
89 | 89 | file: lib/MCE/Stream.pm |
90 | version: '1.837' | |
90 | version: '1.838' | |
91 | 91 | MCE::Subs: |
92 | 92 | file: lib/MCE/Subs.pm |
93 | version: '1.837' | |
93 | version: '1.838' | |
94 | 94 | MCE::Util: |
95 | 95 | file: lib/MCE/Util.pm |
96 | version: '1.837' | |
96 | version: '1.838' | |
97 | 97 | recommends: |
98 | 98 | Sereal::Decoder: '3.015' |
99 | 99 | Sereal::Encoder: '3.015' |
100 | 100 | requires: |
101 | 101 | Carp: '0' |
102 | Errno: '0' | |
102 | 103 | Fcntl: '0' |
103 | 104 | File::Path: '0' |
104 | 105 | Getopt::Long: '0' |
119 | 120 | homepage: https://github.com/marioroy/mce-perl |
120 | 121 | license: http://dev.perl.org/licenses/ |
121 | 122 | repository: https://github.com/marioroy/mce-perl.git |
122 | version: '1.837' | |
123 | version: '1.838' |
16 | 16 | ABSTRACT => 'Many-Core Engine for Perl providing parallel processing capabilities', |
17 | 17 | AUTHOR => 'Mario E. Roy <marioeroy AT gmail DOT com>', |
18 | 18 | NAME => 'MCE', |
19 | VERSION => '1.837', | |
19 | VERSION => '1.838', | |
20 | 20 | |
21 | 21 | EXE_FILES => [ @exe_files ], |
22 | 22 | |
29 | 29 | 'strict' => 0, |
30 | 30 | 'warnings' => 0, |
31 | 31 | 'Carp' => 0, |
32 | 'Errno' => 0, | |
32 | 33 | 'Fcntl' => 0, |
33 | 34 | 'File::Path' => 0, |
34 | 35 | 'Getopt::Long' => 0, |
66 | 67 | 'provides' => { |
67 | 68 | 'MCE' => { |
68 | 69 | 'file' => 'lib/MCE.pm', |
69 | 'version' => '1.837' | |
70 | 'version' => '1.838' | |
70 | 71 | }, |
71 | 72 | 'MCE::Candy' => { |
72 | 73 | 'file' => 'lib/MCE/Candy.pm', |
73 | 'version' => '1.837' | |
74 | 'version' => '1.838' | |
74 | 75 | }, |
75 | 76 | 'MCE::Core::Input::Generator' => { |
76 | 77 | 'file' => 'lib/MCE/Core/Input/Generator.pm', |
77 | 'version' => '1.837' | |
78 | 'version' => '1.838' | |
78 | 79 | }, |
79 | 80 | 'MCE::Core::Input::Handle' => { |
80 | 81 | 'file' => 'lib/MCE/Core/Input/Handle.pm', |
81 | 'version' => '1.837' | |
82 | 'version' => '1.838' | |
82 | 83 | }, |
83 | 84 | 'MCE::Core::Input::Iterator' => { |
84 | 85 | 'file' => 'lib/MCE/Core/Input/Iterator.pm', |
85 | 'version' => '1.837' | |
86 | 'version' => '1.838' | |
86 | 87 | }, |
87 | 88 | 'MCE::Core::Input::Request' => { |
88 | 89 | 'file' => 'lib/MCE/Core/Input/Request.pm', |
89 | 'version' => '1.837' | |
90 | 'version' => '1.838' | |
90 | 91 | }, |
91 | 92 | 'MCE::Core::Input::Sequence' => { |
92 | 93 | 'file' => 'lib/MCE/Core/Input/Sequence.pm', |
93 | 'version' => '1.837' | |
94 | 'version' => '1.838' | |
94 | 95 | }, |
95 | 96 | 'MCE::Core::Manager' => { |
96 | 97 | 'file' => 'lib/MCE/Core/Manager.pm', |
97 | 'version' => '1.837' | |
98 | 'version' => '1.838' | |
98 | 99 | }, |
99 | 100 | 'MCE::Core::Validation' => { |
100 | 101 | 'file' => 'lib/MCE/Core/Validation.pm', |
101 | 'version' => '1.837' | |
102 | 'version' => '1.838' | |
102 | 103 | }, |
103 | 104 | 'MCE::Core::Worker' => { |
104 | 105 | 'file' => 'lib/MCE/Core/Worker.pm', |
105 | 'version' => '1.837' | |
106 | 'version' => '1.838' | |
106 | 107 | }, |
107 | 108 | 'MCE::Flow' => { |
108 | 109 | 'file' => 'lib/MCE/Flow.pm', |
109 | 'version' => '1.837' | |
110 | 'version' => '1.838' | |
110 | 111 | }, |
111 | 112 | 'MCE::Grep' => { |
112 | 113 | 'file' => 'lib/MCE/Grep.pm', |
113 | 'version' => '1.837' | |
114 | 'version' => '1.838' | |
114 | 115 | }, |
115 | 116 | 'MCE::Loop' => { |
116 | 117 | 'file' => 'lib/MCE/Loop.pm', |
117 | 'version' => '1.837' | |
118 | 'version' => '1.838' | |
118 | 119 | }, |
119 | 120 | 'MCE::Map' => { |
120 | 121 | 'file' => 'lib/MCE/Map.pm', |
121 | 'version' => '1.837' | |
122 | 'version' => '1.838' | |
122 | 123 | }, |
123 | 124 | 'MCE::Mutex' => { |
124 | 125 | 'file' => 'lib/MCE/Mutex.pm', |
125 | 'version' => '1.837' | |
126 | 'version' => '1.838' | |
126 | 127 | }, |
127 | 128 | 'MCE::Mutex::Channel' => { |
128 | 129 | 'file' => 'lib/MCE/Mutex/Channel.pm', |
129 | 'version' => '1.837' | |
130 | 'version' => '1.838' | |
130 | 131 | }, |
131 | 132 | 'MCE::Mutex::Flock' => { |
132 | 133 | 'file' => 'lib/MCE/Mutex/Flock.pm', |
133 | 'version' => '1.837' | |
134 | 'version' => '1.838' | |
134 | 135 | }, |
135 | 136 | 'MCE::Queue' => { |
136 | 137 | 'file' => 'lib/MCE/Queue.pm', |
137 | 'version' => '1.837' | |
138 | 'version' => '1.838' | |
138 | 139 | }, |
139 | 140 | 'MCE::Relay' => { |
140 | 141 | 'file' => 'lib/MCE/Relay.pm', |
141 | 'version' => '1.837' | |
142 | 'version' => '1.838' | |
142 | 143 | }, |
143 | 144 | 'MCE::Signal' => { |
144 | 145 | 'file' => 'lib/MCE/Signal.pm', |
145 | 'version' => '1.837' | |
146 | 'version' => '1.838' | |
146 | 147 | }, |
147 | 148 | 'MCE::Step' => { |
148 | 149 | 'file' => 'lib/MCE/Step.pm', |
149 | 'version' => '1.837' | |
150 | 'version' => '1.838' | |
150 | 151 | }, |
151 | 152 | 'MCE::Stream' => { |
152 | 153 | 'file' => 'lib/MCE/Stream.pm', |
153 | 'version' => '1.837' | |
154 | 'version' => '1.838' | |
154 | 155 | }, |
155 | 156 | 'MCE::Subs' => { |
156 | 157 | 'file' => 'lib/MCE/Subs.pm', |
157 | 'version' => '1.837' | |
158 | 'version' => '1.838' | |
158 | 159 | }, |
159 | 160 | 'MCE::Util' => { |
160 | 161 | 'file' => 'lib/MCE/Util.pm', |
161 | 'version' => '1.837' | |
162 | 'version' => '1.838' | |
162 | 163 | } |
163 | 164 | }, |
164 | 165 | 'prereqs' => { |
0 | 0 | ## Many-Core Engine for Perl |
1 | 1 | |
2 | This document describes MCE version 1.837. | |
2 | This document describes MCE version 1.838. | |
3 | 3 | |
4 | 4 | Many-Core Engine (MCE) for Perl helps enable a new level of performance by |
5 | 5 | maximizing all available cores. |
151 | 151 | bytes |
152 | 152 | constant |
153 | 153 | Carp |
154 | Errno | |
154 | 155 | Fcntl |
155 | 156 | File::Path |
156 | 157 | IO::Handle |
176 | 177 | |
177 | 178 | ### Copyright and Licensing |
178 | 179 | |
179 | Copyright (C) 2012-2018 by Mario E. Roy <marioeroy AT gmail DOT com> | |
180 | Copyright (C) 2012-2019 by Mario E. Roy <marioeroy AT gmail DOT com> | |
180 | 181 | |
181 | 182 | This program is free software; you can redistribute it and/or modify |
182 | 183 | it under the same terms as Perl itself: |
96 | 96 | ); |
97 | 97 | use Fcntl qw( O_RDONLY ); |
98 | 98 | use Scalar::Util qw( looks_like_number ); |
99 | use Errno (); | |
99 | 100 | |
100 | 101 | use MCE::Signal qw( -use_dev_shm ); |
101 | 102 | use MCE 1.5; # or later release |
721 | 722 | WRITE: { |
722 | 723 | $wrote += ( syswrite ( |
723 | 724 | $cmd_fh, $$chunk_ref, length($$chunk_ref) - $wrote, $wrote |
724 | )) || do { | |
725 | redo WRITE if ($! && $!{'EINTR'}); | |
725 | )) or do { | |
726 | redo WRITE if $! == Errno::EINTR(); | |
726 | 727 | }; |
727 | 728 | } |
728 | 729 |
10 | 10 | |
11 | 11 | no warnings qw( threads recursion uninitialized ); |
12 | 12 | |
13 | our $VERSION = '1.837'; | |
13 | our $VERSION = '1.838'; | |
14 | 14 | |
15 | 15 | our @CARP_NOT = qw( MCE ); |
16 | 16 | |
218 | 218 | |
219 | 219 | =head1 VERSION |
220 | 220 | |
221 | This document describes MCE::Candy version 1.837 | |
221 | This document describes MCE::Candy version 1.838 | |
222 | 222 | |
223 | 223 | =head1 DESCRIPTION |
224 | 224 |
14 | 14 | use strict; |
15 | 15 | use warnings; |
16 | 16 | |
17 | our $VERSION = '1.837'; | |
17 | our $VERSION = '1.838'; | |
18 | 18 | |
19 | 19 | ## Items below are folded into MCE. |
20 | 20 |
13 | 13 | use strict; |
14 | 14 | use warnings; |
15 | 15 | |
16 | our $VERSION = '1.837'; | |
16 | our $VERSION = '1.838'; | |
17 | 17 | |
18 | 18 | ## Items below are folded into MCE. |
19 | 19 | |
37 | 37 | # To minimize memory consumption, SEEK_CUR equals 1 on most platforms. |
38 | 38 | # e.g. use Fcntl qw(SEEK_CUR); |
39 | 39 | |
40 | sysseek($_[0], 0, 1); | |
40 | MCE::Util::_sysseek($_[0], 0, 1); | |
41 | 41 | } |
42 | 42 | |
43 | 43 | sub _worker_read_handle { |
70 | 70 | $_DAT_LOCK = $self->{'_mutex_'.( $self->{_wid} % 6 + 1 )}; |
71 | 71 | } |
72 | 72 | $_dat_ex = sub { |
73 | sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1 | |
73 | MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1 | |
74 | 74 | unless $_DAT_LOCK->{ $_pid }; |
75 | 75 | }; |
76 | 76 | $_dat_un = sub { |
77 | syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 | |
77 | MCE::Util::_syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 | |
78 | 78 | if $_DAT_LOCK->{ $_pid }; |
79 | 79 | }; |
80 | 80 | } |
111 | 111 | |
112 | 112 | ## Obtain the next chunk_id and offset position. |
113 | 113 | $_dat_ex->() if $_lock_chn; |
114 | ||
115 | 1 until sysread($_QUE_R_SOCK, $_next, $_que_read_size) || ($! && !$!{'EINTR'}); | |
114 | MCE::Util::_sysread($_QUE_R_SOCK, $_next, $_que_read_size); | |
116 | 115 | |
117 | 116 | ($_chunk_id, $_offset_pos) = unpack($_que_template, $_next); |
118 | 117 | |
119 | 118 | if ($_offset_pos >= $_data_size) { |
120 | 1 until syswrite ( $_QUE_W_SOCK, | |
121 | pack($_que_template, 0, $_offset_pos) | |
122 | ) || ($! && !$!{'EINTR'}); | |
123 | ||
119 | MCE::Util::_syswrite($_QUE_W_SOCK, pack($_que_template, 0, $_offset_pos)); | |
124 | 120 | $_dat_un->() if $_lock_chn; |
125 | ||
126 | 121 | close $_IN_FILE; undef $_IN_FILE; |
127 | 122 | return; |
128 | 123 | } |
176 | 171 | } |
177 | 172 | } |
178 | 173 | |
179 | 1 until syswrite ( $_QUE_W_SOCK, | |
180 | pack($_que_template, $_chunk_id, tell $_IN_FILE) | |
181 | ) || ($! && !$!{'EINTR'}); | |
182 | ||
174 | MCE::Util::_syswrite( | |
175 | $_QUE_W_SOCK, pack($_que_template, $_chunk_id, tell $_IN_FILE) | |
176 | ); | |
183 | 177 | $_dat_un->() if $_lock_chn; |
184 | 178 | } |
185 | 179 | else { # Large chunk. |
186 | 180 | local $/ = $_RS if ($/ ne $_RS); |
187 | 181 | |
188 | 182 | if ($_parallel_io && $_RS eq $LF) { |
189 | 1 until syswrite ( $_QUE_W_SOCK, | |
183 | MCE::Util::_syswrite( | |
184 | $_QUE_W_SOCK, | |
190 | 185 | pack($_que_template, $_chunk_id, $_offset_pos + $_chunk_size) |
191 | ) || ($! && !$!{'EINTR'}); | |
192 | ||
186 | ); | |
193 | 187 | $_dat_un->() if $_lock_chn; |
194 | 188 | |
195 | 189 | $_tmp_cs = $_chunk_size; |
200 | 194 | } |
201 | 195 | |
202 | 196 | if ($_proc_type == READ_FILE) { |
203 | sysseek $_IN_FILE, tell( $_IN_FILE ), 0; | |
204 | sysread $_IN_FILE, $_, $_tmp_cs, $_p; | |
205 | seek $_IN_FILE, _systell( $_IN_FILE ), 0; | |
197 | MCE::Util::_sysseek($_IN_FILE, tell( $_IN_FILE ), 0); | |
198 | MCE::Util::_sysread($_IN_FILE, $_, $_tmp_cs, $_p); | |
199 | seek $_IN_FILE, _systell($_IN_FILE), 0; | |
206 | 200 | } |
207 | 201 | else { |
208 | 202 | read $_IN_FILE, $_, $_tmp_cs, $_p; |
212 | 206 | } |
213 | 207 | else { |
214 | 208 | if ($_proc_type == READ_FILE) { |
215 | sysseek $_IN_FILE, $_offset_pos, 0; | |
216 | sysread $_IN_FILE, $_, $_chunk_size, $_p; | |
217 | seek $_IN_FILE, _systell( $_IN_FILE ), 0; | |
209 | MCE::Util::_sysseek($_IN_FILE, $_offset_pos, 0); | |
210 | MCE::Util::_sysread($_IN_FILE, $_, $_chunk_size, $_p); | |
211 | seek $_IN_FILE, _systell($_IN_FILE), 0; | |
218 | 212 | } |
219 | 213 | else { |
220 | 214 | seek $_IN_FILE, $_offset_pos, 0; |
223 | 217 | |
224 | 218 | $_ .= <$_IN_FILE>; |
225 | 219 | |
226 | 1 until syswrite ( $_QUE_W_SOCK, | |
227 | pack($_que_template, $_chunk_id, tell $_IN_FILE) | |
228 | ) || ($! && !$!{'EINTR'}); | |
229 | ||
220 | MCE::Util::_syswrite( | |
221 | $_QUE_W_SOCK, pack($_que_template, $_chunk_id, tell $_IN_FILE) | |
222 | ); | |
230 | 223 | $_dat_un->() if $_lock_chn; |
231 | 224 | } |
232 | 225 | } |
13 | 13 | use strict; |
14 | 14 | use warnings; |
15 | 15 | |
16 | our $VERSION = '1.837'; | |
16 | our $VERSION = '1.838'; | |
17 | 17 | |
18 | 18 | ## Items below are folded into MCE. |
19 | 19 | |
54 | 54 | |
55 | 55 | # inlined for performance |
56 | 56 | $_dat_ex = sub { |
57 | sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1 | |
57 | MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1 | |
58 | 58 | unless $_DAT_LOCK->{ $_pid }; |
59 | 59 | }; |
60 | 60 | $_dat_un = sub { |
61 | syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 | |
61 | MCE::Util::_syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 | |
62 | 62 | if $_DAT_LOCK->{ $_pid }; |
63 | 63 | }; |
64 | 64 | } |
13 | 13 | use strict; |
14 | 14 | use warnings; |
15 | 15 | |
16 | our $VERSION = '1.837'; | |
16 | our $VERSION = '1.838'; | |
17 | 17 | |
18 | 18 | ## Items below are folded into MCE. |
19 | 19 | |
57 | 57 | |
58 | 58 | # inlined for performance |
59 | 59 | $_dat_ex = sub { |
60 | sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1 | |
60 | MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1 | |
61 | 61 | unless $_DAT_LOCK->{ $_pid }; |
62 | 62 | }; |
63 | 63 | $_dat_un = sub { |
64 | syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 | |
64 | MCE::Util::_syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 | |
65 | 65 | if $_DAT_LOCK->{ $_pid }; |
66 | 66 | }; |
67 | 67 | } |
13 | 13 | use strict; |
14 | 14 | use warnings; |
15 | 15 | |
16 | our $VERSION = '1.837'; | |
16 | our $VERSION = '1.838'; | |
17 | 17 | |
18 | 18 | ## Items below are folded into MCE. |
19 | 19 | |
59 | 59 | $_DAT_LOCK = $self->{'_mutex_'.( $self->{_wid} % 6 + 1 )}; |
60 | 60 | } |
61 | 61 | $_dat_ex = sub { |
62 | sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1 | |
62 | MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1 | |
63 | 63 | unless $_DAT_LOCK->{ $_pid }; |
64 | 64 | }; |
65 | 65 | $_dat_un = sub { |
66 | syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 | |
66 | MCE::Util::_syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 | |
67 | 67 | if $_DAT_LOCK->{ $_pid }; |
68 | 68 | }; |
69 | 69 | } |
96 | 96 | |
97 | 97 | ## Obtain the next chunk_id and sequence number. |
98 | 98 | $_dat_ex->() if $_lock_chn; |
99 | ||
100 | 1 until sysread($_QUE_R_SOCK, $_next, $_que_read_size) || ($! && !$!{'EINTR'}); | |
99 | MCE::Util::_sysread($_QUE_R_SOCK, $_next, $_que_read_size); | |
101 | 100 | |
102 | 101 | ($_chunk_id, $_offset) = unpack($_que_template, $_next); |
103 | 102 | |
104 | 103 | if ($_offset >= $_abort) { |
105 | 1 until syswrite ( | |
106 | $_QUE_W_SOCK, pack($_que_template, 0, $_offset) | |
107 | ) || ($! && !$!{'EINTR'}); | |
108 | ||
104 | MCE::Util::_syswrite($_QUE_W_SOCK, pack($_que_template, 0, $_offset)); | |
109 | 105 | $_dat_un->() if $_lock_chn; |
110 | 106 | return; |
111 | 107 | } |
112 | 108 | |
113 | 1 until syswrite ( | |
109 | MCE::Util::_syswrite( | |
114 | 110 | $_QUE_W_SOCK, pack($_que_template, $_chunk_id + 1, $_offset + 1) |
115 | ) || ($! && !$!{'EINTR'}); | |
111 | ); | |
116 | 112 | |
117 | 113 | $_dat_un->() if $_lock_chn; |
118 | 114 | $_chunk_id++; |
13 | 13 | use strict; |
14 | 14 | use warnings; |
15 | 15 | |
16 | our $VERSION = '1.837'; | |
16 | our $VERSION = '1.838'; | |
17 | 17 | |
18 | 18 | ## no critic (BuiltinFunctions::ProhibitStringyEval) |
19 | 19 | ## no critic (TestingAndDebugging::ProhibitNoStrict) |
147 | 147 | if ($_task_id == 0 && defined $_syn_flag && $_sync_cnt) { |
148 | 148 | if ($_sync_cnt == $_total_running) { |
149 | 149 | for my $_i (1 .. $_total_running) { |
150 | 1 until syswrite($_BSB_W_SOCK, $LF) || ($! && !$!{'EINTR'}); | |
150 | MCE::Util::_syswrite($_BSB_W_SOCK, $LF); | |
151 | 151 | } |
152 | 152 | undef $_syn_flag; |
153 | 153 | } |
179 | 179 | if ($_task_id == 0 && defined $_syn_flag && $_sync_cnt) { |
180 | 180 | if ($_sync_cnt == $_total_running) { |
181 | 181 | for my $_i (1 .. $_total_running) { |
182 | 1 until syswrite($_BSB_W_SOCK, $LF) || ($! && !$!{'EINTR'}); | |
182 | MCE::Util::_syswrite($_BSB_W_SOCK, $LF); | |
183 | 183 | } |
184 | 184 | undef $_syn_flag; |
185 | 185 | } |
546 | 546 | |
547 | 547 | binmode $_sendto_fhs{$_file}; |
548 | 548 | |
549 | ## Select new FH, turn on autoflush, restore the old FH. | |
550 | 549 | if ($_flush_file) { |
551 | 550 | local $!; |
552 | # IO::Handle->autoflush not available in older Perl. | |
553 | select(( select($_sendto_fhs{$_file}), $| = 1 )[0]); | |
551 | $_sendto_fhs{$_file}->autoflush(1); | |
554 | 552 | } |
555 | 553 | } |
556 | 554 | |
575 | 573 | |
576 | 574 | binmode $_sendto_fhs{$_fd}; |
577 | 575 | |
578 | ## Select new FH, turn on autoflush, restore the old FH. | |
579 | 576 | if ($_flush_file) { |
580 | 577 | local $!; |
581 | # IO::Handle->autoflush not available in older Perl. | |
582 | select(( select($_sendto_fhs{$_fd}), $| = 1 )[0]); | |
578 | $_sendto_fhs{$_fd}->autoflush(1); | |
583 | 579 | } |
584 | 580 | } |
585 | 581 | |
602 | 598 | |
603 | 599 | if (++$_sync_cnt == $_total_running) { |
604 | 600 | for my $_i (1 .. $_total_running) { |
605 | 1 until syswrite($_BSB_W_SOCK, $LF) || ($! && !$!{'EINTR'}); | |
601 | MCE::Util::_syswrite($_BSB_W_SOCK, $LF); | |
606 | 602 | } |
607 | 603 | undef $_syn_flag; |
608 | 604 | } |
617 | 613 | : $self->{_total_running}; |
618 | 614 | |
619 | 615 | for my $_i (1 .. $_total_running) { |
620 | 1 until syswrite($_BSE_W_SOCK, $LF) || ($! && !$!{'EINTR'}); | |
616 | MCE::Util::_syswrite($_BSE_W_SOCK, $LF); | |
621 | 617 | } |
622 | 618 | } |
623 | 619 | |
625 | 621 | }, |
626 | 622 | |
627 | 623 | OUTPUT_S_IPC.$LF => sub { # Change to win32 IPC |
628 | 1 until syswrite($_DAT_R_SOCK, $LF) || ($! && !$!{'EINTR'}); | |
624 | MCE::Util::_syswrite($_DAT_R_SOCK, $LF); | |
629 | 625 | |
630 | 626 | $_win32_ipc = 1, goto _LOOP unless $_win32_ipc; |
631 | 627 | |
793 | 789 | } |
794 | 790 | |
795 | 791 | ## Autoflush STDERR-STDOUT handles if requested. |
796 | ## Make MCE_STDOUT the default handle. | |
797 | ||
798 | my $_old_hndl = select $_MCE_STDOUT; | |
799 | 792 | |
800 | 793 | { |
801 | 794 | local $!; |
802 | # IO::Handle->autoflush not available in older Perl. | |
803 | select($_MCE_STDERR), $| = 1 if ($self->{flush_stderr}); | |
804 | select($_MCE_STDOUT), $| = 1 if ($self->{flush_stdout}); | |
805 | select($_MCE_STDOUT); | |
795 | $_MCE_STDERR->autoflush(1) if $self->{flush_stderr}; | |
796 | $_MCE_STDOUT->autoflush(1) if $self->{flush_stdout}; | |
806 | 797 | } |
807 | 798 | |
808 | 799 | ## ------------------------------------------------------------------------- |
873 | 864 | |
874 | 865 | ## Wait on requests *without* timeout capability. |
875 | 866 | |
876 | elsif ($^O eq 'MSWin32' && $_win32_ipc) { | |
877 | # The normal loop hangs on Windows when processes/threads start/exit. | |
878 | # Using ioctl() properly, http://www.perlmonks.org/?node_id=780083 | |
879 | ||
880 | my $_val_bytes = "\x00\x00\x00\x00"; | |
881 | my $_ptr_bytes = unpack( 'I', pack('P', $_val_bytes) ); | |
882 | my ($_done, $_count, $_nbytes, $_start) = (0); | |
883 | ||
884 | while (!$_done) { | |
885 | $_start = time, $_count = 1; | |
886 | ||
887 | # MSWin32 FIONREAD | |
888 | IOCTL: ioctl($_DAT_R_SOCK, 0x4004667f, $_ptr_bytes); | |
889 | ||
890 | unless ($_nbytes = unpack('I', $_val_bytes)) { | |
891 | if ($_count) { | |
892 | # delay after a while to not consume a CPU core | |
893 | $_count = 0 if ++$_count % 50 == 0 && time - $_start > 0.030; | |
894 | } else { | |
895 | sleep 0.030; | |
896 | } | |
897 | goto IOCTL; | |
898 | } | |
899 | ||
900 | do { | |
901 | sysread($_DAT_R_SOCK, $_func, 8); | |
902 | $_done = 1, last() unless length($_func) == 8; | |
903 | $_DAU_R_SOCK = $_channels->[ substr($_func, -2, 2, '') ]; | |
904 | ||
905 | if (exists $_core_output_function{$_func}) { | |
906 | $_core_output_function{$_func}(); | |
907 | } elsif (exists $_plugin_function->{$_func}) { | |
908 | $_plugin_function->{$_func}(); | |
909 | } | |
910 | ||
911 | } while (($_nbytes -= 8) >= 8); | |
912 | ||
913 | last unless $self->{_total_running}; | |
914 | } | |
915 | } | |
916 | ||
917 | 867 | elsif ($^O eq 'MSWin32') { |
868 | MCE::Util::_nonblocking($_DAT_R_SOCK, 1) if $_win32_ipc; | |
869 | ||
918 | 870 | while ($self->{_total_running}) { |
919 | sysread($_DAT_R_SOCK, $_func, 8); | |
871 | MCE::Util::_sysread($_DAT_R_SOCK, $_func, 8); | |
920 | 872 | last() unless length($_func) == 8; |
921 | 873 | $_DAU_R_SOCK = $_channels->[ substr($_func, -2, 2, '') ]; |
922 | 874 | |
927 | 879 | } |
928 | 880 | } |
929 | 881 | } |
930 | ||
931 | 882 | else { |
932 | 883 | while ($self->{_total_running}) { |
933 | 884 | $_func = <$_DAT_R_SOCK>; |
960 | 911 | delete $_sendto_fhs{$_p}; |
961 | 912 | } |
962 | 913 | |
963 | ## Restore the default handle. Close MCE STDOUT/STDERR handles. | |
964 | ||
965 | select $_old_hndl; | |
914 | ## Close MCE STDOUT/STDERR handles. | |
966 | 915 | |
967 | 916 | eval q{ |
968 | 917 | close $_MCE_STDOUT if (fileno $_MCE_STDOUT > 2); |
13 | 13 | use strict; |
14 | 14 | use warnings; |
15 | 15 | |
16 | our $VERSION = '1.837'; | |
16 | our $VERSION = '1.838'; | |
17 | 17 | |
18 | 18 | ## Items below are folded into MCE. |
19 | 19 |
13 | 13 | use strict; |
14 | 14 | use warnings; |
15 | 15 | |
16 | our $VERSION = '1.837'; | |
16 | our $VERSION = '1.838'; | |
17 | 17 | |
18 | 18 | my $_has_threads = $INC{'threads.pm'} ? 1 : 0; |
19 | 19 | my $_tid = $_has_threads ? threads->tid() : 0; |
282 | 282 | # inlined for performance |
283 | 283 | $_dat_ex = sub { |
284 | 284 | my $_pid = $_has_threads ? $$ .'.'. $_tid : $$; |
285 | sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1 | |
285 | MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1 | |
286 | 286 | unless $_DAT_LOCK->{ $_pid }; |
287 | 287 | }; |
288 | 288 | $_dat_un = sub { |
289 | 289 | my $_pid = $_has_threads ? $$ .'.'. $_tid : $$; |
290 | syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 | |
290 | MCE::Util::_syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 | |
291 | 291 | if $_DAT_LOCK->{ $_pid }; |
292 | 292 | }; |
293 | 293 | } |
294 | 294 | |
295 | 295 | { |
296 | 296 | local $!; |
297 | # IO::Handle->autoflush not available in older Perl. | |
298 | select(( select(*STDERR), $| = 1 )[0]) if defined(fileno *STDERR); | |
299 | select(( select(*STDOUT), $| = 1 )[0]) if defined(fileno *STDOUT); | |
297 | (*STDERR)->autoflush(1) if defined( fileno *STDERR ); | |
298 | (*STDOUT)->autoflush(1) if defined( fileno *STDOUT ); | |
300 | 299 | } |
301 | 300 | |
302 | 301 | return; |
585 | 584 | _worker_do($self, {}), next if ($_response eq "_data\n"); |
586 | 585 | |
587 | 586 | ## Wait here until MCE completes job submission to all workers. |
588 | 1 until sysread($self->{_bse_r_sock}, my($_b), 1) || ($! && !$!{'EINTR'}); | |
587 | MCE::Util::_sysread($self->{_bse_r_sock}, my($_b), 1); | |
589 | 588 | |
590 | 589 | ## Normal request. |
591 | 590 | if (defined $_job_delay && $_job_delay > 0.0) { |
4 | 4 | |
5 | 5 | =head1 VERSION |
6 | 6 | |
7 | This document describes MCE::Core version 1.837 | |
7 | This document describes MCE::Core version 1.838 | |
8 | 8 | |
9 | 9 | =head1 SYNOPSIS |
10 | 10 |
4 | 4 | |
5 | 5 | =head1 VERSION |
6 | 6 | |
7 | This document describes MCE::Examples version 1.837 | |
7 | This document describes MCE::Examples version 1.838 | |
8 | 8 | |
9 | 9 | =head1 INCLUDED WITH THE DISTRIBUTION |
10 | 10 |
10 | 10 | |
11 | 11 | no warnings qw( threads recursion uninitialized ); |
12 | 12 | |
13 | our $VERSION = '1.837'; | |
13 | our $VERSION = '1.838'; | |
14 | 14 | |
15 | 15 | ## no critic (BuiltinFunctions::ProhibitStringyEval) |
16 | 16 | ## no critic (Subroutines::ProhibitSubroutinePrototypes) |
479 | 479 | |
480 | 480 | =head1 VERSION |
481 | 481 | |
482 | This document describes MCE::Flow version 1.837 | |
482 | This document describes MCE::Flow version 1.838 | |
483 | 483 | |
484 | 484 | =head1 DESCRIPTION |
485 | 485 |
10 | 10 | |
11 | 11 | no warnings qw( threads recursion uninitialized ); |
12 | 12 | |
13 | our $VERSION = '1.837'; | |
13 | our $VERSION = '1.838'; | |
14 | 14 | |
15 | 15 | ## no critic (BuiltinFunctions::ProhibitStringyEval) |
16 | 16 | ## no critic (Subroutines::ProhibitSubroutinePrototypes) |
434 | 434 | |
435 | 435 | =head1 VERSION |
436 | 436 | |
437 | This document describes MCE::Grep version 1.837 | |
437 | This document describes MCE::Grep version 1.838 | |
438 | 438 | |
439 | 439 | =head1 SYNOPSIS |
440 | 440 |
10 | 10 | |
11 | 11 | no warnings qw( threads recursion uninitialized ); |
12 | 12 | |
13 | our $VERSION = '1.837'; | |
13 | our $VERSION = '1.838'; | |
14 | 14 | |
15 | 15 | ## no critic (BuiltinFunctions::ProhibitStringyEval) |
16 | 16 | ## no critic (Subroutines::ProhibitSubroutinePrototypes) |
349 | 349 | |
350 | 350 | =head1 VERSION |
351 | 351 | |
352 | This document describes MCE::Loop version 1.837 | |
352 | This document describes MCE::Loop version 1.838 | |
353 | 353 | |
354 | 354 | =head1 DESCRIPTION |
355 | 355 |
10 | 10 | |
11 | 11 | no warnings qw( threads recursion uninitialized ); |
12 | 12 | |
13 | our $VERSION = '1.837'; | |
13 | our $VERSION = '1.838'; | |
14 | 14 | |
15 | 15 | ## no critic (BuiltinFunctions::ProhibitStringyEval) |
16 | 16 | ## no critic (Subroutines::ProhibitSubroutinePrototypes) |
434 | 434 | |
435 | 435 | =head1 VERSION |
436 | 436 | |
437 | This document describes MCE::Map version 1.837 | |
437 | This document describes MCE::Map version 1.838 | |
438 | 438 | |
439 | 439 | =head1 SYNOPSIS |
440 | 440 |
10 | 10 | |
11 | 11 | no warnings qw( threads recursion uninitialized once ); |
12 | 12 | |
13 | our $VERSION = '1.837'; | |
13 | our $VERSION = '1.838'; | |
14 | 14 | |
15 | 15 | use base 'MCE::Mutex'; |
16 | 16 | use Scalar::Util qw(refaddr weaken); |
28 | 28 | sub DESTROY { |
29 | 29 | my ($pid, $obj) = ($has_threads ? $$ .'.'. $tid : $$, @_); |
30 | 30 | |
31 | syswrite($obj->{_w_sock}, '0'), $obj->{ $pid } = 0 if $obj->{ $pid }; | |
31 | MCE::Util::_syswrite($obj->{_w_sock}, '0'), $obj->{ $pid } = 0 | |
32 | if $obj->{ $pid }; | |
32 | 33 | |
33 | 34 | if ($obj->{'_init_pid'} eq $pid) { |
34 | 35 | my $addr = refaddr $obj; |
62 | 63 | ? MCE::Util::_pipe_pair(\%obj, qw(_r_sock _w_sock)) |
63 | 64 | : MCE::Util::_sock_pair(\%obj, qw(_r_sock _w_sock)); |
64 | 65 | |
65 | 1 until syswrite($obj{_w_sock}, '0') || ($! && !$!{'EINTR'}); | |
66 | MCE::Util::_syswrite($obj{_w_sock}, '0'); | |
66 | 67 | |
67 | 68 | if (caller !~ /^MCE:?/ || caller(1) !~ /^MCE:?/) { |
68 | 69 | push(@MUTEX, \%obj); weaken($MUTEX[-1]); |
74 | 75 | sub lock { |
75 | 76 | my ($pid, $obj) = ($has_threads ? $$ .'.'. $tid : $$, @_); |
76 | 77 | |
77 | sysread($obj->{_r_sock}, my($b), 1), $obj->{ $pid } = 1 | |
78 | MCE::Util::_sysread($obj->{_r_sock}, my($b), 1), $obj->{ $pid } = 1 | |
78 | 79 | unless $obj->{ $pid }; |
79 | 80 | |
80 | 81 | return; |
86 | 87 | sub unlock { |
87 | 88 | my ($pid, $obj) = ($has_threads ? $$ .'.'. $tid : $$, @_); |
88 | 89 | |
89 | syswrite($obj->{_w_sock}, '0'), $obj->{ $pid } = 0 | |
90 | MCE::Util::_syswrite($obj->{_w_sock}, '0'), $obj->{ $pid } = 0 | |
90 | 91 | if $obj->{ $pid }; |
91 | 92 | |
92 | 93 | return; |
99 | 100 | return unless ref($code) eq 'CODE'; |
100 | 101 | |
101 | 102 | # lock, run, unlock - inlined for performance |
102 | sysread($obj->{_r_sock}, my($b), 1), $obj->{ $pid } = 1 | |
103 | MCE::Util::_sysread($obj->{_r_sock}, my($b), 1), $obj->{ $pid } = 1 | |
103 | 104 | unless $obj->{ $pid }; |
104 | 105 | |
105 | 106 | (defined wantarray) |
106 | 107 | ? @ret = wantarray ? $code->(@_) : scalar $code->(@_) |
107 | 108 | : $code->(@_); |
108 | 109 | |
109 | syswrite($obj->{_w_sock}, '0'), $obj->{ $pid } = 0; | |
110 | MCE::Util::_syswrite($obj->{_w_sock}, '0'), $obj->{ $pid } = 0; | |
110 | 111 | |
111 | 112 | return wantarray ? @ret : $ret[-1]; |
112 | 113 | } |
129 | 130 | |
130 | 131 | =head1 VERSION |
131 | 132 | |
132 | This document describes MCE::Mutex::Channel version 1.837 | |
133 | This document describes MCE::Mutex::Channel version 1.838 | |
133 | 134 | |
134 | 135 | =head1 DESCRIPTION |
135 | 136 |
10 | 10 | |
11 | 11 | no warnings qw( threads recursion uninitialized once ); |
12 | 12 | |
13 | our $VERSION = '1.837'; | |
13 | our $VERSION = '1.838'; | |
14 | 14 | |
15 | 15 | use base 'MCE::Mutex'; |
16 | 16 | use Fcntl ':flock'; |
185 | 185 | |
186 | 186 | =head1 VERSION |
187 | 187 | |
188 | This document describes MCE::Mutex::Flock version 1.837 | |
188 | This document describes MCE::Mutex::Flock version 1.838 | |
189 | 189 | |
190 | 190 | =head1 DESCRIPTION |
191 | 191 |
10 | 10 | |
11 | 11 | no warnings qw( threads recursion uninitialized ); |
12 | 12 | |
13 | our $VERSION = '1.837'; | |
13 | our $VERSION = '1.838'; | |
14 | 14 | |
15 | 15 | ## no critic (BuiltinFunctions::ProhibitStringyEval) |
16 | 16 | ## no critic (TestingAndDebugging::ProhibitNoStrict) |
17 | 17 | |
18 | 18 | use Carp (); |
19 | 19 | |
20 | use MCE::Mutex::Channel (); | |
21 | ||
22 | 20 | sub new { |
23 | 21 | my ($class, %argv) = @_; |
24 | 22 | |
27 | 25 | |
28 | 26 | $pkg = ucfirst( lc $pkg ); |
29 | 27 | |
30 | if ($INC{"MCE/Mutex/$pkg.pm"} || eval "require MCE::Mutex::$pkg; 1") { | |
28 | if (eval "require MCE::Mutex::$pkg; 1") { | |
31 | 29 | no strict 'refs'; $pkg = 'MCE::Mutex::'.$pkg; |
32 | ||
33 | 30 | return $pkg->new(%argv); |
34 | 31 | } |
35 | 32 | |
70 | 67 | |
71 | 68 | =head1 VERSION |
72 | 69 | |
73 | This document describes MCE::Mutex version 1.837 | |
70 | This document describes MCE::Mutex version 1.838 | |
74 | 71 | |
75 | 72 | =head1 SYNOPSIS |
76 | 73 |
10 | 10 | |
11 | 11 | no warnings qw( threads recursion uninitialized ); |
12 | 12 | |
13 | our $VERSION = '1.837'; | |
13 | our $VERSION = '1.838'; | |
14 | 14 | |
15 | 15 | ## no critic (Subroutines::ProhibitExplicitReturnUndef) |
16 | 16 | ## no critic (TestingAndDebugging::ProhibitNoStrict) |
117 | 117 | my $_tid = $_has_threads ? threads->tid() : 0; |
118 | 118 | |
119 | 119 | my %_valid_fields_new = map { $_ => 1 } qw( |
120 | await fast gather porder queue type | |
120 | await barrier fast gather porder queue type | |
121 | 121 | ); |
122 | 122 | |
123 | 123 | my $_all = {}; |
171 | 171 | $_Q->{_heap} = []; # Priority heap [ pN, p2, p1 ] in heap order |
172 | 172 | # fyi, _datp will always dequeue before _datq |
173 | 173 | |
174 | $_Q->{_await} = (exists $_argv{await} && defined $_argv{await}) | |
174 | $_Q->{_await} = (defined $_argv{await}) | |
175 | 175 | ? $_argv{await} : $_def->{$_pkg}{AWAIT} || 0; |
176 | $_Q->{_fast} = (exists $_argv{fast} && defined $_argv{fast}) | |
176 | $_Q->{_fast} = (defined $_argv{fast}) | |
177 | 177 | ? $_argv{fast} : $_def->{$_pkg}{FAST} || 0; |
178 | 178 | |
179 | $_Q->{_porder} = (exists $_argv{porder} && defined $_argv{porder}) | |
179 | $_Q->{_porder} = (defined $_argv{porder}) | |
180 | 180 | ? $_argv{porder} : $_def->{$_pkg}{PORDER} || $HIGHEST; |
181 | $_Q->{_type} = (exists $_argv{type} && defined $_argv{type}) | |
181 | $_Q->{_type} = (defined $_argv{type}) | |
182 | 182 | ? $_argv{type} : $_def->{$_pkg}{TYPE} || $FIFO; |
183 | 183 | |
184 | 184 | ## ------------------------------------------------------------------------- |
206 | 206 | $_Q->{_id} = ++$_qid; $_all->{$_qid} = $_Q; |
207 | 207 | $_Q->{_dsem} = 0 if $_Q->{_fast}; |
208 | 208 | |
209 | if ($^O ne 'MSWin32' && $_tid == 0 && !$_Q->{_fast}) { | |
209 | my $_barrier = defined $_argv{barrier} ? $_argv{barrier} : 1; | |
210 | ||
211 | if ($^O ne 'MSWin32' && $_tid == 0 && !$_Q->{_fast} && $_barrier) { | |
210 | 212 | if (caller() !~ /^MCE::/) { |
211 | 213 | for my $_i (0 .. MUTEX_LOCKS - 1) { |
212 | 214 | $_Q->{'_mutex_'.$_i} = MCE::Mutex->new( impl => 'Channel' ); |
218 | 220 | MCE::Util::_sock_pair($_Q, qw(_ar_sock _aw_sock)) if $_Q->{_await}; |
219 | 221 | |
220 | 222 | if (exists $_argv{queue} && scalar @{ $_argv{queue} }) { |
221 | 1 until syswrite($_Q->{_qw_sock}, $LF) || ($! && !$!{'EINTR'}); | |
223 | MCE::Util::_syswrite($_Q->{_qw_sock}, $LF); | |
222 | 224 | } |
223 | 225 | |
224 | 226 | return $_Q; |
404 | 406 | $_Q->{_tsem} = $_t; |
405 | 407 | |
406 | 408 | if ($_Q->pending() <= $_t) { |
407 | 1 until syswrite($_Q->{_aw_sock}, $LF) || ($! && !$!{'EINTR'}); | |
409 | MCE::Util::_syswrite($_Q->{_aw_sock}, $LF); | |
408 | 410 | } else { |
409 | 411 | $_Q->{_asem} += 1; |
410 | 412 | } |
457 | 459 | return; |
458 | 460 | } |
459 | 461 | if (!$_Q->{_nb_flag} && !$_Q->_has_data()) { |
460 | 1 until syswrite($_Q->{_qw_sock}, $LF) || ($! && !$!{'EINTR'}); | |
462 | MCE::Util::_syswrite($_Q->{_qw_sock}, $LF); | |
461 | 463 | } |
462 | 464 | push @{ $_Q->{_datq} }, @{ $_MCE->{thaw}($_buf) }; |
463 | 465 | } |
501 | 503 | return; |
502 | 504 | } |
503 | 505 | if (!$_Q->{_nb_flag} && !$_Q->_has_data()) { |
504 | 1 until syswrite($_Q->{_qw_sock}, $LF) || ($! && !$!{'EINTR'}); | |
506 | MCE::Util::_syswrite($_Q->{_qw_sock}, $LF); | |
505 | 507 | } |
506 | 508 | push @{ $_Q->{_datq} }, $_; |
507 | 509 | } |
561 | 563 | if ($_pending) { |
562 | 564 | $_pending = MAX_DQ_DEPTH if ($_pending > MAX_DQ_DEPTH); |
563 | 565 | for my $_i (1 .. $_pending) { |
564 | 1 until syswrite($_Q->{_qw_sock}, $LF) || ($! && !$!{'EINTR'}); | |
566 | MCE::Util::_syswrite($_Q->{_qw_sock}, $LF); | |
565 | 567 | } |
566 | 568 | } |
567 | 569 | $_Q->{_dsem} = $_pending; |
572 | 574 | } |
573 | 575 | else { |
574 | 576 | ## Otherwise, never to exceed one byte in the channel |
575 | if ($_Q->_has_data()) { | |
576 | 1 until syswrite($_Q->{_qw_sock}, $LF) || ($! && !$!{'EINTR'}); | |
577 | } | |
577 | MCE::Util::_syswrite($_Q->{_qw_sock}, $LF) if $_Q->_has_data(); | |
578 | 578 | } |
579 | 579 | |
580 | 580 | if (exists $_Q->{_ended} && !$_Q->_has_data()) { |
581 | 1 until syswrite($_Q->{_qw_sock}, $LF) || ($! && !$!{'EINTR'}); | |
581 | MCE::Util::_syswrite($_Q->{_qw_sock}, $LF); | |
582 | 582 | } |
583 | 583 | |
584 | 584 | if ($_cnt) { |
599 | 599 | |
600 | 600 | if ($_Q->{_await} && $_Q->{_asem} && $_Q->pending() <= $_Q->{_tsem}) { |
601 | 601 | for my $_i (1 .. $_Q->{_asem}) { |
602 | 1 until syswrite($_Q->{_aw_sock}, $LF) || ($! && !$!{'EINTR'}); | |
602 | MCE::Util::_syswrite($_Q->{_aw_sock}, $LF); | |
603 | 603 | } |
604 | 604 | $_Q->{_asem} = 0; |
605 | 605 | } |
618 | 618 | $_Q = $_all->{$_id}; |
619 | 619 | |
620 | 620 | if (!$_Q->{_nb_flag} && $_Q->_has_data()) { |
621 | 1 until sysread($_Q->{_qr_sock}, my($_b), 1) || ($! && !$!{'EINTR'}); | |
621 | MCE::Util::_sysread($_Q->{_qr_sock}, my($_b), 1); | |
622 | 622 | } |
623 | 623 | |
624 | 624 | if ($_cnt == 1) { |
659 | 659 | |
660 | 660 | if ($_Q->{_await} && $_Q->{_asem} && $_Q->pending() <= $_Q->{_tsem}) { |
661 | 661 | for my $_i (1 .. $_Q->{_asem}) { |
662 | 1 until syswrite($_Q->{_aw_sock}, $LF) || ($! && !$!{'EINTR'}); | |
662 | MCE::Util::_syswrite($_Q->{_aw_sock}, $LF); | |
663 | 663 | } |
664 | 664 | $_Q->{_asem} = 0; |
665 | 665 | } |
844 | 844 | sub _mce_m_clear { |
845 | 845 | my ($_Q) = @_; |
846 | 846 | |
847 | if ($_Q->{_fast}) { | |
848 | warn "Queue: (clear) is not allowed for fast => 1\n"; | |
849 | } | |
850 | else { | |
851 | if ($_Q->_has_data()) { | |
852 | 1 until sysread($_Q->{_qr_sock}, my($_buf), 1) || ($! && !$!{'EINTR'}); | |
853 | } | |
854 | %{ $_Q->{_datp} } = (); | |
855 | @{ $_Q->{_datq} } = (); | |
856 | @{ $_Q->{_heap} } = (); | |
857 | } | |
847 | %{ $_Q->{_datp} } = (); | |
848 | @{ $_Q->{_datq} } = (); | |
849 | @{ $_Q->{_heap} } = (); | |
858 | 850 | |
859 | 851 | return; |
860 | 852 | } |
865 | 857 | my ($_Q) = @_; |
866 | 858 | |
867 | 859 | if (!exists $_Q->{_ended}) { |
868 | if (!$_Q->{_nb_flag}) { | |
869 | 1 until syswrite($_Q->{_qw_sock}, $LF) || ($! && !$!{'EINTR'}); | |
870 | } | |
860 | MCE::Util::_syswrite($_Q->{_qw_sock}, $LF) unless $_Q->{_nb_flag}; | |
871 | 861 | $_Q->{_ended} = undef; |
872 | 862 | } |
873 | 863 | |
886 | 876 | return; |
887 | 877 | } |
888 | 878 | if (!$_Q->{_nb_flag} && !$_Q->_has_data()) { |
889 | 1 until syswrite($_Q->{_qw_sock}, $LF) || ($! && !$!{'EINTR'}); | |
879 | MCE::Util::_syswrite($_Q->{_qw_sock}, $LF); | |
890 | 880 | } |
891 | 881 | |
892 | 882 | ## Append item(s) into the queue. |
910 | 900 | return; |
911 | 901 | } |
912 | 902 | if (!$_Q->{_nb_flag} && !$_Q->_has_data()) { |
913 | 1 until syswrite($_Q->{_qw_sock}, $LF) || ($! && !$!{'EINTR'}); | |
903 | MCE::Util::_syswrite($_Q->{_qw_sock}, $LF); | |
914 | 904 | } |
915 | 905 | |
916 | 906 | $_Q->_enqueuep($_p, @_); |
925 | 915 | my ($_Q, $_cnt) = @_; |
926 | 916 | my (@_items, $_buf, $_next, $_pending); |
927 | 917 | |
928 | 1 until sysread($_Q->{_qr_sock}, $_next, 1) || ($! && !$!{'EINTR'}); | |
918 | MCE::Util::_sysread($_Q->{_qr_sock}, $_next, 1); | |
929 | 919 | |
930 | 920 | if (defined $_cnt && $_cnt ne '1') { |
931 | 921 | _croak('Queue: (dequeue count argument) is not valid') |
954 | 944 | if ($_pending) { |
955 | 945 | $_pending = MAX_DQ_DEPTH if ($_pending > MAX_DQ_DEPTH); |
956 | 946 | for my $_i (1 .. $_pending) { |
957 | 1 until syswrite($_Q->{_qw_sock}, $LF) || ($! && !$!{'EINTR'}); | |
947 | MCE::Util::_syswrite($_Q->{_qw_sock}, $LF); | |
958 | 948 | } |
959 | 949 | } |
960 | 950 | $_Q->{_dsem} = $_pending; |
965 | 955 | } |
966 | 956 | else { |
967 | 957 | ## Otherwise, never to exceed one byte in the channel |
968 | if ($_Q->_has_data()) { | |
969 | 1 until syswrite($_Q->{_qw_sock}, $LF) || ($! && !$!{'EINTR'}); | |
970 | } | |
958 | MCE::Util::_syswrite($_Q->{_qw_sock}, $LF) if $_Q->_has_data(); | |
971 | 959 | } |
972 | 960 | |
973 | 961 | if (exists $_Q->{_ended} && !$_Q->_has_data()) { |
974 | 1 until syswrite($_Q->{_qw_sock}, $LF) || ($! && !$!{'EINTR'}); | |
962 | MCE::Util::_syswrite($_Q->{_qw_sock}, $LF); | |
975 | 963 | } |
976 | 964 | |
977 | 965 | $_Q->{_nb_flag} = 0; |
992 | 980 | } |
993 | 981 | |
994 | 982 | if (!$_Q->{_nb_flag} && $_Q->_has_data()) { |
995 | 1 until sysread($_Q->{_qr_sock}, my($_b), 1) || ($! && !$!{'EINTR'}); | |
983 | MCE::Util::_sysread($_Q->{_qr_sock}, my($_b), 1); | |
996 | 984 | } |
997 | 985 | |
998 | 986 | if (defined $_cnt && $_cnt ne '1') { |
1051 | 1039 | return; |
1052 | 1040 | } |
1053 | 1041 | if (!$_Q->{_nb_flag} && !$_Q->_has_data()) { |
1054 | 1 until syswrite($_Q->{_qw_sock}, $LF) || ($! && !$!{'EINTR'}); | |
1042 | MCE::Util::_syswrite($_Q->{_qw_sock}, $LF); | |
1055 | 1043 | } |
1056 | 1044 | |
1057 | 1045 | if (abs($_i) > scalar @{ $_Q->{_datq} }) { |
1099 | 1087 | return; |
1100 | 1088 | } |
1101 | 1089 | if (!$_Q->{_nb_flag} && !$_Q->_has_data()) { |
1102 | 1 until syswrite($_Q->{_qw_sock}, $LF) || ($! && !$!{'EINTR'}); | |
1090 | MCE::Util::_syswrite($_Q->{_qw_sock}, $LF); | |
1103 | 1091 | } |
1104 | 1092 | |
1105 | 1093 | if (exists $_Q->{_datp}->{$_p} && scalar @{ $_Q->{_datp}->{$_p} }) { |
1221 | 1209 | ); |
1222 | 1210 | |
1223 | 1211 | my $_is_MSWin32 = ($^O eq 'MSWin32') ? 1 : 0; |
1224 | my $_rdy = \&MCE::Util::_sock_ready; | |
1225 | 1212 | |
1226 | 1213 | my $_req1 = sub { |
1227 | 1214 | local $\ = undef if (defined $\); |
1282 | 1269 | # inlined for performance |
1283 | 1270 | $_dat_ex = sub { |
1284 | 1271 | my $_pid = $_has_threads ? $$ .'.'. $_tid : $$; |
1285 | sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1 | |
1272 | MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1 | |
1286 | 1273 | unless $_DAT_LOCK->{ $_pid }; |
1287 | 1274 | }; |
1288 | 1275 | $_dat_un = sub { |
1289 | 1276 | my $_pid = $_has_threads ? $$ .'.'. $_tid : $$; |
1290 | syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 | |
1277 | MCE::Util::_syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0 | |
1291 | 1278 | if $_DAT_LOCK->{ $_pid }; |
1292 | 1279 | }; |
1293 | 1280 | } |
1329 | 1316 | $_t = 0 if ($_t < 0); |
1330 | 1317 | $_req2->(OUTPUT_W_QUE, $_Q->{_id}.$LF . $_t.$LF); |
1331 | 1318 | |
1332 | $_rdy->($_Q->{_ar_sock}) if $_is_MSWin32; | |
1333 | 1 until sysread($_Q->{_ar_sock}, $_next, 1) || ($! && !$!{'EINTR'}); | |
1319 | MCE::Util::_sock_ready($_Q->{_ar_sock}) if $_is_MSWin32; | |
1320 | MCE::Util::_sysread($_Q->{_ar_sock}, $_next, 1); | |
1334 | 1321 | |
1335 | 1322 | return; |
1336 | 1323 | } |
1340 | 1327 | |
1341 | 1328 | return $_Q->_mce_m_clear() if (exists $_all->{ $_Q->{_id} }); |
1342 | 1329 | |
1343 | ($_Q->{_fast}) | |
1344 | ? warn "Queue: (clear) is not allowed for fast => 1\n" | |
1345 | : $_req2->(OUTPUT_C_QUE, $_Q->{_id}.$LF); | |
1330 | $_req2->(OUTPUT_C_QUE, $_Q->{_id}.$LF); | |
1346 | 1331 | |
1347 | 1332 | return; |
1348 | 1333 | } |
1428 | 1413 | |
1429 | 1414 | if (exists $_Q->{'_mutex_0'}) { |
1430 | 1415 | $_Q->{'_mutex_'.$_mutexi}->lock(); |
1431 | ||
1432 | $_rdy->($_Q->{_qr_sock}) if $_is_MSWin32; | |
1433 | 1 until sysread($_Q->{_qr_sock}, $_next, 1) || ($! && !$!{'EINTR'}); | |
1416 | MCE::Util::_sysread($_Q->{_qr_sock}, $_next, 1); | |
1434 | 1417 | |
1435 | 1418 | $_dat_ex->() if $_lock_chn; |
1436 | print({$_DAT_W_SOCK} OUTPUT_D_QUE.$LF . $_chn.$LF), | |
1437 | print({$_DAU_W_SOCK} $_Q->{_id}.$LF . $_cnt.$LF); | |
1438 | ||
1439 | 1419 | $_Q->{'_mutex_'.$_mutexi}->unlock(); |
1440 | 1420 | } |
1441 | 1421 | else { |
1442 | $_rdy->($_Q->{_qr_sock}) if $_is_MSWin32; | |
1443 | 1 until sysread($_Q->{_qr_sock}, $_next, 1) || ($! && !$!{'EINTR'}); | |
1422 | MCE::Util::_sock_ready($_Q->{_qr_sock}) if $_is_MSWin32; | |
1423 | MCE::Util::_sysread($_Q->{_qr_sock}, $_next, 1); | |
1444 | 1424 | |
1445 | 1425 | $_dat_ex->() if $_lock_chn; |
1446 | print({$_DAT_W_SOCK} OUTPUT_D_QUE.$LF . $_chn.$LF), | |
1447 | print({$_DAU_W_SOCK} $_Q->{_id}.$LF . $_cnt.$LF); | |
1448 | } | |
1426 | } | |
1427 | ||
1428 | print({$_DAT_W_SOCK} OUTPUT_D_QUE.$LF . $_chn.$LF), | |
1429 | print({$_DAU_W_SOCK} $_Q->{_id}.$LF . $_cnt.$LF); | |
1449 | 1430 | |
1450 | 1431 | chomp($_len = <$_DAU_W_SOCK>); |
1451 | 1432 | |
1620 | 1601 | |
1621 | 1602 | =head1 VERSION |
1622 | 1603 | |
1623 | This document describes MCE::Queue version 1.837 | |
1604 | This document describes MCE::Queue version 1.838 | |
1624 | 1605 | |
1625 | 1606 | =head1 SYNOPSIS |
1626 | 1607 | |
1742 | 1723 | =head2 MCE::Queue->new ( [ queue => \@array, await => 1, fast => 1 ] ) |
1743 | 1724 | |
1744 | 1725 | This creates a new queue. Available options are queue, porder, type, await, |
1745 | fast, and gather. | |
1726 | barrier, fast, and gather. | |
1746 | 1727 | |
1747 | 1728 | use MCE; |
1748 | 1729 | use MCE::Queue; |
1756 | 1737 | my $q5 = MCE::Queue->new( type => $MCE::Queue::FIFO ); |
1757 | 1738 | my $q6 = MCE::Queue->new( type => $MCE::Queue::LIFO ); |
1758 | 1739 | |
1759 | my $q7 = MCE::Queue->new( await => 1 ); | |
1740 | my $q7 = MCE::Queue->new( await => 1, barrier => 0 ); | |
1760 | 1741 | my $q8 = MCE::Queue->new( fast => 1 ); |
1761 | 1742 | |
1762 | The 'await' option, when enabled, allows workers to block (semaphore-like) | |
1763 | until the number of items pending is equal or less than a threshold value. | |
1743 | The C<await> option, when enabled, allows workers to block (semaphore-like) | |
1744 | until the number of items pending is equal to or less than a threshold value. | |
1764 | 1745 | The $q->await method is described below. |
1765 | 1746 | |
1766 | The 'fast' option speeds up dequeues and is not enabled by default. It is | |
1767 | beneficial for queues not calling (->clear or ->dequeue_nb) and not altering | |
1768 | the optional count value while running; e.g. ->dequeue($count). Basically, | |
1769 | do not enable 'fast' if varying the count dynamically. | |
1770 | ||
1771 | The 'gather' option is mainly for running with MCE and wanting to pass item(s) | |
1747 | On Unix platforms, C<barrier> mode (enabled by default) prevents many workers | |
1748 | from dequeuing simultaneously to lessen overhead for the OS kernel. Specify 0 | |
1749 | to disable barrier mode and not allocate sockets. The barrier option has no | |
1750 | effect if constructing the queue inside a thread or enabling C<fast>. | |
1751 | ||
1752 | The C<fast> option speeds up dequeues and is not enabled by default. It is | |
1753 | beneficial for queues not calling (->dequeue_nb) and not altering the count | |
1754 | value while running; e.g. ->dequeue($count). | |
1755 | ||
1756 | The C<gather> option is mainly for running with MCE and wanting to pass item(s) | |
1772 | 1757 | to a callback function for appending to the queue. Multiple queues may point to |
1773 | 1758 | the same callback function. The callback receives the queue object as the first |
1774 | 1759 | argument and items after it. |
1784 | 1769 | ## Items are diverted to the callback function, not the queue. |
1785 | 1770 | $q7->enqueue( 'apple', 'orange' ); |
1786 | 1771 | |
1787 | Specifying the 'gather' option allows one to store items temporarily while | |
1772 | Specifying the C<gather> option allows one to store items temporarily while | |
1788 | 1773 | ensuring output order. Although a queue object is not required, this is |
1789 | 1774 | simply a demonstration of the gather option in the context of a queue. |
1790 | 1775 |
10 | 10 | |
11 | 11 | no warnings qw( threads recursion uninitialized ); |
12 | 12 | |
13 | our $VERSION = '1.837'; | |
13 | our $VERSION = '1.838'; | |
14 | 14 | |
15 | 15 | ## no critic (Subroutines::ProhibitSubroutinePrototypes) |
16 | 16 | |
342 | 342 | |
343 | 343 | =head1 VERSION |
344 | 344 | |
345 | This document describes MCE::Relay version 1.837 | |
345 | This document describes MCE::Relay version 1.838 | |
346 | 346 | |
347 | 347 | =head1 SYNOPSIS |
348 | 348 |
10 | 10 | |
11 | 11 | no warnings qw( threads recursion uninitialized ); |
12 | 12 | |
13 | our $VERSION = '1.837'; | |
13 | our $VERSION = '1.838'; | |
14 | 14 | |
15 | 15 | ## no critic (BuiltinFunctions::ProhibitStringyEval) |
16 | 16 | |
195 | 195 | $SIG{__DIE__} = $SIG{__WARN__} = \&_NOOP; |
196 | 196 | |
197 | 197 | if (exists $_sig_name_lkup{$_sig_name}) { |
198 | no warnings 'once'; | |
198 | 199 | $SIG{INT} = $SIG{$_sig_name} = \&_NOOP, |
199 | 200 | $_is_sig = $MCE::Signal::KILLED = 1; |
200 | 201 | $_exit_status = 255 if ($_sig_name eq '__DIE__'); |
426 | 427 | |
427 | 428 | =head1 VERSION |
428 | 429 | |
429 | This document describes MCE::Signal version 1.837 | |
430 | This document describes MCE::Signal version 1.838 | |
430 | 431 | |
431 | 432 | =head1 SYNOPSIS |
432 | 433 |
10 | 10 | |
11 | 11 | no warnings qw( threads recursion uninitialized ); |
12 | 12 | |
13 | our $VERSION = '1.837'; | |
13 | our $VERSION = '1.838'; | |
14 | 14 | |
15 | 15 | ## no critic (BuiltinFunctions::ProhibitStringyEval) |
16 | 16 | ## no critic (Subroutines::ProhibitSubroutinePrototypes) |
715 | 715 | |
716 | 716 | =head1 VERSION |
717 | 717 | |
718 | This document describes MCE::Step version 1.837 | |
718 | This document describes MCE::Step version 1.838 | |
719 | 719 | |
720 | 720 | =head1 DESCRIPTION |
721 | 721 |
10 | 10 | |
11 | 11 | no warnings qw( threads recursion uninitialized ); |
12 | 12 | |
13 | our $VERSION = '1.837'; | |
13 | our $VERSION = '1.838'; | |
14 | 14 | |
15 | 15 | ## no critic (BuiltinFunctions::ProhibitStringyEval) |
16 | 16 | ## no critic (Subroutines::ProhibitSubroutinePrototypes) |
671 | 671 | |
672 | 672 | =head1 VERSION |
673 | 673 | |
674 | This document describes MCE::Stream version 1.837 | |
674 | This document describes MCE::Stream version 1.838 | |
675 | 675 | |
676 | 676 | =head1 SYNOPSIS |
677 | 677 |
10 | 10 | |
11 | 11 | no warnings qw( threads recursion uninitialized ); |
12 | 12 | |
13 | our $VERSION = '1.837'; | |
13 | our $VERSION = '1.838'; | |
14 | 14 | |
15 | 15 | ## no critic (Subroutines::ProhibitSubroutinePrototypes) |
16 | 16 | ## no critic (TestingAndDebugging::ProhibitNoStrict) |
203 | 203 | |
204 | 204 | =head1 VERSION |
205 | 205 | |
206 | This document describes MCE::Subs version 1.837 | |
206 | This document describes MCE::Subs version 1.838 | |
207 | 207 | |
208 | 208 | =head1 SYNOPSIS |
209 | 209 |
10 | 10 | |
11 | 11 | no warnings qw( threads recursion uninitialized numeric ); |
12 | 12 | |
13 | our $VERSION = '1.837'; | |
13 | our $VERSION = '1.838'; | |
14 | 14 | |
15 | 15 | ## no critic (BuiltinFunctions::ProhibitStringyEval) |
16 | 16 | |
17 | use IO::Handle (); | |
17 | 18 | use Socket qw( PF_UNIX PF_UNSPEC SOCK_STREAM SOL_SOCKET SO_SNDBUF SO_RCVBUF ); |
18 | 19 | use Time::HiRes qw( sleep time ); |
20 | use Errno (); | |
19 | 21 | use base qw( Exporter ); |
20 | 22 | use bytes; |
21 | 23 | |
45 | 47 | my $g_ncpu; |
46 | 48 | |
47 | 49 | sub get_ncpu { |
48 | ||
49 | 50 | return $g_ncpu if (defined $g_ncpu); |
50 | 51 | |
51 | 52 | local $ENV{PATH} = "/usr/sbin:/sbin:/usr/bin:/bin:$ENV{PATH}"; |
148 | 149 | ############################################################################### |
149 | 150 | |
150 | 151 | sub _destroy_pipes { |
151 | ||
152 | 152 | my ($_obj, @_params) = @_; |
153 | ||
154 | 153 | local ($!,$?); local $SIG{__DIE__}; |
155 | 154 | |
156 | 155 | for my $_p (@_params) { |
173 | 172 | } |
174 | 173 | |
175 | 174 | sub _destroy_socks { |
176 | ||
177 | 175 | my ($_obj, @_params) = @_; |
178 | ||
179 | 176 | local ($!,$?,$@); local $SIG{__DIE__}; |
180 | 177 | |
181 | 178 | for my $_p (@_params) { |
185 | 182 | for my $_i (0 .. @{ $_obj->{$_p} } - 1) { |
186 | 183 | next unless (defined $_obj->{$_p}[$_i]); |
187 | 184 | if (fileno $_obj->{$_p}[$_i]) { |
188 | syswrite($_obj->{$_p}[$_i], '0') if $_is_winenv; | |
185 | MCE::Util::_syswrite($_obj->{$_p}[$_i], '0') if $_is_winenv; | |
189 | 186 | eval q{ CORE::shutdown($_obj->{$_p}[$_i], 2) }; |
190 | 187 | close $_obj->{$_p}[$_i]; |
191 | 188 | } |
194 | 191 | } |
195 | 192 | else { |
196 | 193 | if (fileno $_obj->{$_p}) { |
197 | syswrite($_obj->{$_p}, '0') if $_is_winenv; | |
194 | MCE::Util::_syswrite($_obj->{$_p}, '0') if $_is_winenv; | |
198 | 195 | eval q{ CORE::shutdown($_obj->{$_p}, 2) }; |
199 | 196 | close $_obj->{$_p}; |
200 | 197 | } |
206 | 203 | } |
207 | 204 | |
208 | 205 | sub _pipe_pair { |
209 | ||
210 | 206 | my ($_obj, $_r_sock, $_w_sock, $_i) = @_; |
211 | ||
212 | 207 | local $!; |
213 | 208 | |
214 | 209 | if (defined $_i) { |
215 | 210 | # remove tainted'ness |
216 | 211 | ($_i) = $_i =~ /(.*)/; |
217 | ||
218 | pipe($_obj->{$_r_sock}[$_i], $_obj->{$_w_sock}[$_i]) | |
219 | or die "pipe: $!\n"; | |
220 | ||
221 | # IO::Handle->autoflush not available in older Perl. | |
222 | select(( select($_obj->{$_w_sock}[$_i]), $| = 1 )[0]); | |
212 | pipe($_obj->{$_r_sock}[$_i], $_obj->{$_w_sock}[$_i]) or die "pipe: $!\n"; | |
213 | $_obj->{$_w_sock}[$_i]->autoflush(1); | |
223 | 214 | } |
224 | 215 | else { |
225 | pipe($_obj->{$_r_sock}, $_obj->{$_w_sock}) | |
226 | or die "pipe: $!\n"; | |
227 | ||
228 | select(( select($_obj->{$_w_sock}), $| = 1 )[0]); # Ditto. | |
216 | pipe($_obj->{$_r_sock}, $_obj->{$_w_sock}) or die "pipe: $!\n"; | |
217 | $_obj->{$_w_sock}->autoflush(1); | |
229 | 218 | } |
230 | 219 | |
231 | 220 | return; |
232 | 221 | } |
233 | 222 | |
234 | 223 | sub _sock_pair { |
235 | ||
236 | 224 | my ($_obj, $_r_sock, $_w_sock, $_i) = @_; |
237 | ||
238 | 225 | my $_size = 16384; local $!; |
239 | 226 | |
240 | 227 | if (defined $_i) { |
251 | 238 | setsockopt($_obj->{$_w_sock}[$_i], SOL_SOCKET, SO_RCVBUF, int $_size); |
252 | 239 | } |
253 | 240 | |
254 | # IO::Handle->autoflush not available in older Perl. | |
255 | select(( select($_obj->{$_w_sock}[$_i]), $| = 1 )[0]); | |
256 | select(( select($_obj->{$_r_sock}[$_i]), $| = 1 )[0]); | |
241 | $_obj->{$_w_sock}[$_i]->autoflush(1); | |
242 | $_obj->{$_r_sock}[$_i]->autoflush(1); | |
257 | 243 | } |
258 | 244 | else { |
259 | 245 | socketpair( $_obj->{$_r_sock}, $_obj->{$_w_sock}, |
266 | 252 | setsockopt($_obj->{$_w_sock}, SOL_SOCKET, SO_RCVBUF, int $_size); |
267 | 253 | } |
268 | 254 | |
269 | select(( select($_obj->{$_w_sock}), $| = 1 )[0]); # Ditto. | |
270 | select(( select($_obj->{$_r_sock}), $| = 1 )[0]); | |
255 | $_obj->{$_w_sock}->autoflush(1); | |
256 | $_obj->{$_r_sock}->autoflush(1); | |
271 | 257 | } |
272 | 258 | |
273 | 259 | return; |
274 | 260 | } |
275 | 261 | |
276 | 262 | sub _sock_ready { |
277 | ||
278 | 263 | my ($_socket, $_timeout) = @_; |
279 | ||
280 | 264 | return '' if !defined $_timeout && exists $_sock_ready{"$_socket"}; |
281 | 265 | |
282 | 266 | my $_val_bytes = "\x00\x00\x00\x00"; |
283 | 267 | my $_ptr_bytes = unpack('I', pack('P', $_val_bytes)); |
284 | my ($_count, $_start) = (1, time); | |
268 | my ($_delay, $_start) = (0, time); | |
285 | 269 | |
286 | 270 | if (!defined $_timeout) { |
287 | 271 | $_sock_ready{"$_socket"} = undef; |
291 | 275 | } |
292 | 276 | |
293 | 277 | while (1) { |
294 | # MSWin32 FIONREAD | |
278 | # MSWin32 FIONREAD - from winsock2.h macro | |
295 | 279 | ioctl($_socket, 0x4004667f, $_ptr_bytes); |
296 | 280 | |
297 | 281 | return '' if $_val_bytes ne $_zero_bytes; |
298 | 282 | return 1 if $_timeout && time > $_timeout; |
299 | 283 | |
300 | if ($_count) { | |
301 | # delay after a while to not consume a CPU core | |
302 | $_count = 0 if ++$_count % 50 == 0 && time - $_start > 0.005; | |
303 | next; | |
304 | } | |
305 | ||
306 | sleep 0.030; | |
307 | } | |
284 | # delay after a while to not consume a CPU core | |
285 | sleep(0.030), next if $_delay; | |
286 | $_delay = 1 if time - $_start > 0.005; | |
287 | } | |
288 | } | |
289 | ||
290 | sub _sysread { | |
291 | my ($_delay, $_start); | |
292 | ||
293 | SYSREAD: ( @_ == 3 | |
294 | ? sysread($_[0], $_[1], $_[2]) | |
295 | : sysread($_[0], $_[1], $_[2], $_[3]) | |
296 | ) or do { | |
297 | goto SYSREAD if $! == Errno::EINTR(); | |
298 | ||
299 | # non-blocking operation could not be completed | |
300 | if ( $! == Errno::EWOULDBLOCK() || $! == Errno::EAGAIN() ) { | |
301 | sleep(0.030), goto SYSREAD if $_delay; | |
302 | ||
303 | # delay after a while to not consume a CPU core | |
304 | $_start = time unless $_start; | |
305 | $_delay = 1 if time - $_start > 0.005; | |
306 | ||
307 | goto SYSREAD; | |
308 | } | |
309 | }; | |
310 | ||
311 | return; | |
312 | } | |
313 | ||
314 | sub _sysseek { | |
315 | my $_pos; | |
316 | ||
317 | SYSSEEK: $_pos = sysseek($_[0], $_[1], $_[2]) or do { | |
318 | goto SYSSEEK if $! == Errno::EINTR(); | |
319 | }; | |
320 | ||
321 | return $_pos; | |
322 | } | |
323 | ||
324 | sub _syswrite { | |
325 | syswrite($_[0], $_[1]) or do { | |
326 | goto \&_syswrite if $! == Errno::EINTR(); | |
327 | }; | |
328 | return; | |
329 | } | |
330 | ||
331 | sub _nonblocking { | |
332 | if ($^O eq 'MSWin32') { | |
333 | my $nonblocking = ( $_[1] ) ? "\x00\x00\x00\x01" : "\x00\x00\x00\x00"; | |
334 | ||
335 | # MSWin32 FIONBIO - from winsock2.h macro | |
336 | ioctl($_[0], 0x8004667e, unpack("I", pack('P', $nonblocking))); | |
337 | } | |
338 | else { | |
339 | my $nonblocking = ( $_[1] ) ? 0 : 1; | |
340 | ||
341 | $_[0]->blocking($nonblocking); | |
342 | } | |
343 | ||
344 | return; | |
308 | 345 | } |
309 | 346 | |
310 | 347 | 1; |
323 | 360 | |
324 | 361 | =head1 VERSION |
325 | 362 | |
326 | This document describes MCE::Util version 1.837 | |
363 | This document describes MCE::Util version 1.838 | |
327 | 364 | |
328 | 365 | =head1 SYNOPSIS |
329 | 366 |
10 | 10 | |
11 | 11 | no warnings qw( threads recursion uninitialized ); |
12 | 12 | |
13 | our $VERSION = '1.837'; | |
13 | our $VERSION = '1.838'; | |
14 | 14 | |
15 | 15 | ## no critic (BuiltinFunctions::ProhibitStringyEval) |
16 | 16 | ## no critic (Subroutines::ProhibitSubroutinePrototypes) |
578 | 578 | my $_DAT_W_SOCK = $TOP_HDLR->{_dat_w_sock}->[0]; |
579 | 579 | print {$_DAT_W_SOCK} OUTPUT_S_IPC.$LF . '0'.$LF; |
580 | 580 | |
581 | 1 until sysread($_DAT_W_SOCK, my($_buf), 1) || ($! && !$!{'EINTR'}); | |
581 | MCE::Util::_sysread($_DAT_W_SOCK, my($_buf), 1); | |
582 | 582 | } |
583 | 583 | } |
584 | 584 | |
611 | 611 | : MCE::Util::_sock_pair($self, qw(_que_r_sock _que_w_sock)); |
612 | 612 | |
613 | 613 | if (defined $self->{init_relay}) { # relay |
614 | unless (exists $INC{'MCE/Relay.pm'}) { | |
614 | unless ($INC{'MCE/Relay.pm'}) { | |
615 | 615 | require MCE::Relay; MCE::Relay->import(); |
616 | 616 | } |
617 | 617 | MCE::Util::_sock_pair($self, qw(_rla_r_sock _rla_w_sock), $_) |
625 | 625 | ## Spawn workers. |
626 | 626 | $self->{_pids} = [], $self->{_thrs} = [], $self->{_tids} = []; |
627 | 627 | $self->{_status} = [], $self->{_state} = [], $self->{_task} = []; |
628 | ||
629 | local $SIG{TTIN} unless $_is_MSWin32; | |
630 | local $SIG{TTOU} unless $_is_MSWin32; | |
631 | local $SIG{WINCH} unless $_is_MSWin32; | |
628 | 632 | |
629 | 633 | if (!defined $self->{user_tasks}) { |
630 | 634 | $self->{_total_workers} = $_max_workers; |
757 | 761 | unless (defined $MCE->{init_relay}); |
758 | 762 | } |
759 | 763 | |
760 | *relay_unlock = \&relay; | |
764 | { | |
765 | no warnings 'once'; | |
766 | *relay_unlock = \&relay; | |
767 | } | |
761 | 768 | |
762 | 769 | sub AUTOLOAD { |
763 | 770 | # $AUTOLOAD = MCE::<method_name> |
1115 | 1122 | |
1116 | 1123 | ## Insert the first message into the queue if defined. |
1117 | 1124 | if (defined $_first_msg) { |
1118 | 1 until syswrite ( | |
1125 | MCE::Util::_syswrite( | |
1119 | 1126 | $self->{_que_w_sock}, pack($_que_template, 0, $_first_msg) |
1120 | ) || ($! && !$!{'EINTR'}); | |
1127 | ); | |
1121 | 1128 | } |
1122 | 1129 | |
1123 | 1130 | ## Submit params data to workers. |
1154 | 1161 | ## Notify workers to commence processing. |
1155 | 1162 | if ($_is_MSWin32) { |
1156 | 1163 | my $_buf = _sprintf("%${_total_workers}s", ""); |
1157 | syswrite $self->{_bse_w_sock}, $_buf; | |
1164 | MCE::Util::_syswrite($self->{_bse_w_sock}, $_buf); | |
1158 | 1165 | } else { |
1159 | 1166 | my $_BSE_W_SOCK = $self->{_bse_w_sock}; |
1160 | 1167 | for my $_i (1 .. $_total_workers) { |
1161 | 1 until syswrite($_BSE_W_SOCK, $LF) || ($! && !$!{'EINTR'}); | |
1168 | MCE::Util::_syswrite($_BSE_W_SOCK, $LF); | |
1162 | 1169 | } |
1163 | 1170 | } |
1164 | 1171 | } |
1172 | 1179 | |
1173 | 1180 | ## Remove the last message from the queue. |
1174 | 1181 | if (!$_send_cnt && $_run_mode ne 'nodata') { |
1175 | if (defined $self->{_que_r_sock}) { | |
1176 | 1 until sysread ( | |
1177 | $self->{_que_r_sock}, my($_buf), $_que_read_size | |
1178 | ) || ($! && !$!{'EINTR'}); | |
1179 | } | |
1182 | MCE::Util::_sysread($self->{_que_r_sock}, my($_buf), $_que_read_size) | |
1183 | if ( defined $self->{_que_r_sock} ); | |
1180 | 1184 | } |
1181 | 1185 | |
1182 | 1186 | $self->{_send_cnt} = 0; |
1398 | 1402 | |
1399 | 1403 | ## Wait until all workers from (task_id 0) have synced. |
1400 | 1404 | MCE::Util::_sock_ready($_BSB_R_SOCK, -1) if $_is_MSWin32; |
1401 | 1 until sysread($_BSB_R_SOCK, $_buf, 1) || ($! && !$!{'EINTR'}); | |
1405 | MCE::Util::_sysread($_BSB_R_SOCK, $_buf, 1); | |
1402 | 1406 | |
1403 | 1407 | ## Notify the manager process (barrier end). |
1404 | 1408 | print {$_DAT_W_SOCK} OUTPUT_E_SYN.$LF . $_chn.$LF; |
1405 | 1409 | |
1406 | 1410 | ## Wait until all workers from (task_id 0) have un-synced. |
1407 | 1 until sysread($_BSE_R_SOCK, $_buf, 1) || ($! && !$!{'EINTR'}); | |
1411 | MCE::Util::_sysread($_BSE_R_SOCK, $_buf, 1); | |
1408 | 1412 | |
1409 | 1413 | return; |
1410 | 1414 | } |
1454 | 1458 | local $\ = undef; |
1455 | 1459 | |
1456 | 1460 | if ($_abort_msg > 0) { |
1457 | 1 until sysread ( | |
1458 | $_QUE_R_SOCK, my($_next), $_que_read_size | |
1459 | ) || ($! && !$!{'EINTR'}); | |
1460 | 1 until syswrite ( | |
1461 | $_QUE_W_SOCK, pack($_que_template, 0, $_abort_msg) | |
1462 | ) || ($! && !$!{'EINTR'}); | |
1461 | MCE::Util::_sysread($_QUE_R_SOCK, my($_next), $_que_read_size); | |
1462 | MCE::Util::_syswrite($_QUE_W_SOCK, pack($_que_template, 0, $_abort_msg)); | |
1463 | 1463 | } |
1464 | 1464 | |
1465 | 1465 | if ($self->{_wid} > 0) { |
1792 | 1792 | |
1793 | 1793 | threads->exit(0) if $self->{use_threads}; |
1794 | 1794 | |
1795 | $SIG{HUP} = $SIG{INT} = $SIG{QUIT} = $SIG{TERM} = sub { | |
1796 | $SIG{$_[0]} = $SIG{INT} = sub { }; | |
1797 | CORE::kill($_[0], getppid()) if ($_[0] eq 'INT' && !$_is_MSWin32); | |
1798 | CORE::kill('KILL', $$); | |
1799 | }; | |
1795 | if (! $_tid) { | |
1796 | $SIG{HUP} = $SIG{INT} = $SIG{QUIT} = $SIG{TERM} = sub { | |
1797 | $SIG{$_[0]} = $SIG{INT} = sub { }; | |
1798 | CORE::kill($_[0], getppid()) if ($_[0] eq 'INT' && !$_is_MSWin32); | |
1799 | CORE::kill('KILL', $$); | |
1800 | }; | |
1801 | } | |
1800 | 1802 | |
1801 | 1803 | if ($self->{posix_exit} && !$_is_MSWin32) { |
1802 | 1804 | eval { MCE::Mutex::Channel::_destroy() }; |
4 | 4 | |
5 | 5 | =head1 VERSION |
6 | 6 | |
7 | This document describes MCE version 1.837 | |
7 | This document describes MCE version 1.838 | |
8 | 8 | |
9 | 9 | Many-Core Engine (MCE) for Perl helps enable a new level of performance by |
10 | 10 | maximizing all available cores. |
281 | 281 | |
282 | 282 | =head1 COPYRIGHT AND LICENSE |
283 | 283 | |
284 | Copyright (C) 2012-2018 by Mario E. Roy | |
284 | Copyright (C) 2012-2019 by Mario E. Roy | |
285 | 285 | |
286 | 286 | MCE is released under the same license as Perl. |
287 | 287 |