sszxf 发表于 2017-6-30 08:26:47

Azure Event Hub 技术研究系列3-Event Hub接收事件

  上篇博文中,我们通过编程的方式介绍了如何将事件消息发送到Azure Event Hub:
  Azure Event Hub 技术研究系列2-发送事件到Event Hub
  本篇文章中,我们继续:从Event Hub中接收事件。
  1. 新建控制台工程 EventHubReceiver
  2. 添加Nuget引用
  Microsoft.Azure.EventHubs
  Microsoft.Azure.EventHubs.Processor

  3. 实现IEventProcessor接口
  MyEventProcessor



1   using Microsoft.Azure.EventHubs;
2   using Microsoft.Azure.EventHubs.Processor;
3   using System.Threading.Tasks;
4
5   public class MyEventProcessor : IEventProcessor
6   {
7         public Task CloseAsync(PartitionContext context, CloseReason reason)
8         {
9             Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
10             return Task.CompletedTask;
11         }
12
13         public Task OpenAsync(PartitionContext context)
14         {
15             Console.WriteLine($"MyEventProcessor initialized. Partition: '{context.PartitionId}'");
16             return Task.CompletedTask;
17         }
18
19         public Task ProcessErrorAsync(PartitionContext context, Exception error)
20         {
21             Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}");
22             return Task.CompletedTask;
23         }
24
25         public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
26         {
27             foreach (var eventData in messages)
28             {
29               var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
30               Console.WriteLine($"Event message received. Partition: '{context.PartitionId}', Data: '{data}'");
31             }
32
33             return context.CheckpointAsync();
34         }
35   }
  4. Program程序
  添加常量作为事件中心连接字符串、事件中心名称、存储帐户容器名称、存储帐户名称和存储帐户密钥。 添加以下代码,并将占位符替换为其对应的值。



      private const string EhConnectionString = "{Event Hubs connection string}";
private const string EhEntityPath = "{Event Hub path/name}"; //MyEventHub
private const string StorageContainerName = "{Storage account container name}"; //eventhubcontainer
private const string StorageAccountName = "{Storage account name}"; //linux1
private const string StorageAccountKey = "{Storage account key}";
      private static readonly string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", StorageAccountName, StorageAccountKey);
  这里涉及到Azure Storage Account,必须为上篇博文中创建的事件中心MyEventHub指定一个存储账户和存储容器

  增加MainAysnc方法:注册事件处理器,处理事件消息



1         /// <summary>
2         /// 注册事件处理器
3         /// </summary>
4         /// <param name="args"></param>
5         /// <returns></returns>
6         private static async Task MainAsync(string[] args)
7         {
8             Console.WriteLine("Registering EventProcessor...");
9
10             var eventProcessorHost = new EventProcessorHost(
11               EhEntityPath,
12               PartitionReceiver.DefaultConsumerGroupName,
13               EhConnectionString,
14               StorageConnectionString,
15               StorageContainerName);
16
17             // Registers the Event Processor Host and starts receiving messages
18             await eventProcessorHost.RegisterEventProcessorAsync<MyEventProcessor>();
19
20             Console.WriteLine("Receiving. Press ENTER to stop worker.");
21             Console.ReadLine();
22
23             // Disposes of the Event Processor Host
24             await eventProcessorHost.UnregisterEventProcessorAsync();
25         }
  Main函数



1         static void Main(string[] args)
2         {
3             MainAsync(args).GetAwaiter().GetResult();
4         }
  Run

  至此,我们实现了事件消息发送到Event Hub,同时从Event Hub接收处理事件消息。
  周国庆
  2017/5/18
页: [1]
查看完整版本: Azure Event Hub 技术研究系列3-Event Hub接收事件