设为首页 收藏本站
查看: 1761|回复: 0

[经验分享] Adhesive框架系列文章--Mongodb数据服务模块实现(上)

[复制链接]

尚未签到

发表于 2015-7-6 05:58:51 | 显示全部楼层 |阅读模式
  Mongodb数据服务可以直接接受任何类型数据,并且它设计为可以承受大量数据的写入。为了能保存任何类型的数据,并且在后台可以查看任何类型的数据,因此我们必须在收到数据的时候对数据的元数据进行提取,随同主体数据一并保存在数据库中。对数据本身也需要重新组织结构,相当于进行一次序列化,然后保存到数据库中。虽然Mongodb是支持Json格式的,但是由于我们在保存数据的时候还有很多逻辑,因此我们必须手动进行这个工作。其实对于提交数据来说,应该是一个非常快的动作,应该以异步方式进行,在一个尽量短的时间内让方法的调用可以返回,之后可以在后台慢慢进行数据的转换和数据发送到远端。因此,开发了一个内存队列服务模块来进行异步队列处理工作,并且提交数据到远端也使用了框架内部的Wcf分布式服务模块。当然,在服务端道理也一样,我们可以通过一个内存队列来批量提交数据,并且让服务的调用尽快返回。Mongodb数据服务提交数据的过程如下:
DSC0000.png
  项目的结构如下:
DSC0001.png
  1、Mongodb项目是客户端部分的接口
  2、Mongodb.Imp项目是客户端部分的实现
  3、Mongodb.Server是服务端部分的接口,或者说是服务契约
  4、Mongodb.Server.Imp是服务端部分的实现
  可以看到Mongodb数据本身依赖应用程序信息中心模块、配置服务模块、内存队列服务模块、Wcf分布式服务模块,对于大部分客户端应用程序来说都应该只依赖Mongodb数据服务的客户端而不是服务端。我们把Mongodb数据服务分成两部分,插入数据的服务和查询服务,后者的使用者一般而言只有Mongodb数据服务的后台。本文主要介绍前者:

   public interface IMongodbInsertService : IDisposable
{
void Insert(object item);
}
  从接口本身来看非常简单,只有一个方法。我们来看看它的实现步骤:
  1、调用配置服务,查看这个数据类型对应的配置,说到这里,让我们来看一下Mongodb数据服务客户端的配置:

    [ConfigEntity(FriendlyName = "Mongodb客户端配置")]
public class MongodbServiceConfigurationEntity
{
[ConfigItem(FriendlyName = "插入服务配置项列表")]
public Dictionary MongodbInsertServiceConfigurationItems { get; set; }
}
  每一个类型的配置项如下:

    [ConfigEntity(FriendlyName = "Mongodb客户端针对每个数据类型的配置")]
public class MongodbInsertServiceConfigurationItem
{
[ConfigItem(FriendlyName = "类型完整名")]
public string TypeFullName { get; set; }
[ConfigItem(FriendlyName = "是否提交到服务端")]
public bool SubmitToServer { get; set; }
[ConfigItem(FriendlyName = "队列最大项数")]
public int MaxItemCount { get; set; }
[ConfigItem(FriendlyName = "消费的线程总数")]
public int ConsumeThreadCount { get; set; }
[ConfigItem(FriendlyName = "消费数据的时间间隔毫秒")]
public int ConsumeIntervalMilliseconds { get; set; }
[ConfigItem(FriendlyName = "遇到错误时消费数据的时间间隔毫秒")]
public int ConsumeIntervalWhenErrorMilliseconds { get; set; }
[ConfigItem(FriendlyName = "消费数据的批量项数")]
public int ConsumeItemCountInOneBatch { get; set; }
[ConfigItem(FriendlyName = "达到最大项数后的策略")]
public MemoryQueueServiceReachMaxItemCountAction ReachMaxItemCountAction { get; set; }
[ConfigItem(FriendlyName = "消费数据时不足批次数的策略")]
public MemoryQueueServiceNotReachBatchCountConsumeAction NotReachBatchCountConsumeAction { get; set; }
[ConfigItem(FriendlyName = "消费数据遇到错误的策略")]
public MemoryQueueServiceConsumeErrorAction ConsumeErrorAction { get; set; }
public MongodbInsertServiceConfigurationItem()
{
TypeFullName = "";
SubmitToServer = true;
ReachMaxItemCountAction = MemoryQueueServiceReachMaxItemCountAction.AbandonOldItems
.Add(MemoryQueueServiceReachMaxItemCountAction.LogExceptionEveryOneSecond);
ConsumeErrorAction = MemoryQueueServiceConsumeErrorAction.AbandonAndLogException;
ConsumeThreadCount = 1;
ConsumeIntervalMilliseconds = 10;
ConsumeIntervalWhenErrorMilliseconds = 1000;
ConsumeItemCountInOneBatch = 100;
NotReachBatchCountConsumeAction = MemoryQueueServiceNotReachBatchCountConsumeAction.ConsumeAllItems;
MaxItemCount = 10000;
}
}
  这里可以看到,除了是否提交到服务端这个配置,大多数的配置其实是内存队列服务的配置,在之后的文章中我们单独会介绍内存队列服务。之所以需要为Mongodb数据服务的客户端设置这样的配置,一方面是允许修改队列服务的配置,另一方面是为了限制没有经过配置随便什么数据都往服务端发送,只有在后台显式配置的数据类型,才会发生到服务端。
  2、如果没获取到配置的话返回,如果获取到配置的话,则为这个类型初始化内存队列服务,设置一系列队列服务的参数,并且把队列的处理委托挂载我们提交数据到服务端的处理方法。换句话说是每一个类型都会有自己的内存队列服务,我们在MongodbInsertService的实现定义了一个静态字典用于保存内存队列服务的实现:

private static Dictionary submitDataMemoryQueueServices = new Dictionary();
                if (!submitDataMemoryQueueServices.ContainsKey(typeFullName))
{
lock (submitDataMemoryQueueServices)
{
if (!submitDataMemoryQueueServices.ContainsKey(typeFullName))
{
var memoryQueueService = LocalServiceLocator.GetService();
memoryQueueService.Init(new MemoryQueueServiceConfiguration(string.Format("{0}_{1}", ServiceName, typeFullName), InternalSubmitData)
{
ConsumeErrorAction = config.ConsumeErrorAction,
ConsumeIntervalMilliseconds = config.ConsumeIntervalMilliseconds,
ConsumeIntervalWhenErrorMilliseconds = config.ConsumeIntervalWhenErrorMilliseconds,
ConsumeItemCountInOneBatch = config.ConsumeItemCountInOneBatch,
ConsumeThreadCount = config.ConsumeThreadCount,
MaxItemCount = config.MaxItemCount,
NotReachBatchCountConsumeAction = config.NotReachBatchCountConsumeAction,
ReachMaxItemCountAction = config.ReachMaxItemCountAction,
});
submitDataMemoryQueueServices.Add(typeFullName, memoryQueueService);
}
}
}
  3、然后会判断是否已经提取过这个类型元数据了,如果没提取过则尝试提取元数据并加入缓存:

                if (!mongodbDatabaseDescriptionCache.ContainsKey(typeFullName))
{
lock (mongodbDatabaseDescriptionCache)
{
if (!mongodbDatabaseDescriptionCache.ContainsKey(typeFullName))
{
MongodbDatabaseDescription mongodbDatabaseDescription = GetMongodbDatabaseDescription(item);
CheckMongodbDatabaseDescription(mongodbDatabaseDescription);
mongodbDatabaseDescriptionCache.Add(typeFullName, mongodbDatabaseDescription);
}
}
}
  4、把数据加入队列,等待队列服务在合适的时候调用处理方法(也就是发送到服务端):

if (config.SubmitToServer)
{
submitDataMemoryQueueServices[typeFullName].Enqueue(item);
}
  
  其实到这里为止,方法已经返回了,之后就是队列服务在后台的异步调用了。现在我们来深入一下细节,首先看一下GetMongodbDatabaseDescription是如何提取元数据的,这个方法返回的是MongodbDatabaseDescription,它的定义如下:

    [DataContract(Namespace = "Adhesive.Mongodb")]
public class MongodbDatabaseDescription
{
[DataMember]
public bool SentToServer { get; set; }
[DataMember]
public string TypeFullName { get; set; }
[DataMember]
public string DatabasePrefix { get; set; }
[DataMember]
public string CategoryName { get; set; }
[DataMember]
public string Name { get; set; }
[DataMember]
public string DisplayName { get; set; }
[DataMember]
public int ExpireDays { get; set; }
[DataMember]
public List MongodbColumnDescriptionList { get; set; }
[DataMember]
public List MongodbEnumColumnDescriptionList { get; set; }
}
  在这里可以看到,我们主要解析的是MongodbPersistenceEntityAttribute,对于下一级的MongodbColumnDescriptionList ,我们主要是解析每一个列的元数据,而MongodbEnumColumnDescriptionList则提取所有枚举的信息。MongodbColumnDescription的定义如下:

    [DataContract(Namespace = "Adhesive.Mongodb")]
public class MongodbColumnDescription
{
[DataMember]
public string Name { get; set; }
[DataMember]
public string TypeName { get; set; }
[DataMember]
public bool IsArrayColumn { get; set; }
[DataMember]
public bool IsEntityColumn { get; set; }
[DataMember]
public string ColumnName { get; set; }
[DataMember]
public string DisplayName { get; set; }
[DataMember]
public string Description { get; set; }
[DataMember]
public bool ShowInTableView { get; set; }
[DataMember]
public bool IsTableColumn { get; set; }
[DataMember]
public bool IsTimeColumn { get; set; }
[DataMember]
public bool IsContextIdentityColumn { get; set; }
[DataMember]
public bool IsPrimaryKey { get; set; }
[DataMember]
public MongodbIndexOption MongodbIndexOption { get; set; }
[DataMember]
public MongodbFilterOption MongodbFilterOption { get; set; }
[DataMember]
public MongodbCascadeFilterOption MongodbCascadeFilterOption { get; set; }
[DataMember]
public MongodbSortOption MongodbSortOption { get; set; }
}
  这里很多数据都来自MongodbPersistenceItemAttribute和MongodbPresentationItemAttribute。再来看看MongodbEnumColumnDescription:

    [DataContract(Namespace = "Adhesive.Mongodb")]
public class MongodbEnumColumnDescription
{
[DataMember]
public string Name { get; set; }
[DataMember]
public Dictionary EnumItems { get; set; }
}
  它就简单了,只是保存枚举的列名,和枚举每一项的数据。其实这些元数据提取本身没什么复杂的,可以想到是反射提取,并且其中还涉及到递归,需要深入每一个自定义类型,GetMongodbColumnDescription方法其中有一段这样的代码实现了递归:

                if (!type.Assembly.GlobalAssemblyCache && type != pi.DeclaringType)
{
columnDescription.IsEntityColumn = true;
var properties = GetPropertyListFromCache(type);
if (properties != null)
{
foreach (var property in properties)
{
GetMongodbColumnDescription(typeFullName, fullName, columnDescriptionList, enumColumnDescriptionList, property);
}
}
}
  在提取元数据的时候,另一个重要的工作是缓存一些关键的PropertyInfo的配置,以便后期处理数据的时候使用:

    internal class ProperyInfoConfig
{
public bool IsCascadeFilterLevelOne { get; set; }
public bool IsCascadeFilterLevelTwo { get; set; }
public bool IsCascadeFilterLevelThree { get; set; }
public bool IsDateColumn { get; set; }
public bool IsTableName { get; set; }
public bool IsIgnore { get; set; }
public string ColumnName { get; set; }
}
  因为我们在提交数据之前,需要针对级联下拉的数据进行处理,把第二级的值设置为第一级的值加上第二级的值,第三级的值设置为一加二加三,这样在筛选的时候就会很方便;此外还需要替换列名,计算表名等等,只有缓存了PropertyInfo才能无需重新读取元数据:

private static Dictionary propertyConfigCache = new Dictionary();
  之前说了元数据提取部分时的逻辑,然后来看一下格式化数据时的逻辑,之前为内存队列服务的提交数据的委托挂载的方法主要实现如下:

  var mongodbDataList = items.Select(_ => ConvertItemToMongodbData(_)).Where(_ => _ != null).ToList();
var desc = mongodbDatabaseDescriptionCache[typeFullName];
WcfServiceLocator.GetSafeService().SubmitData(mongodbDataList, desc.SentToServer ? null : desc);
  先是获取要提交的数据,然后再获取元数据,如果有的话和主体数据一并提交到服务端。通过Wcf分布式数据服务获取到IMongodbServer,并调用它的SubmitData方法,定义如下:

[OperationContract]
void SubmitData(IList dataList, MongodbDatabaseDescription databaseDescription);
  MongodbData的定义如下:

    [DataContract(Namespace = "Adhesive.Mongodb")]
public class MongodbData
{
[DataMember]
public string TypeFullName { get; set; }
[DataMember]
public string DatabaseName { get; set; }
[DataMember]
public string TableName { get; set; }
[DataMember]
public string Data { get; set; }
}
  在这里可以发现Data是字符串类型,那是因为我们把要提交的数据主体转换成了Json,否则我们是无法通过Wcf提交Dictionary构成的一颗无限级树的。在这里,我们略去介绍ConvertItemToMongodbData的实现,它其实并不复杂,也是通过递归和反射无限级获取类的所有属性的值,并转换为Dictionary,只不过在这里面需要处理列表类型、字典类型以及枚举。
  
  至此为止,客户端的部分介绍完了,现在我们来看一下服务端部分。首先,服务端也有根据每一个类型的配置:

    [ConfigEntity(FriendlyName = "Mongodb服务端针对每个数据类型的配置")]
public class MongodbServerConfigurationItem
{
[ConfigItem(FriendlyName = "类型完整名")]
public string TypeFullName { get; set; }
[ConfigItem(FriendlyName = "服务器名")]
public string MongodbServerUrlName { get; set; }
[ConfigItem(FriendlyName = "是否提交到数据库")]
public bool SubmitToDatabase { get; set; }
[ConfigItem(FriendlyName = "队列最大项数")]
public int MaxItemCount { get; set; }
[ConfigItem(FriendlyName = "消费的线程总数")]
public int ConsumeThreadCount { get; set; }
[ConfigItem(FriendlyName = "消费数据的时间间隔毫秒")]
public int ConsumeIntervalMilliseconds { get; set; }
[ConfigItem(FriendlyName = "遇到错误时消费数据的时间间隔毫秒")]
public int ConsumeIntervalWhenErrorMilliseconds { get; set; }
[ConfigItem(FriendlyName = "消费数据的批量项数")]
public int ConsumeItemCountInOneBatch { get; set; }
[ConfigItem(FriendlyName = "达到最大项数后的策略")]
public MemoryQueueServiceReachMaxItemCountAction ReachMaxItemCountAction { get; set; }
[ConfigItem(FriendlyName = "消费数据时不足批次数的策略")]
public MemoryQueueServiceNotReachBatchCountConsumeAction NotReachBatchCountConsumeAction { get; set; }
[ConfigItem(FriendlyName = "消费数据遇到错误的策略")]
public MemoryQueueServiceConsumeErrorAction ConsumeErrorAction { get; set; }
public MongodbServerConfigurationItem()
{
TypeFullName = "";
SubmitToDatabase = true;
ReachMaxItemCountAction = MemoryQueueServiceReachMaxItemCountAction.AbandonOldItems
.Add(MemoryQueueServiceReachMaxItemCountAction.LogExceptionEveryOneSecond);
ConsumeErrorAction = MemoryQueueServiceConsumeErrorAction.AbandonAndLogException;
ConsumeThreadCount = Environment.ProcessorCount;
ConsumeIntervalMilliseconds = 10;
ConsumeIntervalWhenErrorMilliseconds = 1000;
ConsumeItemCountInOneBatch = 100;
NotReachBatchCountConsumeAction = MemoryQueueServiceNotReachBatchCountConsumeAction.ConsumeAllItems;
MaxItemCount = 100000;
}
}
  这个配置和客户端的配置差不多,只不过这里把是否提交到服务端改为了是否提交到数据库。在获取了配置之后,同样把数据提交到内存队列,然后由内存队列提交到数据库。核心代码如下:

            try
{
var sw = Stopwatch.StartNew();
var server = CreateMasterMongoServer(typeFullName);
if (server != null)
{
var database = server.GetDatabase(item.DatabaseName);
var collection = database.GetCollection(item.TableName);
var documentList = new List();
JavaScriptSerializer s = new JavaScriptSerializer();
mongodbDataList.ForEach(i =>
{
var dic = s.DeserializeObject(i.Data) as IDictionary;
var document = new BsonDocument().Add(dic);
documentList.Add(document);
});
collection.InsertBatch(documentList);
LocalLoggingService.Debug("Mongodb服务端成功服务提交 {0} 条数据到数据库,类型是 '{1}',耗时 {2} 毫秒", documentList.Count, typeFullName, sw.ElapsedMilliseconds);
}
}
catch (Exception ex)
{
AppInfoCenterService.ExceptionService.Handle(ex, categoryName: ServiceName, subcategoryName: typeFullName, description: "写入数据出现错误", extraInfo: new ExtraInfo
{
DisplayItems = new Dictionary()
{
{"DatabaseName" , item.DatabaseName},
{"TableName", item.TableName}
}
});
}
}
  首先是Json反序列化获取到数据,然后转换为BsonDocument,最后批量提交到数据库中。
  本文介绍了Mongodb数据服务的插入数据部分在客户端和服务端之间的逻辑,下一篇将介绍Mongodb数据服务查询数据的部分。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-83483-1-1.html 上篇帖子: 我与mongodb 二三事(1) 下篇帖子: Adhesive框架系列文章--Mongodb数据服务模块使用(上)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表