|
上篇博文中,我们通过编程的方式介绍了如何将事件消息发送到Azure Event Hub:
Azure Event Hub 技术研究系列2-发送事件到Event Hub
本篇文章中,我们继续:从Event Hub中接收事件。
1. 新建控制台工程 EventHubReceiver
2. 添加Nuget引用
Microsoft.Azure.EventHubs
Microsoft.Azure.EventHubs.Processor
3. 实现IEventProcessor接口
MyEventProcessor
1 using Microsoft.Azure.EventHubs;
2 using Microsoft.Azure.EventHubs.Processor;
3 using System.Threading.Tasks;
4
5 public class MyEventProcessor : IEventProcessor
6 {
7 public Task CloseAsync(PartitionContext context, CloseReason reason)
8 {
9 Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
10 return Task.CompletedTask;
11 }
12
13 public Task OpenAsync(PartitionContext context)
14 {
15 Console.WriteLine($"MyEventProcessor initialized. Partition: '{context.PartitionId}'");
16 return Task.CompletedTask;
17 }
18
19 public Task ProcessErrorAsync(PartitionContext context, Exception error)
20 {
21 Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}");
22 return Task.CompletedTask;
23 }
24
25 public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
26 {
27 foreach (var eventData in messages)
28 {
29 var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
30 Console.WriteLine($"Event message received. Partition: '{context.PartitionId}', Data: '{data}'");
31 }
32
33 return context.CheckpointAsync();
34 }
35 }
4. Program程序
添加常量作为事件中心连接字符串、事件中心名称、存储帐户容器名称、存储帐户名称和存储帐户密钥。 添加以下代码,并将占位符替换为其对应的值。
private const string EhConnectionString = "{Event Hubs connection string}";
private const string EhEntityPath = "{Event Hub path/name}"; //MyEventHub
private const string StorageContainerName = "{Storage account container name}"; //eventhubcontainer
private const string StorageAccountName = "{Storage account name}"; //linux1
private const string StorageAccountKey = "{Storage account key}";
private static readonly string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", StorageAccountName, StorageAccountKey);
这里涉及到Azure Storage Account,必须为上篇博文中创建的事件中心MyEventHub指定一个存储账户和存储容器
增加MainAysnc方法:注册事件处理器,处理事件消息
1 /// <summary>
2 /// 注册事件处理器
3 /// </summary>
4 /// <param name="args"></param>
5 /// <returns></returns>
6 private static async Task MainAsync(string[] args)
7 {
8 Console.WriteLine("Registering EventProcessor...");
9
10 var eventProcessorHost = new EventProcessorHost(
11 EhEntityPath,
12 PartitionReceiver.DefaultConsumerGroupName,
13 EhConnectionString,
14 StorageConnectionString,
15 StorageContainerName);
16
17 // Registers the Event Processor Host and starts receiving messages
18 await eventProcessorHost.RegisterEventProcessorAsync<MyEventProcessor>();
19
20 Console.WriteLine("Receiving. Press ENTER to stop worker.");
21 Console.ReadLine();
22
23 // Disposes of the Event Processor Host
24 await eventProcessorHost.UnregisterEventProcessorAsync();
25 }
Main函数
1 static void Main(string[] args)
2 {
3 MainAsync(args).GetAwaiter().GetResult();
4 }
Run
至此,我们实现了事件消息发送到Event Hub,同时从Event Hub接收处理事件消息。
周国庆
2017/5/18 |
|