Auto generated queues
Sometimes you have messages with the same structure but one or more discriminator properties makes them significantly different than others. Different enough that you want such message split up into multiple different queues. An example might be a message that has a Language
property and you want to have different queues per language so each language can be processed individually. Or if you want to enable PauseOnError
so that any error for that language pauses processing for any other message in that language.
It's possible to manually define a bunch of queues and jobs using inheritance to handle this, but it comes quite inelegant quickly because of the boilerplate needed.
To handle this use-case Nexus lets you auto-generate queues and jobs for those queues by giving Nexus a base class.
For example:
// Program.cs
[Queue("language")]
public class LanguageQueueMessage : IQueueMessage
{
}
public abstract class LanguageQueueMessageJob : IScheduledQueueJob<TMessage>
where TMessage : LanguageQueueMessage
{
public async Task<ProcessResults> ProcessMessageAsync(TMessage message, CancellationToken cancellationToken)
{
return ProcessResults.Processed();
}
}
var generatedQueues = new List<IGeneratedQueue>();
var generatedJobs = new List<IGeneratedScheduledQueueJob>();
foreach (var language in GetLanguages())
{
var queueName = "language_" + language;
generatedQueues.Add(new GeneratedQueue<LanguageQueueMessage>
{
Name = queueName,
DisplayName = "Language queue " + language,
Category = "Language",
ShouldEnqueue = message => message.Language == language,
});
generatedJobs.Add(new GeneratedScheduledQueueJob<LanguageQueueMessage>
{
ScheduledJobBaseType = typeof(LanguageQueueMessageJob<>),
Name = "language_job_" + language,
DisplayName = "Language job " + language,
Category = "Language",
QueueName = queueName,
});
}
builder.Services
.AddNexus()
.AddQueues(options =>
{
options.GeneratedQueues = generatedQueues;
})
.AddScheduledJobs(options =>
{
options.GeneratedScheduledJobs = generatedJobs;
});
With this code you you'll get one concrete queue and job per language returned from the fictional GetLanguages()
method.
The ShouldEnqueue
func you set on your GeneratedQueue
class determines if a message is relevant for your generated queue or not. Because when you want to enqueue to any of the generated queues you ask for an IEnqueuer<LanguageQueueMessage>
and that will iterate over all the generated queues and see which of them are interested in the message. You can also use the API to enqueue to the base LanguageQueueMessage
queue which will have the same effect.
The generated job needs to be an abstract class that is generic on the queue message like in the example above. The generated job will be generic on the generated queue message. The job can be a batch job and doesn't have to be a single message job like in the example above.
Enqueueing messages
Since the IQueueMessage
types for each generated queue is generated at runtime you can't ask the service provider for a IEnqueuer<X>
for a specific queue. Instead you should request IEnqueuer<BaseMessage>
where BaseMessage
is the type you used to define the generated queue in GeneratedQueue<ThisHere>
. Nexus will register a specific enqueuer for that type which then iterates over all the generated queues of that type, calls the ShouldEnqueue
func and enqueues it into the concrete queue if it returns true
.
If you explicitly want to get the enqueuer for a generated queue you'll need to request IEnumerable<IObjectEnqueuer>
from the service provider and then filter out based IObjectEnqueuer.QueueName
.
Other queue operations
If you need any of the other services for a generated queue you'll need to request a list of the non-generic version and filter out based on queue name. So let's say that we want the Swedish queue reader from the language example at the top on this page. We'd do something like this:
var queueReaders = serviceProvider.GetServices<IQueueReader>(); // Or IEnumerable<IQueueReader> in your constructor
var language = "sv";
var swedishQueueReader = queueReaders.Single(q => q.QueueName == "language_" + language);