在Http上传中,Apache common fileupload 的文件上传组件要求传入Inputstream对象。
而Netty中数据是按块(HttpChunk)来传送数据,没有直接的流。
因此要想在Netty中使用Apache Common Fileupload,则必须将httpchunk适配成InputStream。
实现Apache FileUpload
/**
* 用Netty来实现上传
*/
public class NettyFileUpload extends FileUpload {
private NettyRequestContext context;
public static final boolean isMultipartContent(HttpRequest request) {
if (HttpMethod.POST != request.getMethod()) {
return false;
}
if (request.getHeaders("Content-Type") == null && request.getHeaders("Content-Type").size() == 0) {
return false;
}
String contentType = request.getHeaders("Content-Type").get(0);
if (contentType == null) {
return false;
}
if (contentType.toLowerCase().startsWith("multipart/")) {
return true;
}
return false;
}
public NettyFileUpload(NettyRequestContext context) {
this.context = context;
}
public NettyFileUpload(FileItemFactory fileItemFactory) {
super(fileItemFactory);
}
public FileItemIterator getItemIterator() throws FileUploadException, IOException {
return super.getItemIterator(context);
}
public class NettyRequestContext implements RequestContext {
private String encoding;
private String contentType;
private int contentLength = -1;
/**
* 上传的内容流
*/
private InputStream inputStream;
public NettyRequestContext(String encoding, String contentType,
int contentLength, InputStream inputStream) {
this.encoding = encoding;
this.contentType = contentType;
this.contentLength = contentLength;
this.inputStream = inputStream;
}
@Override
public String getCharacterEncoding() {
return encoding;
}
@Override
public String getContentType() {
return contentType;
}
@Override
public int getContentLength() {
return contentLength;
}
@Override
public InputStream getInputStream() throws IOException {
// 不能直接用request的流,因为有HttpChunk
return inputStream;
}
@Override
public String toString() {
return "ContentLength=" + this.getContentLength() + ", ContentType="
+ this.getContentType();
}
public void closeInputStream() throws IOException {
getInputStream().close();
}
}
适配成InputStream:
public class NettyChunkInputStream extends InputStream {
private BlockingQueue chunkQueue = new ArrayBlockingQueue(128);
private HttpChunk currentChunk = null;
private volatile boolean closed;
public boolean putChunk(HttpChunk chunk) throws IOException {
if (!closed) {
try {
chunkQueue.put(chunk);
} catch (InterruptedException e) {
throw new IOException(e);
}
return true;
}
throw new IOException(" this inputstream has been closed!");
}
@Override
public int read() throws IOException {
byte resultByte = -1;
try {
if (getChunk().getContent().readable()) {
resultByte = getChunk().getContent().readByte();
} else if (!getChunk().isLast()) {
nextChunk();
if (getChunk().getContent().readable()) {
resultByte = getChunk().getContent().readByte();
} else {
return -1;
}
} else {
return -1;
}
} catch (InterruptedException e) {
throw new IOException(e);
}
// InputStream.read()返回0-255之间的int
return resultByte >= 0 ? resultByte : 256 + resultByte;
}
private HttpChunk getChunk() throws InterruptedException {
if (currentChunk == null) {
currentChunk = chunkQueue.take();
}
return currentChunk;
}
private void nextChunk() throws InterruptedException {
currentChunk = chunkQueue.take();
}
@Override
public int available() throws IOException {
throw new UnsupportedOperationException("unsupport available()");
}
@Override
public void close() throws IOException {
chunkQueue = null;
closed = true;
}
public boolean isClosed() {
return closed;
}
}