Virtual queues

Depending on your application you might not want the message producer to be in commplete control of the message that is added to a queue. And in some cases a single event should cause multiple messages to be added to different queues. But you don't want to force the message producer to have to think about that.

To solve this the system has a concept of virtual queues. A virtual queue is an IQueueMessage that doesn't have it's own backing storage, it just acts as a proxy in front of one or more concrete queues. You can then register one or more message transformers to place messages in concrete queues.

Hopefully this all makes more sense with a concrete example. Let's say we're in an e-commerce context and we receive an event that an order has been placed. When an order is placed we want to do at leas two things; send an order confirmation email and export the order to some other system such as the ERP.

The naive implementation of this would something like this:

[Queue("send_order_confirmation")]
public class SendOrderConfirmationQueueMessage : IQueueMessageWithId
{
    public string? Id { get; set; }
}

[Queue("export_order")]
public class ExportOrderQueueMessage : IQueueMessageWithId
{
    public string? Id { get; set; }
}

public class OrderController
{
    private readonly IEnqueuer<SendOrderConfirmationQueueMessage> _sendOrderConfirmationEnqueuer;
    private readonly IEnqueuer<ExportOrderQueueMessage> _exportOrderQueueMessageEnqueuer;

    public OrderController(IEnqueuer<SendOrderConfirmationQueueMessage> sendOrderConfirmationEnqueuer, IEnqueuer<ExportOrderQueueMessage> exportOrderQueueMessageEnqueuer)
    {
        _sendOrderConfirmationenqueuer = sendOrderConfirmationEnqueuer;
        _exportOrderQueueMessageEnqueuer = exportOrderQueueMessageEnqueuer;
    }

    [HttpPost]
    public async Task<IActionResult> OrderPlacedAsync(string orderId, string email)
    {
        await _sendOrderConfirmationenqueuer.EnqueueAsync(new SendOrderConfirmationQueueMessage
        {
            Id = orderId,
        });
        await _exportOrderQueueMessageEnqueuer.EnqueueAsync(new ExportOrderQueueMessage
        {
            Id = orderId,
        });
        return NoContent();
    }
}

There's a couple of problems with this approach. The first one is that the controller needs to know which queues are interested in getting notified when orders are placed. This means that if we add an additional queue for order processing we'd need to update the controller to enqueue that message as well.

The second problem is that the controller here is in charge of creating and setting properties to the message that is actually placed in the queue. In the above example it's really simple but constructing a message can be more complex than that.

In simple applications this isn't really an issue but in larger application it becomes a problem. So let's update the example to use virtual queues instead!

[VirtualQueue("order_placed")]
public class OrderPlacedQueueMessage : IQueueMessageWithId
{
    public string? Id { get; set;}
}

[Queue("send_order_confirmation")]
public class SendOrderConfirmationQueueMessage : IQueueMessageWithId
{
    public string? Id { get; set;}
}

public class SendOrderConfirmationQueueMessageSyncTransformer : IVirtualQueueMessageSyncTransformer<OrderPlacedQueueMessage, SendOrderConfirmationQueueMessage>
{
    public SendOrderConfirmationQueueMessage? Transform(OrderPlacedQueueMessage message)
    {
        return new SendOrderConfirmationQueueMessage { Id = message.Id };
    }
}

[Queue("export_order")]
public class ExportOrderQueueMessage : IQueueMessageWithId
{
    public string? Id { get; set;}
}

public class ExportOrderQueueMessageSyncTransformer : IVirtualQueueMessageSyncTransformer<OrderPlacedQueueMessage, ExportOrderQueueMessage>
{
    public ExportOrderQueueMessage? Transform(OrderPlacedQueueMessage message)
    {
        return new ExportOrderQueueMessage { Id = message.Id };
    }
}

public class OrderController
{
    private readonly IEnqueuer<OrderPlacedQueueMessage> _orderPlacedEnqueuer;

    public OrderController(IEnqueuer<OrderPlacedQueueMessage> orderPlacedEnqueuer)
    {
        _orderPlacedEnqueuer = orderPlacedEnqueuer;
    }

    [HttpPost]
    public async Task<IActionResult> OrderPlacedAsync(string orderId, string email)
    {
        await _orderPlacedEnqueuer.EnqueueAsync(new OrderPlacedQueueMessage
        {
            Id = orderId,
        });
        return NoContent();
    }
}

So there's a lot of stuff to unpack here. First we have a new attribute called [VirtualQueue]. This is what tells the system that this message should not get a concrete queue storage setup, it's only used for enqueuing messages to other queues.

The second thing is that we have two message transformers. These are just classes that implements either IVirtualQueueMessageSyncTransformer or it's async counterpart IVirtualQueueMessageTransformer. They're responsible for translating a virtual message into a concrete message. The message transformers are automatically registered with the service collection as long as they exists in assemblies which are scanned for IQueueMessage implementations.

In the controller we now take in an IEnqueuer<OrderPlacedQueueMessage> for our virtual queue. Regular queues gets more services registered such as IQueueReader and IQueueItemUpdater but the only thing you can do with a virtual queue is to add messages to it.

What happens behind the scenes when you call the enqueuer is that the message transformers we created are called and the enqueuers for SendOrderConfirmationQueueMessage and ExportOrderQueueMessage are called behind the scenes. As you might have noticed the transformer can return null in its Transform() method which is a way to say "this message isn't interesting for this queue". If all transformers return null then nothing is enqueued.

API

Virtual queues are extra interesting if you let other systems POST messages into your queues through the API since the virtual queues are also exposed there. This gives you a way to intercept the data sent from the other system and run logic before the message is stored.

Benefits

So when should you use virtual queues? If you have a not-so-large application with just a few developers it might not worth it. But if you have a larger application and maybe even multiple teams it'll definitely be worth it. In such a case you should probably create a virtual queue for each queue, and keep the concrete queue message as internal and the virtual queue message as public. That prevents anyone outside to directly manipulate your underlying queue. Something like this:

[VirtualQueue("public_queue")]
public class PublicQueueMessage : IQueueMessageWithId
{
    public string? Id { get; set; }
}

[Queue("internal_queue")]
internal class InternalQueueMessage : IQueueMessageWithId
{
    public string? Id { get; set; }
}

internal class InternalQueueMessageSyncTransformer : IVirtualQueueMessageSyncTransformer<PublicQueueMessage, InternalQueueMessage>
{
    public InternalQueueMessage? Transform(PublicQueueMessage message)
    {
        return new InternalQueueMessage { Id = message.Id };
    }
}

Now other .NET projects will only be able to use IEnqeuer<PublicQueueMessage> which gives you full control over your queue.

Another option is to start with a public IQueueMessage and at a later point convert it from a concrete queue to a virtual queue. Something like this in step 1:

[Queue("internal_queue")]
public class PublicQueueMessage : IQueueMessageWithId
{
    public string? Id { get; set; }
}

And then when you need it change to:

[VirtualQueue("public_queue")]
public class PublicQueueMessage : IQueueMessageWithId
{
    public string? Id { get; set; }
}

[Queue("internal_queue")]
internal class InternalQueueMessage : IQueueMessageWithId
{
    public string? Id { get; set; }
}

There's a couple of downsides with this that you should be aware of:

  1. Anyone using the API to enqueue messages needs to update their code to use public_queue instead of internal_queue.
  2. Since we moved the [Queue("internal_queue")] to a different type there might still be old JSON messages in that queue that are serialized PublicQueueMessages.