Last active
November 5, 2020 16:51
-
-
Save iconara/5de0699b55818ac1d9b51424848b65a6 to your computer and use it in GitHub Desktop.
S3 GetObject InputStreamResponseTransformer using AWS SDK for Java v2
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// this is an attempt to create a synchronous InputStream from a call to | |
// S3AsyncClient#getObject using a blocking queue. | |
// | |
// the purpose is to be able to make many S3 operations asynchronously, but | |
// at the same time be able to pass off some results to threads and into | |
// code that expects InputStream or Reader, like a Commons CSV. | |
public class InputStreamResponseTransformer extends InputStream implements AsyncResponseTransformer<GetObjectResponse, InputStream>, Subscriber<ByteBuffer> { | |
private static final ByteBuffer END_MARKER = ByteBuffer.allocate(0); | |
private final CompletableFuture<InputStream> future; | |
private final BlockingQueue<ByteBuffer> chunks; | |
private Subscription subscription; | |
private ByteBuffer readChunk; | |
private Throwable error; | |
private AtomicBoolean complete; | |
private AtomicInteger approximateBufferSize; | |
public InputStreamResponseTransformer() { | |
this.future = new CompletableFuture<>(); | |
this.chunks = new LinkedBlockingQueue<>(); | |
this.complete = new AtomicBoolean(false); | |
this.approximateBufferSize = new AtomicInteger(0); | |
} | |
@Override | |
public CompletableFuture<InputStream> prepare() { | |
return future; | |
} | |
@Override | |
public void onResponse(GetObjectResponse response) { | |
future.complete(this); | |
} | |
@Override | |
public void onStream(SdkPublisher<ByteBuffer> publisher) { | |
publisher.subscribe(this); | |
} | |
@Override | |
public void exceptionOccurred(Throwable t) { | |
error = t; | |
future.completeExceptionally(t); | |
} | |
@Override | |
public void onSubscribe(Subscription s) { | |
subscription = s; | |
// how much to request initially? what's the unit here? | |
subscription.request(10L); | |
} | |
@Override | |
public void onNext(ByteBuffer byteBuffer) { | |
chunks.offer(byteBuffer); | |
int size = approximateBufferSize.addAndGet(byteBuffer.remaining()); | |
maybeRequestMore(size); | |
} | |
private void maybeRequestMore(int currentSize) { | |
// this is an attempt to keep track of roughly how much is buffered | |
// and request more only if it's not too much (2^25 =~ 30 MB) | |
if (currentSize < (1 << 25)) { | |
// but what's the unit here? is it bytes, or chunks? how large | |
// should I expect the chunks to be? | |
subscription.request(1L); | |
} | |
} | |
@Override | |
public void onError(Throwable t) { | |
exceptionOccurred(t); | |
} | |
@Override | |
public void onComplete() { | |
chunks.offer(END_MARKER); | |
complete.set(true); | |
} | |
@Override | |
public int available() throws IOException { | |
if (error != null) { | |
throw new IOException(error); | |
} | |
if (readChunk != null) { | |
return readChunk.remaining(); | |
} else { | |
return 0; | |
} | |
} | |
private boolean ensureChunk() throws IOException { | |
if (error != null) { | |
throw new IOException(error); | |
} | |
if (readChunk == END_MARKER) { | |
return false; | |
} else if (readChunk == null || !readChunk.hasRemaining()) { | |
try { | |
readChunk = chunks.take(); | |
if (readChunk == END_MARKER) { | |
return false; | |
} else { | |
int size = approximateBufferSize.addAndGet(-readChunk.remaining()); | |
maybeRequestMore(size); | |
} | |
} catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
return false; | |
} | |
} | |
return true; | |
} | |
@Override | |
public int read(byte[] destination, int offset, int length) throws IOException { | |
if (ensureChunk()) { | |
int actualLength = Math.min(length, readChunk.remaining()); | |
readChunk.get(destination, offset, actualLength); | |
return actualLength; | |
} else { | |
return -1; | |
} | |
} | |
@Override | |
public int read() throws IOException { | |
if (ensureChunk()) { | |
return Byte.toUnsignedInt(readChunk.get()); | |
} else { | |
return -1; | |
} | |
} | |
@Override | |
public void close() throws IOException { | |
if (!complete.get()) { | |
chunks.clear(); | |
chunks.offer(END_MARKER); | |
subscription.cancel(); | |
future.cancel(true); | |
} | |
super.close(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment