New upstream version 1.8.15
Emmanuel Bourg
5 years ago
8 | 8 | <groupId>com.ning</groupId> |
9 | 9 | <artifactId>async-http-client</artifactId> |
10 | 10 | <name>Asynchronous Http Client</name> |
11 | <version>1.8.14</version> | |
11 | <version>1.8.15</version> | |
12 | 12 | <packaging>jar</packaging> |
13 | 13 | <description> |
14 | 14 | Async Http Client library purpose is to allow Java applications to easily execute HTTP requests and |
38 | 38 | |
39 | 39 | public InputStreamBodyGenerator(InputStream inputStream) { |
40 | 40 | this.inputStream = inputStream; |
41 | ||
42 | if (inputStream.markSupported()) { | |
43 | inputStream.mark(0); | |
44 | } else { | |
45 | logger.info("inputStream.markSupported() not supported. Some features will not work."); | |
46 | } | |
47 | 41 | } |
48 | 42 | |
49 | 43 | /** |
95 | 89 | |
96 | 90 | return buffer.position(); |
97 | 91 | } else { |
98 | if (inputStream.markSupported()) { | |
99 | inputStream.reset(); | |
100 | } | |
101 | 92 | eof = false; |
102 | 93 | } |
103 | 94 | return -1; |
113 | 104 | buffer.put(chunk, 0, read); |
114 | 105 | // Was missing the final chunk \r\n. |
115 | 106 | buffer.put(END_PADDING); |
116 | } else { | |
117 | if (read > 0) { | |
118 | buffer.put(chunk, 0, read); | |
119 | } else { | |
120 | if (inputStream.markSupported()) { | |
121 | inputStream.reset(); | |
122 | } | |
123 | } | |
107 | } else if (read > 0) { | |
108 | buffer.put(chunk, 0, read); | |
124 | 109 | } |
125 | 110 | return read; |
126 | 111 | } |
+33
-38
894 | 894 | builder.method(Method.CONNECT); |
895 | 895 | builder.uri(AsyncHttpProviderUtils.getAuthority(uri)); |
896 | 896 | } else if ((secure || httpCtx.isWSRequest) && config.isUseRelativeURIsWithConnectProxies()){ |
897 | builder.uri(uri.getPath()); | |
897 | builder.uri(uri.getRawPath()); | |
898 | 898 | } else { |
899 | 899 | builder.uri(uri.toString()); |
900 | 900 | } |
901 | 901 | } else { |
902 | builder.uri(uri.getPath()); | |
902 | builder.uri(uri.getRawPath()); | |
903 | 903 | } |
904 | 904 | |
905 | 905 | final BodyHandler bodyHandler = isPayloadAllowed(method) ? |
2008 | 2008 | final byte[] data = request.getByteData(); |
2009 | 2009 | final Buffer gBuffer = Buffers.wrap(mm, data); |
2010 | 2010 | if (requestPacket.getContentLength() == -1) { |
2011 | if (!clientConfig.isCompressionEnabled()) { | |
2012 | requestPacket.setContentLengthLong(data.length); | |
2013 | } | |
2014 | } | |
2015 | final HttpContent content = requestPacket.httpContentBuilder().content(gBuffer).build(); | |
2016 | content.setLast(true); | |
2011 | requestPacket.setContentLengthLong(data.length); | |
2012 | } | |
2013 | final HttpContent content = requestPacket.httpContentBuilder() | |
2014 | .content(gBuffer) | |
2015 | .last(true) | |
2016 | .build(); | |
2017 | 2017 | ctx.write(content, ((!requestPacket.isCommitted()) ? ctx.getTransportContext().getCompletionHandler() : null)); |
2018 | 2018 | return true; |
2019 | 2019 | } |
2020 | 2020 | |
2021 | 2021 | @Override |
2022 | 2022 | protected long getContentLength(final Request request) { |
2023 | if (request.getContentLength() >= 0) { | |
2024 | return request.getContentLength(); | |
2025 | } | |
2026 | ||
2027 | return clientConfig.isCompressionEnabled() | |
2028 | ? -1 | |
2023 | return request.getContentLength() >= 0 | |
2024 | ? request.getContentLength() | |
2029 | 2025 | : request.getByteData().length; |
2030 | 2026 | } |
2031 | 2027 | } |
2055 | 2051 | final MemoryManager mm = ctx.getMemoryManager(); |
2056 | 2052 | final Buffer gBuffer = Buffers.wrap(mm, data); |
2057 | 2053 | if (requestPacket.getContentLength() == -1) { |
2058 | if (!clientConfig.isCompressionEnabled()) { | |
2059 | requestPacket.setContentLengthLong(data.length); | |
2060 | } | |
2061 | } | |
2062 | final HttpContent content = requestPacket.httpContentBuilder().content(gBuffer).build(); | |
2063 | content.setLast(true); | |
2054 | requestPacket.setContentLengthLong(data.length); | |
2055 | } | |
2056 | final HttpContent content = requestPacket.httpContentBuilder() | |
2057 | .content(gBuffer) | |
2058 | .last(true) | |
2059 | .build(); | |
2064 | 2060 | ctx.write(content, ((!requestPacket.isCommitted()) ? ctx.getTransportContext().getCompletionHandler() : null)); |
2065 | 2061 | return true; |
2066 | 2062 | } |
2115 | 2111 | final byte[] data = sb.toString().getBytes(charset); |
2116 | 2112 | final MemoryManager mm = ctx.getMemoryManager(); |
2117 | 2113 | final Buffer gBuffer = Buffers.wrap(mm, data); |
2118 | final HttpContent content = requestPacket.httpContentBuilder().content(gBuffer).build(); | |
2114 | final HttpContent content = requestPacket.httpContentBuilder() | |
2115 | .content(gBuffer) | |
2116 | .last(true) | |
2117 | .build(); | |
2119 | 2118 | if (requestPacket.getContentLength() == -1) { |
2120 | if (!clientConfig.isCompressionEnabled()) { | |
2121 | requestPacket.setContentLengthLong(data.length); | |
2122 | } | |
2123 | } | |
2124 | content.setLast(true); | |
2119 | requestPacket.setContentLengthLong(data.length); | |
2120 | } | |
2125 | 2121 | ctx.write(content, ((!requestPacket.isCommitted()) ? ctx.getTransportContext().getCompletionHandler() : null)); |
2126 | 2122 | } |
2127 | 2123 | return true; |
2153 | 2149 | b = o.getBuffer(); |
2154 | 2150 | b.trim(); |
2155 | 2151 | if (b.hasRemaining()) { |
2156 | final HttpContent content = requestPacket.httpContentBuilder().content(b).build(); | |
2157 | content.setLast(true); | |
2152 | final HttpContent content = requestPacket.httpContentBuilder() | |
2153 | .content(b) | |
2154 | .last(true) | |
2155 | .build(); | |
2158 | 2156 | ctx.write(content, ((!requestPacket.isCommitted()) ? ctx.getTransportContext().getCompletionHandler() : null)); |
2159 | 2157 | } |
2160 | 2158 | |
2203 | 2201 | } |
2204 | 2202 | buffer.trim(); |
2205 | 2203 | if (buffer.hasRemaining()) { |
2206 | final HttpContent content = requestPacket.httpContentBuilder().content(buffer).build(); | |
2204 | final HttpContent content = requestPacket.httpContentBuilder() | |
2205 | .content(buffer) | |
2206 | .last(true) | |
2207 | .build(); | |
2207 | 2208 | buffer.allowBufferDispose(false); |
2208 | content.setLast(true); | |
2209 | 2209 | ctx.write(content, ((!requestPacket.isCommitted()) ? ctx.getTransportContext().getCompletionHandler() : null)); |
2210 | 2210 | } |
2211 | 2211 | |
2305 | 2305 | final File f = request.getFile(); |
2306 | 2306 | requestPacket.setContentLengthLong(f.length()); |
2307 | 2307 | final HttpTransactionContext context = HttpTransactionContext.get(ctx.getConnection()); |
2308 | if (clientConfig.isCompressionEnabled() || !SEND_FILE_SUPPORT || | |
2309 | requestPacket.isSecure()) { | |
2308 | if (!SEND_FILE_SUPPORT || requestPacket.isSecure()) { | |
2310 | 2309 | |
2311 | 2310 | final FileInputStream fis = new FileInputStream(request.getFile()); |
2312 | 2311 | final MemoryManager mm = ctx.getMemoryManager(); |
2361 | 2360 | |
2362 | 2361 | @Override |
2363 | 2362 | protected long getContentLength(final Request request) { |
2364 | if (request.getContentLength() >= 0) { | |
2365 | return request.getContentLength(); | |
2366 | } | |
2367 | ||
2368 | return clientConfig.isCompressionEnabled() | |
2369 | ? -1 | |
2363 | return request.getContentLength() >= 0 | |
2364 | ? request.getContentLength() | |
2370 | 2365 | : request.getFile().length(); |
2371 | 2366 | } |
2372 | 2367 | } // END FileBodyHandler |
0 | package com.ning.http.client.providers.netty; | |
1 | ||
2 | import com.ning.http.client.Body; | |
3 | import com.ning.http.client.BodyGenerator; | |
4 | ||
5 | import java.io.IOException; | |
6 | import java.io.UnsupportedEncodingException; | |
7 | import java.nio.ByteBuffer; | |
8 | import java.util.Queue; | |
9 | import java.util.concurrent.ConcurrentLinkedQueue; | |
10 | import java.util.concurrent.atomic.AtomicInteger; | |
11 | ||
12 | /** | |
13 | * {@link com.ning.http.client.BodyGenerator} which may return just part of the payload at the time handler is requesting it. | |
14 | * If it happens, PartialBodyGenerator becomes responsible for finishing payload transferring asynchronously. | |
15 | */ | |
16 | public class FeedableBodyGenerator implements BodyGenerator { | |
17 | private static final String US_ASCII = "US-ASCII"; | |
18 | private final static byte[] END_PADDING = getBytes("\r\n"); | |
19 | private final static byte[] ZERO = getBytes("0"); | |
20 | private final Queue<BodyPart> queue = new ConcurrentLinkedQueue<BodyPart>(); | |
21 | private final AtomicInteger queueSize = new AtomicInteger(); | |
22 | private FeedListener listener; | |
23 | ||
24 | @Override | |
25 | public Body createBody() throws IOException { | |
26 | return new PushBody(); | |
27 | } | |
28 | ||
29 | public void feed(final ByteBuffer buffer, final boolean isLast) throws IOException { | |
30 | queue.offer(new BodyPart(buffer, isLast)); | |
31 | queueSize.incrementAndGet(); | |
32 | if (listener != null) { | |
33 | listener.onContentAdded(); | |
34 | } | |
35 | } | |
36 | ||
37 | public static interface FeedListener { | |
38 | void onContentAdded(); | |
39 | } | |
40 | ||
41 | public void setListener(FeedListener listener) { | |
42 | this.listener = listener; | |
43 | } | |
44 | ||
45 | private final class PushBody implements Body { | |
46 | private final int ONGOING = 0; | |
47 | private final int CLOSING = 1; | |
48 | private final int FINISHED = 2; | |
49 | ||
50 | private int finishState = 0; | |
51 | ||
52 | @Override | |
53 | public long getContentLength() { | |
54 | return -1; | |
55 | } | |
56 | ||
57 | @Override | |
58 | public long read(final ByteBuffer buffer) throws IOException { | |
59 | BodyPart nextPart = queue.peek(); | |
60 | if (nextPart == null) { | |
61 | // Nothing in the queue | |
62 | switch (finishState) { | |
63 | case ONGOING: | |
64 | return 0; | |
65 | case CLOSING: | |
66 | buffer.put(ZERO); | |
67 | buffer.put(END_PADDING); | |
68 | finishState = FINISHED; | |
69 | return buffer.position(); | |
70 | case FINISHED: | |
71 | buffer.put(END_PADDING); | |
72 | return -1; | |
73 | } | |
74 | } | |
75 | int capacity = buffer.remaining() - 10; // be safe (we'll have to add size, ending, etc.) | |
76 | int size = Math.min(nextPart.buffer.remaining(), capacity); | |
77 | buffer.put(getBytes(Integer.toHexString(size))); | |
78 | buffer.put(END_PADDING); | |
79 | for (int i = 0; i < size; i++) { | |
80 | buffer.put(nextPart.buffer.get()); | |
81 | } | |
82 | buffer.put(END_PADDING); | |
83 | if (!nextPart.buffer.hasRemaining()) { | |
84 | if (nextPart.isLast) { | |
85 | finishState = CLOSING; | |
86 | } | |
87 | queue.remove(); | |
88 | } | |
89 | return size; | |
90 | } | |
91 | ||
92 | @Override | |
93 | public void close() throws IOException { | |
94 | } | |
95 | ||
96 | } | |
97 | ||
98 | private final static class BodyPart { | |
99 | private final boolean isLast; | |
100 | private final ByteBuffer buffer; | |
101 | ||
102 | public BodyPart(final ByteBuffer buffer, final boolean isLast) { | |
103 | this.buffer = buffer; | |
104 | this.isLast = isLast; | |
105 | } | |
106 | } | |
107 | ||
108 | private static byte[] getBytes(String s) { | |
109 | // for compatibility with java5, we cannot use s.getBytes(Charset) | |
110 | try { | |
111 | return s.getBytes(US_ASCII); | |
112 | } catch (UnsupportedEncodingException e) { | |
113 | throw new RuntimeException(e); | |
114 | } | |
115 | } | |
116 | }⏎ |
94 | 94 | import org.jboss.netty.handler.codec.http.HttpVersion; |
95 | 95 | import org.jboss.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; |
96 | 96 | import org.jboss.netty.handler.codec.http.websocketx.CloseWebSocketFrame; |
97 | import org.jboss.netty.handler.codec.http.websocketx.PingWebSocketFrame; | |
98 | import org.jboss.netty.handler.codec.http.websocketx.PongWebSocketFrame; | |
97 | 99 | import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame; |
98 | 100 | import org.jboss.netty.handler.codec.http.websocketx.WebSocket08FrameDecoder; |
99 | 101 | import org.jboss.netty.handler.codec.http.websocketx.WebSocket08FrameEncoder; |
2273 | 2275 | private static final byte OPCODE_CONT = 0x0; |
2274 | 2276 | private static final byte OPCODE_TEXT = 0x1; |
2275 | 2277 | private static final byte OPCODE_BINARY = 0x2; |
2278 | private static final byte OPCODE_PING = 0x9; | |
2279 | private static final byte OPCODE_PONG = 0xa; | |
2276 | 2280 | private static final byte OPCODE_UNKNOWN = -1; |
2277 | 2281 | |
2278 | 2282 | // We don't need to synchronize as replacing the "ws-decoder" will process using the same thread. |
2375 | 2379 | pendingOpcode = OPCODE_TEXT; |
2376 | 2380 | } else if (frame instanceof BinaryWebSocketFrame) { |
2377 | 2381 | pendingOpcode = OPCODE_BINARY; |
2382 | } else if (frame instanceof PingWebSocketFrame) { | |
2383 | pendingOpcode = OPCODE_PING; | |
2384 | } else if (frame instanceof PongWebSocketFrame) { | |
2385 | pendingOpcode = OPCODE_PONG; | |
2378 | 2386 | } |
2379 | 2387 | |
2380 | 2388 | HttpChunk webSocketChunk = new HttpChunk() { |
2408 | 2416 | webSocket.onBinaryFragment(rp.getBodyPartBytes(), frame.isFinalFragment()); |
2409 | 2417 | } else if (pendingOpcode == OPCODE_TEXT) { |
2410 | 2418 | webSocket.onTextFragment(frame.getBinaryData().toString(UTF8), frame.isFinalFragment()); |
2419 | } else if (pendingOpcode == OPCODE_PING) { | |
2420 | webSocket.onPing(rp.getBodyPartBytes()); | |
2421 | } else if (pendingOpcode == OPCODE_PONG) { | |
2422 | webSocket.onPong(rp.getBodyPartBytes()); | |
2411 | 2423 | } |
2412 | 2424 | |
2413 | 2425 | if (frame instanceof CloseWebSocketFrame) { |
15 | 15 | import com.ning.http.client.websocket.WebSocketByteListener; |
16 | 16 | import com.ning.http.client.websocket.WebSocketCloseCodeReasonListener; |
17 | 17 | import com.ning.http.client.websocket.WebSocketListener; |
18 | import com.ning.http.client.websocket.WebSocketPingListener; | |
19 | import com.ning.http.client.websocket.WebSocketPongListener; | |
18 | 20 | import com.ning.http.client.websocket.WebSocketTextListener; |
19 | 21 | import org.jboss.netty.channel.Channel; |
20 | 22 | import org.jboss.netty.channel.ChannelFutureListener; |
212 | 214 | } |
213 | 215 | } |
214 | 216 | |
217 | public void onPing(byte[] payload) { | |
218 | for (WebSocketListener listener : listeners) { | |
219 | if (listener instanceof WebSocketPingListener) | |
220 | WebSocketPingListener.class.cast(listener).onPing(payload); | |
221 | } | |
222 | } | |
223 | ||
224 | public void onPong(byte[] payload) { | |
225 | for (WebSocketListener listener : listeners) { | |
226 | if (listener instanceof WebSocketPongListener) | |
227 | WebSocketPongListener.class.cast(listener).onPong(payload); | |
228 | } | |
229 | } | |
230 | ||
215 | 231 | protected void onError(Throwable t) { |
216 | 232 | for (WebSocketListener l : listeners) { |
217 | 233 | try { |
127 | 127 | pos += read; |
128 | 128 | } |
129 | 129 | |
130 | httpResponse.getOutputStream().write(bytes); | |
130 | httpResponse.getOutputStream().write(bytes, 0, pos + 1); // (pos + 1) because last read added -1 | |
131 | 131 | } |
132 | 132 | |
133 | 133 | httpResponse.setStatus(200); |
17 | 17 | import com.ning.http.client.RequestBuilder; |
18 | 18 | import com.ning.http.client.Response; |
19 | 19 | import com.ning.http.client.generators.InputStreamBodyGenerator; |
20 | ||
20 | 21 | import org.testng.annotations.Test; |
21 | 22 | |
22 | 23 | import java.io.BufferedInputStream; |
23 | 24 | import java.io.ByteArrayOutputStream; |
24 | 25 | import java.io.File; |
25 | 26 | import java.io.FileInputStream; |
27 | import java.io.IOException; | |
26 | 28 | import java.io.InputStream; |
29 | import java.net.URISyntaxException; | |
27 | 30 | import java.net.URL; |
28 | 31 | import java.util.Random; |
29 | 32 | |
30 | import static org.testng.Assert.assertNotNull; | |
31 | import static org.testng.AssertJUnit.assertEquals; | |
32 | import static org.testng.AssertJUnit.assertTrue; | |
33 | import static org.testng.Assert.*; | |
33 | 34 | import static org.testng.FileAssert.fail; |
34 | 35 | |
35 | 36 | /** |
68 | 69 | * Tests that the custom chunked stream result in success and content returned that is unchunked |
69 | 70 | */ |
70 | 71 | @Test() |
71 | public void testCustomChunking() throws Throwable { | |
72 | doTest(true); | |
72 | public void testBufferLargerThanFile() throws Throwable { | |
73 | doTest(new BufferedInputStream(new FileInputStream(getTestFile()), 400000)); | |
73 | 74 | } |
74 | 75 | |
75 | private void doTest(boolean customChunkedInputStream) throws Exception { | |
76 | AsyncHttpClient c = null; | |
76 | @Test() | |
77 | public void testBufferSmallThanFile() throws Throwable { | |
78 | doTest(new BufferedInputStream(new FileInputStream(getTestFile()))); | |
79 | } | |
80 | ||
81 | @Test() | |
82 | public void testDirectFile() throws Throwable { | |
83 | doTest(new FileInputStream(getTestFile())); | |
84 | } | |
85 | ||
86 | public void doTest(InputStream is) throws Throwable { | |
87 | AsyncHttpClientConfig.Builder bc = new AsyncHttpClientConfig.Builder()// | |
88 | .setAllowPoolingConnection(true)// | |
89 | .setMaximumConnectionsPerHost(1)// | |
90 | .setMaximumConnectionsTotal(1)// | |
91 | .setConnectionTimeoutInMs(1000)// | |
92 | .setRequestTimeoutInMs(1000)// | |
93 | .setFollowRedirects(true); | |
94 | ||
95 | AsyncHttpClient client = getAsyncHttpClient(bc.build()); | |
77 | 96 | try { |
78 | AsyncHttpClientConfig.Builder bc = new AsyncHttpClientConfig.Builder(); | |
79 | ||
80 | bc.setAllowPoolingConnection(true); | |
81 | bc.setMaximumConnectionsPerHost(1); | |
82 | bc.setMaximumConnectionsTotal(1); | |
83 | bc.setConnectionTimeoutInMs(1000); | |
84 | bc.setRequestTimeoutInMs(1000); | |
85 | bc.setFollowRedirects(true); | |
86 | ||
87 | c = getAsyncHttpClient(bc.build()); | |
88 | ||
89 | 97 | RequestBuilder builder = new RequestBuilder("POST"); |
90 | 98 | builder.setUrl(getTargetUrl()); |
91 | if (customChunkedInputStream) { | |
92 | // made buff in stream big enough to mark. | |
93 | builder.setBody(new InputStreamBodyGenerator(new BufferedInputStream(new FileInputStream(getTestFile()), 400000))); | |
99 | // made buff in stream big enough to mark. | |
100 | builder.setBody(new InputStreamBodyGenerator(is)); | |
101 | ||
102 | ListenableFuture<Response> response = client.executeRequest(builder.build()); | |
103 | Response res = response.get(); | |
104 | assertNotNull(res.getResponseBodyAsStream()); | |
105 | if (500 == res.getStatusCode()) { | |
106 | assertEquals(res.getStatusCode(), 500, "Should have 500 status code"); | |
107 | assertTrue(res.getHeader("X-Exception").contains("invalid.chunk.length"), "Should have failed due to chunking"); | |
108 | fail("HARD Failing the test due to provided InputStreamBodyGenerator, chunking incorrectly:" + res.getHeader("X-Exception")); | |
94 | 109 | } else { |
95 | // made buff in stream big enough to mark. | |
96 | builder.setBody(new InputStreamBodyGenerator(new BufferedInputStream(new FileInputStream(getTestFile()), 400000))); | |
97 | } | |
98 | com.ning.http.client.Request r = builder.build(); | |
99 | Response res = null; | |
100 | ||
101 | try { | |
102 | ListenableFuture<Response> response = c.executeRequest(r); | |
103 | res = response.get(); | |
104 | assertNotNull(res.getResponseBodyAsStream()); | |
105 | if (500 == res.getStatusCode()) { | |
106 | System.out.println("=============="); | |
107 | System.out.println("500 response from call"); | |
108 | System.out.println("Headers:" + res.getHeaders()); | |
109 | System.out.println("=============="); | |
110 | System.out.flush(); | |
111 | assertEquals("Should have 500 status code", 500, res.getStatusCode()); | |
112 | assertTrue("Should have failed due to chunking", res.getHeader("X-Exception").contains("invalid.chunk.length")); | |
113 | fail("HARD Failing the test due to provided InputStreamBodyGenerator, chunking incorrectly:" + res.getHeader("X-Exception")); | |
114 | } else { | |
115 | assertEquals(LARGE_IMAGE_BYTES, readInputStreamToBytes(res.getResponseBodyAsStream())); | |
116 | } | |
117 | } catch (Exception e) { | |
118 | ||
119 | fail("Exception Thrown:" + e.getMessage()); | |
110 | assertEquals(readInputStreamToBytes(res.getResponseBodyAsStream()), LARGE_IMAGE_BYTES); | |
120 | 111 | } |
121 | 112 | } finally { |
122 | if (c != null) | |
123 | c.close(); | |
113 | if (client != null) | |
114 | client.close(); | |
124 | 115 | } |
125 | 116 | } |
126 | ||
127 | private byte[] readInputStreamToBytes(InputStream stream) { | |
128 | byte[] data = new byte[0]; | |
117 | ||
118 | ||
119 | private byte[] readInputStreamToBytes(InputStream stream) throws IOException { | |
129 | 120 | ByteArrayOutputStream buffer = new ByteArrayOutputStream(); |
130 | 121 | try { |
131 | 122 | int nRead; |
134 | 125 | while ((nRead = stream.read(tmp, 0, tmp.length)) != -1) { |
135 | 126 | buffer.write(tmp, 0, nRead); |
136 | 127 | } |
128 | ||
137 | 129 | buffer.flush(); |
138 | data = buffer.toByteArray(); | |
139 | } catch (Exception e) { | |
130 | return buffer.toByteArray(); | |
140 | 131 | |
141 | 132 | } finally { |
142 | 133 | try { |
143 | 134 | stream.close(); |
144 | 135 | } catch (Exception e2) { |
145 | 136 | } |
146 | return data; | |
147 | 137 | } |
148 | 138 | } |
149 | 139 | |
150 | private static File getTestFile() { | |
140 | private static File getTestFile() throws URISyntaxException { | |
151 | 141 | String testResource1 = "300k.png"; |
152 | ||
153 | File testResource1File = null; | |
154 | try { | |
155 | ClassLoader cl = ChunkingTest.class.getClassLoader(); | |
156 | URL url = cl.getResource(testResource1); | |
157 | testResource1File = new File(url.toURI()); | |
158 | } catch (Throwable e) { | |
159 | // TODO Auto-generated catch block | |
160 | fail("unable to find " + testResource1); | |
161 | } | |
162 | ||
163 | return testResource1File; | |
142 | URL url = ChunkingTest.class.getClassLoader().getResource(testResource1); | |
143 | return new File(url.toURI()); | |
164 | 144 | } |
165 | ||
166 | } | |
145 | }⏎ |
+128
-0
0 | /* | |
1 | * Copyright (c) 2013-2014 Sonatype, Inc. All rights reserved. | |
2 | * | |
3 | * This program is licensed to you under the Apache License Version 2.0, | |
4 | * and you may not use this file except in compliance with the Apache License Version 2.0. | |
5 | * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. | |
6 | * | |
7 | * Unless required by applicable law or agreed to in writing, | |
8 | * software distributed under the Apache License Version 2.0 is distributed on an | |
9 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
10 | * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. | |
11 | */ | |
12 | ||
13 | package com.ning.http.client.async.netty; | |
14 | ||
15 | import com.ning.http.client.*; | |
16 | import com.ning.http.client.async.AbstractBasicTest; | |
17 | import com.ning.http.client.async.ChunkingTest; | |
18 | import com.ning.http.client.async.ProviderUtil; | |
19 | import com.ning.http.client.providers.netty.FeedableBodyGenerator; | |
20 | import org.eclipse.jetty.server.handler.AbstractHandler; | |
21 | import org.testng.Assert; | |
22 | import org.testng.annotations.Test; | |
23 | ||
24 | import javax.servlet.ServletException; | |
25 | import javax.servlet.ServletInputStream; | |
26 | import javax.servlet.http.HttpServletRequest; | |
27 | import javax.servlet.http.HttpServletResponse; | |
28 | import java.io.File; | |
29 | import java.io.FileInputStream; | |
30 | import java.io.IOException; | |
31 | import java.net.URL; | |
32 | import java.nio.ByteBuffer; | |
33 | import java.nio.channels.FileChannel; | |
34 | ||
35 | import static org.testng.FileAssert.fail; | |
36 | ||
37 | public class NettyFeedableBodyGeneratorTest extends AbstractBasicTest { | |
38 | ||
39 | @Override | |
40 | public AsyncHttpClient getAsyncHttpClient(AsyncHttpClientConfig config) { | |
41 | return ProviderUtil.nettyProvider(config); | |
42 | } | |
43 | ||
44 | @Test(groups = { "standalone", "default_provider" }, enabled = true) | |
45 | public void testPutImageFile() throws Exception { | |
46 | File largeFile = getTestFile(); | |
47 | final FileChannel fileChannel = new FileInputStream(largeFile).getChannel(); | |
48 | ||
49 | AsyncHttpClientConfig config = new AsyncHttpClientConfig.Builder().setRequestTimeoutInMs(100 * 6000).build(); | |
50 | AsyncHttpClient client = getAsyncHttpClient(config); | |
51 | final FeedableBodyGenerator bodyGenerator = new FeedableBodyGenerator(); | |
52 | ||
53 | try { | |
54 | RequestBuilder builder = new RequestBuilder("PUT") | |
55 | .setUrl(getTargetUrl()) | |
56 | .setBody(bodyGenerator); | |
57 | ||
58 | ListenableFuture<Response> listenableFuture = client.executeRequest(builder.build()); | |
59 | ||
60 | boolean repeat = true; | |
61 | while (repeat) { | |
62 | final ByteBuffer buffer = ByteBuffer.allocate(1024); | |
63 | if (fileChannel.read(buffer) > 0) { | |
64 | buffer.flip(); | |
65 | bodyGenerator.feed(buffer, false); | |
66 | } else { | |
67 | repeat = false; | |
68 | } | |
69 | } | |
70 | ByteBuffer emptyBuffer = ByteBuffer.wrap(new byte[0]); | |
71 | bodyGenerator.feed(emptyBuffer, true); | |
72 | ||
73 | Response response = listenableFuture.get(); | |
74 | Assert.assertEquals(200, response.getStatusCode()); | |
75 | Assert.assertEquals("" + largeFile.length(), response.getHeader("X-TRANSFERRED")); | |
76 | } finally { | |
77 | fileChannel.close(); | |
78 | client.close(); | |
79 | } | |
80 | } | |
81 | ||
82 | private static File getTestFile() { | |
83 | String testResource1 = "300k.png"; | |
84 | ||
85 | File testResource1File = null; | |
86 | try { | |
87 | ClassLoader cl = ChunkingTest.class.getClassLoader(); | |
88 | URL url = cl.getResource(testResource1); | |
89 | testResource1File = new File(url.toURI()); | |
90 | } catch (Throwable e) { | |
91 | // TODO Auto-generated catch block | |
92 | fail("unable to find " + testResource1); | |
93 | } | |
94 | ||
95 | return testResource1File; | |
96 | } | |
97 | ||
98 | @Override | |
99 | public AbstractHandler configureHandler() throws Exception { | |
100 | return new AbstractHandler() { | |
101 | ||
102 | public void handle(String arg0, org.eclipse.jetty.server.Request arg1, HttpServletRequest req, HttpServletResponse resp) throws IOException, ServletException { | |
103 | ||
104 | ServletInputStream in = req.getInputStream(); | |
105 | byte[] b = new byte[8192]; | |
106 | ||
107 | int count = -1; | |
108 | int total = 0; | |
109 | while ((count = in.read(b)) != -1) { | |
110 | b = new byte[8192]; | |
111 | total += count; | |
112 | } | |
113 | System.err.println("consumed " + total + " bytes."); | |
114 | ||
115 | resp.setStatus(200); | |
116 | resp.addHeader("X-TRANSFERRED", String.valueOf(total)); | |
117 | resp.getOutputStream().flush(); | |
118 | resp.getOutputStream().close(); | |
119 | ||
120 | arg1.setHandled(true); | |
121 | ||
122 | } | |
123 | }; | |
124 | } | |
125 | ||
126 | ||
127 | }⏎ |