|
在之前的文章中,介绍了关于master-slave模式下的主从端代码的执行流程,因为当时篇幅所限,未对oplog的数据结构以及mongodb的local数据库作过多阐述,而这可能会让不知道其内容的朋友看代码时云里雾里找不到头绪,今天我专门用一篇文章来大致解释一下(这些内容可能会在后面章节中有所涉及)。
首先了解一个local数据库:
在mongod中,出于特殊目的(复制机制),保留性使用了local数据库。当使用认证机制时,对local数据库等同于认证admin数据库。
当使用复制集(Replica sets)模式时,其会使用下面的local数据库:
local.system.replset 用于复制集配置对象存储 (通过shell下的rs.conf()或直接查询)
local.oplog.rs 一个capped collection集合.可在命令行下使用--oplogSize 选项设置该集合大小尺寸.
local.replset.minvalid 通常在复制集内使用,用于跟踪同步状态(sync status)
主从复制模式(Master/Slave):
* Master
o local.oplog.$main 存储"oplog"信息
o local.slaves 存储在master结点上相应slave结点的同步情况(比如syncTo时间戳等)
* Slave
o local.sources 从结点所要链接的master结点信息(可通过--source配置参数指定)
* Other
o local.me 未知待查:)
o local.pair.* (replica pairs选项,目前已不推荐使用)
除了了解local之外,还有oplog的数据结构(存储在local.oplog.$main)要解释一下:
{ ts : ..., op: ..., ns: ..., o: ... o2: ... }
上面就是一条oplog信息,复制机制就是通过这些信息来进行节点间的数据同步并维护数据一致性的,其中:
ts:8字节的时间戳,由4字节unix timestamp + 4字节自增计数表示。
这个值很重要,在选举(如master宕机时)新primary时,会选择ts最大的那个secondary作为新primary。
op:1字节的操作类型,例如i表示insert,d表示delete。
ns:操作所在的namespace。
o:操作所对应的document,即当前操作的内容(比如更新操作时要更新的的字段和值)
o2: 在执行更新操作时的where条件,仅限于update时才有该属性
其中op,可以是如下几种情形之一:
"i": insert
"u": update
"d": delete
"c": db cmd
"db":声明当前数据库 (其中ns 被设置成为=>数据库名称+ '.')
"n": no op,即空操作,其会定期执行以确保时效性
了解了这些内容之后,大家通过下面方式来验证一下上面的这些内容:
1.本地构造复制集
D:\mongodb\bin>mongod --replSet myoplogs --dbpath d:\mongodb\db
2.使用mongo连接到上面结点并初始化
D:\mongodb\bin>mongo
MongoDB shell version: ...
connecting to: test
> rs.initiate()
3.查看oplog复制集信息
> use local
switched to db local
> db.oplog.rs.find() {"ts" : { "t" : 1306207268000, "i" : 1 }, "h" : NumberLong(0), "op" : "n", "ns" : "", "o" : { "msg" : "initiating set" } }
4.开始添加记录
> use test
switched to db test
> db.foo.insert({x:1})
> db.foo.update({x:1}, {$set : {y:1}})
> db.foo.update({x:2}, {$set : {y:1}}, true)
> db.foo.remove({x:1})
5.查看上面操作生成的oplog信息:
> use local
switched to db local
> db.oplog.rs.find()
{ "ts" : { "t" : 1306207268000, "i" : 1 }, "h" : NumberLong(0), "op" : "n", "ns" : "", "o" : { "msg" : "initiating set" } }
{ "ts" : { "t" : 1306207310000, "i" : 1 }, "h" : NumberLong("3138280161636515857"), "op" : "i", "ns" : "test.foo", "o" : { "_id" : ObjectId("4ddb244d2d0d00000000551a"), "x" : 1 } }
{ "ts" : { "t" : 1306207314000, "i" : 1 }, "h" : NumberLong("772196482295043060"), "op" : "u", "ns" : "test.foo", "o2" : { "_id" : ObjectId("4ddb244d2d0d00000000551a") }, "o" : { "$set" : { "y" : 1 } } }
{ "ts" : { "t" : 1306207317000, "i" : 1 }, "h" : NumberLong("7272647888810218413"), "op" : "i", "ns" : "test.foo", "o" : { "_id" : ObjectId("4ddb2455be10000000002f6a"), "x" : 2, "y" : 1 } }
{ "ts" : { "t" : 1306207325000, "i" : 1 }, "h" : NumberLong("3083832920223263240"), "op" : "d", "ns" : "test.foo", "b" : true, "o" : { "_id" : ObjectId("4ddb244d2d0d00000000551a") } }
注:其它常用命令:db.oplog.$main.help() db.printReplicationInfo();
知道上面这些内容之后,我们来大致看一下mongod在源码层面上是如何存储oplog的,先请看下面文件:
oplog.cpp:顾名思义,它的作用就是存储和读取oplog,其主要包括方法:
createOplog():初始化本地local.oplog.$main 集合信息,包括大小尺寸等
_logOpOld(): 用于在master/slave模式下向local.oplog.$main 添加oplog信息
_logOpObjRS(): 用于在replset模式下向local.oplog.rs模式下添加oplog 信息
append_O_Obj():向已存在的obj对象后追加新的对象元素信息(主要用于更新类型的oplog)
pretouchN()及pretouchOperation():某些情况下MongoDB会锁住数据库。如果此时正有数百个请求,则它们会堆积起来,造成许多问题。这里使用下面的优化方式来避免锁定:每次更新前,先查询记录。查询操作会将对象放入内存,于是更新则会尽可能的迅速。在"主/从"部署方案中,从节点可以使用“-pretouch”参数运行,这也可以得到相同的效果。
下面就借助源码来进一步深入了解,首先回顾一下之前这篇文章中的所描述的这段代码:
//repl.cpp
void startReplication() {
/* if we are going to be a replica set, we aren't doing other forms of replication. */
if( !cmdLine._replSet.empty() ) {
......
//绑定函数,oplog.cpp ->_logOpRS(), 用于处理replset类型的oplog操作
newRepl();
return;
}
//绑定函数,oplog.cpp ->_logOpOld(), 用于处理master-slave类型的oplog操作
oldRepl();
上面的newRepl()及oldRepl()主要是实现函数绑定,它会将oplog.cpp文件中的_logOpRS和_logOpOld方法加以绑定(本人猜测可能因为1.6版之后引入了replset,造成方法名称的重新命名和分配,导致这里用这种方式对老的方法进行“过渡”)。
接着上面方面会调用oplog.cpp文件的createOplog()来构造master的“local.oplog.$main”集合,如下:
//如果是master,则构造并启动线程方法replMasterThread
if ( replSettings.master || replPair ) {
if ( replSettings.master )
log(1) = 0 );
if( countdown > 0 ) {
countdown--; // was pretouched on a prev pass
}
else {
const int m = 4;
if( tp.get() == 0 ) {
int nthr = min(8, cmdLine.pretouch);
nthr = max(nthr, 1);
tp.reset( new ThreadPool(nthr) );
}
vector v;
oplogReader.peek(v, cmdLine.pretouch);
unsigned a = 0;
while( 1 ) {
if( a >= v.size() ) break;
unsigned b = a + m - 1; // v[a..b]
if( b >= v.size() ) b = v.size() - 1;
tp->schedule(pretouchN, v, a, b);
DEV cout |
|