Monday, May 25, 2009

MassTransit Host & Setup

[Intro]

The glue that ties all of MassTransit's moving pieces has to be done  on starting you application. We need configure the service to know what to start up, how to find it and in what context to run it.

MassTransit has split the host service into a separate project, namely TopShelf. you will see TopShelf being used to set up our MassTransit programs in the entry points of our application, typically with the Program.Main(string[] args) method.

The basic set up steps for creating a runner configuration are:

  • Describe the Service
  • Instruct how the service will be run
  • Configure the the service

Once you have done this, the TopShelf Runner can host the service.

Describing the Service means give the service a name, a display name and a description. The display name and description are visible from the Service Control Manager while the service name is intend for console line interactions

Instructing how the service will be run: Define any known dependencies (MSMQ, IIS, SqlServer etc), any actions that should be performed prior to running the service/host and also how the service is to be run: i.e what credentials will the service run under. We can also use the UseWinFormHost<T> where we can supply the name of the WinForm that is the host. This is great for demos, but I am not sure if it is intended for production use... Chris and Dru may care to comment on this; either way its handy when getting to terms with the stack.

Next we need to configure the service(s) we are hosting. In here we can define some delegate for certain event in service life (WhenStarted, WhenStopped etc) and we can also weave some of our IoC voodoo majic by defining our service locator. Again the authors have decided to use Castle Windsor for the sample, however I believe you can use any of the CommonServiceLocator Containers. As this method need to returns something that implements IServiceLocator, using the DefaultMassTransitContainer type makes life a little easier as it does a fair bit of the plumbing for you including setting the current service locator to itself.

[STAThread]
private static void Main(string[] args)
{
//from Starbucks.Barista.Program.Main(string[] args) - Modified for readability
var cfg = RunnerConfigurator.New(configurator =>
{
//Describe the Service
configurator.SetServiceName("StarbucksBarista");
configurator.SetDisplayName("Starbucks Barista");
configurator.SetDescription("A Mass Transit sample service for making orders of coffee.");

//Instruct How the service will be run
configurator.DependencyOnMsmq();
configurator.RunAsFromInteractive();
configurator.BeforeStart(a => { });

//Configure the service(s)
configurator.ConfigureService<BaristaService>(serviceConfigurator =>
{
serviceConfigurator.CreateServiceLocator(() =>
{
//Use MassTransit's built in Container (Castle Windsor specific), described earlier
IWindsorContainer container = new DefaultMassTransitContainer("Starbucks.Barista.Castle.xml");

//Add the components to the container
container.AddComponent("sagaRepository", typeof(ISagaRepository<>), typeof(InMemorySagaRepository<>));
container.AddComponent<DrinkPreparationSaga>();
container.AddComponent<BaristaService>();

//Tracing - not super important in this context
Trace.Listeners.Add(new TextWriterTraceListener(Console.Out));
StateMachineInspector.Trace(new DrinkPreparationSaga(CombGuid.Generate()));

//Return the Current ServiceLocator, which has been assigned in the DefaultMassTransitContainer ctor
return ServiceLocator.Current;
});
//Define delegates (specifically service methods) to fire on given ServiceConfigurator events
serviceConfigurator.WhenStarted(baristaService => baristaService.Start());
serviceConfigurator.WhenStopped(baristaService => baristaService.Stop());
});
});
Runner.Host(cfg, args);
}

Sunday, May 24, 2009

MassTransit End Points

[Intro]

Many people will be familiar with the notion of an "End Point" especially those who use WCF or other web service frameworks. An end point is "the entry point to a service, a process, or a queue or topic destination". My WCF background has had the ABC drilled into me (Address, Binding and Contract) as the 3 things that basically define an end point. MT is pretty much the same. Also like WCF, the endpoints are a configuration aspects of the solution so it seems valid to put this information in a config file. The MT boys are clearly Castle fans (although other IoC frameworks can be used) and they have chosen in most of the samples to use Castle Windsor to configure the endpoints.

SIDE NOTE: For those unaware of Castle Windsor (an IoC implementation) it allows you to write loosely coupled code and specify the concrete implementation detail via config, a little bit like the example of the Asp.Net Membership Provider which is a plug in pattern. Using MT without understanding IoC may prove to be difficult... in fact I would say you are almost certainly biting off more than you can chew. Look in to the Castle stack, it really is great OSS framework to help pick up good habits.

Moving on...

The defining of the endpoints should not be confused with the Castle implementation. It is just as easy to do this in code. Anyway Lets walk through a typical castle config file for MT:

From the Starbucks Sample:

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<facilities>
<facility id="masstransit">
<bus id="customer"
endpoint="msmq://localhost/mt_client">
<subscriptionService endpoint="msmq://localhost/mt_subscriptions" />
<managementService heartbeatInterval="3" />
</bus>
<transports>
<transport>MassTransit.Transports.Msmq.MsmqEndpoint, MassTransit.Transports.Msmq</transport>
</transports>
</facility>
</facilities>
</configuration>


First and foremost this is a castle config. The name of the file "Starbucks.Customer.Castle.xml" is a pretty good hint and I know "facilities" is a castle concept. MassTransit have embraced the concept of facilities which you can investigate here. MassTransit have their own Facility, namely the MassTransit.WindsorIntegration.MassTransitFacility which helps us get up and running with out having to know about all the plumbing. In this MassTransit specific facility we define the Bus and the Transports. The transports child node is equivalent to our "Binding"; it is essential so we know what transport mechanism to use. You will see standard .Net notation for expressing a type in XML, i.e: "Fully.Qualified.Namspace.TypeName, Assembly.Name". This type must implement the interface MassTransit.IEndpoint. Currently there are adapters for MSMQ, NMS, Amazon SQS and WCF.



The other child node in the facility defines the Bus. Here we give the Bus a identifier and its end point. These are both mandatory. The end point will be the URI the bus will receive communication from, when the application publishes a message. The Id indicates that there can be multiple buses configured, which there can. The bus also can have several child nodes specifically:




  • controlBus


  • dispatcher


  • subscriptionService


  • managementService



The Control Bus is involved in managing the disparate system. For example the Starbucks example uses a control bus to manage the interaction amongst the server side consumers: the Cashier and the Barrister. For more info on a Control bus see page 540 in Enterprise Integration Patterns.



The Dispatcher is a means to control the use of threads. High volume message interaction can be handled using multithreading specifically with the attributes maxThreads and readThreads, both of which are self explanatory integer values.



The Subscription Service is the common service that provides an endpoint for subscriptions. The only value required is the end point attribute.



The Management Service allows for specifying a heartbeat monitor for checking the health of your services queue. The samples use the SubscriptionManagerGUI to show the queues  that are being listened to and the health of the subscriptions.



I do not believe any of these bus child nodes are mandatory, from looking into the code the only requirements are that the bus has must have an id & end point and the facility has a defined transport.



There are a couple of notes for new comers to Castle and MassTransit. Like most config files the XML file that is shown above should have its build action as "Content, Copy Always". The Queues that each service uses also need to be set up (e.g. in MSMQ) before they can be used. Luckily the exception handling in MassTransit is pretty good and will let you know that and endpoint is not set up if it is required, just be sure to read the queue name correctly. I spent a about 15 minutes trying to figure out why a sample subscription was failing when the exception was saying I had  not set up "mt_server1". I thought it was saying "mt_server". If in doubt read the exception! We will cover how the castle config is tied up in the Host And End points post.



End points  and their configuration may be a bit tricky for new comers, but if you break each piece down it becomes more manageable.

Thursday, May 21, 2009

OLE DB oracle drivers

I have had issues in the past with standard .Net OLE drivers for Oracle with reagrd to transactions, switching to the oracle drivers fixed the issue, however i have now found the reason why... the M$ one explicitly does not support nested transactions!

Microsoft's OLE DB Provider for Oracle:"...At this time, the provider does not support nested transactions, which is how it would expose save points."

first link here: http://tinyurl.com/qm6fpq

Note navigating directly to the expert exchnge site will not show the answer, google have forced them to show answers at the bottom of the page, hence to indirect link.
This post is much more for me to find this link again.

Basically if you are using .Net and Oracle, use the Oracle drivers

Tuesday, May 19, 2009

MassTransit Publishers

[Intro]

So we feel we have something that the world needs to know about, we have messages to publish. This is what kicks off the events that make up the Pub/Sub system. The IT division have told you they are sick of modifying the HR application to call a growing number of web service to let those services know about  new or updated employee information. You decide this may be a good candidate for some Pub/Sub love. We will start with New Employees, firstly we would need to create a suitable message to publish, say "NewEmployeeNotificationMessage". This has all the relevant info in the message. As part of the creation process all we need to do is create a message of the given type and publish it.

var message = CreateNewEmployeeMessage();
_serviceBus.Publish(message);


That is it. Well.... its not, but as far as the publishing code goes that all there is too it, there is a little bit of infrastructure set up that goes on at start up, but to publish a message is really that simple.



There are times where you may want to know of a response if a subscriber sends one, this can be done by setting a response address in a delegate as part of the publish eg:



_bus.Publish(message, x=> x.SetResponseAddress(_bus.Endpoint.Uri));


If a response is expected then the service publishing the message should also be a consumer of the response message type, see the consumers post



The bus is a MassTransit.IServiceBus that is injected in to the service. We will cover setting up the bus later on in the series.



*this may be a bit over the top example. If you a re building enterprise wide service and integrating system perhaps MT is a little too light weight, judge for yourself. Personally I am angling at using for intra component messaging.

MassTransit Consumers/Subscribers

[Intro]

A messaging system does make a lot of sense if no one or nothing is listening, consuming or subscribing to those sent messages. If you are interested in a particular event that a message represents then you subscribe to that event.

Continuing on with the idea of a new employee at a company, lets assume that head office have decided that all staff members must do a new online intranet based safety course and any new employees must do the safety course as part of the induction. We can create this online application, send out the notifications to all existing staff, but how do we ensure all new staff do the course? well we know that HR publish a New Employee Notification when an employee joins the company so we decide to subscribe to the message so our application notifies the new employee and his supervisor that this course must be completed as part of their induction.

Ok, so how do we do this in MassTransit?

Well one option is to create a consumer, a service that subscribes to the message and acts on it when it happens.

public class NewEmployeeService : Consumes<NewEmployeeNotificationMessage>.All
{
private IServiceBus _serviceBus;
private UnsubscribeAction _unsubscribeToken;
public void Consume(NewEmployeeNotificationMessage message)
{
//Notify user and supervisor of course requirement
}
public void Dispose()
{
_serviceBus.Dispose();
}
public void Start(IServiceBus bus)
{
_serviceBus = bus;
_unsubscribeToken = _serviceBus.Subscribe(this);
}
public void Stop()
{
_unsubscribeToken();
}
}


A couple of things to note here:



The NewEmployeeService implements the "Consumes<T>.All" interface. This means we are subscribing to any message published of type T, in this case NewEmployeeNotificationMessage. By doing so we must implement Consume(T message), this is the method that will be called when the message arrives.  Start and stop are methods we have defined that get call when the host starts up the hosting service (we will cover this is later posts). More importantly and something that may not be obvious is the unsubscribeToken. When subscribing to the bus the subscribe method returns an UnsubscribeAction delegate that can be called when the subscription is no longer required. Therefore calling this delegate on the stopping of the service would be a good idea :)



A service can subscribe to many  messages by specifying and implementing more of the consume interfaces, as it is not a base class you are not limited to a single inheritance. So you may want to define the class as :



public class NewEmployeeService : 
Consumes<NewEmployeeNotificationMessage>.All,
Consumes<EmployeeChangedLocationNotificationMessage>.All
{
//...etc


It is also worth while to note that the message can be responded to:



CurrentMessage.Respond(responseMessage);


This will send the message back to the response address specified by the client, see the Starbucks example: CashierSaga.ProcessNewOrder(..) and OrderDrinkForm. NB: The OrderDrinkForm also implements the consume interface for the response message, otherwise it will not know what to do with the message

MassTransit Messages

[Intro]

Messages are the backbone of MassTransit, without them there would not really be a need for the solution. Messages IMO should be a Verb. "Customer" is not a suitable message name as it has no intent, "NewCustomerCreated" is therefore a more suitable name. As far as MassTransit goes a message just needs to be a class that is marked as [Serializable]. For most scenarios is have encountered I actually want to track a specific message, i.e. I want to know its identity (which we will cover soon), so I have my message implement the interface "MassTransit.CorrelatedBy<T>" which gives the message a Correlation Id so I can track it. It is probably a good time to mention that messages are Immutable dumb DTO's. I have worked on several systems now that try to ignore this and every time it has ended in trouble. The message is a trigger, it should never be the entity you manipulating.

An Example from the MassTransit Pub/Sub Sample is below:

[Serializable]
public class RequestPasswordUpdate :
CorrelatedBy<Guid>
{
private readonly string _newPassword;
private readonly Guid _correlationId;
public RequestPasswordUpdate(string newPassword)
{
_correlationId = Guid.NewGuid();
_newPassword = newPassword;
}
public string NewPassword
{
get { return _newPassword; }
}
public Guid CorrelationId
{
get { return _correlationId; }
}
}


Using the correlation Id means that  later on when I want to listen for associated messages I can. This will be covered in [Consumers/Publishers]

Getting started with MassTransit

Ok so I continue to play with MassTransit and I really like it. Unfortunately I still think there is a small barrier to entry that is stopping people from using it. The guys who have written it have done a great job of building an easy to use stack, but as it grows it may feel a bit like you don't know where to start.What I aim to do here is break the whole thing down to easy to understand pieces (theory) and then get the pieces together (practice!)

MassTransit leans on the concept of Publish/Subscribe or Pub/Sub. The idea being I can raise an event by sending a message (publishing) and any number of consumers that are interested in that message can listen in and consume that message (subscribing). This means as new subscriber become known the publisher itself does not have to be aware of its existence, the Bus (MT) will deal with it, providing a nice sense of loose coupling.

An example could be a new person starts at your place of work. His new boss goes in to the HR system and creates a new employee request*. This goes off to HR where it is actioned and a new Employee notification* is made. A slew of process are now kickoff that HR has no idea about , nor do they care. These could include the new employees desk set up, Identification preparation, security clearance checks, various inductions bookings... who knows? If a department or application are interested in that message they just subscribe to it.

*These are the potential published message

Right, lets delve in to the specifics:

Messages

Consumers/Subscribers

Publishers

EndPoints

Host & Set up

Sagas

 

 

For background on MassTransit see my previous intro post here and some here

Thursday, May 14, 2009

PowerShell to set up MSMQ private queues for MassTransit

A pretty self explanatory title: I want to be able to create MSMQ private queues on the fly. I am currently playing with Mass Transit 0.6 and there are a couple of queues i needed to create and would like them to be done upfront. As far as i am aware there you have to go into the config of each project and get the names of each queue and manually create them. This script means i don't have to (I switch machines a lot):

CAVEAT: I am not a PS guru, in fact i am a complete n00b, combine that with my 101 knowledge of MSMQ and this is probably a disaster; you have been warned.

#Beginning of MassTransitSetup.ps1#
param(
[string] $nameofQ, [string]$username
)
[void][System.Reflection.Assembly]::LoadWithPartialName("System.Messaging")

function CreatePrivateMSMQQueue {
param ( [string]$queuename = $(throw "Please specify the queue name!"),
[string]$user)

if ([System.Messaging.MessageQueue]::Exists($queuename))
{
write-host "$queuename already exists"
}
else
{
$newQueue = [System.Messaging.MessageQueue]::Create($queuename)
if ([System.Messaging.MessageQueue]::Exists($queuename))
{
write-host "$queuename has been created"
$newQueue.Label = $queuename
#Default to everyone if no user is specified
if([string]::IsNullOrEmpty($user)){$user = "Everyone"}
write-host "Setting permissions for user : $user"
$newQueue.SetPermissions(
$user,
[System.Messaging.MessageQueueAccessRights] "ReceiveMessage, PeekMessage, GetQueueProperties, GetQueuePermissions")
}
else
{
write-host "$queuename could not be created!!!"
}
}
}

function CreateDefaultMassTransitQueues{
param ( [string]$user)

$deaultqueues = ".\private$\mt_client", ".\private$\mt_server", ".\private$\mt_server1", ".\private$\mt_subscriptions"
foreach($i in $deaultqueues)
{
CreatePrivateMSMQQueue $i $user
}
}

#Begining of script
if([string]::IsNullOrEmpty($nameofQ))
{
CreateDefaultMassTransitQueues $username
}
else
{
CreatePrivateMSMQQueue $nameofQ $username
}


For more info on MassTransit see the google code home page and be sure to check out the wiki on how to set up the starbucks sample. If you are running the starbucks sample set the queues in the scripts above to:

$deaultqueues = ".\private$\mt_client",
".\private$\mt_server",
".\private$\mt_server1",
".\private$\mt_subscriptions",
".\private$\mt_subscription_ui",
".\private$\mt_health",
".\private$\mt_timeout"