Using MassTransit to improve EPiServer data import on Azure
Introduction
More than a month ago I wrote an article Azure infrastructure usage for EPiServer data import in which I described how to use Azure Service Bus Queues to create import of EPiServer data.
While Azure Service Bus Queues did it's work, it has several issues. Queues are only transport layer - those are used to pass messages around. It means that you have to handle errors, retry policy and transaction handling on your own.
Luckly there are several frameworks available to help with these issues. Most popular ones are MassTransit and NServiceBus, but there are also other frameworks like Rebus. In this article I am going to describe how to use MassTransit for my task.
In my article I used tutorial from David Prothero - MassTransit on Microsoft Azure.
Solution
Setup
I am using the project from previous article. So you have to read it before to understand solution completely.
This time I decided to share common initialization and configuration between projects and created new project for shared configuration initialization - Configuration.
Then install NuGet package for MassTransit with Azure Service Bus into all projects which uses Azure Service Bus - Configuration, ImportArticleProcessor, ImportDataProcessor, NewsSite.
Install-Package MassTransit.AzureServiceBus
After installing NuGet package add common configuration to Configuration project. I called class AzureBusConfiguration and added names of namespace and all queues there. Probably in production system you would want to make it configurable (at least namespace name).
public static class AzureBusConfiguration
{
public const string Namespace = "epinewssite";
public const string ImportDataQueueName = "importqueue";
public const string ImportArticleQueueName = "importarticlequeue";
}
Next create MassTransit's Bus initialization class AzureBusInitializer with factory method which creates IServiceBus instance. Here I am just wrapping my Bus initialization logic for whole application. Each Bus instance is created with watching for messages on some queue, with additional initialization if needed and creating connection to Azure Service Bus Queue using connection string.
public class AzureBusInitializer
{
public static IServiceBus CreateBus(
string queueName,
Action<ServiceBusConfigurator> moreInitialization,
string connectionString)
{
var bus = ServiceBusFactory.New(sbc =>
{
sbc.UseLibLog();
var queueUri = "azure-sb://" + AzureBusConfiguration.Namespace + "/" + queueName;
sbc.ReceiveFrom(queueUri);
sbc.UseAzureServiceBus(a => a.ConfigureNamespace(
AzureBusConfiguration.Namespace, h =>
{
h.SetKeyName("RootManageSharedAccessKey");
h.SetKey(CnBuilder(connectionString).SharedAccessKey);
}));
sbc.UseAzureServiceBusRouting();
moreInitialization(sbc);
});
return bus;
}
private static ServiceBusConnectionStringBuilder CnBuilder(string connectionString)
{
return new ServiceBusConnectionStringBuilder(connectionString);
}
}
Scheduled Job for import initialization
Previous Scheduled Job can be found here. I changed Execute method to use newly created Bus initializer. Bus is created by providing queue name to listen for messages on, additional initialization and connection string. This Scheduled Job does not listen to any messages, so it doesn't metter what queue name to provide. Also it do not require aditional initialization, but connection string is retrieved from Web.config. We can publish message without wrapping into another class like with Azure Queues (which requires to wrap message within BrokeredMessage).
public override string Execute()
{
var cn = ConfigurationManager
.ConnectionStrings["EPiServerAzureEvents"]
.ConnectionString;
var container = CreateStorageContainer();
using (var bus = AzureBusInitializer.CreateBus(
AzureBusConfiguration.ImportDataQueueName, x => { }, cn))
{
foreach (var item in container.ListBlobs()
.OfType<CloudBlockBlob>())
{
var importFile = new ImportFile
{
Name = item.Name, Uri = item.Uri
};
bus.Publish(importFile, x => {x.SetDeliveryMode(DeliveryMode.Persistent);});
}
}
return "Success";
}
Now if you run your Scheduled Job it will run successfully, but you will not see any message on Azure Queues because MassTransit requires at least one subscriber to particular message.
Import data processor
First create message consumer in ImportDataProcessor project. It will watch for messages of ImportFile. Consumer class should inherit from Consumes<T>.Context and implement Consume method. Consume method receives message as parameter and received data is hold in Message property. Received message also has Bus property which is reference to Bus instance the message was sent on. As I am publishing another message here, I am reusing it. I am not sure if that is good solution. In production system I would inject Bus instance in Consumer's constructor.
public class ImportFileConsumer : Consumes<ImportFile>.Context
{
public void Consume(IConsumeContext<ImportFile> message)
{
var importFile = message.Message;
var container = CreateStorageContainer();
var blob = container.GetBlockBlobReference(importFile.Name);
var articles = ReadArticles(blob).ToList();
articles.ForEach(article =>
{
message.Bus.Publish(article);
});
}
// other code omitted
}
Next configure Bus to run on Worker process. Worker process do not need Run method anymore. Now just create Bus and provide additional initialization logic which adds ImportFileConsumer to listen for messages.
public class WorkerRole : RoleEntryPoint
{
readonly ManualResetEvent CompletedEvent = new ManualResetEvent(false);
private IServiceBus _bus;
public override bool OnStart()
{
ServicePointManager.DefaultConnectionLimit = 12;
var cn = CloudConfigurationManager
.GetSetting("Microsoft.ServiceBus.ConnectionString");
_bus = AzureBusInitializer.CreateBus(
AzureBusConfiguration.ImportDataQueueName, sbc =>
{
sbc.SetConcurrentConsumerLimit(64);
sbc.Subscribe(subs =>
{
subs.Consumer<ImportFileConsumer>().Permanent();
});
}, cn);
return base.OnStart();
}
public override void OnStop()
{
if (_bus != null)
_bus.Dispose();
CompletedEvent.Set();
base.OnStop();
}
}
Import article processor
ImportArticleProcessor is similar to ImportDataProcessor. It has Article message consumer defined and initializes Bus same way, but listens on another queue.
public class ImportArticleConsumer : Consumes<Article>.Context
{
public void Consume(IConsumeContext<Article> message)
{
var article = message.Message;
using (var client = CreateClient())
{
var str = JsonConvert.SerializeObject(article);
var content = new StringContent(str, Encoding.UTF8, "text/json");
var result = client.PostAsync("api/article", content).Result;
result.EnsureSuccessStatusCode();
}
}
// omitted code
}
public class WorkerRole : RoleEntryPoint
{
readonly ManualResetEvent CompletedEvent = new ManualResetEvent(false);
private IServiceBus _bus;
public override bool OnStart()
{
ServicePointManager.DefaultConnectionLimit = 12;
var cn = CloudConfigurationManager
.GetSetting("Microsoft.ServiceBus.ConnectionString");
_bus = AzureBusInitializer.CreateBus(
AzureBusConfiguration.ImportArticleQueueName, sbc =>
{
sbc.SetConcurrentConsumerLimit(64);
sbc.Subscribe(subs =>
{
subs.Consumer<ImportArticleConsumer>().Permanent();
});
}, cn);
return base.OnStart();
}
public override void OnStop()
{
if (_bus != null)
_bus.Dispose();
CompletedEvent.Set();
base.OnStop();
}
}
Now application is ready to run. After you publish it on Azure and run you will see that 2 new queues are created and also MassTransit will create 2 new topics with one subscriber for each. When you run import job queues will receive messages and articles gets imported. If your job or site hangs during the run, MassTransit will handle it and next time when consumer gets back to work, it will consume missed messages. These scenarios can be easily tested locally with Azure Compute emulators.
Source code can be found on GitHub.
Summary
While working with Azure Service Bus Queues is not hard, complicated scenarios might not work or might require additional work to be done. MassTransit helps to deal with that and starting with it is not harder than working directly with Azure Queues.