Processing directly from Kafka topics
Nexus allows you to use Kafka as an external pending storage, which means that Nexus reads and processes messages directly from Kafka without first storing them in an SQL backed Nexus queue.
Use it by first installing CommerceMind.Nexus.Kafka and then configuring the external storage:
builder
.AddNexus()
.AddKafkaExternalPendingStorage();
And then on your queue:
[Queue("export_order", StorageMode = QueueStorageMode.ExternalPending)]
public class ExportOrderQueueMessage : IQueueMessageWithId
{ }
Read more about external pending storage here.
Enqueueing from Kafka topics
Nexus has built-in support to use the very popular Apache Kafka platform for both instance events as well as taking messages from a Kafka topic and enqueueing into a Nexus queue.
In order to use this you need to install the NuGet package for RabbitMQ called CommerceMind.Nexus.Kafka.
You set it up like this:
builder.Services
.AddNexus()
.AddQueues()
.AddKafka(options =>
{
options.ConsumerConfig.BootstrapServers = "...";
options.ConsumerConfig.GroupId = "...";
})
.AddNexusKafkaQueues(options =>
{
// This allows you to control things like which status or when messages are processed
options.EnqueueContextFactory = message => new EnqueueContext { Priority = message.Id != null ? 10 : 0 }
})
.EnqueueFrom<KafkaQueueMessage>("kafka-topic");
With this examples all messages posted to the kafka-topic will get placed in the Nexus queue for KafkaQueueMessage.
Instance events
You can also use Kafka as the way to send signals between Nexus instances in a scaled out environment.
builder.Services
.AddNexus()
.AddQueues()
.AddKafka(options =>
{
options.ConsumerConfig.BootstrapServers = "...";
options.ConsumerConfig.GroupId = "...";
options.ProducerConfig.BootstrapServers = "...";
})
.AddKafkaInstanceEvents(options =>
{
// Change this to something unique if you're using the same Kafka instance for multiple
// different Nexus applications
options.TopicName = "nexus_instance_events";
});