Codebase list jboss-threads / d5d77ad
New upstream version 2.3.3 Markus Koschany 5 years ago
4 changed file(s) with 89 addition(s) and 48 deletion(s). Raw diff Collapse all Expand all
2626 <groupId>org.jboss.threads</groupId>
2727 <artifactId>jboss-threads</artifactId>
2828 <packaging>jar</packaging>
29 <version>2.3.2.Final</version>
29 <version>2.3.3.Final</version>
3030
3131 <name>JBoss Threads</name>
3232
1818 package org.jboss.threads;
1919
2020 import static java.lang.Math.max;
21 import static java.lang.Thread.currentThread;
2122 import static java.lang.Thread.holdsLock;
2223 import static java.security.AccessController.doPrivileged;
2324 import static java.security.AccessController.getContext;
5253 import org.jboss.threads.management.StandardThreadPoolMXBean;
5354 import org.wildfly.common.Assert;
5455 import org.wildfly.common.annotation.NotNull;
55 import sun.misc.Contended;
5656
5757 /**
5858 * A task-or-thread queue backed thread pool executor service. Tasks are added in a FIFO manner, and consumers in a LIFO manner.
6666 *
6767 * @author <a href="mailto:david.lloyd@redhat.com">David M. Lloyd</a>
6868 */
69 @Contended
7069 public final class EnhancedQueueExecutor extends AbstractExecutorService implements ManageableThreadPoolExecutorService {
7170 static {
7271 Version.getVersionString();
220219 volatile TaskNode tail;
221220
222221 /**
222 * The linked list of threads waiting for termination of this thread pool.
223 */
224 @SuppressWarnings("unused") // used by field updater
225 volatile Waiter terminationWaiters;
226
227 /**
223228 * Queue size:
224229 * <ul>
225230 * <li>Bit 00..1F: current queue length</li>
304309
305310 private static final AtomicReferenceFieldUpdater<EnhancedQueueExecutor, TaskNode> headUpdater = AtomicReferenceFieldUpdater.newUpdater(EnhancedQueueExecutor.class, TaskNode.class, "head");
306311 private static final AtomicReferenceFieldUpdater<EnhancedQueueExecutor, TaskNode> tailUpdater = AtomicReferenceFieldUpdater.newUpdater(EnhancedQueueExecutor.class, TaskNode.class, "tail");
312 private static final AtomicReferenceFieldUpdater<EnhancedQueueExecutor, Waiter> terminationWaitersUpdater = AtomicReferenceFieldUpdater.newUpdater(EnhancedQueueExecutor.class, Waiter.class, "terminationWaiters");
307313
308314 private static final AtomicLongFieldUpdater<EnhancedQueueExecutor> queueSizeUpdater = AtomicLongFieldUpdater.newUpdater(EnhancedQueueExecutor.class, "queueSize");
309315 private static final AtomicLongFieldUpdater<EnhancedQueueExecutor> threadStatusUpdater = AtomicLongFieldUpdater.newUpdater(EnhancedQueueExecutor.class, "threadStatus");
342348 // Marker objects
343349 // =======================================================
344350
351 static final QNode TERMINATE_REQUESTED = new TerminateWaiterNode(null);
345352 static final QNode TERMINATE_COMPLETE = new TerminateWaiterNode(null);
353
354 static final Waiter TERMINATE_COMPLETE_WAITER = new Waiter(null);
346355
347356 static final Runnable WAITING = new NullRunnable();
348357 static final Runnable GAVE_UP = new NullRunnable();
379388 public ObjectInstance run() {
380389 try {
381390 final Hashtable<String, String> table = new Hashtable<>();
382 table.put("name", finalName);
391 table.put("name", ObjectName.quote(finalName));
383392 table.put("type", "thread-pool");
384393 return ManagementFactory.getPlatformMBeanServer().registerMBean(mxBean, new ObjectName("jboss.threads", table));
385394 } catch (Throwable ignored) {
825834 if (runningThreads.contains(thread)) {
826835 throw Messages.msg.cannotAwaitWithin();
827836 }
828 final TerminateWaiterNode node = new TerminateWaiterNode(thread);
829 // stick it on the queue
830 QNode tail = this.tail;
831 QNode tailNext;
832 for (;;) {
833 tailNext = tail.getNext();
834 if (tailNext == null) {
835 if (tail.compareAndSetNext(null, node)) {
836 // now we wait!
837 break;
838 }
839 } else if (tailNext == TERMINATE_COMPLETE) {
840 // nothing more to be done!
837 Waiter waiters = this.terminationWaiters;
838 if (waiters == TERMINATE_COMPLETE_WAITER) {
839 return true;
840 }
841 final Waiter waiter = new Waiter(waiters);
842 waiter.setThread(currentThread());
843 while (! compareAndSetTerminationWaiters(waiters, waiter)) {
844 waiters = this.terminationWaiters;
845 if (waiters == TERMINATE_COMPLETE_WAITER) {
841846 return true;
842 } else {
843 if (UPDATE_TAIL && tailNext instanceof TaskNode) {
844 assert tail instanceof TaskNode; // else tailNext couldn't possibly be a TaskNode
845 compareAndSetTail(((TaskNode) tail), ((TaskNode) tailNext));
846 }
847 tail = tailNext;
848847 }
848 waiter.setNext(waiters);
849849 }
850850 try {
851851 parkNanos(this, unit.toNanos(timeout));
852852 } finally {
853853 // prevent future spurious unparks without sabotaging the queue's integrity
854 node.getAndClearThread();
854 waiter.setThread(null);
855855 }
856856 }
857857 if (Thread.interrupted()) throw new InterruptedException();
910910 TaskNode tail = this.tail;
911911 QNode tailNext;
912912 // a marker to indicate that termination was requested
913 final TerminateWaiterNode terminateNode = new TerminateWaiterNode(null);
914913 for (;;) {
915914 tailNext = tail.getNext();
916915 if (tailNext instanceof TaskNode) {
926925 // tail(snapshot) is a task node
927926 // tail(snapshot).next is a (list of) pool thread node(s) or null
928927 // postconditions (succeed):
929 // tail(snapshot).next is terminateNode(null)
930 if (tail.compareAndSetNext(node, terminateNode)) {
928 // tail(snapshot).next is TERMINATE_REQUESTED
929 if (tail.compareAndSetNext(node, TERMINATE_REQUESTED)) {
931930 // got it!
932931 // state change sh3:
933932 // node.task ← EXIT
17591758 this.terminationTask = null;
17601759 safeRun(terminationTask);
17611760 // notify all waiters
1762 QNode tail = EnhancedQueueExecutor.this.tail;
1763 QNode tailNext = tail.getAndSetNext(TERMINATE_COMPLETE);
1764 while (tailNext != null) {
1765 // state change ct1:
1766 // tail(snapshot).next(snapshot).thread ← null
1767 // succeeds: sh2
1768 // preconditions:
1769 // threadStatus is shutdown (because sh1 ≺ … ≺ ct1)
1770 // postconditions: -
1771 // post-actions:
1772 // unpark(twn)
1773 if (tailNext instanceof TerminateWaiterNode) {
1774 unpark(((TerminateWaiterNode) tailNext).getAndClearThread());
1775 }
1776 tailNext = tailNext.getNext();
1777 }
1761 Waiter waiters = getAndSetTerminationWaiters(TERMINATE_COMPLETE_WAITER);
1762 while (waiters != null) {
1763 unpark(waiters.getThread());
1764 waiters = waiters.getNext();
1765 }
1766 tail.getAndSetNext(TERMINATE_COMPLETE);
17781767 final ObjectInstance handle = this.handle;
17791768 if (handle != null) {
17801769 doPrivileged(new PrivilegedAction<Void>() {
18251814 tailUpdater.compareAndSet(this, expect, update);
18261815 }
18271816
1817 boolean compareAndSetTerminationWaiters(final Waiter expect, final Waiter update) {
1818 return terminationWaitersUpdater.compareAndSet(this, expect, update);
1819 }
1820
1821 Waiter getAndSetTerminationWaiters(final Waiter update) {
1822 return terminationWaitersUpdater.getAndSet(this, update);
1823 }
1824
18281825 // =======================================================
18291826 // Queue size operations
18301827 // =======================================================
20362033 // Node classes
20372034 // =======================================================
20382035
2039 @Contended
20402036 abstract static class QNode {
20412037 // in 9, use VarHandle
20422038 private static final AtomicReferenceFieldUpdater<QNode, QNode> nextUpdater = AtomicReferenceFieldUpdater.newUpdater(QNode.class, QNode.class, "next");
20612057 }
20622058 }
20632059
2064 @Contended
20652060 static final class PoolThreadNode extends QNode {
20662061 private final Thread thread;
20672062
20932088 }
20942089 }
20952090
2096 @Contended
20972091 static final class TerminateWaiterNode extends QNode {
20982092 private volatile Thread thread;
20992093
21172111 }
21182112 }
21192113
2120 @Contended
21212114 static final class TaskNode extends QNode {
21222115 volatile Runnable task;
21232116
7878 case 'g': builder.append(globalThreadSequenceNum); break;
7979 case 'f': builder.append(factorySequenceNum); break;
8080 case 'p': if (group != null) appendGroupPath(group, builder); break;
81 case 'i': builder.append(thread.getId());
81 case 'i': builder.append(thread.getId()); break;
8282 case 'G': if (group != null) builder.append(group.getName()); break;
8383 }
8484 }
0 /*
1 * JBoss, Home of Professional Open Source.
2 * Copyright 2018 Red Hat, Inc., and individual contributors
3 * as indicated by the @author tags.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.jboss.threads;
19
20 /**
21 */
22 final class Waiter {
23 private volatile Thread thread;
24 private Waiter next;
25
26 Waiter(final Waiter next) {
27 this.next = next;
28 }
29
30 Thread getThread() {
31 return thread;
32 }
33
34 Waiter setThread(final Thread thread) {
35 this.thread = thread;
36 return this;
37 }
38
39 Waiter getNext() {
40 return next;
41 }
42
43 Waiter setNext(final Waiter next) {
44 this.next = next;
45 return this;
46 }
47 }