Queue processing jobs

There's two interfaces for queue processing jobs included in the SDK. One implementation that process messages one by one and one that process messages in batch.

Accessing the previous message

For some types of jobs and queues you want to get both the current and the previous version of a message. Such as a message for product information updates where you want to compare the previous and the current message to know what changed. You access it using the IPreviousMessageProvider service from inside your job. Read more here.

Processing messages one by one

This is the easiest and safest way to process a message since processing one by one never risks missing a message, it will either complete or fail.

You implement such a job by creating a class that implements IScheduledQueueJob<TMessage>. The job executor takes care of leasing messages and all you have to do is to implement the ProcessMessageAsync() method like this:

public class ExampleQueueJob : IScheduledQueueJob<ExampleQueueMessage>
{
    public string DefaultSchedule => CronSchedule.TimesPerMinute(10);

    public async Task<ProcessResults> ProcessMessageAsync(ExampleQueueMessage message, CancellationToken cancellationToken)
    {
        await DoSomethingInteresting(message);

        return ProcessResults.Completed();
    }
}

The job will run until there are no more Pending messages in the queue and will then return a short summary of how many messages that got processed and how many of them that failed as the job results.

Note that even though the ProcessResults class has a Failed() method you don't have to catch exceptions yourself. The executor will catch any exceptions that happen in ProcessMessageAsync() and mark that message with the Error status together with the exception details in its processing log.

Stopping processing on error

If you have a queue and a job where the processing of a message is dependent on that previous messages have been successfully processed you can tell Nexus to stop processing when an error occur. Nexus will by default process messages in the order they are created, but will continue processing the next message if a previous message fails.

To stop processing when an error happen you implement IScheduledQueueJobBase.StopProcessingOnError in your job class and set it to true.

This means that as soon as any message has either Error or AwaitingRetry status in the queue the processing will be paused and won't continue until those messages gets another status.

Use this with caution as you risk pausing processing indefinitely unless you monitor the queue for errors.

public class ExampleQueueJob(ILogger<ExampleQueueJob> logger) : IScheduledQueueJob<ExampleQueueMessage>
{
    public bool StopProcessingOnError => true;

    public async Task<ProcessResults> ProcessMessageAsync(ExampleQueueMessage message, CancellationToken cancellationToken)
    {
        // Important to wrap the entire job in a try-catch with a retry. Otherwise you risk exceptions
        // stopping the queue processing until someone notices it.
        try
        {
            await DoSomethingInteresting(message);
            return ProcessResults.Completed();
        }
        catch (Exception e)
        {
            logger.LogError(e, "An error occured");
            return ProcessResults.RetryLater(TimeSpan.FromMinutes(10));
        }
    }
}

Processing messages in batches

The other interface is IScheduledBatchQueueJob<TMessage> where you're passed a batch of messages instead of getting them one by one.

When you're creating a queue processing job that will read data from an external system it's often more efficient to batch load such data rather than requesting it one by one. Such as having a queue for when product data is updated and your job needs to read the full product object.

Processing messages in a batch is a bit harder to do correctly than processing one by one, but the executor will take care of most of the complexity for you.

Here's an example implementation:

public class ExampleQueueJob : IScheduledBatchQueueJob<ExampleQueueMessage>
{
    public string DefaultSchedule => CronSchedule.TimesPerMinute(1);

    public async Task ProcessMessagesAsync(QueueMessageBatch<ExampleQueueMessage> batch, CancellationToken cancellationToken)
    {
        foreach (var message in batch.Messages)
        {
            cancellationToken.ThrowIfCancellationRequested();

            try
            {
                await DoSomethingInteresting(message);
                batch.Processed(message);
            }
            catch (Exception e)
            {
                batch.Failed(message, e);
            }
        }
    }
}

You are responsible for letting the system know which messages you've completed and which failed. If you didn't have the try-catch and there was an exception when processing the fifth message in a batch of ten the system will mark all messages as failed that hasn't explicitly been passed to batch.Processed(message).

Queue job schedule

How often you schedule your job depends on the type of queue you have and if you process it in batches or not. For some queues it's more efficient to run it less frequently and in larger batches but for other jobs you might want to run the job very frequently. There's also the option of requesting processing to start directly when enqueueing to avoid any type of delay.

Batch size

By default the batch size of a job is 100. You can control this by implementing the BatchSize property from IScheduledBatchQueueJob. Like this:

public class MyBatchJob : IScheduledBatchQueueJob<SomeMessage>
{
    public int BatchSize => 200;
}

Number of messages per job run

By default a Nexus queue job will run until the queue is empty. Sometimes that can be too aggressive. It might put too much pressure on other systems that the job is communicating with, and digging into the logs of that single run can be hard since it'll contain tons fo logs. Nexus lets you set a max number of messages that a single job run can process by implementing the property MaxNumberOfMessagesPerRun in your queue job. Like this:

public class MyQueueJob : IScheduledQueueJob<SomeMessage> // Or IScheduledBatchQueueJob<SomeMessage>
{
    public int MaxNumberOfMessagesPerRun => 1000;
}

Multithreading

The interfaces IScheduledBatchQueueJob<TMessage> and IScheduledQueueJob<TMessage> have a property called NumberOfProcessingThreads which by default returns 1. If you want your job to process messages in multiple threads in parallel you can implement the method in your job to increase parallelism like this:

public class ExampleQueueJob : IScheduledQueueJob<ExampleQueueMessage>
{
    public int NumberOfProcessingThreads => 10;

    // Other methods and properties excluded for brevity
}

Make sure not to store any instance state that isn't thread safe when you turn on multi threading. The same instance of your job will be used by all threads so if you need to store instance state make sure to use concurrent collections and/or locks.

Writing your own queue processing

You don't have to use the built in interfaces for processing queues in a IScheduledJob. You can implement it yourself using IQueueReader<TMessage>.LeaseAsync(), IQueueItemUpdater<TMessage>.ProcessedAsync(), and IQueueItemUpdater<TMessage>.FailedAsync().

Should however let your job implement the marker interface IScheduledQueueJobBase<TMessage> as that is what the UI and API uses to tell which job processes which queue.

Scheduling a processed message to be processed again

Sometimes when you're using idempotent messages to update external systems you might have dates in your message that affects the outcome of the processing. Such as a product message which contains a list of prices that have a valid from and to date. At the time of processing you pick the price that is valid right now, but you want to run the processing again when the next price becomes valid.

For a batch processing job you can call:

batch.Processed(message, processAgainAtUtc: DateTime.UtcNow.AddHours(1));

And for a job that process messages one by one you do:

return ProcessResults.Completed(processAgainAtUtc: DateTime.UtcNow.AddHours(1));

This will mark the queue message as Processed together with the date for when it should be processed again. Note that it's not marked as Sleeping like unprocessed, future messages. This to be able to differentiate between messages that have been processed from ones that are just scheduled for processing.

The message status is then changed to Pending again at the date you set, and the queue processing job is called again with the same message.

If a change is enqueued for the same product/message before the date you set the status for the message becomes Pending directly and it's up to the job to recalculate when/if the message should get processed again.