设为首页 收藏本站
查看: 464|回复: 0

[经验分享] Netty中使用Apache Common FileUpload

[复制链接]

尚未签到

发表于 2017-1-8 07:26:40 | 显示全部楼层 |阅读模式
/**
* 用Netty来实现上传
*/
public class NettyFileUpload extends FileUpload {
private NettyRequestContext context;
public static final boolean isMultipartContent(HttpRequest request) {
if (HttpMethod.POST != request.getMethod()) {
return false;
}
if (request.getHeaders("Content-Type") == null && request.getHeaders("Content-Type").size() == 0) {
return false;
}
String contentType = request.getHeaders("Content-Type").get(0);
if (contentType == null) {
return false;
}
if (contentType.toLowerCase().startsWith("multipart/")) {
return true;
}
return false;
}
public NettyFileUpload(NettyRequestContext context) {
this.context = context;
}
public NettyFileUpload(FileItemFactory fileItemFactory) {
super(fileItemFactory);
}
public FileItemIterator getItemIterator() throws FileUploadException, IOException {
return super.getItemIterator(context);
}
}

public class NettyRequestContext implements RequestContext {
private String encoding;
private String contentType;
private int contentLength = -1;
/**
* 上传的内容流
*/
private InputStream inputStream;
public NettyRequestContext(String encoding, String contentType,
int contentLength, InputStream inputStream) {
this.encoding = encoding;
this.contentType = contentType;
this.contentLength = contentLength;
this.inputStream = inputStream;
}
@Override
public String getCharacterEncoding() {
return encoding;
}
@Override
public String getContentType() {
return contentType;
}
@Override
public int getContentLength() {
return contentLength;
}
@Override
public InputStream getInputStream() throws IOException {
// 不能直接用request的流,因为有HttpChunk
return inputStream;
}
@Override
public String toString() {
return "ContentLength=" + this.getContentLength() + ", ContentType="
+ this.getContentType();
}
public void closeInputStream() throws IOException {
getInputStream().close();
}
}

public class NettyChunkInputStream extends InputStream {
private BlockingQueue<HttpChunk> chunkQueue = new ArrayBlockingQueue<HttpChunk>(128);
private HttpChunk currentChunk = null;
private volatile boolean closed;
public boolean putChunk(HttpChunk chunk) throws IOException {
if (!closed) {
try {
chunkQueue.put(chunk);
} catch (InterruptedException e) {
throw new IOException(e);
}
return true;
}
throw new IOException(" this inputstream has been closed!");
}
@Override
public int read() throws IOException {
byte resultByte = -1;
try {
if (getChunk().getContent().readable()) {
resultByte = getChunk().getContent().readByte();
} else if (!getChunk().isLast()) {
nextChunk();
if (getChunk().getContent().readable()) {
resultByte = getChunk().getContent().readByte();
} else {
return -1;
}
} else {
return -1;
}
} catch (InterruptedException e) {
throw new IOException(e);
}
// InputStream.read()返回0-255之间的int
return resultByte >= 0 ? resultByte : 256 + resultByte;
}
private HttpChunk getChunk() throws InterruptedException {
if (currentChunk == null) {
currentChunk = chunkQueue.take();
}
return currentChunk;
}
private void nextChunk() throws InterruptedException {
currentChunk = chunkQueue.take();
}
@Override
public int available() throws IOException {
throw new UnsupportedOperationException("unsupport available()");
}
@Override
public void close() throws IOException {
chunkQueue = null;
closed = true;
}
public boolean isClosed() {
return closed;
}
}

应用:


public class NettyUploadHandler extends SimpleChannelUpstreamHandler {
private static ExecutorService EXECUTOR = Executors.newFixedThreadPool(32);
private boolean hasReadChunk;
private NettyChunkInputStream chunkStream = new NettyChunkInputStream();
private NettyRequestContext context;
private volatile Map<String, String> resultMap = null;
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
if (!hasReadChunk) {
handleHttpRequest(ctx, e);
} else {
handleHttpChunk(e);
}
}
private void handleHttpRequest(ChannelHandlerContext ctx, MessageEvent e) throws IOException {
HttpRequest request = (HttpRequest) e.getMessage();
if (isUploadFile(request)) {
handleUploadRequest(request);
} else {
ctx.sendUpstream(e);
}
}
private void handleUploadRequest(HttpRequest request) throws IOException {
context = new NettyRequestContext("UTF-8", request.getHeader("Content-Type"), -1, chunkStream);
if (request.isChunked()) {
hasReadChunk = true;
} else {
HttpChunk chunk = new DefaultHttpChunk(request.getContent());
chunkStream.putChunk(chunk);
}
startUpload();
}
private void handleHttpChunk(MessageEvent e) throws IOException {
if (isUploadFinished()) {
writeResult(e.getChannel());
return;
}
HttpChunk chunk = (HttpChunk) e.getMessage();
chunkStream.putChunk(chunk);
if (chunk.isLast()) {
for (;;) {
if (isUploadFinished()) {
writeResult(e.getChannel());
return;
}
}
}
}
private boolean isUploadFinished() {
return resultMap != null || chunkStream.isClosed();
}
private boolean isUploadFile(HttpRequest request) {
return request.getUri().equals("/upload/uploadfile") && NettyFileUpload.isMultipartContent(request);
}
private void startUpload() {
EXECUTOR.execute(new UploadTask());
}
private void writeResult(Channel channel) {
String json = JsonUtil.beanToJson(resultMap);
byte[] data = json.getBytes();
ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(data);
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.setContent(buffer);
response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8");
response.setHeader(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(buffer.readableBytes()));
channel.write(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
e.getCause().printStackTrace();
}
class UploadTask implements Runnable {
public UploadTask() {
super();
}
@Override
public void run() {
long start = System.currentTimeMillis();
try {
NettyFileUpload upload = new NettyFileUpload(context);
FileItemIterator iter = upload.getItemIterator();
while (iter.hasNext()) {
FileItemStream item = iter.next();
//这里处理逻辑
}
resultMap = handler.getResult();
context.closeInputStream();
long end = System.currentTimeMillis();
System.out.println("spend time : " + (end - start));
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

该NettyChunkInputStream必须一个线程来putChunk(...),另一个线程使用getInputStream()来消耗数据。
PS:可以在NettyChunkInputStream中重写InputStream.read(bs,offset,len),避免每次调用read()都进行边界判断,使之效率更高。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-325262-1-1.html 上篇帖子: Apache Roller汉化研究之资源文件 下篇帖子: apache + mongrel +instantrails 部署 出现下面问题
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表