Queues

Unlike popular message buses and event queues this system uses an SQL database to store messages. Message bus systems are typically built to be able to handle tons of messages and have the assumption that you're not really interested in looking at a single message and don't offer great support for finding and manipulating messages.

This system on the other hand assumes that you won't have millions of messages per second and that you are interested in looking at individual messages. And for that an SQL database is a perfect fit. At the time of writing we support Postgres, SQL Server and SQLite. Feel free to reach out to us if you need support for another database engine like MySQL or MariaDB.

Each registered queue is its own database table which is automatically created by then system when you've registered your queue. And a message is a row in that database table which contains the message in JSON format together with meta data such as the status and when it was created.

Defining and registering a queue

The first thing to do is to create a C# class representing the message type of the queue. It's just a .NET class implementing the interface IQueueMessage. Here's an example:

[Queue("send_order_confirmation")]
public class SendOrderConfirmationQueueMessage : IQueueMessage
{
    public string OrderId { get; set; }
    public string Email { get; set; }
}

You can add any properties you want on the class, the only important thing to remember is that the message will get serialized to and from JSON.

There's also the IQueueMessageWithId interface which you can read about in the section about message identity.

[Queue] attribute

The queue attribute isn't strictly required but it's recommended as it sets meta data for the queue such as the queue name. That name is used for naming the database table and to access the queue in the API and admin UI. If the queue attribute isn't set the name is taken from the class name. If you don't use the [Queue] attribute it's easy to accidentally rename the class which will create a completely new and empty queue. The [Queue] attribute also lets you set other types of meta data such as which database engine to use and for how long we should keep processed messages.

Registering the queue

Just like scheduled jobs the queues are automatically registered by finding all types that implements IQueueMessage (IQueueMessageWithId inherits from IQueueMessage). And just like for jobs this is only done automatically for messages defined in the entry assembly. If you have multiple .NET projects that contains queue message classes you need to register the assemblies with the service.

There's a property on the options object passed into AddNexus().AddQueues() called AssembliesToScanForQueueMessages where you can add additional assemblies to scan for queue messages like this:

builder.Services.AddNexus().AddQueues(options =>
{
    options.AssembliesToScanForQueueMessages.Add(typeof(MyQueueMessageInAnotherProject).Assembly);
});

Adding a message to a queue

There's two ways to add queue messages. The first is through the C# SDK using the IEnqueuer<TMessage> service. That service is injectable through the IServiceProvider so you can take it as a constructor dependency. The generic argument TMessage is the message type that you want to add a message for. An example:

public class OrderController
{
    private readonly IEnqueuer<SendOrderConfirmationQueueMessage> _enqueuer;

    public OrderController(IEnqueuer<SendOrderConfirmationQueueMessage> enqueuer)
    {
        _enqueuer = enqueuer;
    }

    [HttpPost]
    public async Task<IActionResult> OrderPlacedAsync(string orderId, string email)
    {
        await _enqueuer.EnqueueAsync(new SendOrderConfirmationQueueMessage
        {
            OrderId = orderId,
            Email = email,
        });
        return NoContent();
    }
}

The message is now placed in the queue with the status Pending which means that it'll get processed by the queue processing job as soon as it starts. There might be a delay between the time the message is enqueued and when the queue processing job is scheduled to run. If no delay is acceptable you can use the EnqueueAndStartProcessingAsync() method rather than the EnqueueAsync() method on IEnqueuer<TMessage>. That will request the queue processing job to start as quickly as possible regardless of its schedule unless the job has been manually disabled.

There's also the non-generic version of IEnqueuer which lets you enqueue any object of type IQueueMessage. The non-generic IEnqueuer will find the correct enqueuer for the messages you pass in and send it to the correct queue.

Adding a message through the API

The other way of adding a message to a queue is by using the API. The API is just an way to access the same IEnqueuer<TMessage> over HTTP at the endpoint POST /queues/{queueName}.

Adding multiple messages at once

Both the C# SDK and the API allows you to enqueue multiple messages at the same time and will use batch operations (except SQLite which doesn't have batch imports) to make enqueuing a lot messages as fast as possible.

Message status

A message has a status and the default status is Pending. It means that the message is up-for-grabs for a queue processing job.

When a queue processing job starts it will see if there's any Pending messages and then try to lease them. This is done by using the IQueueReader<TMessage>.LeaseAsync() method which will update the status of any Pending messages to Leased together with setting the date for how long they've been leased for. Leasing a message ensures that multiple queue processors can never lease the same message at the same time.

If the lease expires - that is, if the processing job hasn't finished in the time it set for the lease - the message status till be set to Abandoned. When this happens it most likely means that the job that leased the message crashed and won't recover. When a message is set as Abandoned you need to manually look into what happened and decide if you want to retry the message by changing its status back to Pending or if you want to delete the message and handle the work in some other way.

When a queue processing job is done with the message the status will either become Error or Processed. And if the queue is not configured to keep processed messages the message will be deleted rather than set to Processed.

The system also allows you to set custom statuses on messages through the UI and the API which allows you to categorize messages for troubleshooting as well as prevent them from being processed by the system. Messages with custom statuses are invisible to the processing system and are only visible in the UI and API.

Updating status by property in message

Sometimes you might want to update the status of messages where a certain property inside messages contains a specific value. An example would be that you have a queue of inventory messages and you don't want to process it until the product data has come in. So your queue processing job for the inventory messages keeps rescheduling such messages. When the product data then comes in you can update the inventory message(s) like this:

public async Task OnProductCreated(string sku)
{
    await _inventoryQueueItemUpdater.UpdateStatusWhereAsync(QueueItemStatus.Pending, m => m.Product.Sku == sku);
}

The lambda expression m => m.Product.Sku == sku is compiled into a SQL statement to use in a WHERE clause. In the case of Postgres it becomes WHERE message->'Product'->>'Sku'::text = '123'.

The lambda passed in must be a comparison between a property on the message and a primitive, string or null value. You can use ==, !=, <, >, <=, or >= as comparison operator. Or a lamda which calls Contains on a list such as m => m.Skus.Contains('sku'). Note that for Postgres only lists of strings are supported as the ? operator in Postgres only allows the text data type.

If you pass in an invalid lambda you'll get a runtime error.

Performance impact

When updating messages based on properties inside the JSON message it can have less than optimal performance as it needs to process all rows in the table to find the matching rows. If you don't use idempotent messages and don't store processed messages that's not going to be a problem because the queue table will be quite small. If you're using Postgres you can create a jsonb index on the message column to improve performance but you'll need to add that index yourself as Nexus won't be aware of it. If you're using SQL Server you can create indexes on a computed column unless you plan on using Contains on a list as SQL Server can't create an index for that.

Default enqueue status

As mentioned above the default enqueue status is Pending which makes the message immediately available for processing jobs.

In some cases you might want to change this to a different status so that new messages that comes in are kept in a different status and becomes invisible to processing jobs. You can do this in the Admin UI for the queue or through the API and you can use any text value you want such as Paused or OnHold.

This can be useful if a queue processing job is experiencing problems and you only want to run it on selected messages. Then you can change the default enqueue status and manually change only the messages you want to process to Pending.

After the issues have been resolved you can filter out all the messages with status Paused or OnHold and update to Pending again.

Previous message version

For messages that has identity the previous version of a message is stored when enqueueing a new version. This is useful for doing change based processing since it allows you to compare the current message with the previous message and add additional processing logic when certain properties of the message changes.

In order to access the previous message from inside a queue processing job you use the IPreviousMessageProvider service. Note that it's only usable inside queue processing jobs and will always return null if you try to use it outside of a job. If you need access to previous messages outside of a job you can read QueueItems using IQueueReader. QueueItem contains everything about the queue item including the current and previous message.

Previous version

The previous version is only updated when calling IQueueItemUpdater.ProcessedAsync() which is done for you when using the built in queue processing jobs. It's then updated to be the message passed to that method. This ensures that the previous version of a message will only ever be a version that was successfully processed. If the message processing fails the previous version stays untouched to let you retry the message.

Idempotent messages

If messages are fully self-contained, meaning that a processing job will only use data on the message itself to perform it's processing and not look at any external data your messages are idempotent. Processing an idempotent message will always result in the same effect.

For the queues you have which has this characteristic you can tell the [Queue] attribute that it's idempotent like this:

[Queue("product_updates", IdempotentMessages = true)]
public class ProductUpdatedQueueMessage : IQueueMessageWithId
{
    ...
}

When IdempotentMessages is set to true it means that if a message is enqueued Nexus will check if we already have that message with the same message id and JSON and then discard the message.

This is useful for big data syncs such as a product catalog. You can have a job that reads the entire product catalog from another system and enqueues a message per product. If nothing has changed on the product since the last time it was enqueued the message won't get processed again.

The comparison is done at a JSON string level so a message will only be discarded if the JSON is exactly the same. Which means that the order of properties in the serialized message is important. This is only something you need to consider if your message contains dynamic collections such as dictionaries where the order can change.

Idempotency and processed messages

When you set IdempotentMessages to true the KeepProcessedMessagesFor property is automatically set to never delete processed messages unless you explicitly specify a timespan to keep processed messages for in the queue.

Partial messages

Sometimes you might have multiple data sources that are each responsible for their own section of a message. Such as in an e-commerce context where the ERP might be responsible for the price and the PIM is responsible for the product data, but you want to process the data from these systems in a single message.

In such a case you can use partial messages in Nexus, which is message classes where the top level properties are set by different enqueuers. Take this message for example:

public class ProductUpdatedMessage : IQueueMessageWithId
{
    public string? Id { get; set; }
    public ProductInformation? ProductInformation { get; set; }
    public ProductPrice? Price { get; set; }
}

When listening to events from the PIM you do this:

public ProductInformationController(IEnqueuer<ProductUpdatedMessage> enqueuer) : Controller
{
    [HttpPost("{productNumber}")]
    public async Task<IActionResult> ProductUpdatedAsync(string productNumber, [FromBody] ProductInformation productInformation)
    {
        await enqueuer.EnqueueAsync(new ProductUpdatedMessage { Id = productNumber, ProductInformation = productInformation }, new EnqueueContext { PartialMessage = true });
        return Ok();
    }
}

And when listening for ERP price updates you do:

public PriceController(IEnqueuer<ProductUpdatedMessage> enqueuer) : Controller
{
    [HttpPost("{productNumber}/price")]
    public async Task<IActionResult> PriceUpdatedAsync(string productNumber, [FromBody] ProductPrice price)
    {
        await enqueuer.EnqueueAsync(new ProductUpdatedMessage { Id = productNumber, ProductPrice = price }, new EnqueueContext { PartialMessage = true });
        return Ok();
    }
}

What will happen in the background here is that Nexus will merge the incoming message with an existing message with the same id. Meaning that when the PIM data comes in it won't remove the price from the message and vice versa.

If there's no existing message with the same id a new message with be enqueued with only that data. If the processing job needs both in order to correctly process the product it can just return a custom status or just return that it processed the message. The message will get the Pending status again as soon as any of the systems involved have sent their data.

Scheduling messages for future processing

There's an additional status called Sleeping that messages that should be processed in the future will have. When you enqueue a message you can specify a DateTime or a TimeSpan (relative to right now) for when a message should be processed.

This is useful for scenarios where you know that something will happen at a certain time in the future and you want to run some processing at that time. A typical e-commerce scenario is that a product can have prices with a start and stop date. You want to export your prices to another system but that system might not support date based validity for prices so you need to export the current price at any given time.

To deal with this you can enqueue messages that should get processed at the time that one price stops being active and another starts. And your queue processing job can send the new price to the external system exactly when it starts being valid.

Processing the same message again in the future

Sometimes when you're using idempotent messages to update external systems you might have dates in your message that affects the outcome of the processing. Such as a product message which contains a list of prices that have a valid from and to date. At the time of processing you pick the price that is valid right now, but you want to run the processing again when the next price becomes valid.

In your queue processing job you can return an optional DateTime for when to process the message again. See more details in the section about queue jobs.

Example queue messages

The admin UI needs to create an example message to prefill the text box where you can manually enqueue a new message. By default Nexus will try to create an instance of your queue message class with default values in properties. If you want to customize that message, you can add a static method on your message class called GetExampleMessage like this:

private class MessageWithExampleMethod : IQueueMessage
{
    public string? SomeProperty { get; set; }

    public static IQueueMessage? GetExampleMessage()
    {
        return new MessageWithExampleMethod
        {
            SomeProperty = "From example"
        };
    }
}

If you want even more control over this you can register your own implementation of the interface IExampleQueueMessageFactory which is the service that will be called to create these example messages.

Leasing messages

The way that Nexus ensures that multiple queue processors won't process the same message is by requiring them to first lease messages before processing them. And Nexus ensures that if two processes try to lease the same message only one of them will win.

The default lease time is 30 minutes. If you have a batch job with a very large batch size and the job takes a long time to complete you might want to increase the lease time for that job. You do that by implementing the MessageLeaseTime property from IScheduledQueueJobBase, like this:

public class ExampleQueueJob : IScheduledQueueJob<ExampleQueueMessage>
{
    public TimeSpan MessageLeaseTime => TimeSpan.FromMinutes(60);
}

Abandoned messages

If a server crashes in the middle of processing messages the lease will eventually expire. Note that this only happens in extraordinary cases. Exceptions that are thrown by the processing job are caught by Nexus and correctly transitions the message status from Leased to Error. But there can be a power outage or the database server might become unavailable which are things that Nexus can't automatically handle. In such cases the lease will eventually expire and at that point Nexus will transition the message status from Leased to Abandoned.

If you see that any of your queues contains abandoned messages you need to investigate what happened first before resetting the status to run them again. It could be that the messages were fully processed but the server or database crashed right before Nexus was able to update the status to Processed.

The best way to start investigating is by looking at the correlation id set in the messages. That is the correlation id of the job that leased it which means that you can use the Admin UI or your log UI of choice to try to find out how far the job with that correlation id got. If the correlation id is "123" your logs related to that job run would have a log property called Nexus.CorrelationId with the value 123.

Health status

All queues automatically get a health check and the default behavior is to mark the queue as unhealthy when there's a message with the Error status in it. However that's not always what you want because you can have qeueues for systems that you know have intermittent failures and you don't want the health check monitoring to keep bugging you about it.

In that case you can either disable the health check for the queue or you can change which health status the queue has when it has messages of Error status in it, like this:

[Queue("myqueue", HealthStatusWhenHasErrors = HealthStatus.Healthy)]
private class MyQueueMessage : IQueueMessage
{
    ...
}

Organizing the Admin UI

If you have a lot of queues you can group queues in the Admin UI by a category, just like you can with jobs. Set a category in the Queue attribute like this:

[Queue("myqueue", Category = "My category")]
private class MyQueueMessage : IQueueMessage
{
    ...
}