Queue processing jobs

There's two interfaces for queue processing jobs included in the SDK. One implementation that process messages one by one and one that process messages in batch.

Accessing the previous message

For some types of jobs and queues you want to get both the current and the previous version of a message. Such as a message for product information updates where you want to compare the previous and the current message to know what changed. You access it using the IPreviousMessageProvider service from inside your job. Read more here.

Processing messages one by one

This is the easiest and safest way to process a message since processing one by one never risks missing a message, it will either complete or fail.

You implement such a job by creating a class that implements IScheduledQueueJob<TMessage>. The job executor takes care of leasing messages and all you have to do is to implement the ProcessMessageAsync() method like this:

public class ExampleQueueJob : IScheduledQueueJob<ExampleQueueMessage>
{
    public string DefaultSchedule => CronSchedule.TimesPerMinute(10);

    public async Task<ProcessResults> ProcessMessageAsync(ExampleQueueMessage message, CancellationToken cancellationToken)
    {
        await DoSomethingInteresting(message);

        return ProcessResults.Completed();
    }
}

The job will run until there are no more Pending messages in the queue and will then return a short summary of how many messages that got processed and how many of them that failed as the job results.

Note that even though the ProcessResults class has a Failed() method you don't have to catch exceptions yourself. The executor will catch any exceptions that happen in ProcessMessageAsync() and mark that message with the Error status together with the exception details in its processing log.

Updating message during processing

Sometimes you want to store meta data about the processing of a message and use that meta data the next time you process it. An example being syncing entities to another system and then storing the id the entity got in the other system so the next time we process the message we can use that id when communicating with the external system. Or you want to store error details that the next run can use when retrying a message.

This can be achieved by returning an updated message as part of the returned ProcessResult. Eg:

public class ExampleQueueJob(ILogger<ExampleQueueJob> logger) : IScheduledQueueJob<ExampleQueueMessage>
{
    public async Task<ProcessResults> ProcessMessageAsync(ExampleQueueMessage message, CancellationToken cancellationToken)
    {
        var id = await SendToExternalSystemAsync(message);
        message.IdInExternalSystem = id;
        return ProcessResults.Processed(updatedMessage: message);
    }
}

Note that if a new version of the message is enqueued the default behavior is to replace the message, which means that your updated message with the id is cleared. You can work around that by using partial messages since that won't replace the message, only replace the properties that the new message contains.

Pausing on error

If you have a queue and a job where the processing of a message is dependent on that previous messages have been successfully processed you can tell Nexus to pause processing when an error occurs. Nexus will by default process messages in the order they are created, but will continue to process the next message if a previous message fails.

To pause processing when an error happen you set PauseOnError to true in the [ScheduledJob] attribute on your job.

This means that as soon as any message has either Error or AwaitingRetry status in the queue the processing will be paused and won't continue until those messages gets another status.

Use this with caution as you risk pausing processing indefinitely unless you monitor the queue for errors.

[ScheduledJob(PauseOnError = true)]
public class ExampleQueueJob(ILogger<ExampleQueueJob> logger) : IScheduledQueueJob<ExampleQueueMessage>
{
    public async Task<ProcessResults> ProcessMessageAsync(ExampleQueueMessage message, CancellationToken cancellationToken)
    {
        // Important to wrap the entire job in a try-catch with a retry. Otherwise you risk exceptions
        // stopping the queue processing until someone notices it.
        try
        {
            await DoSomethingInteresting(message);
            return ProcessResults.Processed();
        }
        catch (Exception e)
        {
            logger.LogError(e, "An error occured");
            return ProcessResults.RetryLater(TimeSpan.FromMinutes(10));
        }
    }
}

Note that this can be overriden in the Admin UI/API under the More button in the job details page.

Processing messages in batches

The other interface is IScheduledBatchQueueJob<TMessage> where you're passed a batch of messages instead of getting them one by one.

When you're creating a queue processing job that will read data from an external system it's often more efficient to batch load such data rather than requesting it one by one. Such as having a queue for when product data is updated and your job needs to read the full product object.

Processing messages in a batch is a bit harder to do correctly than processing one by one, but the executor will take care of most of the complexity for you.

Here's an example implementation:

public class ExampleQueueJob : IScheduledBatchQueueJob<ExampleQueueMessage>
{
    public string DefaultSchedule => CronSchedule.TimesPerMinute(1);

    public async Task ProcessMessagesAsync(QueueMessageBatch<ExampleQueueMessage> batch, CancellationToken cancellationToken)
    {
        foreach (var message in batch.Messages)
        {
            cancellationToken.ThrowIfCancellationRequested();

            try
            {
                await DoSomethingInteresting(message);
                batch.Processed(message);
            }
            catch (Exception e)
            {
                batch.Failed(message, e);
            }
        }
    }
}

You are responsible for letting the system know which messages you've completed and which failed. If you didn't have the try-catch and there was an exception when processing the fifth message in a batch of ten the system will mark all messages as failed that hasn't explicitly been passed to batch.Processed(message).

Queue job schedule

How often you schedule your job depends on the type of queue you have and if you process it in batches or not. For some queues it's more efficient to run it less frequently and in larger batches but for other jobs you might want to run the job very frequently. There's also the option of requesting processing to start directly when enqueueing to avoid any type of delay.

Filtering messages

In some cases you might want multiple queue jobs to process messages in a single queue. Eg if your queue contains messages for all languages but you want to be able to process messages for different languages individually.

This functionality builds on filterable message properties so make sure to read that section.

To implement filtering on your job you implement one of these interfaces:

IScheduledFilteredQueueJob<TMessage>
IScheduledFilteredQueueJob<TMessage, TParameters>
IScheduledFilteredBatchQueueJob<TMessage>
IScheduledFilteredBatchQueueJob<TMessage, TParameters>

Read more about jobs with parameters here if you're wondering what TParameters mean.

These interfaces extend the base queue job interfaces with a way for you to return a lambda to filter which messages you want to process. Eg:

class MyMessage : IQueueMessage
{
    [Filterable]
    public required string Language { get; set; }
    public required SomeDataType Data { get; set; }
}

class EnglishQueueJob : IScheduledFilteredQueueJob<MyMessage>
{
    public Expression<Func<TMessage, bool>> MessageFilter => m => m.Language == "en";

    public async Task<ProcessResults> ProcessMessageAsync(MyMessage message, CancellationToken cancellationToken)
    {
        // Only english messages will be passed to this job

        return ProcessResults.Completed();
    }
}

The interface with parameters lets you create a dynamic lambda based on the passed in parameters which can be useful if you want to process all languages by default, but want to be able to limit to one language in a parameter:

class MyParameters
{
    public required string? Language { get; set; }
}

class MyQueueJob : IScheduledFilteredQueueJob<MyMessage, MyParameters>
{
    public MyParameters DefaultParameters => new();

    public Expression<Func<TMessage, bool>> GetMessageFilter(MyParameters parameters) =>
        parameters.Language != null ? m => m.Language = parameters.Language : null;

    public async Task<ProcessResults> ProcessMessageAsync(MyMessage message, CancellationToken cancellationToken)
    {
        return ProcessResults.Completed();
    }
}

Returning a custom status

Sometimes it's not specific enough to say that a queue message was either processed or failed. You might have different actions you need to take for different outcomes that you want to specifically monitor. Nexus allows you to process a message and set it directly in a custom status rather than one of the built in statuses.

In a batched queue job you call the CustomStatus method on the batch:

var processingLog = "... something descriptive of what happened ...";
batch.CustomStatus(message, "MyCustomStatus", processingLog, isError: true);

And in a regular queue job you return ProcessResults.CustomStatus():

var processingLog = "... something descriptive of what happened ...";
return ProcessResults.CustomStatus("MyCustomStatus", processingLog, isError: true);

The isError argument lets you determine if the job should be marked as failed or not because of this custom status.

Note that it's up to you to monitor queues for these custom statuses and take action. If you want to be notified about it you can implement a custom healthcheck that checks if any queue has such a status.

Batch size

By default the batch size of a job is 100. You can control this by implementing the BatchSize property from IScheduledBatchQueueJob. Like this:

public class MyBatchJob : IScheduledBatchQueueJob<SomeMessage>
{
    public int BatchSize => 200;
}

Number of messages per job run

By default a Nexus queue job will run until the queue is empty. Sometimes that can be too aggressive. It might put too much pressure on other systems that the job is communicating with, and digging into the logs of that single run can be hard since it'll contain tons fo logs. Nexus lets you set a max number of messages that a single job run can process by implementing the property MaxNumberOfMessagesPerRun in your queue job. Like this:

public class MyQueueJob : IScheduledQueueJob<SomeMessage> // Or IScheduledBatchQueueJob<SomeMessage>
{
    public int MaxNumberOfMessagesPerRun => 1000;
}

Multithreading

The interfaces IScheduledBatchQueueJob<TMessage> and IScheduledQueueJob<TMessage> have a property called NumberOfProcessingThreads which by default returns 1. If you want your job to process messages in multiple threads in parallel you can implement the method in your job to increase parallelism like this:

public class ExampleQueueJob : IScheduledQueueJob<ExampleQueueMessage>
{
    public int NumberOfProcessingThreads => 10;

    // Other methods and properties excluded for brevity
}

Make sure not to store any instance state that isn't thread safe when you turn on multi threading. The same instance of your job will be used by all threads so if you need to store instance state make sure to use concurrent collections and/or locks.

Controlling the job result message

By default Nexus will create a job result message based on the number of messages processed by the job run. In case you want to create your own job result message you can implement the method IScheduledQueueJobBase.GetResultsText() and return your own job result message:

public class ExampleQueueJob : IScheduledQueueJob<ExampleQueueMessage>
{
    public string DefaultSchedule => CronSchedule.TimesPerMinute(10);

    public async Task<ProcessResults> ProcessMessageAsync(ExampleQueueMessage message, CancellationToken cancellationToken) => ProcessResults.Completed();

    public string GetResultsText(QueueJobProcessResult result)
    {
        if (result.FailedCount == 0)
        {
            return null; // returning null means we'll get the default result message
        }

        return $"ERROR! {result.FailedCount} messages failed";
    }
}

Writing your own queue processing

You don't have to use the built in interfaces for processing queues in a IScheduledJob. You can implement it yourself using IQueueReader<TMessage>.LeaseAsync(), IQueueItemUpdater<TMessage>.ProcessedAsync(), and IQueueItemUpdater<TMessage>.FailedAsync().

Should however let your job implement the marker interface IScheduledQueueJobBase<TMessage> as that is what the UI and API uses to tell which job processes which queue.

Scheduling a processed message to be processed again

Sometimes when you're using idempotent messages to update external systems you might have dates in your message that affects the outcome of the processing. Such as a product message which contains a list of prices that have a valid from and to date. At the time of processing you pick the price that is valid right now, but you want to run the processing again when the next price becomes valid.

For a batch processing job you can call:

batch.Processed(message, processAgainAtUtc: DateTime.UtcNow.AddHours(1));

And for a job that process messages one by one you do:

return ProcessResults.Completed(processAgainAtUtc: DateTime.UtcNow.AddHours(1));

This will mark the queue message as Processed together with the date for when it should be processed again. Note that it's not marked as Sleeping like unprocessed, future messages. This to be able to differentiate between messages that have been processed from ones that are just scheduled for processing.

The message status is then changed to Pending again at the date you set, and the queue processing job is called again with the same message.

If a change is enqueued for the same product/message before the date you set the status for the message becomes Pending directly and it's up to the job to recalculate when/if the message should get processed again.