设为首页 收藏本站
查看: 1651|回复: 0

[经验分享] mongodb源码分析--查询

[复制链接]

尚未签到

发表于 2015-7-5 14:30:29 | 显示全部楼层 |阅读模式
在之前的一篇文章中,介绍了mongodb的主程序入口main()的执行流程,其实main只是实始化一些参数信息并做了些后台线程任务的启动工作(包括数据准备和恢复),并最终启动一个线程进行循环侦听。今天将会介绍在mongodb中数据查询 (find)的流程,以了解mongodb是如果对message进行拆包分析,以及数据进行表扫描及索引使用的。
   
     好了,开始今天的正文吧!
     这里继续昨天的代码浏览过程,从connThread函数说起,看了上一篇文章的朋友都清楚了该函数主要工作就是不断循环[while ( 1 )]获取当前客户端发来的信息(上面已封装成了message)并将其信息进行分析,并根据相应操作标志位确定当前操作是CRUD或构建索引等[assembleResponse()],如果一些正常,则向客户端发送应答信息。而如果客户端连接提交了一个查询操作(也包括CUD及其它操作)的话,那么它就会调用assembleResponse方法来进行相关操作的处理,该方法声明如下(instance.cpp第224行):
   


    // 直接请求包括'end'字符,则返回false   
    void assembleResponse( Message &m /*客户端传来的(操作)信息*/,
                           DbResponse &dbresponse,/*响应结构体,用于绑定要响应的数据及状态*/
                           const SockAddr &client ) {
    // 获取操作符枚举信息
    int op = m.operation();
   
    注:枚举定义如下


    enum Operations {
        opReply = 1,     /* reply. responseTo is set. */
        dbMsg = 1000,    /* generic msg command followed by a string */
        dbUpdate = 2001, /* update object */
        dbInsert = 2002,  //dbGetByOID = 2003,
        dbQuery = 2004,
        dbGetMore = 2005,
        dbDelete = 2006,
        dbKillCursors = 2007
    };
      接着它会判断是否为$cmd命令,即以.$cmd为开头,形如 db.$cmd.findOne({getlasterror: 1}),并对一些特殊指令进行单独处理,包括inprog,killop,unlock。


        bool isCommand = false;
        const char *ns = m.singleData()->_data + 4;
        if ( op == dbQuery ) {
            if( strstr(ns, ".$cmd") ) {
                isCommand = true;
                opwrite(m);
                if( strstr(ns, ".$cmd.sys.") ) {
                    if( strstr(ns, "$cmd.sys.inprog") ) {
                        inProgCmd(m, dbresponse);
                        return;
                    }
                    if( strstr(ns, "$cmd.sys.killop") ) {
                        killOp(m, dbresponse);
                        return;
                    }
                    if( strstr(ns, "$cmd.sys.unlock") ) {
                        unlockFsync(ns, m, dbresponse);
                        return;
                    }
                }
            }
            else {
                opread(m);
            }
        }
        else if( op == dbGetMore ) {
            opread(m);
        }
      接着就是获取当前线程连接的客户端对象,如下:


    Client& c = cc();
      该方法实现代码如下:


    /** get the Client object for this thread. */
    inline Client& cc() {
        Client * c = currentClient.get();
        assert( c );
        return *c;
    }
     其主要用内联函数方式获取当前客户端操作的线程信息,而该线程默认就是上一篇文章中所创建的那个:
   


    Client::initThread("initandlisten");
     因为mongodb会为每一个客户端DB操作创建一个线程 Client对象,我个人把它理解为服务端持有的对应(每)客户端的操作对象。其主体函数如下:


    boost::thread_specific_ptr currentClient; //thread_specific_ptr对象为每个线程保持一个指针,每个线程都应该new出一个对 象交给thread_specific_ptr,当线程终结时,该对象释放。

    /* each thread which does db operations has a Client object in TLS.
       call this when your thread starts.
    */
    Client& Client::initThread(const char *desc, MessagingPort *mp) {
        assert( currentClient.get() == 0 );
        Client *c = new Client(desc, mp);
        currentClient.reset(c);
        mongo::lastError.initThread();
        return *c;
    }
     我们再回到assembleResponse函数,接下来的代码就是使用CurOp(一个提供了内部锁机制来保存当前客户端操作状态的对象)来把当前Client对象及相应操作(CRUD等)封装于其中,这样当以访问该对象进行原子操作时(Atomic)就可以通过其内置支持多线程并发访问和锁保护了。
   


    CurOp* currentOpP = c.curop();
    ......
    CurOp& currentOp = *currentOpP;
    currentOp.reset(client,op);
    OpDebug& debug = currentOp.debug();
    StringBuilder& ss = debug.str;
    ss _data + 4;
        char cl[256];
        nsToDatabase(ns, cl);
        //进行权限认证
        if( ! c.getAuthenticationInfo()->isAuthorized(cl) ) {
            uassert_nothrow("unauthorized");
        }
        else {
            try {
                if ( op == dbInsert ) {  //添加记录操作
                    receivedInsert(m, currentOp);
                }
                else if ( op == dbUpdate ) { //更新记录
                    receivedUpdate(m, currentOp);
                }
                else if ( op == dbDelete ) { //删除记录
                    receivedDelete(m, currentOp);
                }
                else if ( op == dbKillCursors ) { //删除Cursors(游标)对象
                    currentOp.ensureStarted();
                    logThreshold = 10;
                    ss setResultFlagsToOk();
                qr->len = bb.len();
                ss cursorId = 0;
                qr->startingFrom = 0;
                qr->nReturned = 1;
                result.setData( qr.release(), true );
            }
            else {
                uasserted(13530, "bad or malformed command request?");
            }
            return 0;
        }
        /* 普通查询分支(非指令式操作,也就是我们用c#客户端链接查询方式)*/
        ......
        BSONObj order = pq.getOrder();
        BSONObj query = pq.getFilter();
        /* 对查询对象大小进行判断,过滤错误的查询对象(为0)*/
        if ( query.objsize() == 0 ) {
            out() setResultFlagsToOk();
        // qr->len is updated automatically by appendData()
        ss setOperation(opReply);
        qr->startingFrom = 0;
        qr->nReturned = n;
        /*查询耗时统计*/
        int duration = curop.elapsedMillis();
        bool dbprofile = curop.shouldDBProfile( duration );
        if ( dbprofile || duration >= cmdLine.slowMS ) {
            ss qp().willScanTable() ) {/*设置表扫描标识*/
            _tableScanned = true;
        }
        //pop出or谓词/子句
        _fros.popOrClause( ret->qp().indexed() ? ret->qp().indexKey() : BSONObj() );
        return ret;
    }
    上面方面最终都是调用 _currentQps->runOp( op )来执行查询操作,下面就是方法的代码:   


    shared_ptr< QueryOp > QueryPlanSet::runOp( QueryOp &op ) {
        if ( _usingPrerecordedPlan ) { /*该变量貌似&#8220;是否使用预先记录的计划&#8221;,也就是索引*/
            Runner r( *this, op );
            shared_ptr< QueryOp > res = r.run();
            ......
        }
        Runner r( *this, op );
        return r.run();
    }
    上面代码主要是定义声明Runner实例并运行它,Runner本身为strcut类型,主要是用于对执行步骤进行封装(形成依次执行的操作流),这里不再多述了。下面是其r.run()方法的定义:
   


     shared_ptr< QueryOp > QueryPlanSet::Runner::run() {
        ......
        for( vector< shared_ptr< QueryOp > >::iterator i = ops.begin(); i != ops.end(); ++i ) {
            initOp( **i ); //初始化操作,声明如下
            if ( (*i)->complete() )
                return *i;
        }
        ......
    }
    void QueryPlanSet::Runner::initOp( QueryOp &op ) {
        GUARD_OP_EXCEPTION( op, op.init() );
    }
    上面op.init操作主要最终会执行下面方法(位于query.cpp 662行), 该方法会用查询条件构造一个游标,该游标记录着遍历数据集方式,查询起始位置等信息等
     


     virtual void _init() {
           ......
                _c = qp().newCursor( DiskLoc() , _pq.getNumToReturn() + _pq.getSkip() );/*构造*/
                _capped = _c->capped();
                // setup check for if we can only use index to extract
                if ( _c->modifiedKeys() == false && _c->isMultiKey() == false && _pq.getFields() ) {
                    _keyFieldsOnly.reset( _pq.getFields()->checkKey( _c->indexKeyPattern() ) );
                }
            }
          ......        
    }
    下面是其函数的代码(queryoptimizer.cpp 168 行):        
   


    shared_ptr QueryPlan::newCursor( const DiskLoc &startLoc , int numWanted ) const {
         .....
        if ( !_index ) { //非索引扫描
            if ( _fbs.nNontrivialRanges() )
                checkTableScanAllowed( _fbs.ns() );
            return findTableScan( _fbs.ns(), _order, startLoc ); /*进行表扫描*/
        }
        .....        
    }
    findTableScan方法(pdfile.cpp 687行)即开始表扫描指定磁盘位置信息,并根据相关条件指定相应类型的游标信息。   
   


shared_ptr findTableScan(const char *ns, const BSONObj& order, const DiskLoc &startLoc) {
        BSONElement el = order.getField("$natural"); // e.g., { $natural : -1 }

        if ( el.number() >= 0 )
            return DataFileMgr::findAll(ns, startLoc);  /*startLoc开始位置*/
        ......
    }
    返回的游标类型为Cursor,但findAll方法里构造的是BasicCursor,相应代码(pdfile.cpp 639行):


    shared_ptr DataFileMgr::findAll(const char *ns, const DiskLoc &startLoc) {
        NamespaceDetails * d = nsdetails( ns );
        if ( ! d )
            return shared_ptr(new BasicCursor(DiskLoc()));
        .....
        
        return shared_ptr(new BasicCursor( e->firstRecord ));
    }
    BasicCursor构造函数比较有意思,其引入了AdvanceStrategy对象指针,这个策略指针定义访问物理磁盘文件的方式,其操作单元是DiskLoc(DiskLoc实例对象实际是一个双向链接),访问方法虽然只有next一种,但mongodb却用它实现了向前和后转两种访问方式(详情参见cursor.cpp),如下:


    BasicCursor(DiskLoc dl, const AdvanceStrategy *_s = forward()) : curr(dl), s( _s ), _nscanned() {
            incNscanned();
            init();
    }
     /* these will be used outside of mutexes - really functors - thus the const */
    class Forward : public AdvanceStrategy {
        virtual DiskLoc next( const DiskLoc &prev ) const {
            return prev.rec()->getNext( prev );
        }
    } _forward;
    class Reverse : public AdvanceStrategy {
        virtual DiskLoc next( const DiskLoc &prev ) const {
            return prev.rec()->getPrev( prev );
        }
    } _reverse;
    上面的 prev.rec()方法调用最终会执行下面函数调用流程:
   


    //pdfile.h
    inline Record* DiskLoc::rec() const {
        return DataFileMgr::getRecord(*this);
    }
    inline Record* DataFileMgr::getRecord(const DiskLoc& dl) {
        assert( dl.a() != -1 );
        return cc().database()->getFile(dl.a())->recordAt(dl);
    }
   
     而最后&#8220;cc().database()->getFile(dl.a())->recordAt(dl)&#8221;方法会最终从数据库文件 mongodfile中获取记录信息(详见database.cpp):


    //pdfile.h
    inline Record* MongoDataFile::recordAt(DiskLoc dl) {
        int ofs = dl.getOfs();
        if( ofs < DataFileHeader::HeaderSize ) badOfs(ofs); // will uassert - external call to keep out of the normal code path
        return (Record*) (p()+ofs);
    }
   
     兜了一大圈,头都快大了,不是吗?呵呵。另外Record,DiskLoc这两个与数据访问/存储相关类以后会抽时间介绍。
     好了,今天的内容到这里就告一段落了,在接下来的文章中,将会介绍客户端发起Insert操作时,Mongodb的执行流程和B树的相应部分实现。
    原文链接:http://www.iyunv.com/daizhj/archive/2011/03/18/1988288.html
    作者: daizhj, 代震军   
    微博: http://t.sina.com.cn/daizhj
   
Tags: mongodb,c++,source code

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-83423-1-1.html 上篇帖子: 开篇有益:为什么选择MongoDB? 下篇帖子: windows下mongodb安装与使用整理
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表