Last active
March 2, 2023 07:27
-
-
Save johnnyaug/b6066d26714bda20cc95fc0761720bc4 to your computer and use it in GitHub Desktop.
Hadoop FSInputStream for HTTP Byte-Range requests
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.EOFException; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.net.HttpURLConnection; | |
import java.net.MalformedURLException; | |
import java.net.URL; | |
import org.apache.commons.io.IOUtils; | |
import org.apache.hadoop.fs.FSExceptionMessages; | |
import org.apache.hadoop.fs.FSInputStream; | |
public class HttpRangeInputStream extends FSInputStream { | |
private static final int BUFFER_SIZE = 1024 * 1024; | |
private final String url; | |
private long start = Long.MAX_VALUE; | |
private long pos; | |
private long len = 0; | |
private byte[] rangeContent; | |
private boolean closed; | |
public HttpRangeInputStream(String url) throws IOException { | |
this.url = url; | |
HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection(); | |
connection.setRequestMethod("GET"); | |
connection.setRequestProperty("Range", "bytes=0-0"); | |
String contentRangeHeader = connection.getHeaderField("Content-Range"); | |
if (contentRangeHeader == null || !contentRangeHeader.startsWith("bytes 0-0/")) { | |
// empty file | |
return; | |
} | |
len = Long.parseLong(contentRangeHeader.substring("bytes 0-0/".length())); | |
} | |
private void updateInputStream(long targetPos) throws MalformedURLException, IOException { | |
if (targetPos >= start && targetPos < start + BUFFER_SIZE) { | |
// no need to update the stream | |
return; | |
} | |
HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection(); | |
connection.setRequestMethod("GET"); | |
long rangeEnd = Math.min(targetPos + BUFFER_SIZE, len); | |
connection.setRequestProperty("Range", "bytes=" + targetPos + "-" + rangeEnd); | |
rangeContent = new byte[(int) (rangeEnd - targetPos)]; | |
InputStream inputStream = connection.getInputStream(); | |
IOUtils.readFully(inputStream, rangeContent); | |
inputStream.close(); | |
start = targetPos; | |
} | |
@Override | |
public synchronized void seek(long targetPos) throws IOException { | |
if (closed) { | |
throw new IOException("Stream closed"); | |
} | |
if (targetPos < 0) { | |
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK | |
+ " " + targetPos); | |
} | |
this.pos = targetPos; | |
} | |
@Override | |
public synchronized long getPos() throws IOException { | |
return pos; | |
} | |
public synchronized int available() throws IOException { | |
if (closed) { | |
throw new IOException("Stream closed"); | |
} | |
if (len - pos > Integer.MAX_VALUE) { | |
return Integer.MAX_VALUE; | |
} | |
return (int) Math.max(len - pos, 0); | |
} | |
@Override | |
public synchronized boolean seekToNewSource(long targetPos) throws IOException { | |
return false; | |
} | |
@Override | |
public synchronized int read() throws IOException { | |
if (closed) { | |
throw new IOException("Stream closed"); | |
} | |
if (pos >= len) { | |
return -1; | |
} | |
updateInputStream(pos); | |
int res = rangeContent[(int) (pos - start)] & 0xff; | |
pos++; | |
return res; | |
} | |
@Override | |
public synchronized void close() throws IOException { | |
if (closed) { | |
return; | |
} | |
closed = true; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Used in lakeFS.