Codebase list httpcomponents-asyncclient / 45c6c8f
Update upstream source from tag 'upstream/4.1.5' Update to upstream version '4.1.5' with Debian dir d3cfd87bbd0a6b1cbc3dc4c04be63f7d0e456aa4 Jérôme Charaoui 1 year, 6 months ago
60 changed file(s) with 616 addition(s) and 600 deletion(s). Raw diff Collapse all Expand all
00 Apache HttpComponents AsyncClient
1 Copyright 2010-2017 The Apache Software Foundation
1 Copyright 2010-2021 The Apache Software Foundation
22
33 This product includes software developed at
44 The Apache Software Foundation (http://www.apache.org/).
0 Release 4.1.5
1 -------------------
2
3 This is a maintenance release that fixes a number of issues discovered since 4.1.4.
4
5 Changelog
6 -------------------
7
8 * Upgraded HttpCore to version 4.4.15 and HttpClient to version 4.5.13.
9 Contributed by Oleg Kalnichevski <olegk at apache.org>
10
11 * HTTPASYNC-160: HttpAsyncClient in INACTIVE or STOPPED state throws a IllegalStateException
12 causing the current thread to terminate.
13 Contributed by Oleg Kalnichevski <olegk at apache.org>
14
15 * HTTPASYNC-156: Possible deadlock condition in case of concurrent connection request
16 completion and cancellation.
17 Contributed by Oleg Kalnichevski <olegk at apache.org>
18
19 * HTTPASYNC-155: Runtime exception in AbstractClientExchangeHandler can cause the I/O reactor
20 to shut down.
21 Contributed by Oleg Kalnichevski <olegk at apache.org>
22
23 * Added connection time to live parameter to HttpAsyncClientBuilder.
24 Contributed by Oleg Kalnichevski <olegk at apache.org>
25
26 * Bug fix: PipeliningClientExchangeHandlerImpl to fail result future in case of an execution
27 failure.
28 Contributed by Oleg Kalnichevski <olegk at apache.org>
29
30 * HTTPASYNC-152: InternalIODispatch#createConnection to throw CancelledKeyException instead
31 of IllegalStateException.
32 Contributed by Oleg Kalnichevski <olegk at apache.org>
33
34
035 Release 4.1.4
136 -------------------
237
2727 <parent>
2828 <groupId>org.apache.httpcomponents</groupId>
2929 <artifactId>httpcomponents-asyncclient</artifactId>
30 <version>4.1.4</version>
30 <version>4.1.5</version>
3131 </parent>
3232 <artifactId>httpasyncclient</artifactId>
3333 <name>Apache HttpAsyncClient</name>
136136 <artifactId>maven-javadoc-plugin</artifactId>
137137 <version>${hc.javadoc.version}</version>
138138 <configuration>
139 <!-- reduce console output. Can override with -Dquiet=false -->
140 <quiet>true</quiet>
139141 <source>${maven.compiler.source}</source>
140142 <links>
141 <link>http://download.oracle.com/javase/1.5.0/docs/api/</link>
142 <link>http://hc.apache.org/httpcomponents-core-ga/httpcore/apidocs/</link>
143 <link>http://hc.apache.org/httpcomponents-client-ga/httpclient/apidocs/</link>
143 <link>http://docs.oracle.com/javase/6/docs/api/</link>
144 <link>https://hc.apache.org/httpcomponents-core-4.4.x/current/httpcore/apidocs/</link>
145 <link>https://hc.apache.org/httpcomponents-client-4.5.x/current/httpclient/apidocs/</link>
144146 </links>
145147 </configuration>
146148 <reportSets>
154156
155157 <plugin>
156158 <artifactId>maven-project-info-reports-plugin</artifactId>
157 <version>${hc.project-info.version}</version>
158159 <inherited>false</inherited>
159160 <reportSets>
160161 <reportSet>
169170
170171 <plugin>
171172 <artifactId>maven-jxr-plugin</artifactId>
172 <version>${hc.jxr.version}</version>
173173 </plugin>
174174
175175 <plugin>
176176 <artifactId>maven-surefire-report-plugin</artifactId>
177 <version>${hc.surefire-report.version}</version>
178177 </plugin>
179178
180179 </plugins>
181180 </reporting>
182181
183 </project>
182 </project>
7070 }
7171
7272 @Override
73 protected void onCharReceived(final CharBuffer buf, final IOControl ioctrl) throws IOException {
73 protected void onCharReceived(final CharBuffer buf, final IOControl ioControl) throws IOException {
7474 while (buf.hasRemaining()) {
7575 System.out.print(buf.get());
7676 }
112112 }
113113
114114 @Override
115 protected void onCharReceived(final CharBuffer buf, final IOControl ioctrl) throws IOException {
115 protected void onCharReceived(final CharBuffer buf, final IOControl ioControl) throws IOException {
116116 while (buf.hasRemaining()) {
117117 System.out.print(buf.get());
118118 }
9797 }
9898
9999 @Override
100 protected void onCharReceived(final CharBuffer buf, final IOControl ioctrl) throws IOException {
100 protected void onCharReceived(final CharBuffer buf, final IOControl ioControl) throws IOException {
101101 // Do something useful
102102 }
103103
3535 import org.apache.commons.logging.Log;
3636 import org.apache.http.ConnectionClosedException;
3737 import org.apache.http.ConnectionReuseStrategy;
38 import org.apache.http.HttpException;
3839 import org.apache.http.HttpHost;
3940 import org.apache.http.HttpResponse;
4041 import org.apache.http.client.config.RequestConfig;
4445 import org.apache.http.conn.ConnectionKeepAliveStrategy;
4546 import org.apache.http.conn.routing.HttpRoute;
4647 import org.apache.http.conn.routing.RouteTracker;
48 import org.apache.http.impl.conn.ConnectionShutdownException;
4749 import org.apache.http.nio.NHttpClientConnection;
4850 import org.apache.http.nio.conn.NHttpClientConnectionManager;
4951 import org.apache.http.nio.protocol.HttpAsyncClientExchangeHandler;
333335 managedConn.requestOutput();
334336 }
335337 }
338 } catch (final ConnectionShutdownException runex) {
339 failed(runex);
336340 } catch (final RuntimeException runex) {
337341 failed(runex);
338342 throw runex;
399403 }));
400404 }
401405
406 abstract void start() throws HttpException, IOException;
407
402408 abstract void releaseResources();
403409
404410 abstract void executionFailed(final Exception ex);
2626 package org.apache.http.impl.nio.client;
2727
2828 import java.io.IOException;
29 import java.util.concurrent.CancellationException;
2930 import java.util.concurrent.ThreadFactory;
3031 import java.util.concurrent.atomic.AtomicReference;
3132
3435 import org.apache.http.nio.NHttpClientEventHandler;
3536 import org.apache.http.nio.conn.NHttpClientConnectionManager;
3637 import org.apache.http.nio.reactor.IOEventDispatch;
37 import org.apache.http.util.Asserts;
3838
3939 abstract class CloseableHttpAsyncClientBase extends CloseableHttpPipeliningClient {
4040
8484 }
8585 }
8686
87 protected void ensureRunning() {
88 final Status currentStatus = this.status.get();
89 Asserts.check(currentStatus == Status.ACTIVE, "Request cannot be executed; " +
90 "I/O reactor status: %s", currentStatus);
91 }
92
9387 @Override
9488 public void close() {
9589 if (this.status.compareAndSet(Status.ACTIVE, Status.STOPPED)) {
113107 return this.status.get() == Status.ACTIVE;
114108 }
115109
110 final void execute(final AbstractClientExchangeHandler handler) {
111 try {
112 if (!isRunning()) {
113 throw new CancellationException("Request execution cancelled");
114 }
115 handler.start();
116 } catch (final Exception ex) {
117 handler.failed(ex);
118 }
119 }
120
116121 }
117117 return cancelled;
118118 }
119119
120 @Override
120121 public void start() throws HttpException, IOException {
121122 final HttpHost target = this.requestProducer.getTarget();
122123 final HttpRequest original = this.requestProducer.generateRequest();
135136
136137 @Override
137138 public void produceContent(
138 final ContentEncoder encoder, final IOControl ioctrl) throws IOException {
139 this.exec.produceContent(this.state, encoder, ioctrl);
139 final ContentEncoder encoder, final IOControl ioControl) throws IOException {
140 this.exec.produceContent(this.state, encoder, ioControl);
140141 }
141142
142143 @Override
152153
153154 @Override
154155 public void consumeContent(
155 final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
156 this.exec.consumeContent(this.state, decoder, ioctrl);
156 final ContentDecoder decoder, final IOControl ioControl) throws IOException {
157 this.exec.consumeContent(this.state, decoder, ioControl);
157158 if (!decoder.isCompleted() && this.responseConsumer.isDone()) {
158159 markConnectionNonReusable();
159160 try {
3131 import java.util.LinkedList;
3232 import java.util.concurrent.Executors;
3333 import java.util.concurrent.ThreadFactory;
34 import java.util.concurrent.TimeUnit;
3435
3536 import javax.net.ssl.HostnameVerifier;
3637 import javax.net.ssl.SSLContext;
8485 import org.apache.http.impl.conn.DefaultProxyRoutePlanner;
8586 import org.apache.http.impl.conn.DefaultRoutePlanner;
8687 import org.apache.http.impl.conn.DefaultSchemePortResolver;
88 import org.apache.http.impl.conn.SystemDefaultDnsResolver;
8789 import org.apache.http.impl.conn.SystemDefaultRoutePlanner;
8890 import org.apache.http.impl.cookie.DefaultCookieSpecProvider;
8991 import org.apache.http.impl.cookie.IgnoreSpecProvider;
9092 import org.apache.http.impl.cookie.NetscapeDraftSpecProvider;
9193 import org.apache.http.impl.cookie.RFC6265CookieSpecProvider;
94 import org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory;
9295 import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
9396 import org.apache.http.impl.nio.reactor.IOReactorConfig;
9497 import org.apache.http.nio.NHttpClientEventHandler;
186189
187190 private int maxConnTotal = 0;
188191 private int maxConnPerRoute = 0;
192 private long connTimeToLive = -1;
193 private TimeUnit connTimeToLiveTimeUnit = TimeUnit.MILLISECONDS;
189194
190195 public static HttpAsyncClientBuilder create() {
191196 return new HttpAsyncClientBuilder();
265270 */
266271 public final HttpAsyncClientBuilder setMaxConnPerRoute(final int maxConnPerRoute) {
267272 this.maxConnPerRoute = maxConnPerRoute;
273 return this;
274 }
275
276 /**
277 * Sets maximum time to live for persistent connections
278 * <p>
279 * Please note this value can be overridden by the {@link #setConnectionManager(
280 * org.apache.http.nio.conn.NHttpClientConnectionManager)} method.
281 *
282 * @since 4.1
283 */
284 public final HttpAsyncClientBuilder setConnectionTimeToLive(final long connTimeToLive, final TimeUnit connTimeToLiveTimeUnit) {
285 this.connTimeToLive = connTimeToLive;
286 this.connTimeToLiveTimeUnit = connTimeToLiveTimeUnit;
268287 return this;
269288 }
270289
663682 sslStrategy = new SSLIOSessionStrategy(
664683 sslcontext, supportedProtocols, supportedCipherSuites, hostnameVerifier);
665684 }
666 final ConnectingIOReactor ioreactor = IOReactorUtils.create(
685 final ConnectingIOReactor ioReactor = IOReactorUtils.create(
667686 defaultIOReactorConfig != null ? defaultIOReactorConfig : IOReactorConfig.DEFAULT, threadFactory);
668687 final PoolingNHttpClientConnectionManager poolingmgr = new PoolingNHttpClientConnectionManager(
669 ioreactor,
688 ioReactor,
689 ManagedNHttpClientConnectionFactory.INSTANCE,
670690 RegistryBuilder.<SchemeIOSessionStrategy>create()
671691 .register("http", NoopIOSessionStrategy.INSTANCE)
672692 .register("https", sslStrategy)
673 .build());
693 .build(),
694 DefaultSchemePortResolver.INSTANCE,
695 SystemDefaultDnsResolver.INSTANCE,
696 connTimeToLive,
697 connTimeToLiveTimeUnit);
674698 if (defaultConnectionConfig != null) {
675699 poolingmgr.setDefaultConnectionConfig(defaultConnectionConfig);
676700 }
8585 * aspects only. This client does not support HTTP state management, authentication
8686 * and automatic redirects.
8787 */
88 public static CloseableHttpAsyncClient createMinimal(final ConnectingIOReactor ioreactor) {
89 Args.notNull(ioreactor, "I/O reactor");
90 return createMinimal(new PoolingNHttpClientConnectionManager(ioreactor), false);
88 public static CloseableHttpAsyncClient createMinimal(final ConnectingIOReactor ioReactor) {
89 Args.notNull(ioReactor, "I/O reactor");
90 return createMinimal(new PoolingNHttpClientConnectionManager(ioReactor), false);
9191 }
9292
9393 /**
137137 *
138138 * @since 4.1
139139 */
140 public static CloseableHttpPipeliningClient createPipelining(final ConnectingIOReactor ioreactor) {
141 return createPipelining(new PoolingNHttpClientConnectionManager(ioreactor), false);
140 public static CloseableHttpPipeliningClient createPipelining(final ConnectingIOReactor ioReactor) {
141 return createPipelining(new PoolingNHttpClientConnectionManager(ioReactor), false);
142142 }
143143
144144 /**
5050 void produceContent(
5151 InternalState state,
5252 ContentEncoder encoder,
53 IOControl ioctrl) throws IOException;
53 IOControl ioControl) throws IOException;
5454
5555 void requestCompleted(
5656 InternalState state,
6464 void consumeContent(
6565 InternalState state,
6666 ContentDecoder decoder,
67 IOControl ioctrl) throws IOException;
67 IOControl ioControl) throws IOException;
6868
6969 void responseCompleted(
7070 InternalState state,
119119 final HttpAsyncResponseConsumer<T> responseConsumer,
120120 final HttpContext context,
121121 final FutureCallback<T> callback) {
122 ensureRunning();
123122 final BasicFuture<T> future = new BasicFuture<T>(callback);
124123 final HttpClientContext localcontext = HttpClientContext.adapt(
125124 context != null ? context : new BasicHttpContext());
126125 setupContext(localcontext);
127126
128 @SuppressWarnings("resource")
129127 final DefaultClientExchangeHandlerImpl<T> handler = new DefaultClientExchangeHandlerImpl<T>(
130128 this.log,
131129 requestProducer,
136134 this.connReuseStrategy,
137135 this.keepaliveStrategy,
138136 this.exec);
139 try {
140 handler.start();
141 } catch (final Exception ex) {
142 handler.failed(ex);
143 }
137 execute(handler);
144138 return new FutureWrapper<T>(future, handler);
145139 }
146140
2727 package org.apache.http.impl.nio.client;
2828
2929 import java.io.IOException;
30 import java.nio.channels.CancelledKeyException;
3031
3132 import org.apache.commons.logging.Log;
3233 import org.apache.commons.logging.LogFactory;
5253
5354 @Override
5455 protected DefaultNHttpClientConnection createConnection(final IOSession session) {
55 throw new IllegalStateException("Connection must be created by connection manager");
56 // This method should never get called under normal circumstances
57 // Connection object should be created by the connection manager
58 // upon completion of the session request
59 log.debug("Unexpected invocation of #createConnection");
60 session.close();
61 throw new CancelledKeyException();
5662 }
5763
5864 @Override
252252 public void produceContent(
253253 final InternalState state,
254254 final ContentEncoder encoder,
255 final IOControl ioctrl) throws IOException {
255 final IOControl ioControl) throws IOException {
256256 if (this.log.isDebugEnabled()) {
257257 this.log.debug("[exchange: " + state.getId() + "] produce content");
258258 }
259259 final HttpAsyncRequestProducer requestProducer = state.getRequestProducer();
260260 state.setRequestContentProduced();
261 requestProducer.produceContent(encoder, ioctrl);
261 requestProducer.produceContent(encoder, ioControl);
262262 if (encoder.isCompleted()) {
263263 requestProducer.resetRequest();
264264 }
285285 this.log.debug("[exchange: " + state.getId() + "] Response received " + response.getStatusLine());
286286 }
287287 final HttpClientContext context = state.getLocalContext();
288 context.setAttribute(HttpClientContext.HTTP_RESPONSE, response);
288 context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
289289 this.httpProcessor.process(response, context);
290290
291291 handler.setCurrentResponse(response);
319319 public void consumeContent(
320320 final InternalState state,
321321 final ContentDecoder decoder,
322 final IOControl ioctrl) throws IOException {
322 final IOControl ioControl) throws IOException {
323323 if (this.log.isDebugEnabled()) {
324324 this.log.debug("[exchange: " + state.getId() + "] Consume content");
325325 }
326326 if (state.getFinalResponse() != null) {
327327 final HttpAsyncResponseConsumer<?> responseConsumer = state.getResponseConsumer();
328 responseConsumer.consumeContent(decoder, ioctrl);
328 responseConsumer.consumeContent(decoder, ioControl);
329329 } else {
330330 final ByteBuffer tmpbuf = state.getTmpbuf();
331331 tmpbuf.clear();
513513 }
514514 }
515515
516 localContext.setAttribute(HttpClientContext.HTTP_REQUEST, currentRequest);
517 localContext.setAttribute(HttpClientContext.HTTP_TARGET_HOST, target);
516 localContext.setAttribute(HttpCoreContext.HTTP_REQUEST, currentRequest);
517 localContext.setAttribute(HttpCoreContext.HTTP_TARGET_HOST, target);
518518 localContext.setAttribute(HttpClientContext.HTTP_ROUTE, route);
519519 this.httpProcessor.process(currentRequest, localContext);
520520 }
539539
540540 private boolean handleConnectResponse(
541541 final InternalState state,
542 final AbstractClientExchangeHandler handler) throws HttpException {
542 final AbstractClientExchangeHandler handler) {
543543 final HttpClientContext localContext = state.getLocalContext();
544544 final RequestConfig config = localContext.getRequestConfig();
545545 if (config.isAuthenticationEnabled()) {
598598
599599 private boolean needAuthentication(
600600 final InternalState state,
601 final AbstractClientExchangeHandler handler) throws HttpException {
601 final AbstractClientExchangeHandler handler) {
602602 final HttpClientContext localContext = state.getLocalContext();
603603 final CredentialsProvider credsProvider = localContext.getCredentialsProvider();
604604 if (credsProvider != null) {
119119 return cancelled;
120120 }
121121
122 @Override
122123 public void start() throws HttpException, IOException {
123124 final HttpHost target = this.requestProducer.getTarget();
124125 final HttpRequest original = this.requestProducer.generateRequest();
142143 setCurrentRequest(request);
143144 setRoute(route);
144145
145 this.localContext.setAttribute(HttpClientContext.HTTP_REQUEST, request);
146 this.localContext.setAttribute(HttpClientContext.HTTP_TARGET_HOST, target);
146 this.localContext.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
147 this.localContext.setAttribute(HttpCoreContext.HTTP_TARGET_HOST, target);
147148 this.localContext.setAttribute(HttpClientContext.HTTP_ROUTE, route);
148149
149150 this.httpProcessor.process(request, this.localContext);
170171
171172 @Override
172173 public void produceContent(
173 final ContentEncoder encoder, final IOControl ioctrl) throws IOException {
174 final ContentEncoder encoder, final IOControl ioControl) throws IOException {
174175 if (this.log.isDebugEnabled()) {
175176 this.log.debug("[exchange: " + getId() + "] produce content");
176177 }
177 this.requestProducer.produceContent(encoder, ioctrl);
178 this.requestProducer.produceContent(encoder, ioControl);
178179 if (encoder.isCompleted()) {
179180 this.requestProducer.resetRequest();
180181 }
194195 if (this.log.isDebugEnabled()) {
195196 this.log.debug("[exchange: " + getId() + "] Response received " + response.getStatusLine());
196197 }
197 this.localContext.setAttribute(HttpClientContext.HTTP_RESPONSE, response);
198 this.localContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
198199 this.httpProcessor.process(response, this.localContext);
199200
200201 setCurrentResponse(response);
204205
205206 @Override
206207 public void consumeContent(
207 final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
208 final ContentDecoder decoder, final IOControl ioControl) throws IOException {
208209 if (this.log.isDebugEnabled()) {
209210 this.log.debug("[exchange: " + getId() + "] Consume content");
210211 }
211 this.responseConsumer.consumeContent(decoder, ioctrl);
212 this.responseConsumer.consumeContent(decoder, ioControl);
212213 if (!decoder.isCompleted() && this.responseConsumer.isDone()) {
213214 markConnectionNonReusable();
214215 try {
8989 final HttpAsyncResponseConsumer<T> responseConsumer,
9090 final HttpContext context,
9191 final FutureCallback<T> callback) {
92 ensureRunning();
9392 final BasicFuture<T> future = new BasicFuture<T>(callback);
9493 final HttpClientContext localcontext = HttpClientContext.adapt(
9594 context != null ? context : new BasicHttpContext());
9695
97 @SuppressWarnings("resource")
9896 final MinimalClientExchangeHandlerImpl<T> handler = new MinimalClientExchangeHandlerImpl<T>(
9997 this.log,
10098 requestProducer,
105103 this.httpProcessor,
106104 this.connReuseStrategy,
107105 this.keepaliveStrategy);
108 try {
109 handler.start();
110 } catch (final Exception ex) {
111 handler.failed(ex);
112 }
106 execute(handler);
113107 return new FutureWrapper<T>(future, handler);
114108 }
115109
120114 final List<? extends HttpAsyncResponseConsumer<T>> responseConsumers,
121115 final HttpContext context,
122116 final FutureCallback<List<T>> callback) {
123 ensureRunning();
124117 final BasicFuture<List<T>> future = new BasicFuture<List<T>>(callback);
125118 final HttpClientContext localcontext = HttpClientContext.adapt(
126119 context != null ? context : new BasicHttpContext());
127 @SuppressWarnings("resource")
128120 final PipeliningClientExchangeHandlerImpl<T> handler = new PipeliningClientExchangeHandlerImpl<T>(
129121 this.log,
130122 target,
136128 this.httpProcessor,
137129 this.connReuseStrategy,
138130 this.keepaliveStrategy);
139 try {
140 handler.start();
141 } catch (final Exception ex) {
142 handler.failed(ex);
143 }
131 execute(handler);
144132 return new FutureWrapper<List<T>>(future, handler);
145133 }
146134
144144
145145 @Override
146146 void executionFailed(final Exception ex) {
147 final HttpAsyncRequestProducer requestProducer = this.requestProducerRef.get();
148 if (requestProducer != null) {
149 requestProducer.failed(ex);
150 }
151 final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerRef.get();
152 if (responseConsumer != null) {
153 responseConsumer.failed(ex);
154 }
155 for (final HttpAsyncResponseConsumer<T> cancellable: this.responseConsumerQueue) {
156 cancellable.cancel();
147 try {
148 final HttpAsyncRequestProducer requestProducer = this.requestProducerRef.get();
149 if (requestProducer != null) {
150 requestProducer.failed(ex);
151 }
152 final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerRef.get();
153 if (responseConsumer != null) {
154 responseConsumer.failed(ex);
155 }
156 for (final HttpAsyncResponseConsumer<T> cancellable: this.responseConsumerQueue) {
157 cancellable.cancel();
158 }
159 } finally {
160 this.resultFuture.failed(ex);
157161 }
158162 }
159163
165169 return cancelled;
166170 }
167171
172 @Override
168173 public void start() throws HttpException, IOException {
169174 if (this.log.isDebugEnabled()) {
170175 this.log.debug("[exchange: " + getId() + "] start execution");
173178 final HttpRoute route = new HttpRoute(this.target);
174179 setRoute(route);
175180
176 this.localContext.setAttribute(HttpClientContext.HTTP_TARGET_HOST, this.target);
181 this.localContext.setAttribute(HttpCoreContext.HTTP_TARGET_HOST, this.target);
177182 this.localContext.setAttribute(HttpClientContext.HTTP_ROUTE, route);
178183
179184 requestConnection();
213218
214219 @Override
215220 public void produceContent(
216 final ContentEncoder encoder, final IOControl ioctrl) throws IOException {
221 final ContentEncoder encoder, final IOControl ioControl) throws IOException {
217222 if (this.log.isDebugEnabled()) {
218223 this.log.debug("[exchange: " + getId() + "] produce content");
219224 }
220225 final HttpAsyncRequestProducer requestProducer = this.requestProducerRef.get();
221226 Asserts.check(requestProducer != null, "Inconsistent state: request producer is null");
222 requestProducer.produceContent(encoder, ioctrl);
227 requestProducer.produceContent(encoder, ioControl);
223228 if (encoder.isCompleted()) {
224229 requestProducer.resetRequest();
225230 }
267272
268273 @Override
269274 public void consumeContent(
270 final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
275 final ContentDecoder decoder, final IOControl ioControl) throws IOException {
271276 if (this.log.isDebugEnabled()) {
272277 this.log.debug("[exchange: " + getId() + "] Consume content");
273278 }
274279 final HttpAsyncResponseConsumer<T> responseConsumer = this.responseConsumerRef.get();
275280 Asserts.check(responseConsumer != null, "Inconsistent state: response consumer is null");
276 responseConsumer.consumeContent(decoder, ioctrl);
281 responseConsumer.consumeContent(decoder, ioControl);
277282 }
278283
279284 @Override
4545 private final Log log = LogFactory.getLog(CPool.class);
4646
4747 private final long timeToLive;
48 private final TimeUnit tunit;
48 private final TimeUnit timeUnit;
4949
5050 public CPool(
51 final ConnectingIOReactor ioreactor,
51 final ConnectingIOReactor ioReactor,
5252 final NIOConnFactory<HttpRoute, ManagedNHttpClientConnection> connFactory,
5353 final SocketAddressResolver<HttpRoute> addressResolver,
5454 final int defaultMaxPerRoute, final int maxTotal,
55 final long timeToLive, final TimeUnit tunit) {
56 super(ioreactor, connFactory, addressResolver, defaultMaxPerRoute, maxTotal);
55 final long timeToLive, final TimeUnit timeUnit) {
56 super(ioReactor, connFactory, addressResolver, defaultMaxPerRoute, maxTotal);
5757 this.timeToLive = timeToLive;
58 this.tunit = tunit;
58 this.timeUnit = timeUnit;
5959 }
6060
6161 @Override
6262 protected CPoolEntry createEntry(final HttpRoute route, final ManagedNHttpClientConnection conn) {
63 final CPoolEntry entry = new CPoolEntry(this.log, conn.getId(), route, conn, this.timeToLive, this.tunit);
63 final CPoolEntry entry = new CPoolEntry(this.log, conn.getId(), route, conn, this.timeToLive, this.timeUnit);
6464 entry.setSocketTimeout(conn.getSocketTimeout());
6565 return entry;
6666 }
4848 final String id,
4949 final HttpRoute route,
5050 final ManagedNHttpClientConnection conn,
51 final long timeToLive, final TimeUnit tunit) {
52 super(id, route, conn, timeToLive, tunit);
51 final long timeToLive, final TimeUnit timeUnit) {
52 super(id, route, conn, timeToLive, timeUnit);
5353 this.log = log;
5454 }
5555
151151 @Override
152152 public boolean isOpen() {
153153 final CPoolEntry local = this.poolEntry;
154 if (local != null) {
155 return !local.isClosed();
156 } else {
157 return false;
158 }
154 return local != null ? !local.isClosed() : false;
159155 }
160156
161157 @Override
162158 public boolean isStale() {
163159 final NHttpClientConnection conn = getConnection();
164 if (conn != null) {
165 return !conn.isOpen();
166 } else {
167 return false;
168 }
160 return conn != null ? !conn.isOpen() : false;
169161 }
170162
171163 @Override
247239 }
248240
249241 @Override
250 public void bind(final IOSession iosession) {
251 getValidConnection().bind(iosession);
242 public void bind(final IOSession ioSession) {
243 getValidConnection().bind(ioSession);
252244 }
253245
254246 @Override
4242 private final ByteChannel channel;
4343 private final String id;
4444 private final Log log;
45 private final Wire wirelog;
46
47 public LoggingIOSession(final IOSession session, final String id, final Log log, final Log wirelog) {
45 private final Wire wireLog;
46
47 public LoggingIOSession(final IOSession session, final String id, final Log log, final Log wireLog) {
4848 super();
4949 this.session = session;
5050 this.channel = new LoggingByteChannel();
5151 this.id = id;
5252 this.log = log;
53 this.wirelog = new Wire(wirelog, this.id);
53 this.wireLog = new Wire(wireLog, this.id);
5454 }
5555
5656 @Override
204204 if (log.isDebugEnabled()) {
205205 log.debug(id + " " + session + ": " + bytesRead + " bytes read");
206206 }
207 if (bytesRead > 0 && wirelog.isEnabled()) {
207 if (bytesRead > 0 && wireLog.isEnabled()) {
208208 final ByteBuffer b = dst.duplicate();
209209 final int p = b.position();
210210 b.limit(p);
211211 b.position(p - bytesRead);
212 wirelog.input(b);
212 wireLog.input(b);
213213 }
214214 return bytesRead;
215215 }
220220 if (log.isDebugEnabled()) {
221221 log.debug(id + " " + session + ": " + byteWritten + " bytes written");
222222 }
223 if (byteWritten > 0 && wirelog.isEnabled()) {
223 if (byteWritten > 0 && wireLog.isEnabled()) {
224224 final ByteBuffer b = src.duplicate();
225225 final int p = b.position();
226226 b.limit(p);
227227 b.position(p - byteWritten);
228 wirelog.output(b);
228 wireLog.output(b);
229229 }
230230 return byteWritten;
231231 }
5454 */
5555 public class ManagedNHttpClientConnectionFactory implements NHttpConnectionFactory<ManagedNHttpClientConnection> {
5656
57 private final Log headerlog = LogFactory.getLog("org.apache.http.headers");
58 private final Log wirelog = LogFactory.getLog("org.apache.http.wire");
57 private final Log headerLog = LogFactory.getLog("org.apache.http.headers");
58 private final Log wireLog = LogFactory.getLog("org.apache.http.wire");
5959 private final Log log = LogFactory.getLog(ManagedNHttpClientConnectionImpl.class);
6060
6161 private static final AtomicLong COUNTER = new AtomicLong();
8484
8585 @Override
8686 public ManagedNHttpClientConnection create(
87 final IOSession iosession, final ConnectionConfig config) {
87 final IOSession ioSession, final ConnectionConfig config) {
8888 final String id = "http-outgoing-" + Long.toString(COUNTER.getAndIncrement());
89 CharsetDecoder chardecoder = null;
90 CharsetEncoder charencoder = null;
89 CharsetDecoder charDecoder = null;
90 CharsetEncoder charEncoder = null;
9191 final Charset charset = config.getCharset();
9292 final CodingErrorAction malformedInputAction = config.getMalformedInputAction() != null ?
9393 config.getMalformedInputAction() : CodingErrorAction.REPORT;
9494 final CodingErrorAction unmappableInputAction = config.getUnmappableInputAction() != null ?
9595 config.getUnmappableInputAction() : CodingErrorAction.REPORT;
9696 if (charset != null) {
97 chardecoder = charset.newDecoder();
98 chardecoder.onMalformedInput(malformedInputAction);
99 chardecoder.onUnmappableCharacter(unmappableInputAction);
100 charencoder = charset.newEncoder();
101 charencoder.onMalformedInput(malformedInputAction);
102 charencoder.onUnmappableCharacter(unmappableInputAction);
97 charDecoder = charset.newDecoder();
98 charDecoder.onMalformedInput(malformedInputAction);
99 charDecoder.onUnmappableCharacter(unmappableInputAction);
100 charEncoder = charset.newEncoder();
101 charEncoder.onMalformedInput(malformedInputAction);
102 charEncoder.onUnmappableCharacter(unmappableInputAction);
103103 }
104104 final ManagedNHttpClientConnection conn = new ManagedNHttpClientConnectionImpl(
105105 id,
106106 this.log,
107 this.headerlog,
108 this.wirelog,
109 iosession,
107 this.headerLog,
108 this.wireLog,
109 ioSession,
110110 config.getBufferSize(),
111111 config.getFragmentSizeHint(),
112112 this.allocator,
113 chardecoder,
114 charencoder,
113 charDecoder,
114 charEncoder,
115115 config.getMessageConstraints(),
116116 null,
117117 null,
118118 this.requestWriterFactory,
119119 this.responseParserFactory);
120 iosession.setAttribute(IOEventDispatch.CONNECTION_KEY, conn);
120 ioSession.setAttribute(IOEventDispatch.CONNECTION_KEY, conn);
121121 return conn;
122122 }
123123
4949 class ManagedNHttpClientConnectionImpl
5050 extends DefaultNHttpClientConnection implements ManagedNHttpClientConnection {
5151
52 private final Log headerlog;
53 private final Log wirelog;
52 private final Log headerLog;
53 private final Log wireLog;
5454 private final Log log;
5555
5656 private final String id;
5959 public ManagedNHttpClientConnectionImpl(
6060 final String id,
6161 final Log log,
62 final Log headerlog,
63 final Log wirelog,
64 final IOSession iosession,
65 final int buffersize,
62 final Log headerLog,
63 final Log wireLog,
64 final IOSession ioSession,
65 final int bufferSize,
6666 final int fragmentSizeHint,
6767 final ByteBufferAllocator allocator,
68 final CharsetDecoder chardecoder,
69 final CharsetEncoder charencoder,
68 final CharsetDecoder charDecoder,
69 final CharsetEncoder charEncoder,
7070 final MessageConstraints constraints,
7171 final ContentLengthStrategy incomingContentStrategy,
7272 final ContentLengthStrategy outgoingContentStrategy,
7373 final NHttpMessageWriterFactory<HttpRequest> requestWriterFactory,
7474 final NHttpMessageParserFactory<HttpResponse> responseParserFactory) {
75 super(iosession, buffersize, fragmentSizeHint, allocator, chardecoder, charencoder, constraints,
75 super(ioSession, bufferSize, fragmentSizeHint, allocator, charDecoder, charEncoder, constraints,
7676 incomingContentStrategy, outgoingContentStrategy,
7777 requestWriterFactory, responseParserFactory);
7878 this.id = id;
7979 this.log = log;
80 this.headerlog = headerlog;
81 this.wirelog = wirelog;
82 this.original = iosession;
83 if (this.log.isDebugEnabled() || this.wirelog.isDebugEnabled()) {
84 super.bind(new LoggingIOSession(iosession, this.id, this.log, this.wirelog));
80 this.headerLog = headerLog;
81 this.wireLog = wireLog;
82 this.original = ioSession;
83 if (this.log.isDebugEnabled() || this.wireLog.isDebugEnabled()) {
84 super.bind(new LoggingIOSession(ioSession, this.id, this.log, this.wireLog));
8585 }
8686 }
8787
8888 @Override
89 public void bind(final IOSession iosession) {
90 Args.notNull(iosession, "I/O session");
91 Asserts.check(!iosession.isClosed(), "I/O session is closed");
89 public void bind(final IOSession ioSession) {
90 Args.notNull(ioSession, "I/O session");
91 Asserts.check(!ioSession.isClosed(), "I/O session is closed");
9292 this.status = ACTIVE;
93 this.original = iosession;
94 if (this.log.isDebugEnabled() || this.wirelog.isDebugEnabled()) {
95 this.log.debug(this.id + " Upgrade session " + iosession);
96 super.bind(new LoggingIOSession(iosession, this.id, this.log, this.wirelog));
93 this.original = ioSession;
94 if (this.log.isDebugEnabled() || this.wireLog.isDebugEnabled()) {
95 this.log.debug(this.id + " Upgrade session " + ioSession);
96 super.bind(new LoggingIOSession(ioSession, this.id, this.log, this.wireLog));
9797 } else {
98 super.bind(iosession);
98 super.bind(ioSession);
9999 }
100100 }
101101
106106
107107 @Override
108108 public SSLSession getSSLSession() {
109 if (this.original instanceof SSLIOSession) {
110 return ((SSLIOSession) this.original).getSSLSession();
111 } else {
112 return null;
113 }
109 return this.original instanceof SSLIOSession
110 ? ((SSLIOSession) this.original).getSSLSession()
111 : null;
114112 }
115113
116114 @Override
120118
121119 @Override
122120 protected void onResponseReceived(final HttpResponse response) {
123 if (response != null && this.headerlog.isDebugEnabled()) {
124 this.headerlog.debug(this.id + " << " + response.getStatusLine().toString());
121 if (response != null && this.headerLog.isDebugEnabled()) {
122 this.headerLog.debug(this.id + " << " + response.getStatusLine().toString());
125123 final Header[] headers = response.getAllHeaders();
126124 for (final Header header : headers) {
127 this.headerlog.debug(this.id + " << " + header.toString());
125 this.headerLog.debug(this.id + " << " + header.toString());
128126 }
129127 }
130128 }
131129
132130 @Override
133131 protected void onRequestSubmitted(final HttpRequest request) {
134 if (request != null && this.headerlog.isDebugEnabled()) {
135 this.headerlog.debug(this.id + " >> " + request.getRequestLine().toString());
132 if (request != null && this.headerLog.isDebugEnabled()) {
133 this.headerLog.debug(this.id + " >> " + request.getRequestLine().toString());
136134 final Header[] headers = request.getAllHeaders();
137135 for (final Header header : headers) {
138 this.headerlog.debug(this.id + " >> " + header.toString());
136 this.headerLog.debug(this.id + " >> " + header.toString());
139137 }
140138 }
141139 }
9696
9797 private final Log log = LogFactory.getLog(getClass());
9898
99 static final String IOSESSION_FACTORY_REGISTRY = "http.iosession-factory-registry";
100
101 private final ConnectingIOReactor ioreactor;
99 static final String IOSESSION_FACTORY_REGISTRY = "http.ioSession-factory-registry";
100
101 private final ConnectingIOReactor ioReactor;
102102 private final ConfigData configData;
103103 private final CPool pool;
104 private final Registry<SchemeIOSessionStrategy> iosessionFactoryRegistry;
104 private final Registry<SchemeIOSessionStrategy> ioSessionFactoryRegistry;
105105
106106 private static Registry<SchemeIOSessionStrategy> getDefaultRegistry() {
107107 return RegistryBuilder.<SchemeIOSessionStrategy>create()
110110 .build();
111111 }
112112
113 public PoolingNHttpClientConnectionManager(final ConnectingIOReactor ioreactor) {
114 this(ioreactor, getDefaultRegistry());
115 }
116
117 public PoolingNHttpClientConnectionManager(
118 final ConnectingIOReactor ioreactor,
119 final Registry<SchemeIOSessionStrategy> iosessionFactoryRegistry) {
120 this(ioreactor, null, iosessionFactoryRegistry, (DnsResolver) null);
121 }
122
123 public PoolingNHttpClientConnectionManager(
124 final ConnectingIOReactor ioreactor,
113 public PoolingNHttpClientConnectionManager(final ConnectingIOReactor ioReactor) {
114 this(ioReactor, getDefaultRegistry());
115 }
116
117 public PoolingNHttpClientConnectionManager(
118 final ConnectingIOReactor ioReactor,
119 final Registry<SchemeIOSessionStrategy> ioSessionFactoryRegistry) {
120 this(ioReactor, null, ioSessionFactoryRegistry, (DnsResolver) null);
121 }
122
123 public PoolingNHttpClientConnectionManager(
124 final ConnectingIOReactor ioReactor,
125125 final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory,
126126 final DnsResolver dnsResolver) {
127 this(ioreactor, connFactory, getDefaultRegistry(), dnsResolver);
128 }
129
130 public PoolingNHttpClientConnectionManager(
131 final ConnectingIOReactor ioreactor,
127 this(ioReactor, connFactory, getDefaultRegistry(), dnsResolver);
128 }
129
130 public PoolingNHttpClientConnectionManager(
131 final ConnectingIOReactor ioReactor,
132132 final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory,
133133 final SocketAddressResolver<HttpRoute> socketAddressResolver) {
134 this(ioreactor, connFactory, getDefaultRegistry(), socketAddressResolver);
135 }
136
137 public PoolingNHttpClientConnectionManager(
138 final ConnectingIOReactor ioreactor,
134 this(ioReactor, connFactory, getDefaultRegistry(), socketAddressResolver);
135 }
136
137 public PoolingNHttpClientConnectionManager(
138 final ConnectingIOReactor ioReactor,
139139 final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory) {
140 this(ioreactor, connFactory, getDefaultRegistry(), (DnsResolver) null);
141 }
142
143 public PoolingNHttpClientConnectionManager(
144 final ConnectingIOReactor ioreactor,
140 this(ioReactor, connFactory, getDefaultRegistry(), (DnsResolver) null);
141 }
142
143 public PoolingNHttpClientConnectionManager(
144 final ConnectingIOReactor ioReactor,
145145 final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory,
146 final Registry<SchemeIOSessionStrategy> iosessionFactoryRegistry) {
147 this(ioreactor, connFactory, iosessionFactoryRegistry, (DnsResolver) null);
148 }
149
150 public PoolingNHttpClientConnectionManager(
151 final ConnectingIOReactor ioreactor,
146 final Registry<SchemeIOSessionStrategy> ioSessionFactoryRegistry) {
147 this(ioReactor, connFactory, ioSessionFactoryRegistry, (DnsResolver) null);
148 }
149
150 public PoolingNHttpClientConnectionManager(
151 final ConnectingIOReactor ioReactor,
152152 final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory,
153 final Registry<SchemeIOSessionStrategy> iosessionFactoryRegistry,
153 final Registry<SchemeIOSessionStrategy> ioSessionFactoryRegistry,
154154 final DnsResolver dnsResolver) {
155 this(ioreactor, connFactory, iosessionFactoryRegistry, null, dnsResolver,
155 this(ioReactor, connFactory, ioSessionFactoryRegistry, null, dnsResolver,
156156 -1, TimeUnit.MILLISECONDS);
157157 }
158158
159159 public PoolingNHttpClientConnectionManager(
160 final ConnectingIOReactor ioreactor,
160 final ConnectingIOReactor ioReactor,
161161 final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory,
162 final Registry<SchemeIOSessionStrategy> iosessionFactoryRegistry,
162 final Registry<SchemeIOSessionStrategy> ioSessionFactoryRegistry,
163163 final SocketAddressResolver<HttpRoute> socketAddressResolver) {
164 this(ioreactor, connFactory, iosessionFactoryRegistry, socketAddressResolver,
164 this(ioReactor, connFactory, ioSessionFactoryRegistry, socketAddressResolver,
165165 -1, TimeUnit.MILLISECONDS);
166166 }
167167
168168 public PoolingNHttpClientConnectionManager(
169 final ConnectingIOReactor ioreactor,
169 final ConnectingIOReactor ioReactor,
170170 final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory,
171 final Registry<SchemeIOSessionStrategy> iosessionFactoryRegistry,
171 final Registry<SchemeIOSessionStrategy> ioSessionFactoryRegistry,
172172 final SchemePortResolver schemePortResolver,
173173 final DnsResolver dnsResolver,
174 final long timeToLive, final TimeUnit tunit) {
175 this(ioreactor, connFactory, iosessionFactoryRegistry,
176 new InternalAddressResolver(schemePortResolver, dnsResolver), timeToLive, tunit);
177 }
178
179 public PoolingNHttpClientConnectionManager(
180 final ConnectingIOReactor ioreactor,
174 final long timeToLive, final TimeUnit timeUnit) {
175 this(ioReactor, connFactory, ioSessionFactoryRegistry,
176 new InternalAddressResolver(schemePortResolver, dnsResolver), timeToLive, timeUnit);
177 }
178
179 public PoolingNHttpClientConnectionManager(
180 final ConnectingIOReactor ioReactor,
181181 final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory,
182 final Registry<SchemeIOSessionStrategy> iosessionFactoryRegistry,
182 final Registry<SchemeIOSessionStrategy> ioSessionFactoryRegistry,
183183 final SocketAddressResolver<HttpRoute> socketAddressResolver,
184 final long timeToLive, final TimeUnit tunit) {
184 final long timeToLive, final TimeUnit timeUnit) {
185185 super();
186 Args.notNull(ioreactor, "I/O reactor");
187 Args.notNull(iosessionFactoryRegistry, "I/O session factory registry");
186 Args.notNull(ioReactor, "I/O reactor");
187 Args.notNull(ioSessionFactoryRegistry, "I/O session factory registry");
188188 Args.notNull(socketAddressResolver, "Socket address resolver");
189 this.ioreactor = ioreactor;
189 this.ioReactor = ioReactor;
190190 this.configData = new ConfigData();
191 this.pool = new CPool(ioreactor,
191 this.pool = new CPool(ioReactor,
192192 new InternalConnectionFactory(this.configData, connFactory),
193193 socketAddressResolver,
194 2, 20, timeToLive, tunit != null ? tunit : TimeUnit.MILLISECONDS);
195 this.iosessionFactoryRegistry = iosessionFactoryRegistry;
194 2, 20, timeToLive, timeUnit != null ? timeUnit : TimeUnit.MILLISECONDS);
195 this.ioSessionFactoryRegistry = ioSessionFactoryRegistry;
196196 }
197197
198198 PoolingNHttpClientConnectionManager(
199 final ConnectingIOReactor ioreactor,
199 final ConnectingIOReactor ioReactor,
200200 final CPool pool,
201 final Registry<SchemeIOSessionStrategy> iosessionFactoryRegistry) {
201 final Registry<SchemeIOSessionStrategy> ioSessionFactoryRegistry) {
202202 super();
203 this.ioreactor = ioreactor;
203 this.ioReactor = ioReactor;
204204 this.configData = new ConfigData();
205205 this.pool = pool;
206 this.iosessionFactoryRegistry = iosessionFactoryRegistry;
206 this.ioSessionFactoryRegistry = ioSessionFactoryRegistry;
207207 }
208208
209209 @Override
217217
218218 @Override
219219 public void execute(final IOEventDispatch eventDispatch) throws IOException {
220 this.ioreactor.execute(eventDispatch);
220 this.ioReactor.execute(eventDispatch);
221221 }
222222
223223 public void shutdown(final long waitMs) throws IOException {
271271 final Object state,
272272 final long connectTimeout,
273273 final long leaseTimeout,
274 final TimeUnit tunit,
274 final TimeUnit timeUnit,
275275 final FutureCallback<NHttpClientConnection> callback) {
276276 Args.notNull(route, "HTTP route");
277277 if (this.log.isDebugEnabled()) {
284284 } else {
285285 host = route.getTargetHost();
286286 }
287 final SchemeIOSessionStrategy sf = this.iosessionFactoryRegistry.lookup(
287 final SchemeIOSessionStrategy sf = this.ioSessionFactoryRegistry.lookup(
288288 host.getSchemeName());
289289 if (sf == null) {
290290 resultFuture.failed(new UnsupportedSchemeException(host.getSchemeName() +
292292 return resultFuture;
293293 }
294294 final Future<CPoolEntry> leaseFuture = this.pool.lease(route, state,
295 connectTimeout, leaseTimeout, tunit != null ? tunit : TimeUnit.MILLISECONDS,
295 connectTimeout, leaseTimeout, timeUnit != null ? timeUnit : TimeUnit.MILLISECONDS,
296296 new FutureCallback<CPoolEntry>() {
297297
298298 @Override
302302 log.debug("Connection leased: " + format(entry) + formatStats(entry.getRoute()));
303303 }
304304 final NHttpClientConnection managedConn = CPoolProxy.newProxy(entry);
305 if (!resultFuture.completed(managedConn)) {
306 pool.release(entry, true);
305 synchronized (managedConn) {
306 if (!resultFuture.completed(managedConn)) {
307 pool.release(entry, true);
308 }
307309 }
308310 }
309311
361363 final NHttpClientConnection managedConn,
362364 final Object state,
363365 final long keepalive,
364 final TimeUnit tunit) {
366 final TimeUnit timeUnit) {
365367 Args.notNull(managedConn, "Managed connection");
366368 synchronized (managedConn) {
367369 final CPoolEntry entry = CPoolProxy.detach(managedConn);
375377 try {
376378 if (conn.isOpen()) {
377379 entry.setState(state);
378 entry.updateExpiry(keepalive, tunit != null ? tunit : TimeUnit.MILLISECONDS);
380 entry.updateExpiry(keepalive, timeUnit != null ? timeUnit : TimeUnit.MILLISECONDS);
379381 if (this.log.isDebugEnabled()) {
380382 final String s;
381383 if (keepalive > 0) {
400402 Lookup<SchemeIOSessionStrategy> reg = (Lookup<SchemeIOSessionStrategy>) context.getAttribute(
401403 IOSESSION_FACTORY_REGISTRY);
402404 if (reg == null) {
403 reg = this.iosessionFactoryRegistry;
405 reg = this.ioSessionFactoryRegistry;
404406 }
405407 return reg;
406408 }
485487 }
486488
487489 @Override
488 public void closeIdleConnections(final long idleTimeout, final TimeUnit tunit) {
490 public void closeIdleConnections(final long idleTimeout, final TimeUnit timeUnit) {
489491 if (this.log.isDebugEnabled()) {
490 this.log.debug("Closing connections idle longer than " + idleTimeout + " " + tunit);
491 }
492 this.pool.closeIdle(idleTimeout, tunit);
492 this.log.debug("Closing connections idle longer than " + idleTimeout + " " + timeUnit);
493 }
494 this.pool.closeIdle(idleTimeout, timeUnit);
493495 }
494496
495497 @Override
610612
611613 @Override
612614 public ManagedNHttpClientConnection create(
613 final HttpRoute route, final IOSession iosession) throws IOException {
615 final HttpRoute route, final IOSession ioSession) throws IOException {
614616 ConnectionConfig config = null;
615617 if (route.getProxyHost() != null) {
616618 config = this.configData.getConnectionConfig(route.getProxyHost());
624626 if (config == null) {
625627 config = ConnectionConfig.DEFAULT;
626628 }
627 final ManagedNHttpClientConnection conn = this.connFactory.create(iosession, config);
628 iosession.setAttribute(IOEventDispatch.CONNECTION_KEY, conn);
629 final ManagedNHttpClientConnection conn = this.connFactory.create(ioSession, config);
630 ioSession.setAttribute(IOEventDispatch.CONNECTION_KEY, conn);
629631 return conn;
630632 }
631633
6161 * if the consumer is temporarily unable to consume more content.
6262 *
6363 * @param buf chunk of content.
64 * @param ioctrl I/O control of the underlying connection.
64 * @param ioControl I/O control of the underlying connection.
6565 * @throws IOException in case of an I/O error
6666 */
6767 protected abstract void onByteReceived(
68 ByteBuffer buf, IOControl ioctrl) throws IOException;
68 ByteBuffer buf, IOControl ioControl) throws IOException;
6969
7070 @Override
7171 protected final void onEntityEnclosed(
7474
7575 @Override
7676 protected final void onContentReceived(
77 final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
77 final ContentDecoder decoder, final IOControl ioControl) throws IOException {
7878 Asserts.notNull(this.bbuf, "Byte buffer");
7979 final int bytesRead = decoder.read(this.bbuf);
8080 if (bytesRead <= 0) {
8181 return;
8282 }
8383 this.bbuf.flip();
84 onByteReceived(this.bbuf, ioctrl);
84 onByteReceived(this.bbuf, ioControl);
8585 this.bbuf.clear();
8686 }
8787
5252 private final ByteBuffer bbuf;
5353 private final CharBuffer cbuf;
5454
55 private CharsetDecoder chardecoder;
55 private CharsetDecoder charDecoder;
5656
5757 public AsyncCharConsumer(final int bufSize) {
5858 super();
7070 * if the consumer is temporarily unable to consume more content.
7171 *
7272 * @param buf chunk of content.
73 * @param ioctrl I/O control of the underlying connection.
73 * @param ioControl I/O control of the underlying connection.
7474 * @throws IOException in case of an I/O error
7575 */
7676 protected abstract void onCharReceived(
77 CharBuffer buf, IOControl ioctrl) throws IOException;
77 CharBuffer buf, IOControl ioControl) throws IOException;
7878
7979 /**
8080 * Invoked to create a @{link CharsetDecoder} for contentType.
9797 @Override
9898 protected final void onEntityEnclosed(
9999 final HttpEntity entity, final ContentType contentType) throws IOException {
100 this.chardecoder = createDecoder(contentType != null ? contentType : ContentType.DEFAULT_TEXT);
100 this.charDecoder = createDecoder(contentType != null ? contentType : ContentType.DEFAULT_TEXT);
101101 }
102102
103103 @Override
104104 protected final void onContentReceived(
105 final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
105 final ContentDecoder decoder, final IOControl ioControl) throws IOException {
106106 Asserts.notNull(this.bbuf, "Byte buffer");
107107
108108 final int bytesRead = decoder.read(this.bbuf);
111111 }
112112 this.bbuf.flip();
113113 final boolean completed = decoder.isCompleted();
114 CoderResult result = this.chardecoder.decode(this.bbuf, this.cbuf, completed);
115 handleDecodingResult(result, ioctrl);
114 CoderResult result = this.charDecoder.decode(this.bbuf, this.cbuf, completed);
115 handleDecodingResult(result, ioControl);
116116 this.bbuf.compact();
117117 if (completed) {
118 result = this.chardecoder.flush(this.cbuf);
119 handleDecodingResult(result, ioctrl);
118 result = this.charDecoder.flush(this.cbuf);
119 handleDecodingResult(result, ioControl);
120120 }
121121 }
122122
123123 private void handleDecodingResult(
124 final CoderResult result, final IOControl ioctrl) throws IOException {
124 final CoderResult result, final IOControl ioControl) throws IOException {
125125 if (result.isError()) {
126126 result.throwException();
127127 }
128128 this.cbuf.flip();
129129 if (this.cbuf.hasRemaining()) {
130 onCharReceived(this.cbuf, ioctrl);
130 onCharReceived(this.cbuf, ioControl);
131131 }
132132 this.cbuf.clear();
133133 }
9797
9898 @Override
9999 public synchronized void produceContent(
100 final ContentEncoder encoder, final IOControl ioctrl) throws IOException {
100 final ContentEncoder encoder, final IOControl ioControl) throws IOException {
101101 if (this.fileChannel == null) {
102102 this.fileChannel = this.accessfile.getChannel();
103103 this.idx = 0;
8989
9090 @Override
9191 protected void onContentReceived(
92 final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
92 final ContentDecoder decoder, final IOControl ioControl) throws IOException {
9393 Asserts.notNull(this.fileChannel, "File channel");
9494 final long transferred;
9595 if (decoder instanceof FileContentDecoder) {
5050 /**
5151 * Binds connection to the given I/O session.
5252 */
53 void bind(IOSession iosession);
53 void bind(IOSession ioSession);
5454
5555 /**
5656 * Returns the underlying I/O session.
164164 * All expired connections will also be closed.
165165 *
166166 * @param idletime the idle time of connections to be closed
167 * @param tunit the unit for the {@code idletime}
167 * @param timeUnit the unit for the {@code idletime}
168168 *
169169 * @see #closeExpiredConnections()
170170 */
171 void closeIdleConnections(long idletime, TimeUnit tunit);
171 void closeIdleConnections(long idletime, TimeUnit timeUnit);
172172
173173 /**
174174 * Closes all expired connections in the pool.
3636 */
3737 public interface NHttpConnectionFactory<T extends NHttpConnection> {
3838
39 T create(IOSession iosession, ConnectionConfig config);
39 T create(IOSession ioSession, ConnectionConfig config);
4040
4141 }
3939 public static final NoopIOSessionStrategy INSTANCE = new NoopIOSessionStrategy();
4040
4141 @Override
42 public IOSession upgrade(final HttpHost host, final IOSession iosession) {
43 return iosession;
42 public IOSession upgrade(final HttpHost host, final IOSession ioSession) {
43 return ioSession;
4444 }
4545
4646 @Override
5252 * Decorates the original {@link IOSession} with a transport level security
5353 * protocol implementation.
5454 * @param host the target host.
55 * @param iosession the I/O session.
55 * @param ioSession the I/O session.
5656 * @return upgraded I/O session.
5757 */
58 IOSession upgrade(HttpHost host, IOSession iosession) throws IOException;
58 IOSession upgrade(HttpHost host, IOSession ioSession) throws IOException;
5959
6060 }
6262 */
6363 public class SSLIOSessionStrategy implements SchemeIOSessionStrategy {
6464
65 /**
66 * @deprecated Do not use.
67 */
6568 @Deprecated
6669 public static final X509HostnameVerifier ALLOW_ALL_HOSTNAME_VERIFIER =
6770 new AllowAllHostnameVerifier();
6871
72 /**
73 * @deprecated Do not use.
74 */
6975 @Deprecated
7076 public static final X509HostnameVerifier BROWSER_COMPATIBLE_HOSTNAME_VERIFIER =
7177 new BrowserCompatHostnameVerifier();
7278
79 /**
80 * @deprecated Do not use.
81 */
7382 @Deprecated
7483 public static final X509HostnameVerifier STRICT_HOSTNAME_VERIFIER =
7584 new StrictHostnameVerifier();
159168 }
160169
161170 @Override
162 public SSLIOSession upgrade(final HttpHost host, final IOSession iosession) throws IOException {
163 Asserts.check(!(iosession instanceof SSLIOSession), "I/O session is already upgraded to TLS/SSL");
164 final SSLIOSession ssliosession = new SSLIOSession(
165 iosession,
171 public SSLIOSession upgrade(final HttpHost host, final IOSession ioSession) throws IOException {
172 Asserts.check(!(ioSession instanceof SSLIOSession), "I/O session is already upgraded to TLS/SSL");
173 final SSLIOSession sslioSession = new SSLIOSession(
174 ioSession,
166175 SSLMode.CLIENT,
167176 host,
168177 this.sslContext,
182191
183192 @Override
184193 public void verify(
185 final IOSession iosession,
194 final IOSession ioSession,
186195 final SSLSession sslsession) throws SSLException {
187 verifySession(host, iosession, sslsession);
196 verifySession(host, ioSession, sslsession);
188197 }
189198
190199 });
191 iosession.setAttribute(SSLIOSession.SESSION_KEY, ssliosession);
192 ssliosession.initialize();
193 return ssliosession;
200 ioSession.setAttribute(SSLIOSession.SESSION_KEY, sslioSession);
201 sslioSession.initialize();
202 return sslioSession;
194203 }
195204
196205 protected void initializeEngine(final SSLEngine engine) {
198207
199208 protected void verifySession(
200209 final HttpHost host,
201 final IOSession iosession,
210 final IOSession ioSession,
202211 final SSLSession sslsession) throws SSLException {
203212 if (!this.hostnameVerifier.verify(host.getHostName(), sslsession)) {
204213 final Certificate[] certs = sslsession.getPeerCertificates();
131131
132132 protected AbstractHttpAsyncClient(final IOReactorConfig config) throws IOReactorException {
133133 super();
134 final DefaultConnectingIOReactor defaultioreactor = new DefaultConnectingIOReactor(config);
135 defaultioreactor.setExceptionHandler(new InternalIOReactorExceptionHandler(this.log));
136 this.connmgr = new PoolingClientAsyncConnectionManager(defaultioreactor);
134 final DefaultConnectingIOReactor defaultioReactor = new DefaultConnectingIOReactor(config);
135 defaultioReactor.setExceptionHandler(new InternalIOReactorExceptionHandler(this.log));
136 this.connmgr = new PoolingClientAsyncConnectionManager(defaultioReactor);
137137 this.queue = new ConcurrentLinkedQueue<HttpAsyncRequestExecutionHandler<?>>();
138138 }
139139
104104 private final HttpContext localContext;
105105 private final ResultCallback<T> resultCallback;
106106 private final ClientAsyncConnectionManager connmgr;
107 private final HttpProcessor httppocessor;
107 private final HttpProcessor httpPocessor;
108108 private final HttpRoutePlanner routePlanner;
109109 private final HttpRouteDirector routeDirector;
110110 private final ConnectionReuseStrategy reuseStrategy;
144144 final HttpContext localContext,
145145 final ResultCallback<T> callback,
146146 final ClientAsyncConnectionManager connmgr,
147 final HttpProcessor httppocessor,
147 final HttpProcessor httpPocessor,
148148 final HttpRoutePlanner routePlanner,
149149 final ConnectionReuseStrategy reuseStrategy,
150150 final ConnectionKeepAliveStrategy keepaliveStrategy,
160160 this.localContext = localContext;
161161 this.resultCallback = callback;
162162 this.connmgr = connmgr;
163 this.httppocessor = httppocessor;
163 this.httpPocessor = httpPocessor;
164164 this.routePlanner = routePlanner;
165165 this.reuseStrategy = reuseStrategy;
166166 this.keepaliveStrategy = keepaliveStrategy;
327327
328328 @Override
329329 public synchronized void produceContent(
330 final ContentEncoder encoder, final IOControl ioctrl) throws IOException {
330 final ContentEncoder encoder, final IOControl ioControl) throws IOException {
331331 if (this.log.isDebugEnabled()) {
332332 this.log.debug("[exchange: " + this.id + "] produce content");
333333 }
334334 this.requestContentProduced = true;
335 this.requestProducer.produceContent(encoder, ioctrl);
335 this.requestProducer.produceContent(encoder, ioControl);
336336 if (encoder.isCompleted()) {
337337 this.requestProducer.resetRequest();
338338 }
403403
404404 @Override
405405 public synchronized void consumeContent(
406 final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
406 final ContentDecoder decoder, final IOControl ioControl) throws IOException {
407407 if (this.log.isDebugEnabled()) {
408408 this.log.debug("[exchange: " + this.id + "] Consume content");
409409 }
410410 if (this.finalResponse != null) {
411 this.responseConsumer.consumeContent(decoder, ioctrl);
411 this.responseConsumer.consumeContent(decoder, ioControl);
412412 } else {
413413 if (this.tmpbuf == null) {
414414 this.tmpbuf = ByteBuffer.allocate(2048);
686686 }
687687
688688 private RequestWrapper wrapRequest(final HttpRequest request) throws ProtocolException {
689 if (request instanceof HttpEntityEnclosingRequest) {
690 return new EntityEnclosingRequestWrapper((HttpEntityEnclosingRequest) request);
691 } else {
692 return new RequestWrapper(request);
693 }
689 return request instanceof HttpEntityEnclosingRequest
690 ? new EntityEnclosingRequestWrapper((HttpEntityEnclosingRequest) request)
691 : new RequestWrapper(request);
694692 }
695693
696694 protected void rewriteRequestURI(
771769 return null;
772770 }
773771
774 private RoutedRequest handleConnectResponse() throws HttpException {
772 private RoutedRequest handleConnectResponse() {
775773 RoutedRequest followup = null;
776774 if (HttpClientParams.isAuthenticating(this.params)) {
777775 final CredentialsProvider credsProvider = (CredentialsProvider) this.localContext.getAttribute(
840838 }
841839
842840 private RoutedRequest handleTargetChallenge(
843 final CredentialsProvider credsProvider) throws HttpException {
841 final CredentialsProvider credsProvider) {
844842 final HttpRoute route = this.mainRequest.getRoute();
845843 HttpHost target = (HttpHost) this.localContext.getAttribute(
846844 ExecutionContext.HTTP_TARGET_HOST);
853851 this.targetAuthStrategy, this.targetAuthState, this.localContext)) {
854852 // Re-try the same request via the same route
855853 return this.mainRequest;
856 } else {
857 return null;
858 }
854 }
855 return null;
859856 }
860857 return null;
861858 }
862859
863860 private RoutedRequest handleProxyChallenge(
864 final CredentialsProvider credsProvider) throws HttpException {
861 final CredentialsProvider credsProvider) {
865862 final HttpRoute route = this.mainRequest.getRoute();
866863 final HttpHost proxy = route.getProxyHost();
867864 if (this.authenticator.isAuthenticationRequested(proxy, this.currentResponse,
870867 this.proxyAuthStrategy, this.proxyAuthState, this.localContext)) {
871868 // Re-try the same request via the same route
872869 return this.mainRequest;
873 } else {
874 return null;
875 }
870 }
871 return null;
876872 }
877873 return null;
878874 }
884880
885881 @Override
886882 public HttpProcessor getHttpProcessor() {
887 return this.httppocessor;
883 return this.httpPocessor;
888884 }
889885
890886 @Override
4141 public class DefaultClientAsyncConnection
4242 extends DefaultNHttpClientConnection implements ClientAsyncConnection {
4343
44 private final Log headerlog = LogFactory.getLog("org.apache.http.headers");
45 private final Log wirelog = LogFactory.getLog("org.apache.http.wire");
44 private final Log headerLog = LogFactory.getLog("org.apache.http.headers");
45 private final Log wireLog = LogFactory.getLog("org.apache.http.wire");
4646 private final Log log;
4747
4848 private final String id;
5050
5151 public DefaultClientAsyncConnection(
5252 final String id,
53 final IOSession iosession,
53 final IOSession ioSession,
5454 final HttpResponseFactory responseFactory,
5555 final ByteBufferAllocator allocator,
5656 final HttpParams params) {
57 super(iosession, responseFactory, allocator, params);
57 super(ioSession, responseFactory, allocator, params);
5858 this.id = id;
59 this.original = iosession;
60 this.log = LogFactory.getLog(iosession.getClass());
61 if (this.log.isDebugEnabled() || this.wirelog.isDebugEnabled()) {
62 bind(new LoggingIOSession(iosession, this.id, this.log, this.wirelog));
59 this.original = ioSession;
60 this.log = LogFactory.getLog(ioSession.getClass());
61 if (this.log.isDebugEnabled() || this.wireLog.isDebugEnabled()) {
62 bind(new LoggingIOSession(ioSession, this.id, this.log, this.wireLog));
6363 }
6464 }
6565
6666 @Override
67 public void upgrade(final IOSession iosession) {
68 this.original = iosession;
69 if (this.log.isDebugEnabled() || this.wirelog.isDebugEnabled()) {
70 this.log.debug(this.id + " Upgrade session " + iosession);
71 bind(new LoggingIOSession(iosession, this.id, this.headerlog, this.wirelog));
67 public void upgrade(final IOSession ioSession) {
68 this.original = ioSession;
69 if (this.log.isDebugEnabled() || this.wireLog.isDebugEnabled()) {
70 this.log.debug(this.id + " Upgrade session " + ioSession);
71 bind(new LoggingIOSession(ioSession, this.id, this.headerLog, this.wireLog));
7272 } else {
73 bind(iosession);
73 bind(ioSession);
7474 }
7575 }
7676
8585
8686 @Override
8787 protected void onResponseReceived(final HttpResponse response) {
88 if (response != null && this.headerlog.isDebugEnabled()) {
89 this.headerlog.debug(this.id + " << " + response.getStatusLine().toString());
88 if (response != null && this.headerLog.isDebugEnabled()) {
89 this.headerLog.debug(this.id + " << " + response.getStatusLine().toString());
9090 final Header[] headers = response.getAllHeaders();
9191 for (final Header header : headers) {
92 this.headerlog.debug(this.id + " << " + header.toString());
92 this.headerLog.debug(this.id + " << " + header.toString());
9393 }
9494 }
9595 }
9696
9797 @Override
9898 protected void onRequestSubmitted(final HttpRequest request) {
99 if (request != null && this.headerlog.isDebugEnabled()) {
100 this.headerlog.debug(this.id + " >> " + request.getRequestLine().toString());
99 if (request != null && this.headerLog.isDebugEnabled()) {
100 this.headerLog.debug(this.id + " >> " + request.getRequestLine().toString());
101101 final Header[] headers = request.getAllHeaders();
102102 for (final Header header : headers) {
103 this.headerlog.debug(this.id + " >> " + header.toString());
103 this.headerLog.debug(this.id + " >> " + header.toString());
104104 }
105105 }
106106 }
5454 public class DefaultClientAsyncConnectionFactory
5555 implements ClientAsyncConnectionFactory, NHttpConnectionFactory<ManagedNHttpClientConnection> {
5656
57 private final Log headerlog = LogFactory.getLog("org.apache.http.headers");
58 private final Log wirelog = LogFactory.getLog("org.apache.http.wire");
57 private final Log headerLog = LogFactory.getLog("org.apache.http.headers");
58 private final Log wireLog = LogFactory.getLog("org.apache.http.wire");
5959 private final Log log = LogFactory.getLog(ManagedNHttpClientConnectionImpl.class);
6060
6161 public static final DefaultClientAsyncConnectionFactory INSTANCE = new DefaultClientAsyncConnectionFactory(null, null);
8888 @Deprecated
8989 public ClientAsyncConnection create(
9090 final String id,
91 final IOSession iosession,
91 final IOSession ioSession,
9292 final HttpParams params) {
9393 return new DefaultClientAsyncConnection(
94 id, iosession, this.responseFactory, this.allocator, params);
94 id, ioSession, this.responseFactory, this.allocator, params);
9595 }
9696
9797 @Deprecated
106106
107107 @Override
108108 public ManagedNHttpClientConnection create(
109 final IOSession iosession, final ConnectionConfig config) {
109 final IOSession ioSession, final ConnectionConfig config) {
110110 final String id = "http-outgoing-" + Long.toString(COUNTER.getAndIncrement());
111 CharsetDecoder chardecoder = null;
112 CharsetEncoder charencoder = null;
111 CharsetDecoder charDecoder = null;
112 CharsetEncoder charEncoder = null;
113113 final Charset charset = config.getCharset();
114114 final CodingErrorAction malformedInputAction = config.getMalformedInputAction() != null ?
115115 config.getMalformedInputAction() : CodingErrorAction.REPORT;
116116 final CodingErrorAction unmappableInputAction = config.getUnmappableInputAction() != null ?
117117 config.getUnmappableInputAction() : CodingErrorAction.REPORT;
118118 if (charset != null) {
119 chardecoder = charset.newDecoder();
120 chardecoder.onMalformedInput(malformedInputAction);
121 chardecoder.onUnmappableCharacter(unmappableInputAction);
122 charencoder = charset.newEncoder();
123 charencoder.onMalformedInput(malformedInputAction);
124 charencoder.onUnmappableCharacter(unmappableInputAction);
119 charDecoder = charset.newDecoder();
120 charDecoder.onMalformedInput(malformedInputAction);
121 charDecoder.onUnmappableCharacter(unmappableInputAction);
122 charEncoder = charset.newEncoder();
123 charEncoder.onMalformedInput(malformedInputAction);
124 charEncoder.onUnmappableCharacter(unmappableInputAction);
125125 }
126126 final ManagedNHttpClientConnection conn = new ManagedNHttpClientConnectionImpl(
127127 id,
128128 this.log,
129 this.headerlog,
130 this.wirelog,
131 iosession,
129 this.headerLog,
130 this.wireLog,
131 ioSession,
132132 config.getBufferSize(),
133133 config.getFragmentSizeHint(),
134134 this.allocator,
135 chardecoder, charencoder, config.getMessageConstraints(),
135 charDecoder, charEncoder, config.getMessageConstraints(),
136136 null, null, null,
137137 this.responseParserFactory);
138 iosession.setAttribute(IOEventDispatch.CONNECTION_KEY, conn);
138 ioSession.setAttribute(IOEventDispatch.CONNECTION_KEY, conn);
139139 return conn;
140140 }
141141
4747 private final Log log;
4848 private final AsyncSchemeRegistry schemeRegistry;
4949 private final long connTimeToLive;
50 private final TimeUnit tunit;
50 private final TimeUnit timeUnit;
5151
5252 HttpNIOConnPool(
5353 final Log log,
54 final ConnectingIOReactor ioreactor,
54 final ConnectingIOReactor ioReactor,
5555 final AsyncSchemeRegistry schemeRegistry,
56 final long connTimeToLive, final TimeUnit tunit) {
57 super(ioreactor, new HttpNIOConnPoolFactory(), 2, 20);
56 final long connTimeToLive, final TimeUnit timeUnit) {
57 super(ioReactor, new HttpNIOConnPoolFactory(), 2, 20);
5858 this.log = log;
5959 this.schemeRegistry = schemeRegistry;
6060 this.connTimeToLive = connTimeToLive;
61 this.tunit = tunit;
61 this.timeUnit = timeUnit;
6262 }
6363
6464 @Override
8484 @Override
8585 protected HttpPoolEntry createEntry(final HttpRoute route, final IOSession session) {
8686 final String id = Long.toString(COUNTER.getAndIncrement());
87 return new HttpPoolEntry(this.log, id, route, session, this.connTimeToLive, this.tunit);
87 return new HttpPoolEntry(this.log, id, route, session, this.connTimeToLive, this.timeUnit);
8888 }
8989
9090 }
4444 private final RouteTracker tracker;
4545
4646 HttpPoolEntry(final Log log, final String id, final HttpRoute route, final IOSession session,
47 final long timeToLive, final TimeUnit tunit) {
48 super(id, route, session, timeToLive, tunit);
47 final long timeToLive, final TimeUnit timeUnit) {
48 super(id, route, session, timeToLive, timeUnit);
4949 this.log = log;
5050 this.tracker = new RouteTracker(route);
5151 }
133133 @Override
134134 public boolean isOpen() {
135135 final ClientAsyncConnection conn = getConnection();
136 if (conn != null) {
137 return conn.isOpen();
138 } else {
139 return false;
140 }
136 return conn != null ? conn.isOpen() : false;
141137 }
142138
143139 @Override
274270 @Override
275271 public SSLSession getSSLSession() {
276272 final ClientAsyncConnection conn = ensureConnection();
277 final IOSession iosession = conn.getIOSession();
278 if (iosession instanceof SSLIOSession) {
279 return ((SSLIOSession) iosession).getSSLSession();
280 } else {
281 return null;
282 }
273 final IOSession ioSession = conn.getIOSession();
274 return ioSession instanceof SSLIOSession
275 ? ((SSLIOSession) ioSession).getSSLSession()
276 : null;
283277 }
284278
285279 @Override
340334
341335 final HttpHost target = route.getTargetHost();
342336 final HttpHost proxy = route.getProxyHost();
343 IOSession iosession = entry.getConnection();
337 IOSession ioSession = entry.getConnection();
344338
345339 if (proxy == null) {
346340 final AsyncScheme scheme = getSchemeRegistry(context).getScheme(target);
347341 final LayeringStrategy layeringStrategy = scheme.getLayeringStrategy();
348342 if (layeringStrategy != null) {
349 iosession = layeringStrategy.layer(iosession);
343 ioSession = layeringStrategy.layer(ioSession);
350344 }
351345 }
352346
353347 final ClientAsyncConnection conn = this.connFactory.create(
354348 "http-outgoing-" + entry.getId(),
355 iosession,
349 ioSession,
356350 params);
357 iosession.setAttribute(IOEventDispatch.CONNECTION_KEY, conn);
351 ioSession.setAttribute(IOEventDispatch.CONNECTION_KEY, conn);
358352
359353 if (proxy == null) {
360354 tracker.connectTarget(conn.getIOSession() instanceof SSLIOSession);
409403 throw new IllegalStateException(scheme.getName() +
410404 " scheme does not provider support for protocol layering");
411405 }
412 final IOSession iosession = entry.getConnection();
413 final ClientAsyncConnection conn = (ClientAsyncConnection) iosession.getAttribute(
406 final IOSession ioSession = entry.getConnection();
407 final ClientAsyncConnection conn = (ClientAsyncConnection) ioSession.getAttribute(
414408 IOEventDispatch.CONNECTION_KEY);
415 conn.upgrade((SSLIOSession) layeringStrategy.layer(iosession));
409 conn.upgrade(layeringStrategy.layer(ioSession));
416410 tracker.layerProtocol(layeringStrategy.isSecure());
417411 }
418412
431425 return;
432426 }
433427 this.reusable = false;
434 final IOSession iosession = this.poolEntry.getConnection();
435 final ClientAsyncConnection conn = (ClientAsyncConnection) iosession.getAttribute(
428 final IOSession ioSession = this.poolEntry.getConnection();
429 final ClientAsyncConnection conn = (ClientAsyncConnection) ioSession.getAttribute(
436430 IOEventDispatch.CONNECTION_KEY);
437431 try {
438432 conn.shutdown();
444438
445439 @Override
446440 public synchronized String toString() {
447 if (this.poolEntry != null) {
448 return this.poolEntry.toString();
449 } else {
450 return "released";
451 }
441 return this.poolEntry != null ? this.poolEntry.toString() : "released";
452442 }
453443
454444 }
5252
5353 private final Log log = LogFactory.getLog(getClass());
5454
55 private final ConnectingIOReactor ioreactor;
55 private final ConnectingIOReactor ioReactor;
5656 private final HttpNIOConnPool pool;
5757 private final AsyncSchemeRegistry schemeRegistry;
5858 private final ClientAsyncConnectionFactory connFactory;
5959
6060 public PoolingClientAsyncConnectionManager(
61 final ConnectingIOReactor ioreactor,
61 final ConnectingIOReactor ioReactor,
6262 final AsyncSchemeRegistry schemeRegistry,
63 final long timeToLive, final TimeUnit tunit) {
63 final long timeToLive, final TimeUnit timeUnit) {
6464 super();
65 Args.notNull(ioreactor, "I/O reactor");
65 Args.notNull(ioReactor, "I/O reactor");
6666 Args.notNull(schemeRegistry, "Scheme registory");
67 Args.notNull(tunit, "Time unit");
68 this.ioreactor = ioreactor;
69 this.pool = new HttpNIOConnPool(this.log, ioreactor, schemeRegistry, timeToLive, tunit);
67 Args.notNull(timeUnit, "Time unit");
68 this.ioReactor = ioReactor;
69 this.pool = new HttpNIOConnPool(this.log, ioReactor, schemeRegistry, timeToLive, timeUnit);
7070 this.schemeRegistry = schemeRegistry;
7171 this.connFactory = createClientAsyncConnectionFactory();
7272 }
7373
7474 public PoolingClientAsyncConnectionManager(
75 final ConnectingIOReactor ioreactor,
75 final ConnectingIOReactor ioReactor,
7676 final AsyncSchemeRegistry schemeRegistry) throws IOReactorException {
77 this(ioreactor, schemeRegistry, -1, TimeUnit.MILLISECONDS);
77 this(ioReactor, schemeRegistry, -1, TimeUnit.MILLISECONDS);
7878 }
7979
8080 public PoolingClientAsyncConnectionManager(
81 final ConnectingIOReactor ioreactor) throws IOReactorException {
82 this(ioreactor, AsyncSchemeRegistryFactory.createDefault());
81 final ConnectingIOReactor ioReactor) throws IOReactorException {
82 this(ioReactor, AsyncSchemeRegistryFactory.createDefault());
8383 }
8484
8585 @Override
102102
103103 @Override
104104 public void execute(final IOEventDispatch eventDispatch) throws IOException {
105 this.ioreactor.execute(eventDispatch);
105 this.ioReactor.execute(eventDispatch);
106106 }
107107
108108 @Override
109109 public IOReactorStatus getStatus() {
110 return this.ioreactor.getStatus();
110 return this.ioReactor.getStatus();
111111 }
112112
113113 @Override
161161 final HttpRoute route,
162162 final Object state,
163163 final long connectTimeout,
164 final TimeUnit tunit,
164 final TimeUnit timeUnit,
165165 final FutureCallback<ManagedClientAsyncConnection> callback) {
166166 Args.notNull(route, "HTTP route");
167 Args.notNull(tunit, "Time unit");
167 Args.notNull(timeUnit, "Time unit");
168168 if (this.log.isDebugEnabled()) {
169169 this.log.debug("Connection request: " + format(route, state) + formatStats(route));
170170 }
171171 final BasicFuture<ManagedClientAsyncConnection> future = new BasicFuture<ManagedClientAsyncConnection>(
172172 callback);
173 this.pool.lease(route, state, connectTimeout, tunit, new InternalPoolEntryCallback(future));
173 this.pool.lease(route, state, connectTimeout, timeUnit, new InternalPoolEntryCallback(future));
174174 return future;
175175 }
176176
178178 public void releaseConnection(
179179 final ManagedClientAsyncConnection conn,
180180 final long keepalive,
181 final TimeUnit tunit) {
181 final TimeUnit timeUnit) {
182182 Args.notNull(conn, "HTTP connection");
183183 if (!(conn instanceof ManagedClientAsyncConnectionImpl)) {
184184 throw new IllegalArgumentException("Connection class mismatch, " +
185185 "connection not obtained from this manager");
186186 }
187 Args.notNull(tunit, "Time unit");
187 Args.notNull(timeUnit, "Time unit");
188188 final ManagedClientAsyncConnectionImpl managedConn = (ManagedClientAsyncConnectionImpl) conn;
189189 final ClientAsyncConnectionManager manager = managedConn.getManager();
190190 if (manager != null && manager != this) {
210210 }
211211 }
212212 if (managedConn.isOpen()) {
213 entry.updateExpiry(keepalive, tunit != null ? tunit : TimeUnit.MILLISECONDS);
213 entry.updateExpiry(keepalive, timeUnit != null ? timeUnit : TimeUnit.MILLISECONDS);
214214 if (this.log.isDebugEnabled()) {
215215 final String s;
216216 if (keepalive > 0) {
217 s = "for " + keepalive + " " + tunit;
217 s = "for " + keepalive + " " + timeUnit;
218218 } else {
219219 s = "indefinitely";
220220 }
272272 return this.pool.getMaxPerRoute(route);
273273 }
274274
275 public void closeIdleConnections(final long idleTimeout, final TimeUnit tunit) {
275 public void closeIdleConnections(final long idleTimeout, final TimeUnit timeUnit) {
276276 if (log.isDebugEnabled()) {
277 log.debug("Closing connections idle longer than " + idleTimeout + " " + tunit);
278 }
279 this.pool.closeIdle(idleTimeout, tunit);
277 log.debug("Closing connections idle longer than " + idleTimeout + " " + timeUnit);
278 }
279 this.pool.closeIdle(idleTimeout, timeUnit);
280280 }
281281
282282 public void closeExpiredConnections() {
3232 @Deprecated
3333 public interface ClientAsyncConnection extends NHttpClientConnection, HttpInetConnection {
3434
35 void upgrade(IOSession iosession);
35 void upgrade(IOSession ioSession);
3636
3737 IOSession getIOSession();
3838
3131 @Deprecated
3232 public interface ClientAsyncConnectionFactory {
3333
34 ClientAsyncConnection create(String id, IOSession iosession, HttpParams params);
34 ClientAsyncConnection create(String id, IOSession ioSession, HttpParams params);
3535
3636 }
5858
5959 void layerProtocol(HttpContext context, HttpParams params) throws IOException;
6060
61 void setIdleDuration(long duration, TimeUnit tunit);
61 void setIdleDuration(long duration, TimeUnit timeUnit);
6262
6363 }
9898 return this.name.equals(that.name)
9999 && this.defaultPort == that.defaultPort
100100 && this.strategy.equals(that.strategy);
101 } else {
102 return false;
103101 }
102 return false;
104103 }
105104
106105 @Override
3232
3333 boolean isSecure();
3434
35 IOSession layer(IOSession iosession);
35 IOSession layer(IOSession ioSession);
3636
3737 }
183183 }
184184
185185 @Override
186 public SSLIOSession layer(final IOSession iosession) {
187 final SSLIOSession ssliosession = new SSLIOSession(
188 iosession,
186 public SSLIOSession layer(final IOSession ioSession) {
187 final SSLIOSession sslioSession = new SSLIOSession(
188 ioSession,
189189 SSLMode.CLIENT,
190190 this.sslContext,
191191 new SSLSetupHandler() {
198198
199199 @Override
200200 public void verify(
201 final IOSession iosession,
201 final IOSession ioSession,
202202 final SSLSession sslsession) throws SSLException {
203 verifySession(iosession, sslsession);
203 verifySession(ioSession, sslsession);
204204 }
205205
206206 });
207 iosession.setAttribute(SSLIOSession.SESSION_KEY, ssliosession);
208 return ssliosession;
207 ioSession.setAttribute(SSLIOSession.SESSION_KEY, sslioSession);
208 return sslioSession;
209209 }
210210
211211 protected void initializeEngine(final SSLEngine engine) {
212212 }
213213
214214 protected void verifySession(
215 final IOSession iosession,
215 final IOSession ioSession,
216216 final SSLSession sslsession) throws SSLException {
217 final InetSocketAddress address = (InetSocketAddress) iosession.getRemoteAddress();
217 final InetSocketAddress address = (InetSocketAddress) ioSession.getRemoteAddress();
218218
219219 final Certificate[] certs = sslsession.getPeerCertificates();
220220 final X509Certificate x509 = (X509Certificate) certs[0];
6060 import org.junit.Test;
6161 import org.mockito.ArgumentCaptor;
6262 import org.mockito.Captor;
63 import org.mockito.Matchers;
6364 import org.mockito.Mock;
6465 import org.mockito.Mockito;
6566 import org.mockito.MockitoAnnotations;
6768 public class TestPoolingHttpClientAsyncConnectionManager {
6869
6970 @Mock
70 private ConnectingIOReactor ioreactor;
71 private ConnectingIOReactor ioReactor;
7172 @Mock
7273 private CPool pool;
7374 @Mock
8990 @Mock
9091 private SessionRequest sessionRequest;
9192 @Mock
92 private IOSession iosession;
93 private IOSession ioSession;
9394
9495 private Registry<SchemeIOSessionStrategy> layeringStrategyRegistry;
9596 private PoolingNHttpClientConnectionManager connman;
104105 .register("https", sslStrategy)
105106 .build();
106107 connman = new PoolingNHttpClientConnectionManager(
107 ioreactor, pool, layeringStrategyRegistry);
108 ioReactor, pool, layeringStrategyRegistry);
108109 }
109110
110111 @Test
130131 Assert.assertNotNull(future);
131132
132133 Mockito.verify(pool).lease(
133 Mockito.same(route),
134 Mockito.eq("some state"),
135 Mockito.eq(1000L),
136 Mockito.eq(2000L),
137 Mockito.eq(TimeUnit.MILLISECONDS),
134 Matchers.same(route),
135 Matchers.eq("some state"),
136 Matchers.eq(1000L),
137 Matchers.eq(2000L),
138 Matchers.eq(TimeUnit.MILLISECONDS),
138139 poolEntryCallbackCaptor.capture());
139140 final FutureCallback<CPoolEntry> callaback = poolEntryCallbackCaptor.getValue();
140141 final Log log = Mockito.mock(Log.class);
144145
145146 Assert.assertTrue(future.isDone());
146147 final NHttpClientConnection managedConn = future.get();
147 Mockito.verify(connCallback).completed(Mockito.<NHttpClientConnection>any());
148 Mockito.verify(connCallback).completed(Matchers.<NHttpClientConnection>any());
148149
149150 Mockito.when(conn.isOpen()).thenReturn(Boolean.TRUE);
150151 connman.releaseConnection(managedConn, "new state", 5, TimeUnit.SECONDS);
166167 Assert.assertNotNull(future);
167168
168169 Mockito.verify(pool).lease(
169 Mockito.same(route),
170 Mockito.eq("some state"),
171 Mockito.eq(1000L),
172 Mockito.eq(2000L),
173 Mockito.eq(TimeUnit.MILLISECONDS),
170 Matchers.same(route),
171 Matchers.eq("some state"),
172 Matchers.eq(1000L),
173 Matchers.eq(2000L),
174 Matchers.eq(TimeUnit.MILLISECONDS),
174175 poolEntryCallbackCaptor.capture());
175176 final FutureCallback<CPoolEntry> callaback = poolEntryCallbackCaptor.getValue();
176177 final Log log = Mockito.mock(Log.class);
179180
180181 Assert.assertTrue(future.isDone());
181182 final NHttpClientConnection managedConn = future.get();
182 Mockito.verify(connCallback).completed(Mockito.<NHttpClientConnection>any());
183 Mockito.verify(connCallback).completed(Matchers.<NHttpClientConnection>any());
183184
184185 Mockito.when(conn.isOpen()).thenReturn(Boolean.TRUE);
185186 connman.releaseConnection(managedConn, "new state", 5, TimeUnit.SECONDS);
197198 future.cancel(true);
198199
199200 Mockito.verify(pool).lease(
200 Mockito.same(route),
201 Mockito.eq("some state"),
202 Mockito.eq(1000L),
203 Mockito.eq(2000L),
204 Mockito.eq(TimeUnit.MILLISECONDS),
201 Matchers.same(route),
202 Matchers.eq("some state"),
203 Matchers.eq(1000L),
204 Matchers.eq(2000L),
205 Matchers.eq(TimeUnit.MILLISECONDS),
205206 poolEntryCallbackCaptor.capture());
206207 final FutureCallback<CPoolEntry> callaback = poolEntryCallbackCaptor.getValue();
207208 final Log log = Mockito.mock(Log.class);
221222 Assert.assertNotNull(future);
222223
223224 Mockito.verify(pool).lease(
224 Mockito.same(route),
225 Mockito.eq("some state"),
226 Mockito.eq(1000L),
227 Mockito.eq(2000L),
228 Mockito.eq(TimeUnit.MILLISECONDS),
225 Matchers.same(route),
226 Matchers.eq("some state"),
227 Matchers.eq(1000L),
228 Matchers.eq(2000L),
229 Matchers.eq(TimeUnit.MILLISECONDS),
229230 poolEntryCallbackCaptor.capture());
230231 final FutureCallback<CPoolEntry> callaback = poolEntryCallbackCaptor.getValue();
231232 callaback.failed(new Exception());
243244 Assert.assertNotNull(future);
244245
245246 Mockito.verify(pool).lease(
246 Mockito.same(route),
247 Mockito.eq("some state"),
248 Mockito.eq(1000L),
249 Mockito.eq(2000L),
250 Mockito.eq(TimeUnit.MILLISECONDS),
247 Matchers.same(route),
248 Matchers.eq("some state"),
249 Matchers.eq(1000L),
250 Matchers.eq(2000L),
251 Matchers.eq(TimeUnit.MILLISECONDS),
251252 poolEntryCallbackCaptor.capture());
252253 final FutureCallback<CPoolEntry> callaback = poolEntryCallbackCaptor.getValue();
253254 callaback.cancelled();
267268 final CPoolEntry poolentry = new CPoolEntry(log, "some-id", route, conn, -1, TimeUnit.MILLISECONDS);
268269 final NHttpClientConnection managedConn = CPoolProxy.newProxy(poolentry);
269270
270 Mockito.when(conn.getIOSession()).thenReturn(iosession);
271 Mockito.when(sslStrategy.upgrade(target, iosession)).thenReturn(iosession);
271 Mockito.when(conn.getIOSession()).thenReturn(ioSession);
272 Mockito.when(sslStrategy.upgrade(target, ioSession)).thenReturn(ioSession);
272273
273274 connman.startRoute(managedConn, route, context);
274275
275 Mockito.verify(noopStrategy, Mockito.never()).upgrade(target, iosession);
276 Mockito.verify(conn, Mockito.never()).bind(iosession);
276 Mockito.verify(noopStrategy, Mockito.never()).upgrade(target, ioSession);
277 Mockito.verify(conn, Mockito.never()).bind(ioSession);
277278
278279 Assert.assertFalse(connman.isRouteComplete(managedConn));
279280 }
289290 poolentry.markRouteComplete();
290291 final NHttpClientConnection managedConn = CPoolProxy.newProxy(poolentry);
291292
292 Mockito.when(conn.getIOSession()).thenReturn(iosession);
293 Mockito.when(sslStrategy.upgrade(target, iosession)).thenReturn(iosession);
293 Mockito.when(conn.getIOSession()).thenReturn(ioSession);
294 Mockito.when(sslStrategy.upgrade(target, ioSession)).thenReturn(ioSession);
294295
295296 connman.startRoute(managedConn, route, context);
296297
297 Mockito.verify(sslStrategy).upgrade(target, iosession);
298 Mockito.verify(conn).bind(iosession);
298 Mockito.verify(sslStrategy).upgrade(target, ioSession);
299 Mockito.verify(conn).bind(ioSession);
299300 }
300301
301302 @Test
313314 final CPoolEntry poolentry = new CPoolEntry(log, "some-id", route, conn, -1, TimeUnit.MILLISECONDS);
314315 final NHttpClientConnection managedConn = CPoolProxy.newProxy(poolentry);
315316
316 Mockito.when(conn.getIOSession()).thenReturn(iosession);
317 Mockito.when(sslStrategy.upgrade(target, iosession)).thenReturn(iosession);
317 Mockito.when(conn.getIOSession()).thenReturn(ioSession);
318 Mockito.when(sslStrategy.upgrade(target, ioSession)).thenReturn(ioSession);
318319
319320 connman.startRoute(managedConn, route, context);
320321
321 Mockito.verify(noopStrategy, Mockito.never()).upgrade(target, iosession);
322 Mockito.verify(conn, Mockito.never()).bind(iosession);
322 Mockito.verify(noopStrategy, Mockito.never()).upgrade(target, ioSession);
323 Mockito.verify(conn, Mockito.never()).bind(ioSession);
323324
324325 Assert.assertFalse(connman.isRouteComplete(managedConn));
325326 }
335336 poolentry.markRouteComplete();
336337 final NHttpClientConnection managedConn = CPoolProxy.newProxy(poolentry);
337338
338 Mockito.when(conn.getIOSession()).thenReturn(iosession);
339 Mockito.when(sslStrategy.upgrade(target, iosession)).thenReturn(iosession);
339 Mockito.when(conn.getIOSession()).thenReturn(ioSession);
340 Mockito.when(sslStrategy.upgrade(target, ioSession)).thenReturn(ioSession);
340341
341342 connman.startRoute(managedConn, route, context);
342343 }
352353 poolentry.markRouteComplete();
353354 final NHttpClientConnection managedConn = CPoolProxy.newProxy(poolentry);
354355
355 Mockito.when(conn.getIOSession()).thenReturn(iosession);
356 Mockito.when(sslStrategy.upgrade(target, iosession)).thenReturn(iosession);
356 Mockito.when(conn.getIOSession()).thenReturn(ioSession);
357 Mockito.when(sslStrategy.upgrade(target, ioSession)).thenReturn(ioSession);
357358
358359 connman.upgrade(managedConn, route, context);
359360
360 Mockito.verify(sslStrategy).upgrade(target, iosession);
361 Mockito.verify(conn).bind(iosession);
361 Mockito.verify(sslStrategy).upgrade(target, ioSession);
362 Mockito.verify(conn).bind(ioSession);
362363 }
363364
364365 @Test(expected=UnsupportedSchemeException.class)
372373 poolentry.markRouteComplete();
373374 final NHttpClientConnection managedConn = CPoolProxy.newProxy(poolentry);
374375
375 Mockito.when(conn.getIOSession()).thenReturn(iosession);
376 Mockito.when(sslStrategy.upgrade(target, iosession)).thenReturn(iosession);
376 Mockito.when(conn.getIOSession()).thenReturn(ioSession);
377 Mockito.when(sslStrategy.upgrade(target, ioSession)).thenReturn(ioSession);
377378
378379 connman.upgrade(managedConn, route, context);
379380 }
389390 poolentry.markRouteComplete();
390391 final NHttpClientConnection managedConn = CPoolProxy.newProxy(poolentry);
391392
392 Mockito.when(conn.getIOSession()).thenReturn(iosession);
393 Mockito.when(sslStrategy.upgrade(target, iosession)).thenReturn(iosession);
393 Mockito.when(conn.getIOSession()).thenReturn(ioSession);
394 Mockito.when(sslStrategy.upgrade(target, ioSession)).thenReturn(ioSession);
394395
395396 connman.upgrade(managedConn, route, context);
396397 }
406407 poolentry.markRouteComplete();
407408 final NHttpClientConnection managedConn = CPoolProxy.newProxy(poolentry);
408409
409 Mockito.when(conn.getIOSession()).thenReturn(iosession);
410 Mockito.when(sslStrategy.upgrade(target, iosession)).thenReturn(iosession);
410 Mockito.when(conn.getIOSession()).thenReturn(ioSession);
411 Mockito.when(sslStrategy.upgrade(target, ioSession)).thenReturn(ioSession);
411412
412413 connman.startRoute(managedConn, route, context);
413414 connman.routeComplete(managedConn, route, context);
456457 configData, connFactory);
457458
458459 final HttpRoute route = new HttpRoute(new HttpHost("somehost", 80));
459 internalConnFactory.create(route, iosession);
460
461 Mockito.verify(sslStrategy, Mockito.never()).upgrade(Mockito.eq(new HttpHost("somehost", 80)),
462 Mockito.<IOSession>any());
463 Mockito.verify(connFactory).create(Mockito.same(iosession), Mockito.<ConnectionConfig>any());
460 internalConnFactory.create(route, ioSession);
461
462 Mockito.verify(sslStrategy, Mockito.never()).upgrade(Matchers.eq(new HttpHost("somehost", 80)),
463 Matchers.<IOSession>any());
464 Mockito.verify(connFactory).create(Matchers.same(ioSession), Matchers.<ConnectionConfig>any());
464465 }
465466
466467 @Test
476477 final ConnectionConfig config = ConnectionConfig.custom().build();
477478 configData.setConnectionConfig(proxy, config);
478479
479 internalConnFactory.create(route, iosession);
480
481 Mockito.verify(connFactory).create(iosession, config);
480 internalConnFactory.create(route, ioSession);
481
482 Mockito.verify(connFactory).create(ioSession, config);
482483 }
483484
484485 @Test
493494 final ConnectionConfig config = ConnectionConfig.custom().build();
494495 configData.setConnectionConfig(target, config);
495496
496 internalConnFactory.create(route, iosession);
497
498 Mockito.verify(connFactory).create(iosession, config);
497 internalConnFactory.create(route, ioSession);
498
499 Mockito.verify(connFactory).create(ioSession, config);
499500 }
500501
501502 @Test
510511 final ConnectionConfig config = ConnectionConfig.custom().build();
511512 configData.setDefaultConnectionConfig(config);
512513
513 internalConnFactory.create(route, iosession);
514
515 Mockito.verify(connFactory).create(iosession, config);
514 internalConnFactory.create(route, ioSession);
515
516 Mockito.verify(connFactory).create(ioSession, config);
516517 }
517518
518519 @Test
526527
527528 configData.setDefaultConnectionConfig(null);
528529
529 internalConnFactory.create(route, iosession);
530
531 Mockito.verify(connFactory).create(iosession, ConnectionConfig.DEFAULT);
530 internalConnFactory.create(route, ioSession);
531
532 Mockito.verify(connFactory).create(ioSession, ConnectionConfig.DEFAULT);
532533 }
533534
534535 @Test
5454
5555 public abstract class AbstractAsyncTest {
5656
57 public enum ProtocolScheme { http, https };
57 public enum ProtocolScheme { http, https }
5858
5959 protected final ProtocolScheme scheme;
6060
216216 final BasicAsyncResponseConsumer responseConsumer = new BasicAsyncResponseConsumer() {
217217
218218 @Override
219 public void onContentReceived(final ContentDecoder decoder, final IOControl ioctrl)
219 public void onContentReceived(final ContentDecoder decoder, final IOControl ioControl)
220220 throws IOException {
221221 throw new IOException("Kaboom");
222222 }
199199 @Override
200200 public synchronized void produceContent(
201201 final ContentEncoder encoder,
202 final IOControl ioctrl) throws IOException {
203 ioctrl.shutdown();
202 final IOControl ioControl) throws IOException {
203 ioControl.shutdown();
204204 }
205205
206206 });
276276
277277 @Override
278278 public void consumeContent(
279 final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
279 final ContentDecoder decoder, final IOControl ioControl) throws IOException {
280280 throw new IllegalStateException();
281281 }
282282
358358
359359 @Override
360360 public void consumeContent(
361 final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
361 final ContentDecoder decoder, final IOControl ioControl) throws IOException {
362362 }
363363
364364 @Override
181181 @Override
182182 protected void onContentReceived(
183183 final ContentDecoder decoder,
184 final IOControl ioctrl) throws IOException {
184 final IOControl ioControl) throws IOException {
185185 final ByteBuffer buf = ByteBuffer.allocate(2048);
186186 decoder.read(buf);
187187 }
4747 import org.junit.Assert;
4848 import org.junit.Before;
4949 import org.junit.Test;
50 import org.mockito.Matchers;
5051 import org.mockito.Mockito;
5152
5253 public class TestAsyncConsumers extends HttpAsyncTestBase {
7576 }
7677
7778 @Override
78 protected void onByteReceived(final ByteBuffer buf, final IOControl ioctrl) {
79 protected void onByteReceived(final ByteBuffer buf, final IOControl ioControl) {
7980 this.count.addAndGet(buf.remaining());
8081 }
8182
127128 }
128129
129130 @Override
130 protected void onCharReceived(final CharBuffer buf, final IOControl ioctrl) throws IOException {
131 protected void onCharReceived(final CharBuffer buf, final IOControl ioControl) throws IOException {
131132 while (buf.hasRemaining()) {
132133 this.sb.append(buf.get());
133134 }
205206 final Future<String> future = this.httpclient.execute(httppost, consumer, null);
206207 final String result = future.get();
207208 Assert.assertEquals(s, result);
208 Mockito.verify(consumer).buildResult(Mockito.any(HttpContext.class));
209 Mockito.verify(consumer).buildResult(Matchers.any(HttpContext.class));
209210 Mockito.verify(consumer).releaseResources();
210211 }
211212
217218 ContentType.create("text/plain", Consts.ASCII));
218219 final AsyncCharConsumer<String> consumer = Mockito.spy(new BufferingCharConsumer());
219220 Mockito.doThrow(new IOException("Kaboom")).when(consumer).onCharReceived(
220 Mockito.any(CharBuffer.class), Mockito.any(IOControl.class));
221 Matchers.any(CharBuffer.class), Matchers.any(IOControl.class));
221222
222223 final Future<String> future = this.httpclient.execute(httppost, consumer, null);
223224 try {
239240 target.toURI() + "/echo/stuff", "stuff",
240241 ContentType.create("text/plain", Consts.ASCII));
241242 final BufferingCharConsumer consumer = Mockito.spy(new BufferingCharConsumer());
242 Mockito.doThrow(new HttpException("Kaboom")).when(consumer).buildResult(Mockito.any(HttpContext.class));
243 Mockito.doThrow(new HttpException("Kaboom")).when(consumer).buildResult(Matchers.any(HttpContext.class));
243244
244245 final Future<String> future = this.httpclient.execute(httppost, consumer, null);
245246 try {
173173
174174 boolean ok = true;
175175
176 final InputStream instream = requestEntity.getContent();
176 final InputStream inStream = requestEntity.getContent();
177177 try {
178178 final ContentType contentType = ContentType.getOrDefault(requestEntity);
179179 Charset charset = contentType.getCharset();
180180 if (charset == null) {
181181 charset = Consts.ISO_8859_1;
182182 }
183 final LineIterator it = IOUtils.lineIterator(instream, charset.name());
183 final LineIterator it = IOUtils.lineIterator(inStream, charset.name());
184184 int count = 0;
185185 while (it.hasNext()) {
186186 final String line = it.next();
193193 count++;
194194 }
195195 } finally {
196 instream.close();
196 inStream.close();
197197 }
198198 if (ok) {
199199 final NFileEntity responseEntity = new NFileEntity(TEST_FILE,
221221 final Integer status = future.get();
222222 Assert.assertNotNull(status);
223223 Assert.assertEquals(HttpStatus.SC_OK, status.intValue());
224 final InputStream instream = new FileInputStream(this.tmpfile);
224 final InputStream inStream = new FileInputStream(this.tmpfile);
225225 try {
226 final LineIterator it = IOUtils.lineIterator(instream, ASCII.name());
226 final LineIterator it = IOUtils.lineIterator(inStream, ASCII.name());
227227 int count = 0;
228228 while (it.hasNext()) {
229229 final String line = it.next();
233233 count++;
234234 }
235235 } finally {
236 instream.close();
236 inStream.close();
237237 }
238238 }
239239
249249 final Integer status = future.get();
250250 Assert.assertNotNull(status);
251251 Assert.assertEquals(HttpStatus.SC_OK, status.intValue());
252 final InputStream instream = new FileInputStream(this.tmpfile);
252 final InputStream inStream = new FileInputStream(this.tmpfile);
253253 try {
254 final LineIterator it = IOUtils.lineIterator(instream, ASCII.name());
254 final LineIterator it = IOUtils.lineIterator(inStream, ASCII.name());
255255 int count = 0;
256256 while (it.hasNext()) {
257257 final String line = it.next();
261261 count++;
262262 }
263263 } finally {
264 instream.close();
264 inStream.close();
265265 }
266266 }
267267
2727 <parent>
2828 <groupId>org.apache.httpcomponents</groupId>
2929 <artifactId>httpcomponents-asyncclient</artifactId>
30 <version>4.1.4</version>
30 <version>4.1.5</version>
3131 </parent>
3232 <artifactId>httpasyncclient-cache</artifactId>
3333 <name>Apache HttpAsyncClient Cache</name>
118118 <artifactId>maven-javadoc-plugin</artifactId>
119119 <version>${hc.javadoc.version}</version>
120120 <configuration>
121 <!-- reduce console output. Can override with -Dquiet=false -->
122 <quiet>true</quiet>
121123 <source>${maven.compiler.source}</source>
122124 <links>
123 <link>http://download.oracle.com/javase/1.5.0/docs/api/</link>
124 <link>http://hc.apache.org/httpcomponents-core-ga/httpcore/apidocs/</link>
125 <link>http://hc.apache.org/httpcomponents-client-ga/httpclient/apidocs/</link>
125 <link>http://docs.oracle.com/javase/6/docs/api/</link>
126 <link>https://hc.apache.org/httpcomponents-core-4.4.x/current/httpcore/apidocs/</link>
127 <link>https://hc.apache.org/httpcomponents-client-4.5.x/current/httpclient/apidocs/</link>
126128 </links>
127129 </configuration>
128130 <reportSets>
136138
137139 <plugin>
138140 <artifactId>maven-project-info-reports-plugin</artifactId>
139 <version>${hc.project-info.version}</version>
140141 <inherited>false</inherited>
141142 <reportSets>
142143 <reportSet>
151152
152153 <plugin>
153154 <artifactId>maven-jxr-plugin</artifactId>
154 <version>${hc.jxr.version}</version>
155155 </plugin>
156156
157157 <plugin>
158158 <artifactId>maven-surefire-report-plugin</artifactId>
159 <version>${hc.surefire-report.version}</version>
160159 </plugin>
161160
162161 </plugins>
163162 </reporting>
164163
165 </project>
164 </project>
7272 import org.apache.http.nio.reactor.IOReactorException;
7373 import org.apache.http.protocol.HTTP;
7474 import org.apache.http.protocol.HttpContext;
75 import org.apache.http.protocol.HttpCoreContext;
7576 import org.apache.http.util.Args;
7677 import org.apache.http.util.EntityUtils;
7778 import org.apache.http.util.VersionInfo;
364365 return;
365366 }
366367 clientContext.setAttribute(HttpClientContext.HTTP_ROUTE, new HttpRoute(target));
367 clientContext.setAttribute(HttpClientContext.HTTP_TARGET_HOST, target);
368 clientContext.setAttribute(HttpClientContext.HTTP_REQUEST, request);
369 clientContext.setAttribute(HttpClientContext.HTTP_RESPONSE, out);
370 clientContext.setAttribute(HttpClientContext.HTTP_REQ_SENT, Boolean.TRUE);
368 clientContext.setAttribute(HttpCoreContext.HTTP_TARGET_HOST, target);
369 clientContext.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
370 clientContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, out);
371 clientContext.setAttribute(HttpCoreContext.HTTP_REQ_SENT, Boolean.TRUE);
371372 future.completed(out);
372373 }
373374
2727 <parent>
2828 <groupId>org.apache.httpcomponents</groupId>
2929 <artifactId>httpcomponents-asyncclient</artifactId>
30 <version>4.1.4</version>
30 <version>4.1.5</version>
3131 </parent>
3232 <artifactId>httpasyncclient-osgi</artifactId>
3333 <name>Apache HttpAsyncClient OSGi bundle</name>
119119 org.apache.http.ssl;version=${httpcore.osgi.import.version},
120120 *
121121 </Import-Package>
122 <Include-Resource />
122 <Include-Resource/>
123123 <!-- Stop the JAVA_1_n_HOME variables from being treated as headers by Bnd -->
124124 <_removeheaders>JAVA_1_3_HOME,JAVA_1_4_HOME</_removeheaders>
125125 </instructions>
141141 <plugin>
142142 <groupId>org.codehaus.mojo</groupId>
143143 <artifactId>clirr-maven-plugin</artifactId>
144 <version>${hc.clirr.version}</version>
145144 <configuration>
146145 <skip>true</skip>
147146 </configuration>
148147 </plugin>
149148 <plugin>
150149 <artifactId>maven-project-info-reports-plugin</artifactId>
151 <version>${hc.project-info.version}</version>
152150 <inherited>false</inherited>
153151 <reportSets>
154152 <reportSet>
164162 </plugins>
165163 </reporting>
166164
167 </project>
165 </project>
2424 <http://www.apache.org />.
2525 --><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
2626 <parent>
27 <artifactId>project</artifactId>
28 <groupId>org.apache.httpcomponents</groupId>
29 <version>7</version>
30 <relativePath>../project/pom.xml</relativePath>
31 </parent>
32 <modelVersion>4.0.0</modelVersion>
27 <groupId>org.apache.httpcomponents</groupId>
28 <artifactId>httpcomponents-parent</artifactId>
29 <version>11</version>
30 </parent>
31 <modelVersion>4.0.0</modelVersion>
3332 <artifactId>httpcomponents-asyncclient</artifactId>
3433 <name>Apache HttpComponents AsyncClient</name>
35 <version>4.1.4</version>
34 <version>4.1.5</version>
3635 <description>Apache components to build asynchronous client side HTTP services</description>
3736 <url>http://hc.apache.org/httpcomponents-asyncclient</url>
3837 <inceptionYear>2010</inceptionYear>
6261 <url>https://svn.apache.org/repos/asf/httpcomponents/httpasyncclient/branches/4.1.x</url>
6362 </scm>
6463
64 <distributionManagement>
65 <site>
66 <id>apache.website</id>
67 <name>Apache HttpComponents Website</name>
68 <url>scm:svn:https://svn.apache.org/repos/asf/httpcomponents/site/components/httpcomponents-asyncclient-4.1.x/LATEST/</url>
69 </site>
70 </distributionManagement>
71
6572 <properties>
6673 <maven.compiler.source>1.6</maven.compiler.source>
6774 <maven.compiler.target>1.6</maven.compiler.target>
68 <httpcore.version>4.4.10</httpcore.version>
69 <httpclient.version>4.5.6</httpclient.version>
75 <httpcore.version>4.4.15</httpcore.version>
76 <httpclient.version>4.5.13</httpclient.version>
7077 <commons-logging.version>1.2</commons-logging.version>
7178 <commons-io.version>2.4</commons-io.version>
7279 <junit.version>4.11</junit.version>
7380 <easymock.version>2.5.2</easymock.version>
7481 <mockito.version>1.8.5</mockito.version>
75 <hc.stylecheck.version>1</hc.stylecheck.version>
76 <api.comparison.version>4.0</api.comparison.version>
82 <api.comparison.version>4.1</api.comparison.version>
7783 </properties>
7884
7985 <dependencyManagement>
185191 <source>${maven.compiler.source}</source>
186192 <links>
187193 <link>http://docs.oracle.com/javase/6/docs/api/</link>
188 <link>http://hc.apache.org/httpcomponents-core-ga/httpcore/apidocs/</link>
189 <link>http://hc.apache.org/httpcomponents-client-ga/httpclient/apidocs/</link>
194 <link>https://hc.apache.org/httpcomponents-core-4.4.x/current/httpcore/apidocs/</link>
195 <link>https://hc.apache.org/httpcomponents-client-4.5.x/current/httpclient/apidocs/</link>
190196 </links>
191197 </configuration>
192198 </plugin>
196202 <plugin>
197203 <groupId>org.apache.maven.plugins</groupId>
198204 <artifactId>maven-checkstyle-plugin</artifactId>
199 <version>2.9.1</version>
200 <dependencies>
201 <dependency>
202 <groupId>org.apache.httpcomponents</groupId>
203 <artifactId>hc-stylecheck</artifactId>
204 <version>${hc.stylecheck.version}</version>
205 </dependency>
206 </dependencies>
207 <configuration>
208 <encoding>UTF-8</encoding>
209 </configuration>
210 <executions>
211 <execution>
212 <id>validate-main</id>
213 <phase>validate</phase>
214 <configuration>
215 <configLocation>hc-stylecheck/default.xml</configLocation>
216 <headerLocation>hc-stylecheck/asl2.header</headerLocation>
217 <consoleOutput>true</consoleOutput>
218 <failsOnError>true</failsOnError>
219 <linkXRef>false</linkXRef>
220 <sourceDirectory>${basedir}/src/main</sourceDirectory>
221 </configuration>
222 <goals>
223 <goal>checkstyle</goal>
224 </goals>
225 </execution>
226 <execution>
227 <id>validate-test</id>
228 <phase>validate</phase>
229 <configuration>
230 <configLocation>hc-stylecheck/default.xml</configLocation>
231 <headerLocation>hc-stylecheck/asl2.header</headerLocation>
232 <consoleOutput>true</consoleOutput>
233 <failsOnError>true</failsOnError>
234 <linkXRef>false</linkXRef>
235 <sourceDirectory>${basedir}/src/test</sourceDirectory>
236 </configuration>
237 <goals>
238 <goal>checkstyle</goal>
239 </goals>
240 </execution>
241 <execution>
242 <id>validate-examples</id>
243 <phase>validate</phase>
244 <configuration>
245 <configLocation>hc-stylecheck/minimal.xml</configLocation>
246 <headerLocation>hc-stylecheck/asl2.header</headerLocation>
247 <consoleOutput>true</consoleOutput>
248 <failsOnError>true</failsOnError>
249 <linkXRef>false</linkXRef>
250 <sourceDirectory>${basedir}/src/examples</sourceDirectory>
251 </configuration>
252 <goals>
253 <goal>checkstyle</goal>
254 </goals>
255 </execution>
256 </executions>
205 <configuration>
206 <configLocation>hc-stylecheck/default.xml</configLocation>
207 <headerLocation>hc-stylecheck/asl2.header</headerLocation>
208 <consoleOutput>true</consoleOutput>
209 <failsOnError>true</failsOnError>
210 <linkXRef>false</linkXRef>
211 <sourceDirectories>
212 <sourceDirectory>${basedir}/src/main/</sourceDirectory>
213 <sourceDirectory>${basedir}/src/test</sourceDirectory>
214 </sourceDirectories>
215 <excludes>**/java-deprecated/**</excludes>
216 </configuration>
257217 </plugin>
258218 <plugin>
259219 <groupId>org.codehaus.mojo</groupId>
260220 <artifactId>clirr-maven-plugin</artifactId>
261 <version>${hc.clirr.version}</version>
262221 <configuration>
263222 <comparisonVersion>${api.comparison.version}</comparisonVersion>
264223 </configuration>
266225 <plugin>
267226 <groupId>org.apache.rat</groupId>
268227 <artifactId>apache-rat-plugin</artifactId>
269 <version>0.12</version>
270228 <executions>
271229 <execution>
272230 <phase>verify</phase>
288246
289247 <reporting>
290248 <plugins>
291
292249 <plugin>
293250 <artifactId>maven-project-info-reports-plugin</artifactId>
294 <version>${hc.project-info.version}</version>
295251 <inherited>false</inherited>
296252 <reportSets>
297253 <reportSet>
298254 <reports>
255 <report>dependency-info</report>
299256 <report>dependency-management</report>
300 <report>issue-tracking</report>
301 <report>license</report>
257 <report>issue-management</report>
258 <report>licenses</report>
259 <report>mailing-lists</report>
302260 <report>scm</report>
303261 <report>summary</report>
304262 </reports>
306264 </reportSets>
307265 </plugin>
308266
267 <plugin>
268 <groupId>org.codehaus.mojo</groupId>
269 <artifactId>clirr-maven-plugin</artifactId>
270 <configuration>
271 <comparisonVersion>${api.comparison.version}</comparisonVersion>
272 </configuration>
273 </plugin>
274
309275 </plugins>
310276 </reporting>
311277
312 </project>
278 </project>