Queues
Unlike popular message buses and event queues this system uses an SQL database to store messages. Message bus systems are typically built to be able to handle tons of messages and have the assumption that you're not really interested in looking at a single message and don't offer great support for finding and manipulating messages.
This system on the other hand assumes that you won't have millions of messages per second and that you are interested in looking at individual messages. And for that an SQL database is a perfect fit. At the time of writing we support Postgres, SQL Server and SQLite. Feel free to reach out to us if you need support for another database engine like MySQL or MariaDB.
Each registered queue is its own database table which is automatically created by then system when you've registered your queue. And a message is a row in that database table which contains the message in JSON format together with meta data such as the status and when it was created.
Defining and registering a queue
The first thing to do is to create a C# class representing the message type of the queue. It's just a .NET class implementing the interface IQueueMessage
. Here's an example:
[Queue("send_order_confirmation")]
public class SendOrderConfirmationQueueMessage : IQueueMessage
{
public string OrderId { get; set; }
public string Email { get; set; }
}
You can add any properties you want on the class, the only important thing to remember is that the message will get serialized to and from JSON.
There's also the IQueueMessageWithId
interface which you can read about in the section about message identity.
[Queue] attribute
The queue attribute isn't strictly required but it's recommended as it sets meta data for the queue such as the queue name. That name is used for naming the database table and to access the queue in the API and admin UI. If the queue attribute isn't set the name is taken from the class name. If you don't use the [Queue]
attribute it's easy to accidentally rename the class which will create a completely new and empty queue. The [Queue]
attribute also lets you set other types of meta data such as which database engine to use and for how long we should keep processed messages.
Registering the queue
Just like scheduled jobs the queues are automatically registered by finding all types that implements IQueueMessage
(IQueueMessageWithId
inherits from IQueueMessage
). And just like for jobs this is only done automatically for messages defined in the entry assembly. If you have multiple .NET projects that contains queue message classes you need to register the assemblies with the service.
There's a property on the options object passed into AddNexusQueues()
called AssembliesToScanForQueueMessages
where you can add additional assemblies to scan for queue messages like this:
builder.Services.AddNexusQueues(options =>
{
options.AssembliesToScanForQueueMessages.Add(typeof(MyQueueMessageInAnotherProject).Assembly);
});
Adding a message to a queue
There's two ways to add queue messages. The first is through the C# SDK using the IEnqueuer<TMessage>
service. That service is injectable through the IServiceProvider
so you can take it as a constructor dependency. The generic argument TMessage
is the message type that you want to add a message for. An example:
public class OrderController
{
private readonly IEnqueuer<SendOrderConfirmationQueueMessage> _enqueuer;
public OrderController(IEnqueuer<SendOrderConfirmationQueueMessage> enqueuer)
{
_enqueuer = enqueuer;
}
[HttpPost]
public async Task<IActionResult> OrderPlacedAsync(string orderId, string email)
{
await _enqueuer.EnqueueAsync(new SendOrderConfirmationQueueMessage
{
OrderId = orderId,
Email = email,
});
return NoContent();
}
}
The message is now placed in the queue with the status Pending
which means that it'll get processed by the queue processing job as soon as it starts. There might be a delay between the time the message is enqueued and when the queue processing job is scheduled to run. If no delay is acceptable you can use the EnqueueAndStartProcessingAsync()
method rather than the EnqueueAsync()
method on IEnqueuer<TMessage>
. That will request the queue processing job to start as quickly as possible regardless of its schedule unless the job has been manually disabled.
Adding a message through the API
The other way of adding a message to a queue is by using the API. The API is just an way to access the same IEnqueuer<TMessage>
over HTTP at the endpoint POST /queues/{queueName}
.
Adding multiple messages at once
Both the C# SDK and the API allows you to enqueue multiple messages at the same time and will use batch operations (except SQLite which doesn't have batch imports) to make enqueuing a lot messages as fast as possible.
Message status
A message has a status and the default status is Pending
. It means that the message is up-for-grabs for a queue processing job.
When a queue processing job starts it will see if there's any Pending
messages and then try to lease them. This is done by using the IQueueReader<TMessage>.LeaseAsync()
method which will update the status of any Pending
messages to Leased
together with setting the date for how long they've been leased for. Leasing a message ensures that multiple queue processors can never lease the same message at the same time. Once the lease has expired the system will put it back in the Pending
status.
When a queue processing job is done with the message the status will either become Error
or Processed
. And if the queue is not configured to keep processed messages the message will be deleted rather than set to Processed
.
The system also allows you to set custom statuses on messages through the UI and the API which allows you to categorize messages for troubleshooting as well as prevent them from being processed by the system. Messages with custom statuses are invisible to the processing system and are only visible in the UI and API.
Updating status by property in message
Sometimes you might want to update the status of messages where a certain property inside messages contains a specific value. An example would be that you have a queue of inventory messages and you don't want to process it until the product data has come in. So your queue processing job for the inventory messages keeps rescheduling such messages. When the product data then comes in you can update the inventory message(s) like this:
public async Task OnProductCreated(string sku)
{
await _inventoryQueueItemUpdater.UpdateStatusWhereAsync(QueueItemStatus.Pending, m => m.Product.Sku == sku);
}
The lambda expression m => m.Product.Sku == sku
is compiled into a SQL statement to used in a WHERE clause. In the case of Postgres it becomes WHERE message->'Product'->>'Sku'::text = '123'
.
Note that such a query can have less than optimal performance as it needs to process all rows in the table to find the matching rows. If you don't use idempotent messages and don't store processed messages that's not going to be a problem because the queue table will be quite small.
If you're using Postgres you can create a jsonb
index on the message
column to improve performance but you'll need to add that index yourself as Nexus won't be aware of it.
The lambda passed in must be a comparison between a property on the message and a primitive, string or null
value. You can use ==
, !=
, <
, >
, <=
, or >=
as comparison operator. If you pass in an invalid lambda you'll get a runtime error.
Default enqueue status
As mentioned above the default enqueue status is Pending
which makes the message immediately available for processing jobs.
In some cases you might want to change this to a different status so that new messages that comes in are kept in a different status and becomes invisible to processing jobs. You can do this in the Admin UI for the queue or through the API and you can use any text value you want such as Paused
or OnHold
.
This can be useful if a queue processing job is experiencing problems and you only want to run it on selected messages. Then you can change the default enqueue status and manually change only the messages you want to process to Pending
.
After the issues have been resolved you can filter out all the messages with status Paused
or OnHold
and update to Pending
again.
Previous message version
For messages that has identity the previous version of a message is stored when enqueueing a new version. This is useful for doing change based processing since it allows you to compare the current message with the previous message and add additional processing logic when certain properties of the message changes.
In order to access the previous message from inside a queue processing job you use the IPreviousMessageProvider
service. Note that it's only usable inside queue processing jobs and will always return null
if you try to use it outside of a job. If you need access to previous messages outside of a job you can read QueueItem
s using IQueueReader
. QueueItem
contains everything about the queue item including the current and previous message.
Previous version
The previous version is only updated when calling IQueueItemUpdater.ProcessedAsync()
which is done for you when using the built in queue processing jobs. It's then updated to be the message passed to that method. This ensures that the previous version of a message will only ever be a version that was successfully processed. If the message processing fails the previous version stays untouched to let you retry the message.
Idempotent messages
If messages are fully self-contained, meaning that a processing job will only use data on the message itself to perform it's processing and not look at any external data your messages are idempotent. Processing an idempotent message will always result in the same effect.
For the queues you have which has this characteristic you can tell the [Queue]
attribute that it's idempotent like this:
[Queue("product_updates", IdempotentMessages = true)]
public class ProductUpdatedQueueMessage : IQueueMessageWithId
{
...
}
When IdempotentMessages
is set to true
it means that if a message is enqueued Nexus will check if we already have that message with the same message id and JSON and then discard the message.
This is useful for big data syncs such as a product catalog. You can have a job that reads the entire product catalog from another system and enqueues a message per product. If nothing has changed on the product since the last time it was enqueued the message won't get processed again.
The comparison is done at a JSON string level so a message will only be discarded if the JSON is exactly the same. Which means that the order of properties in the serialized message is important. This is only something you need to consider if your message contains dynamic collections such as dictionaries where the order can change.
Idempotency and processed messages
When you set IdempotentMessages
to true
the KeepProcessedMessagesFor
property is automatically set to never delete processed messages unless you explicitly specify a timespan to keep processed messages for in the queue.
Scheduling messages for future processing
There's an additional status called Sleeping
that messages that should be processed in the future will have. When you enqueue a message you can specify a DateTime
or a TimeSpan
(relative to right now) for when a message should be processed.
This is useful for scenarios where you know that something will happen at a certain time in the future and you want to run some processing at that time. A typical e-commerce scenario is that a product can have prices with a start and stop date. You want to export your prices to another system but that system might not support date based validity for prices so you need to export the current price at any given time.
To deal with this you can enqueue messages that should get processed at the time that one price stops being active and another starts. And your queue processing job can send the new price to the external system exactly when it starts being valid.
Processing the same message again in the future
Sometimes when you're using idempotent messages to update external systems you might have dates in your message that affects the outcome of the processing. Such as a product message which contains a list of prices that have a valid from and to date. At the time of processing you pick the price that is valid right now, but you want to run the processing again when the next price becomes valid.
In your queue processing job you can return an optional DateTime
for when to process the message again. See more details in the section about queue jobs.