Up and Running with Azure Event Hubs

This past month I finally had a business case that called for Azure Event Hubs to be part of my overall solution to a real-world problem.  Up until this point I was always able to get by mostly with plain old Table Storage Queues and sometimes Service Bus Topics, but the business case I started working on this past month involved sending many megabytes of information from potentially thousands of devices around the clock.

Specifically, I am building a solution in C# that takes raw POS (Point of Sale) data from a register and streams it to the cloud in real-time.  A single register is capable of producing roughly 2Mb of data every 24 hours.  Each store can have up to 5 registers running.  And there are going to be at least 2000 stores using this solution.  So this platform I'm building needs to be able to handle 830Mb per hour at the very minimum.  In addition, the data coming off the register comes to me in packets which may or may not (mostly not) include a full transaction line from the receipt, so I need to be able to handle sending the data up to the cloud in chunks and reassembling the complete transactions there.

To emulate the register I built a quick console app that streams POS data using a historic data file from a real customer which contains thousands of POS transactions from a real register.  Here is my Program.cs:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
using Microsoft.Azure.EventHubs;
using System.Threading.Tasks;
using System;
using System.IO;
using System.Reflection;

namespace Tests.EventPumper
{
    class Program
    {
        private static EventHubClient eventHubClient;
        private const string EhConnectionString = "YOUR CONNECTION STRING HERE";
        private const string EhEntityPath = "incomingtest";
        private static readonly Guid MachineID = Guid.Parse("ddf7a4b8-c754-44e3-abb9-62cb376ad1ce");
        private static readonly Guid CustomerID = Guid.Parse("89465c71-a67e-4e6b-a417-deedb82c747e");

        public static void Main(string[] args)
        {
            MainAsync(args).GetAwaiter().GetResult();
        }

        private static async Task MainAsync(string[] args)
        {
            var connectionStringBuilder = new EventHubsConnectionStringBuilder(EhConnectionString)
            {
                EntityPath = EhEntityPath
            };

            eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());

            await SendMessagesToEventHub(1024 * 5000); //limit to 1MB for testing

            await eventHubClient.CloseAsync();

            Console.WriteLine("Press ENTER to exit.");
            Console.ReadLine();
        }

        private static async Task SendMessagesToEventHub(int numBytesToRead)
        {
            var assembly = typeof(Program).GetTypeInfo().Assembly;
            Stream resource = assembly.GetManifestResourceStream("Tests.EventPumper.Resources.PosSample.capture");

            byte[] buffer = new byte[2048]; // read in chunks of 2KB
            int bytesRead;
            var totalBytesRead = 0;
            var messagesSent = 0;
            while ((bytesRead = resource.Read(buffer, 0, buffer.Length)) > 0)
            {
                try
                {
                    totalBytesRead += bytesRead;

                    Console.WriteLine($"Sending message {++messagesSent}. Total bytes sent: {totalBytesRead}");

                    var ed = new EventData(buffer);
                    ed.Properties.Add("MachineID", MachineID);
                    ed.Properties.Add("CustomerID", CustomerID);
                    ed.Properties.Add("MachineName", Environment.MachineName);

                    await eventHubClient.SendAsync(ed, MachineID.ToString());
                }
                catch(Exception ex)
                {
                    Console.WriteLine($"{DateTime.Now} > Exception: {ex.Message}");
                    break;
                }

                await Task.Delay(10);

                if (totalBytesRead >= numBytesToRead)
                    break;
            }

            resource.Dispose();
        }
    }
}

What this program is doing, is just taking 2Kb at a time from a preexisting capture file (POS file) and sending it to the Event Hub with a slight delay.

One of the important pieces here are on lines 57-59.  What I'm doing is tagging the data with some property values so that I know where this data came from.  Otherwise, when I receive it in my cloud service, I'll have no way to identify which store or customer it came from.

Another important piece is on line 61.  I'm calling the SendAsync method using an overload that allows me to pass it a partitionKey.  If you don't understand how partitioning works in Event Hubs what you need to know is that Event Hubs by default will use a round-robin strategy to read data from the different partitions it has created to ingest data.  It creates many partitions to improve performance when it's needed.  Normally it's fine to just use the default round-robin strategy.  But in my case, remember, I might be receiving partial POS messages each time a chunk of data is sent.  I want to keep these chunks tied together as best as I can so I tell it to use the MachineID (think of this as a device id) as the partitionKey.  Now this does not necessarily mean that I control which partition the data will be read on (because the value gets hashed) but it does guarantee that all data from this device will go to the same partition every time.  In other words, I can't tell Event Hubs to send this data on Partition 0, or Partition 8, etc.  If you need that kind of control you will probably want to look at the PartitionSender class.

Now that our data is being sent up to the cloud, we need a way to ingest the data and do something interesting with it.  For now, it is good enough to grab the data and save it to Azure Blob Storage.  I am going to save the data in hourly chunks per MachineID.  So if I have 2000 Machines sending data, I will have 2000x24 files/blobs created every day in order to store the POS data.

I've created a Cloud Service and have it running locally on my machine.  Here are the important pieces of my worker role.

In WorkerRole.cs, I've created an event processor by modifying the RunAsync method (which is typically boiler-plated for you when you create a new worker role).  My RunAsync method now looks like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
        private async Task RunAsync(CancellationToken cancellationToken)
        {
            var eventProcessorHost = new EventProcessorHost(
               EhEntityPath,
               PartitionReceiver.DefaultConsumerGroupName,
               EhConnectionString,
               StorageConnectionString,
               StorageContainerName);

            // Registers the Event Processor Host and starts receiving messages
            var options = new EventProcessorOptions()
            {
                MaxBatchSize = 100
            };
            await eventProcessorHost.RegisterEventProcessorAsync<CloudEventProcessor>();

            while (!cancellationToken.IsCancellationRequested)
            {
                Trace.TraceInformation("Receiving event hub data...");
                await Task.Delay(1000);
            }

            // Disposes of the Event Processor Host
            await eventProcessorHost.UnregisterEventProcessorAsync();
        }

The important things to note here is that you will need to create a new Storage Account in Azure that the Event Hub will use internally for partitioning and ingesting the data.  There is nothing special about the setup, just create a Storage Account like you normally would and create a new container for the Event Hub to use.

On line 5 you can see that I'm using the DefaultConsumerGroupName.  If you needed your event data to go to multiple consumers, you could manually create new Consumer Group using the portal or powershell and pass in that name here.  For my purposes, the default consumer group is just fine, I am only going to be sending the data to this one cloud service.

The most important line of code here is on line 15 where I register an event processor.  You only need to do this one time for the lifetime of the worker role.  As a side note, the while loop is not a good choice here, it would be better to use a reset event or some other mechanism to wait for the program to end or be cancelled but it was convenient :)

All that's left now is to look at my CloudEventProcessor class to see how the magic really happens:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.EventHubs.Processor;
using System.Diagnostics;
using Digiop.CloudServices.Core.Interfaces;
using Digiop.CloudServices.Core;
using Roles.PosEventHub.Models;
using Microsoft.WindowsAzure.Storage.Blob;

namespace Roles.PosEventHub
{
    internal class CloudEventProcessor : IEventProcessor
    {
        ICloudFactory _cloudFactory;
        CloudBlobContainer _rootContainer;
        string _rootContainerName = "eventhubdata";
        char CR = '\xD';
        char LF = '\xA';

        public CloudEventProcessor()
        {
            _cloudFactory = new CloudFactory(DigiopStorageAccounts.DigiopPosData);
            _rootContainer = null;
        }

        public Task CloseAsync(PartitionContext context, CloseReason reason)
        {
            Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
            return Task.CompletedTask;
        }

        public Task OpenAsync(PartitionContext context)
        {
            Console.WriteLine($"SimpleEventProcessor initialized. Partition: '{context.PartitionId}'");
            return Task.CompletedTask;
        }

        public Task ProcessErrorAsync(PartitionContext context, Exception error)
        {
            Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}");
            return Task.CompletedTask;
        }

        private async Task<CloudBlobContainer> GetRootContainer()
        {
            if (_rootContainer == null)
                _rootContainer = await _cloudFactory.GetBlobContainerReferenceAsync(_rootContainerName);

            return _rootContainer;
        }

        public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
        {
            var eventList = new List<EventHubEvent>();
            foreach (var eventData in messages)
            {
                var hasMachineID = eventData.Properties.Keys.Contains("MachineID");
                var hasCustomerID = eventData.Properties.Keys.Contains("CustomerID");
                var hasMachineName = eventData.Properties.Keys.Contains("MachineName");
                if (!hasMachineID || !hasCustomerID || !hasMachineName)
                {
                    //todo: log exception
                    continue;
                }

                //Convert the raw bytes to a UTF8 string suitable for storage
                var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);

                //strip out control characters but keep CR and LF (newline)
                var stripped = new string(data.Where(c => !char.IsControl(c) || c == CR || c == LF).ToArray());

                var hubEvent = new EventHubEvent()
                {
                    CustomerID = (Guid)eventData.Properties["CustomerID"],
                    MachineID = (Guid)eventData.Properties["MachineID"],
                    MachineName = eventData.Properties["MachineName"].ToString(),
                    RawData = stripped
                };
                eventList.Add(hubEvent);
            }

            var root = await GetRootContainer();
            foreach (var machine in eventList.GroupBy(x => x.MachineID))
            {
                try
                {
                    var customerRoot     = root.GetDirectoryReference(machine.First().CustomerID.ToString());
                    var machineRoot      = customerRoot.GetDirectoryReference(machine.First().MachineID.ToString());
                    var fileNameThisHour = $"{DateTime.UtcNow:yyyy.MM.dd.HH}.txt";
                    var appendBlob       = machineRoot.GetAppendBlobReference(fileNameThisHour);

                    if (!await appendBlob.ExistsAsync())
                        await appendBlob.CreateOrReplaceAsync();

                    var rawText = new StringBuilder();
                    foreach (var e in machine)
                    {
                        rawText.Append(e.RawData);
                    }

                    await appendBlob.AppendTextAsync(rawText.ToString());
                }
                catch(Exception ex)
                {
                    Trace.TraceError(ex.Message);
                }
            }

            await context.CheckpointAsync();
        }
    }
}

Notice that I've implemented the IEventProcessor interface.  This is the only requirement in order to be able to register your class as an event processor.  It's very simple to implement.  As you can see, the exciting stuff happens in ProcessEventsAsync.

In this method, you will be passed an IEnumerable list of EventData messages from the Event Hub.  You can't completely control how many messages you will receive but as long as there is a steady supply of data and you haven't changed any of the defaults, you will receive 10 messages at a time.  However, you can see in my WorkerRoles.cs I told Event Hub it's ok to send me up to 100 messages at a time.  I'm doing this because I know that when messages come in they will likely be from the same machine/device and I want to optimize my write operations to Table Storage.  In other words, basically I'm writing the data in larger batches by doing it this way.

The interesting code here really starts on line 87.  After I've scrubbed the data a little to make sure it's a list of valid events, I group the data by MachineID (because the events could have potentially come from multiple machines/devices) and then I am creating an AppendBlob in my storage account based on the customer, the machine and finally the current hour.  I'm using an AppendBlob rather than a normal CloudBlockBlob here because the AppendBlob is optimized for appending (duh) data which is exactly what I'm doing in this event processor.  In fact, that's really all that this event processor does all day, append data to a blob.

The other important point here is on line 113 where I call context.CheckpointAsync.  Basically what this does is create a bookmark into the partition that I can come back to in case something happens to this processor (crashes) and I need to come back to process events where I left off.  The details of this are beyond the scope of this blog post but the important thing to know is that Even Hubs are fundamentally different from Queues in that you are reading from a giant partition of revolving data rather than individual messages in a queue.  You need to be able to pick up where you left off in order to not process the same data all over again.

That is really all there is to it!  Now I'm successfully ingesting many megabytes of raw POS data in real-time.  My next step in this solution is to create a separate cloud service that crawls the hourly data in regular intervals and does something interesting with it.  For example, it could send alerts to managers when something exceptional happens in the POS feed (fraud detection).  It will also be used to create beautiful sales dashboards and user auditing trails.  

Let me know if you need any help with Event Hubs and I would be glad to share my knowledge and experiences with you!