Queues
Unlike popular message buses and event queues this system uses an SQL database to store messages and also offers in-memory storage. Message bus systems are typically built to be able to handle tons of messages and have the assumption that you're not really interested in looking at a single message and doesn't offer great support for finding and manipulating messages.
Nexus on the other hand assumes that you won't have millions of messages per second and that you are interested in looking at individual messages. And for that an SQL database is a perfect fit. At the time of writing we support Postgres, SQL Server and SQLite. Feel free to reach out to us if you need support for another database engine like MySQL or MariaDB.
Each registered queue is its own database table which is automatically created by the system when you've registered your queue. And a message is a row in that database table which contains the message in JSON format together with meta data such as the status and when it was created.
Defining and registering a queue
The first thing to do is to create a C# class representing the message type of the queue. It's just a .NET class implementing the interface IQueueMessage
. Here's an example:
[Queue("send_order_confirmation")]
public class SendOrderConfirmationQueueMessage : IQueueMessage
{
public string OrderId { get; set; }
public string Email { get; set; }
}
You can add any properties you want on the class, the only important thing to remember is that the message will get serialized to and from JSON.
There's also the IQueueMessageWithId
interface which you can read about in the section about message identity.
[Queue] attribute
The queue attribute isn't strictly required but it's recommended as it sets meta data for the queue such as the queue name. That name is used for naming the database table and to access the queue in the API and admin UI. If the queue attribute isn't set the name is taken from the class name. If you don't use the [Queue]
attribute it's easy to accidentally rename the class which will create a completely new and empty queue. The [Queue]
attribute also lets you set other types of meta data such as which database engine to use and for how long we should keep processed messages.
Registering the queue
Just like scheduled jobs the queues are automatically registered by finding all types that implements IQueueMessage
(IQueueMessageWithId
inherits from IQueueMessage
). And just like for jobs this is only done automatically for messages defined in the entry assembly. If you have multiple .NET projects that contains queue message classes you need to register the assemblies with the service.
There's a property on the options object passed into AddNexus().AddQueues()
called AssembliesToScanForQueueMessages
where you can add additional assemblies to scan for queue messages like this:
builder.Services.AddNexus().AddQueues(options =>
{
options.AssembliesToScanForQueueMessages.Add(typeof(MyQueueMessageInAnotherProject).Assembly);
});
Adding a message to a queue
There's two ways to add queue messages. The first is through the C# SDK using the IEnqueuer<TMessage>
service. That service is injectable through the IServiceProvider
so you can take it as a constructor dependency. The generic argument TMessage
is the message type that you want to add a message for. An example:
public class OrderController
{
private readonly IEnqueuer<SendOrderConfirmationQueueMessage> _enqueuer;
public OrderController(IEnqueuer<SendOrderConfirmationQueueMessage> enqueuer)
{
_enqueuer = enqueuer;
}
[HttpPost]
public async Task<IActionResult> OrderPlacedAsync(string orderId, string email)
{
await _enqueuer.EnqueueAsync(new SendOrderConfirmationQueueMessage
{
OrderId = orderId,
Email = email,
});
return NoContent();
}
}
The message is now placed in the queue with the status Pending
which means that it'll get processed by the queue processing job as soon as it starts. There might be a delay between the time the message is enqueued and when the queue processing job is scheduled to run. If no delay is acceptable you can use the EnqueueAndStartProcessingAsync()
method rather than the EnqueueAsync()
method on IEnqueuer<TMessage>
. That will request the queue processing job to start as quickly as possible regardless of its schedule unless the job has been manually disabled.
There's also the non-generic version of IEnqueuer
which lets you enqueue any object of type IQueueMessage
. The non-generic IEnqueuer
will find the correct enqueuer for the messages you pass in and send it to the correct queue.
Adding a message through the API
The other way of adding a message to a queue is by using the API. The API is just an way to access the same IEnqueuer<TMessage>
over HTTP at the endpoint POST /queues/{queueName}
.
Adding multiple messages at once
Both the C# SDK and the API allows you to enqueue multiple messages at the same time and will use batch operations (except SQLite which doesn't have batch imports) to make enqueuing a lot messages as fast as possible.
Message status
A message has a status and the default status is Pending
. It means that the message is up-for-grabs for a queue processing job.
When a queue processing job starts it will see if there's any Pending
messages and then try to lease them. This is done by using the IQueueReader<TMessage>.LeaseAsync()
method which will update the status of any Pending
messages to Leased
together with setting the date for how long they've been leased for. Leasing a message ensures that multiple queue processors can never lease the same message at the same time.
If the lease expires - that is, if the processing job hasn't finished in the time it set for the lease - the message status till be set to Abandoned
. When this happens it most likely means that the job that leased the message crashed and won't recover. When a message is set as Abandoned
you need to manually look into what happened and decide if you want to retry the message by changing its status back to Pending
or if you want to delete the message and handle the work in some other way.
In the Admin UI it's possible to configure a queue (under the More button on the queue details page) to retry messages when the lease expires instead of setting them to Abandoned
. This should only be enabled for messages where it's safe to process the same message twice. The message might have been processed once, but the server crashed before Nexus was able to update the status to Processed
.
When a queue processing job is done with the message the status will either become Error
or Processed
. And if the queue is not configured to keep processed messages the message will be deleted rather than set to Processed
.
The system also allows you to set custom statuses on messages through the UI and the API which allows you to categorize messages for troubleshooting as well as prevent them from being processed by the system. Messages with custom statuses are invisible to the processing system and are only visible in the UI and API.
Updating status or reading by property in message
Sometimes you might want to update the status of messages where a certain property inside messages contains a specific value. An example would be that you have a queue of inventory messages and you don't want to process it until the product data has come in. So your queue processing job for the inventory messages keeps rescheduling such messages. When the product data then comes in you can update the inventory message(s) like this:
public async Task OnProductCreated(string sku)
{
await _inventoryQueueItemUpdater.UpdateStatusWhereAsync(QueueItemStatus.Pending, m => m.Product.Sku == sku);
}
See more in the docs about filterable properties.
Default enqueue status
As mentioned above the default enqueue status is Pending
which makes the message immediately available for processing jobs.
In some cases you might want to change this to a different status so that new messages that comes in are kept in a different status and becomes invisible to processing jobs. You can do this in the Admin UI for the queue or through the API and you can use any text value you want such as Paused
or OnHold
.
This can be useful if a queue processing job is experiencing problems and you only want to run it on selected messages. Then you can change the default enqueue status and manually change only the messages you want to process to Pending
.
After the issues have been resolved you can filter out all the messages with status Paused
or OnHold
and update to Pending
again.
Previous message version
For messages that has identity the previous version of a message is stored when enqueueing a new version. This is useful for doing change based processing since it allows you to compare the current message with the previous message and add additional processing logic when certain properties of the message changes.
In order to access the previous message from inside a queue processing job you use the IPreviousMessageProvider
service. Note that it's only usable inside queue processing jobs and will always return null
if you try to use it outside of a job. If you need access to previous messages outside of a job you can read QueueItem
s using IQueueReader
. QueueItem
contains everything about the queue item including the current and previous message.
Previous version
The previous version is only updated when calling IQueueItemUpdater.ProcessedAsync()
which is done for you when using the built in queue processing jobs. It's then updated to be the message passed to that method. This ensures that the previous version of a message will only ever be a version that was successfully processed. If the message processing fails the previous version stays untouched to let you retry the message.
Idempotent messages
If messages are fully self-contained, meaning that a processing job will only use data on the message itself to perform it's processing and not look at any external data your messages are idempotent. Processing an idempotent message will always result in the same effect.
For the queues you have which has this characteristic you can tell the [Queue]
attribute that it's idempotent like this:
[Queue("product_updates", IdempotentMessages = true)]
public class ProductUpdatedQueueMessage : IQueueMessageWithId
{
...
}
When IdempotentMessages
is set to true
it means that if a message is enqueued Nexus will check if we already have that message with the same message id and JSON and then discard the message.
This is useful for big data syncs such as a product catalog. You can have a job that reads the entire product catalog from another system and enqueues a message per product. If nothing has changed on the product since the last time it was enqueued the message won't get processed again.
The comparison is done at a JSON string level so a message will only be discarded if the JSON is exactly the same. Which means that the order of properties in the serialized message is important. This is only something you need to consider if your message contains dynamic collections such as dictionaries where the order can change.
Idempotency and processed messages
When you set IdempotentMessages
to true
the KeepProcessedMessagesFor
property is automatically set to never delete processed messages unless you explicitly specify a timespan to keep processed messages for in the queue.
Partial messages
Sometimes you might have multiple data sources that are each responsible for their own section of a message. Such as in an e-commerce context where the ERP might be responsible for the price and the PIM is responsible for the product data, but you want to process the data from these systems in a single message.
In such a case you can use partial messages in Nexus, which is message classes where the top level properties are set by different enqueuers. Take this message for example:
public class ProductUpdatedMessage : IQueueMessageWithId
{
public string? Id { get; set; }
public ProductInformation? ProductInformation { get; set; }
public ProductPrice? Price { get; set; }
}
When listening to events from the PIM you do this:
public ProductInformationController(IEnqueuer<ProductUpdatedMessage> enqueuer) : Controller
{
[HttpPost("{productNumber}")]
public async Task<IActionResult> ProductUpdatedAsync(string productNumber, [FromBody] ProductInformation productInformation)
{
await enqueuer.EnqueueAsync(new ProductUpdatedMessage { Id = productNumber, ProductInformation = productInformation }, new EnqueueContext { PartialMessage = true });
return Ok();
}
}
And when listening for ERP price updates you do:
public PriceController(IEnqueuer<ProductUpdatedMessage> enqueuer) : Controller
{
[HttpPost("{productNumber}/price")]
public async Task<IActionResult> PriceUpdatedAsync(string productNumber, [FromBody] ProductPrice price)
{
await enqueuer.EnqueueAsync(new ProductUpdatedMessage { Id = productNumber, ProductPrice = price }, new EnqueueContext { PartialMessage = true });
return Ok();
}
}
What will happen in the background here is that Nexus will merge the incoming message with an existing message with the same id. Meaning that when the PIM data comes in it won't remove the price from the message and vice versa.
If there's no existing message with the same id a new message with be enqueued with only that data. If the processing job needs both in order to correctly process the product it can just return a custom status or just return that it processed the message. The message will get the Pending
status again as soon as any of the systems involved have sent their data.
Only top level properties are merged
When using partial messages Nexus won't do a deep, recursive merge. Which means that if you have a message that looks like this: {"Key1": {"DeepKey1": "deepvalue1"}, "Key2": "value2"}
and enqueue a partial message that looks like this: {"Key1": {"DeepKey2": "deepvalue2"}, "Key3": "value3"}
the end result will be: {"Key1": {"DeepKey2": "deepvalue2"}, "Key2": "value2", "Key3": "value3"}
. That is, Key1
is replaced and Key1.DeepKey1
was removed.
Note however that top level dictionaries are supported, so if Key1
on your message is a Dictionary<string, string>
it would instead be merged into: {"Key1": {"DeepKey1": "deepvalue1", "DeepKey2": "deepvalue2"}, "Key2": "value2", "Key3": "value3"}
. Top level dictionaries can have any type and not just strings as in this example.
Scheduling messages for future processing
There's an additional status called Sleeping
that messages that should be processed in the future will have. When you enqueue a message you can specify a DateTime
or a TimeSpan
(relative to right now) for when a message should be processed.
This is useful for scenarios where you know that something will happen at a certain time in the future and you want to run some processing at that time. A typical e-commerce scenario is that a product can have prices with a start and stop date. You want to export your prices to another system but that system might not support date based validity for prices so you need to export the current price at any given time.
To deal with this you can enqueue messages that should get processed at the time that one price stops being active and another starts. And your queue processing job can send the new price to the external system exactly when it starts being valid.
Processing the same message again in the future
Sometimes when you're using idempotent messages to update external systems you might have dates in your message that affects the outcome of the processing. Such as a product message which contains a list of prices that have a valid from and to date. At the time of processing you pick the price that is valid right now, but you want to run the processing again when the next price becomes valid.
In your queue processing job you can return an optional DateTime
for when to process the message again. See more details in the section about queue jobs.
Example queue messages
The admin UI needs to create an example message to prefill the text box where you can manually enqueue a new message. By default Nexus will try to create an instance of your queue message class with default values in properties. If you want to customize that message, you can add a static method on your message class called GetExampleMessage
like this:
private class MessageWithExampleMethod : IQueueMessage
{
public string? SomeProperty { get; set; }
public static IQueueMessage? GetExampleMessage()
{
return new MessageWithExampleMethod
{
SomeProperty = "From example"
};
}
}
If you want even more control over this you can register your own implementation of the interface IExampleQueueMessageFactory
which is the service that will be called to create these example messages.
Leasing messages
The way that Nexus ensures that multiple queue processors won't process the same message is by requiring them to first lease messages before processing them. And Nexus ensures that if two processes try to lease the same message only one of them will win.
The default lease time is 30 minutes. If you have a batch job with a very large batch size and the job takes a long time to complete you might want to increase the lease time for that job. You do that by implementing the MessageLeaseTime
property from IScheduledQueueJobBase
, like this:
public class ExampleQueueJob : IScheduledQueueJob<ExampleQueueMessage>
{
public TimeSpan MessageLeaseTime => TimeSpan.FromMinutes(60);
}
Abandoned messages
If a server crashes in the middle of processing messages the lease will eventually expire. Note that this only happens in extraordinary cases. Exceptions that are thrown by the processing job are caught by Nexus and correctly transitions the message status from Leased
to Error
. But there can be a power outage or the database server might become unavailable which are things that Nexus can't automatically handle. In such cases the lease will eventually expire and at that point Nexus will transition the message status from Leased
to Abandoned
.
If you see that any of your queues contains abandoned messages you need to investigate what happened first before resetting the status to run them again. It could be that the messages were fully processed but the server or database crashed right before Nexus was able to update the status to Processed
.
The best way to start investigating is by looking at the correlation id set in the messages. That is the correlation id of the job that leased it which means that you can use the Admin UI or your log UI of choice to try to find out how far the job with that correlation id got. If the correlation id is "123" your logs related to that job run would have a log property called Nexus.CorrelationId
with the value 123
.
Health status
All queues automatically get a health check and the default behavior is to mark the queue as unhealthy when there's a message with the Error
status in it. However that's not always what you want because you can have qeueues for systems that you know have intermittent failures and you don't want the health check monitoring to keep bugging you about it.
In that case you can either disable the health check for the queue or you can change which health status the queue has when it has messages of Error
status in it, like this:
[Queue("myqueue", HealthStatusWhenHasErrors = HealthStatus.Healthy)]
private class MyQueueMessage : IQueueMessage
{
...
}
Organizing the Admin UI
If you have a lot of queues you can group queues in the Admin UI by a category, just like you can with jobs. Set a category in the Queue
attribute like this:
[Queue("myqueue", Category = "My category")]
private class MyQueueMessage : IQueueMessage
{
...
}