设为首页 收藏本站
查看: 1062|回复: 0

[经验分享] 第一次将内容添加到azure event hubs

[复制链接]

尚未签到

发表于 2017-6-30 13:01:59 | 显示全部楼层 |阅读模式
  由于每秒数据吞吐量巨大,需要将实时数据存到event hubs,再由event hubs定时定量保存到document DB。
  event hubs的介绍详见微软官页:https://azure.microsoft.com/zh-tw/services/event-hubs/



事件中樞

從網站、應用程式和裝置擷取雲端等級的遙測數據






  • 幾乎每秒即時地記錄數百萬個事件
  • 使用靈活的授權與節流來連接裝置
  • 以時間為基礎處理事件緩衝




  • 利用彈性化的規模來管理服務
  • 使用原生用戶端程式庫連接廣泛的平台
  • 其他雲端服務隨插即用的配接器





每秒串流處理數百萬個事件
  Azure 事件中樞是極具調整規模彈性的「發佈-訂閱」服務,其每秒可吸取上百萬項事件,並將這些事件串流至多項應用程式。如此可讓您處理及分析由所連接之裝置與應用程式所產生的大量資料。當事件中樞收集到資料之後,即可利用任何即時分析提供者或以批次/儲存裝置介面卡,來轉換及儲存資料。





處理具變動式負載數據來源的事件
  現今的互聯世界即是由巨量資料所構成。巨量資料源自許多變動式負載數據來源,像是每隔幾分鐘即產生遙測資料的互聯車輛與控溫器、每秒產生事件的應用程式效能計數器,以及擷取每位使用者個別動作遙測資料的行動應用程式。事件中樞是一款受管理的服務,其能吸取規模具彈性的事件,以容納這些變動式負載數據來源以及由間歇連接所引發的尖峰情況。





跨平台連接數百萬個裝置
  因為所連接的裝置種類成長快速,且涉及各種平台與通訊協定,所以讓吸取來自各種裝置的資料形成相當大的挑戰。而事件中樞不僅能處理各式規模的彙總串流,同時也可迎接這項連接不同資料來源的挑戰。事件中樞可輕鬆佈建容量,處理來自上百萬項裝置的事件,同時也能以裝置為單位維持事件順序。對進階訊息佇列通訊協定 (AMQP) 與 HTTP 所提供的支援,能讓許多平台皆可使用事件中樞。常見的平台也有原生用戶端程式庫可用。


  定义event hubs:



private EventHubClientMapping _eventHubClient = new EventHubClientMapping();
  保存:



  protected void SaveDomainUserToFile(DomainUser user, bool isReceive = false)
         {
             System.Runtime.Serialization.Json.DataContractJsonSerializer json =
                    new System.Runtime.Serialization.Json.DataContractJsonSerializer(typeof(DomainUser));

             json.WriteObject(stream, user);
             stream.Write(_cn, 0, _cn.Length);

             byte[] bs = stream.ToArray();
             String contentStr = System.Text.Encoding.UTF8.GetString(bs);
             //Event Hub
             try
             {
                 Notify(contentStr);
             }
             catch (Exception ex)
             {
                 throw ex;
             }

             StartSave();
         }
  调用接口方法:



  public void Notify(string cookie)
         {
             _eventHubClient.AddMessage(cookie);
         }
  webconfig中要加入这两句:


DSC0000.gif DSC0001.gif


</configuration>
<appSettings>
     <add key="MappingDataFlowEventHubConnection" value="Endpoint=sb://xxx.servicebus.chinacloudapi.cn/;SharedAccessKeyName=sender;SharedAccessKey=xxx;TransportType=Amqp"/>
     <add key="MappingDataFlowEventHubName" value="mapping-response-eventhub" />
   </appSettings>
</configuration>
View Code  azure cloud service上的配置:
  继承的接口azure core中建立interface:





  public interface IMappingNotify
     {
         void Notify(string cookie);
     }

     public class MappingNotify
     {
         public CookieSource CookieSource { get; set; }

         public string Company { get; set; }

         public string Domain { get; set; }

         public string IP { get; set; }

         public string Lang { get; set; }

         public string PUID { get; set; }

         public string Refer { get; set; }

         public string ScreenHeight { get; set; }

         public string ScreenWidth { get; set; }

         public string TimeStamp { get; set; }

         public string Title { get; set; }

         public string UA { get; set; }

         public string UID { get; set; }

         public string URL { get; set; }

         public string Version { get; set; }

         public static byte[] Serialize(string cookie)
         {
             var message = string.Format("{0}", cookie);

             return Encoding.UTF8.GetBytes(message);
         }

         public static IList<MappingNotify> Deserialize(byte[] message)
         {
             if (message == null || message.Length == 0)
                 throw new Exception("message is null.");

             var mns = new List<MappingNotify>();

             var str = Encoding.UTF8.GetString(message);
             string[] jsons = str.Trim('\n').Trim('\r').Split(new string[] { "\r\n" }, StringSplitOptions.RemoveEmptyEntries);

             foreach (string json in jsons)
             {
                 var jObject = JObject.Parse(json);

                 var mn = new MappingNotify();

                 if (jObject["company"] != null)
                     mn.Company = jObject["company"].ToString();
                 if (jObject["domain"] != null)
                     mn.Domain = jObject["domain"].ToString();
                 if (jObject["ip"] != null)
                     mn.IP = jObject["ip"].ToString();
                 if (jObject["lang"] != null)
                     mn.Lang = jObject["lang"].ToString();
                 if (jObject["puid"] != null)
                     mn.PUID = jObject["puid"].ToString();
                 if (jObject["refer"] != null)
                     mn.Refer = jObject["refer"].ToString();
                 if (jObject["screenHeight"] != null)
                     mn.ScreenHeight = jObject["screenHeight"].ToString();
                 if (jObject["screenWidth"] != null)
                     mn.ScreenWidth = jObject["screenWidth"].ToString();
                 if (jObject["timestamp"] != null)
                     mn.TimeStamp = jObject["timestamp"].ToString();
                 if (jObject["title"] != null)
                     mn.Title = jObject["title"].ToString();
                 if (jObject["ua"] != null)
                     mn.UA = jObject["ua"].ToString();
                 if (jObject["uid"] != null)
                     mn.UID = jObject["uid"].ToString();
                 if (jObject["url"] != null)
                     mn.URL = jObject["url"].ToString();
                 if (jObject["version"] != null)
                     mn.Version = jObject["version"].ToString();

                 mn.CookieSource = CookieSource.mapping;

                 mns.Add(mn);
             }

             return mns;
         }
     }
View Code  在azure cloud service上的configuration文件加上这两句:





<ConfigurationSettings>      
       <Setting name="MappingDataFlowEventHubConnection" value="Endpoint=sb://xxx.servicebus.chinacloudapi.cn/;SharedAccessKeyName=sender;SharedAccessKey=xxx;TransportType=Amqp"/>
       <Setting name="MappingDataFlowEventHubName" value="mapping-response-eventhub" />
     </ConfigurationSettings>
View Code  在azure business中加入一个cs文件:





using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.ComponentModel.Composition;

using xxx.Core.Interface;
using xxx.Core.Entity;
using xxx.Core;
using xxx.Core.Client;

namespace xxx.Business
{
     [Export(typeof(IMappingNotify))]
     public class EventHubMappingNotify : IMappingNotify
     {
         private EventHubClientMapping _eventHubClient = new EventHubClientMapping();

         public void Notify(string cookie)
         {
             _eventHubClient.AddMessage(cookie);
         }
     }

}
View Code  在azure core中config.cs文件中加入config信息:





//EventHub链接
         public static string MappingDataFlowEventHubConnection
         {
             get
             {
                 var config = CloudConfigurationManager.GetSetting("MappingDataFlowEventHubConnection");
                 if (string.IsNullOrEmpty(config))
                     throw new Exception("MappingDataFlowEventHubConnection hasn't configured!");

                 return config;
             }            
         }
EventHub链接




//EventHub存储配置
         public static string MappingDataFlowEventHubName
         {
             get
             {
                 var config = CloudConfigurationManager.GetSetting("MappingDataFlowEventHubName");
                 if (string.IsNullOrEmpty(config))
                 {
                     return "mapping-response-eventhub";
                 }

                 return config;
             }            
         }
EventHub存储配置




//EventHub消费者组
         public static string MappingDataFlowEventHubConsumer
         {
             get
             {
                 var config = CloudConfigurationManager.GetSetting("MappingDataFlowEventHubConsumer");
                 if (string.IsNullOrEmpty(config))
                     throw new Exception("MappingDataFlowEventHubConsumer hasn't configured!");

                 return config;
             }
         }
EventHub消费者组  在azure core中加入client.cs文件:





using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

using Microsoft.Azure;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;

using xxx.Core.Entity;
using xxx.Core;
using xxx.Core.Interface;

namespace xxx.Core.Client
{
     public class EventHubClientMapping
     {
         private Microsoft.ServiceBus.Messaging.EventHubClient _eventHubClient;

         public EventHubClientMapping()
         {
             var factory = MessagingFactory.CreateFromConnectionString(DANDaasConfig.MappingDataFlowEventHubConnection);
             _eventHubClient = factory.CreateEventHubClient(DANDaasConfig.MappingDataFlowEventHubName);

             _eventHubClient.RetryPolicy = RetryPolicy.Default;
         }

         public void AddMessage(string cookie)
         {
             var message = new EventData(MappingNotify.Serialize(cookie));

             _eventHubClient.Send(message);
         }
     }

}
View Code  额。。差不多了。。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-389655-1-1.html 上篇帖子: 《转》最全的Windows Azure学习教程汇总 下篇帖子: 使用powershell链接到Azure
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表