Extending NServiceBus

NServicecBus is a very popular enterprise service bus (ESB) for the .NET framework. In this article I will analyze the extensibility facility of NSB, what you can do with it and what you can’t.

Suppose for now we have the a very simple application that sends messages from one endpoint to another. We want to extend the NSB framework to do the following:

  • Outgoing message should be checked for confidential information – like credit card numbers – and censored if necessary.
  • Messages of some specific kind should be augmented with copy right notices.
  • Some messages should be prevented from being send. As an alternative, a simple notice message should be sent instead.
  • The outgoing bytes should be compressed and encrypted.
  • The incoming bytes should be decrypted and decompress before dispatching to the message handlers.

To achieve those requirements I started analyzing the UnicastBus class, which is responsible to taking the message processing them before sending them using the configured transport.

The first extensible point I found was the IMessageModule interface. Classes that implement this interfaces should implement three functions that represents the three different points in the lifecycle of a message handler. Those functions are executed before processing a message, afterwards and afterwards if a problem happened. The biggest short come of this approach  is that the functions take no parameters. You don’t know which messages are being processed, you don’t have access to the bus to send messages on your own. If you know the Alexandria application you will be very disappointed.  Cool things like caching responses and sending them when a matched request is sent is not possible. I’m not sure when to use IMessageHanlder interface. The only usage I’m aware of is managing NHibernate sessions.  The plus side of this interfaces is its easy to use. Just inject all implementations in your IoC Container of choice I you’re cool. NServiceBus will retrieve and call them for you. As I figured out, almost all extensibility points works in same way. Just implement an interface, inject the implementation in the container and you’re done.

The second interface that could be used to grant you access to  message processing pipeline is the interface pair IMutateIncomingMessages and IMutateOutgoingMessages.

The IMutateOutgoingMessages interface has only one method:

IMessage MutateOutgoing(IMessage message);

As you see you don’t just have write access to the message, you have to return the modified you. This can be used to return a wholly different message.

The following snippet shows how to watch sent messages and censor all messages containing the word “Visa”.

public IMessage MutateOutgoing(IMessage message)
{
    var msg = (message as EventMessage);
    if (msg == null)
        return message;

    if (msg.Text.ToLower().Contains("visa"))
    {
        return new CensoredMessage
                   {
                       Text = "Message has been censored!"
                   };
    }
    return msg;
}

Even though you couldn’t prevent sending the message, you can send a replacement message that do pretty nothing. If you have multiple mutators and you would like to execute them in defined order, just introduce them to the container in the same order. IoC containers usually return items in the same order they we configured in.

Accordingly  IMutateIncopmingMessage do the same to the received messages.

 

What we could not do with those interfaces is compressing the outgoing bytes to spare bandwidth.

To do this you have to use the IMapOutgoingTransportMessages interface. Having single method

void MapOutgoing(IMessage[] messages, TransportMessage transportMessage)

it grants you access to original logic messages as well to the serialized bytes.TansportMessage give also an access to the message headers. Those headers could be used to set meta data that can be useful when decompressing the messages.

Using this method we could implement the following compression service

 

void MapOutgoing(IMessage[] messages, TransportMessage transportMessage)
{

    if (!ShouldCompressMessage(transportMessage))
        return;

    transportMessage.Headers["zipped"] = "1";

    byte[] output;
    var inputData = transportMessage.Body;

    using (var inMemoryStream = new MemoryStream(inputData))
    using (var outMemoryStream = new MemoryStream())
    {
        using (var zipStream = new DeflateStream(outMemoryStream, CompressionMode.Compress,leaveOpen: true))
        {
            inMemoryStream.WriteTo(zipStream);
            zipStream.Flush();
        }
        output = outMemoryStream.ToArray();
    }
    transportMessage.Body = output;
}

private bool ShouldCompressMessage(TransportMessage transportMessage)
{
    return transportMessage.Body.Length > 300;
}

Now to the other side of communication. All we have to do to decompress the messages is implementing IMapIncomingTransportMessages, isn’t it? I was very surprised to discover that there is no such an interface. I can’t imagine why there is an IMapOutgoingTransportMessages  interface but no IMapIncomingTransportMessages. I guess it must be on the way. Nevertheless NSB is open source and you can add this interface by your self. Simply by adding the interface and calling its implementations using the container in the right place I could undo the compressing and continue process the message, as if there were no compression at all.

void MapIncoming(TransportMessage transportMessage)
{
    if (!transportMessage.Headers.ContainsKey("zipped"))
        return;

    byte[] output;
    using (var inMemoryStream = new MemoryStream(transportMessage.Body))
    using (var zipStream = new DeflateStream(inMemoryStream, CompressionMode.Decompress,leaveOpen: true))
    {
        using (var strea = new MemoryStream())
        {
            zipStream.CopyTo(strea);
            output = strea.ToArray();
        }
    }
    transportMessage.Body = output;
    transportMessage.Headers.Remove("zipped");
}

That’s all, folks.  Using those techniques you can hook your own processing pipeline to the default pipeline of NSB without (big) modifications to the NSB source code.

September 13, 2010 |
Tags : ESB extensibility NServiceBus Programming

About Me

Moukarram Kabbash This blog is kept alive by me, Moukarram Kabbash, a programmer, hobby photographer from Dortmund in Germany.

mouk.github.com