设为首页 收藏本站
查看: 730|回复: 0

[经验分享] hadoop源码解析copyFromLocal

[复制链接]

尚未签到

发表于 2016-12-10 06:07:52 | 显示全部楼层 |阅读模式
  好奇分布式存储是怎么实现的,如何能将一个文件存储到HDFS上,HDFS的文件目录只是一个空壳,真正存储数据的是DataNode,那么当我们把一个文件放到HDFS上的时候,集群都做了哪些工作呢 ?也就是执行命令copyFromLocal这个命令都做了哪些操作
  首先命令肯定对应着源码里面的某一个方法,这个方法是FsShell类的copyFromLocal,代码:

    void copyFromLocal(Path[] srcs, String dstf) throws IOException
{
// 创建目标路径
Path dstPath = new Path(dstf);
// 获取目录存储目标路径的文件系统
FileSystem dstFs = dstPath.getFileSystem(getConf());
if (srcs.length == 1 && srcs[0].toString().equals("-"))
{
copyFromStdin(dstPath, dstFs);
}
else
{
dstFs.copyFromLocalFile(false, false, srcs, dstPath);
}
}
  文件的拷贝是通过类FileUtil累的copy方法实现的:

    public static boolean copy(FileSystem srcFS, Path src, FileSystem dstFS,
Path dst, boolean deleteSource, boolean overwrite,
Configuration conf) throws IOException
{
// 检查目标路径是否合法
dst = checkDest(src.getName(), dstFS, dst, overwrite);
if (srcFS.getFileStatus(src).isDir())
{
// 检查目标目录是否是合理的目录
checkDependencies(srcFS, src, dstFS, dst);
if (!dstFS.mkdirs(dst))
{
return false;
}
FileStatus contents[] = srcFS.listStatus(src);
for (int i = 0; i < contents.length; i++)
{
// 递归调用当前方法,如果原目标是文件,那么执行else if 代码块
copy(srcFS, contents.getPath(), dstFS, new Path(dst,
contents.getPath().getName()), deleteSource,
overwrite, conf);
}
}
else if (srcFS.isFile(src))
{
InputStream in = null;
OutputStream out = null;
try
{
in = srcFS.open(src);
// 创建目标路径,在分布式中如何创建很重要
out = dstFS.create(dst, overwrite);
IOUtils.copyBytes(in, out, conf, true);
}
catch (IOException e)
{
IOUtils.closeStream(out);
IOUtils.closeStream(in);
throw e;
}
}
}
  文件的拷贝需要打开源文件流和目标文件流,目标文件流是通过DFSClient的create方法实现,创建一个DFSOutputStream流:

    public OutputStream create(String src, FsPermission permission,
boolean overwrite, boolean createParent, short replication,
long blockSize, Progressable progress, int buffersize)
throws IOException
{
checkOpen();
if (permission == null)
{
permission = FsPermission.getDefault();
}
FsPermission masked = permission
.applyUMask(FsPermission.getUMask(conf));
LOG.debug(src + ": masked=" + masked);
// src 为要拷贝到的目标路径, 文件块大小blockSize应该是io.bytes.per.checksum
// 大小的n倍,否则会出现异常
OutputStream result = new DFSOutputStream(src, masked, overwrite,
createParent, replication, blockSize, progress, buffersize,
conf.getInt("io.bytes.per.checksum", 512));
leasechecker.put(src, result);
return result;
}
  在创建DFSOutputStream流的时候都做了什么工作,具体看创建方法,在DFSOutputStream中开启了DataStreamer进程,这个进程在后面的数据写入的时候扮演者重要的角色:

        DFSOutputStream(String src, FsPermission masked, boolean overwrite,
boolean createParent, short replication, long blockSize,
Progressable progress, int buffersize, int bytesPerChecksum)
throws IOException
{
this(src, blockSize, progress, bytesPerChecksum, replication);
computePacketChunkSize(writePacketSize, bytesPerChecksum);
try
{
namenode.create(src, masked, clientName, overwrite,
createParent, replication, blockSize);
}
catch (RemoteException re)
{
throw re.unwrapRemoteException(AccessControlException.class,
FileAlreadyExistsException.class,
FileNotFoundException.class,
NSQuotaExceededException.class,
DSQuotaExceededException.class);
}
streamer.start();
}
  DataStreamer进程起来以后,开启与目标文件的通道,等待DataQueue队列有数据后,将数据写入到目标文件中,目标文件其实是DataNode上的文件,熟称block,关于如何寻找相应的block,可以从上面的另一条主线,创建文件的源码中查看。

......
// get packet to be sent.
if (dataQueue.isEmpty())
{        
one = new Packet(); // heartbeat packet
}
else
{
// 从队列中获取一个 Packet
one = dataQueue.getFirst(); // regular data
// packet
}
......
// 如果某一块的数据已经读取完,开启下一个块的连接
//
if (blockStream == null)
{
LOG.debug("Allocating new block");
nodes = nextBlockOutputStream(src);
this.setName("DataStreamer for file " + src
+ " block " + block);
response = new ResponseProcessor(nodes);
response.start();
}
......
// blockStream向clinet(也就是某个DataNode)发送数据
blockStream.write(buf.array(), buf.position(),
buf.remaining());
  这个进程会等待数据的来临,那么数据从何而来,看IOUtils的copyBytes方法,它判断是否是PrintStream流,这个用于打印到控制台:

    public static void copyBytes(InputStream in, OutputStream out, int buffSize)
throws IOException
{
PrintStream ps = out instanceof PrintStream ? (PrintStream) out : null;
byte buf[] = new byte[buffSize];
int bytesRead = in.read(buf);
while (bytesRead >= 0)
{
// 这个另有乾坤,不要简单的把out想象成OutputStream
// 这个out方法最终会调用DFSClient.DFSOutputStream.writeChunk(..)
out.write(buf, 0, bytesRead);
if ((ps != null) && ps.checkError())
{
throw new IOException("Unable to write to output stream.");
}
bytesRead = in.read(buf);
}
}
  这个out从刚才的原来看应该是FSDataOutputStream,追踪write方法,会到FSOutputSummer类的writeChecksumChunk方法中:

    private void writeChecksumChunk(byte b[], int off, int len, boolean keep)
throws IOException
{
int tempChecksum = (int) sum.getValue();
if (!keep)
{
sum.reset();
}
int2byte(tempChecksum, checksum);
writeChunk(b, off, len, checksum);
}
  这个方法通过调用自身的抽象方法writeChunk方法来实现写数据,这个抽象的方法由DFSOutputStream实现,在writeChunk方法中将源文件的数据装载到DataQueue中,这样原先的DataStreamer进程就可以从DataQueue中读取数据并写如到指定的block中,具体可以看代码的实现。

        private synchronized void enqueueCurrentPacket()
{
synchronized (dataQueue)
{
if (currentPacket == null)
return;
dataQueue.addLast(currentPacket);
dataQueue.notifyAll();
lastQueuedSeqno = currentPacket.seqno;
currentPacket = null;
}
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-311960-1-1.html 上篇帖子: Hadoop源码之JobTracker 下篇帖子: Hadoop文集收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表