Queue processing jobs
There's two interfaces for queue processing jobs included in the SDK. One implementation that process messages one by one and one that process messages in batch.
Accessing the previous message
For some types of jobs and queues you want to get both the current and the previous version of a message. Such as a message for product information updates where you want to compare the previous and the current message to know what changed. You access it using the IPreviousMessageProvider
service from inside your job. Read more here.
Processing messages one by one
This is the easiest and safest way to process a message since processing one by one never risks missing a message, it will either complete or fail.
You implement such a job by creating a class that implements IScheduledQueueJob<TMessage>
. The job executor takes care of leasing messages and all you have to do is to implement the ProcessMessageAsync()
method like this:
public class ExampleQueueJob : IScheduledQueueJob<ExampleQueueMessage>
{
public string DefaultSchedule => CronSchedule.TimesPerMinute(10);
public async Task<ProcessResults> ProcessMessageAsync(ExampleQueueMessage message, CancellationToken cancellationToken)
{
await DoSomethingInteresting(message);
return ProcessResults.Completed();
}
}
The job will run until there are no more Pending
messages in the queue and will then return a short summary of how many messages that got processed and how many of them that failed as the job results.
Note that even though the ProcessResults
class has a Failed()
method you don't have to catch exceptions yourself. The executor will catch any exceptions that happen in ProcessMessageAsync()
and mark that message with the Error
status together with the exception details in its processing log.
Stopping processing on error
If you have a queue and a job where the processing of a message is dependent on that previous messages have been successfully processed you can tell Nexus to stop processing when an error occur. Nexus will by default process messages in the order they are created, but will continue processing the next message if a previous message fails.
To stop processing when an error happen you implement IScheduledQueueJobBase.StopProcessingOnError
in your job class and set it to true
.
This means that as soon as any message has either Error
or AwaitingRetry
status in the queue the processing will be paused and won't continue until those messages gets another status.
Use this with caution as you risk pausing processing indefinitely unless you monitor the queue for errors.
public class ExampleQueueJob(ILogger<ExampleQueueJob> logger) : IScheduledQueueJob<ExampleQueueMessage>
{
public bool StopProcessingOnError => true;
public async Task<ProcessResults> ProcessMessageAsync(ExampleQueueMessage message, CancellationToken cancellationToken)
{
// Important to wrap the entire job in a try-catch with a retry. Otherwise you risk exceptions
// stopping the queue processing until someone notices it.
try
{
await DoSomethingInteresting(message);
return ProcessResults.Completed();
}
catch (Exception e)
{
logger.LogError(e, "An error occured");
return ProcessResults.RetryLater(TimeSpan.FromMinutes(10));
}
}
}
Processing messages in batches
The other interface is IScheduledBatchQueueJob<TMessage>
where you're passed a batch of messages instead of getting them one by one.
When you're creating a queue processing job that will read data from an external system it's often more efficient to batch load such data rather than requesting it one by one. Such as having a queue for when product data is updated and your job needs to read the full product object.
Processing messages in a batch is a bit harder to do correctly than processing one by one, but the executor will take care of most of the complexity for you.
Here's an example implementation:
public class ExampleQueueJob : IScheduledBatchQueueJob<ExampleQueueMessage>
{
public string DefaultSchedule => CronSchedule.TimesPerMinute(1);
public async Task ProcessMessagesAsync(QueueMessageBatch<ExampleQueueMessage> batch, CancellationToken cancellationToken)
{
foreach (var message in batch.Messages)
{
cancellationToken.ThrowIfCancellationRequested();
try
{
await DoSomethingInteresting(message);
batch.Processed(message);
}
catch (Exception e)
{
batch.Failed(message, e);
}
}
}
}
You are responsible for letting the system know which messages you've completed and which failed. If you didn't have the try-catch
and there was an exception when processing the fifth message in a batch of ten the system will mark all messages as failed that hasn't explicitly been passed to batch.Processed(message)
.
Queue job schedule
How often you schedule your job depends on the type of queue you have and if you process it in batches or not. For some queues it's more efficient to run it less frequently and in larger batches but for other jobs you might want to run the job very frequently. There's also the option of requesting processing to start directly when enqueueing to avoid any type of delay.
Returning a custom status
Sometimes it's not specific enough to say that a queue message was either processed or failed. You might have different actions you need to take for different outcomes that you want to specifically monitor. Nexus allows you to process a message and set it directly in a custom status rather than one of the built in statuses.
In a batched queue job you call the CustomStatus
method on the batch:
var processingLog = "... something descriptive of what happened ...";
batch.CustomStatus(message, "MyCustomStatus", processingLog, isError: true);
And in a regular queue job you return ProcessResults.CustomStatus()
:
var processingLog = "... something descriptive of what happened ...";
return ProcessResults.CustomStatus("MyCustomStatus", processingLog, isError: true);
The isError
argument lets you determine if the job should be marked as failed or not because of this custom status.
Note that it's up to you to monitor queues for these custom statuses and take action. If you want to be notified about it you can implement a custom healthcheck that checks if any queue has such a status.
Batch size
By default the batch size of a job is 100. You can control this by implementing the BatchSize
property from IScheduledBatchQueueJob
. Like this:
public class MyBatchJob : IScheduledBatchQueueJob<SomeMessage>
{
public int BatchSize => 200;
}
Number of messages per job run
By default a Nexus queue job will run until the queue is empty. Sometimes that can be too aggressive. It might put too much pressure on other systems that the job is communicating with, and digging into the logs of that single run can be hard since it'll contain tons fo logs. Nexus lets you set a max number of messages that a single job run can process by implementing the property MaxNumberOfMessagesPerRun
in your queue job. Like this:
public class MyQueueJob : IScheduledQueueJob<SomeMessage> // Or IScheduledBatchQueueJob<SomeMessage>
{
public int MaxNumberOfMessagesPerRun => 1000;
}
Multithreading
The interfaces IScheduledBatchQueueJob<TMessage>
and IScheduledQueueJob<TMessage>
have a property called NumberOfProcessingThreads
which by default returns 1
. If you want your job to process messages in multiple threads in parallel you can implement the method in your job to increase parallelism like this:
public class ExampleQueueJob : IScheduledQueueJob<ExampleQueueMessage>
{
public int NumberOfProcessingThreads => 10;
// Other methods and properties excluded for brevity
}
Make sure not to store any instance state that isn't thread safe when you turn on multi threading. The same instance of your job will be used by all threads so if you need to store instance state make sure to use concurrent collections and/or locks.
Writing your own queue processing
You don't have to use the built in interfaces for processing queues in a IScheduledJob
. You can implement it yourself using IQueueReader<TMessage>.LeaseAsync()
, IQueueItemUpdater<TMessage>.ProcessedAsync()
, and IQueueItemUpdater<TMessage>.FailedAsync()
.
Should however let your job implement the marker interface IScheduledQueueJobBase<TMessage>
as that is what the UI and API uses to tell which job processes which queue.
Scheduling a processed message to be processed again
Sometimes when you're using idempotent messages to update external systems you might have dates in your message that affects the outcome of the processing. Such as a product message which contains a list of prices that have a valid from and to date. At the time of processing you pick the price that is valid right now, but you want to run the processing again when the next price becomes valid.
For a batch processing job you can call:
batch.Processed(message, processAgainAtUtc: DateTime.UtcNow.AddHours(1));
And for a job that process messages one by one you do:
return ProcessResults.Completed(processAgainAtUtc: DateTime.UtcNow.AddHours(1));
This will mark the queue message as Processed
together with the date for when it should be processed again. Note that it's not marked as Sleeping
like unprocessed, future messages. This to be able to differentiate between messages that have been processed from ones that are just scheduled for processing.
The message status is then changed to Pending
again at the date you set, and the queue processing job is called again with the same message.
If a change is enqueued for the same product/message before the date you set the status for the message becomes Pending
directly and it's up to the job to recalculate when/if the message should get processed again.