-
ABP Framework version: v9.0.0
-
UI Type: Blazor Server
-
Database System: SQL Server
-
Tiered (for MVC) or Auth Server Separated (for Angular): yes
Hi I need to have more threads that consume same queue on rabbitmq. It's no important the order of processing messages in queue, but only this must be fast as it as possibile.
I see that on RabbitMq client library there is a parameter on ConnectionFactory ConsumerDispatchConcurrency that enable multithread consumer.
This feature require that messages will be consumed using IAsyncBasicConsumer.
Abp in AbpRabbitMqModule use AsyncEventingBasicConsumer.
Set parameter ConsumerDispatchConcurrency greater then 1 on AbpRabbitMqModule is possible and enable multithread consumer?
Many thanks.
Marco
6 Answer(s)
-
0
Hi,
You can try to override the
RabbitMqDistributedEventBus
and setConsumerDispatchConcurrency
For example:
[Dependency(ReplaceServices = true)] [ExposeServices(typeof(IDistributedEventBus), typeof(RabbitMqDistributedEventBus), typeof(IRabbitMqDistributedEventBus))] public class MyRabbitMqDistributedEventBus : RabbitMqDistributedEventBus { protected List<IRabbitMqMessageConsumer> Consumers { get; private set; } = default!; public override Initialize() { // create consumers Consumers = new List.... for(var i=0; i<3; i++) { Consumers.Add(MessageConsumerFactory.Create( new ExchangeDeclareConfiguration( AbpRabbitMqEventBusOptions.ExchangeName, type: AbpRabbitMqEventBusOptions.GetExchangeTypeOrDefault(), durable: true, arguments: AbpRabbitMqEventBusOptions.ExchangeArguments ), new QueueDeclareConfiguration( AbpRabbitMqEventBusOptions.ClientName, durable: true, exclusive: false, autoDelete: false, prefetchCount: AbpRabbitMqEventBusOptions.PrefetchCount, arguments: AbpRabbitMqEventBusOptions.QueueArguments ), AbpRabbitMqEventBusOptions.ConnectionName ); ) } foreach(var consumer in Consumers) { consumer.OnMessageReceived(ProcessEventAsync); } SubscribeHandlers(AbpDistributedEventBusOptions.Handlers); } public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { var handlerFactories = GetOrCreateHandlerFactories(eventType); if (factory.IsInFactories(handlerFactories)) { return NullDisposable.Instance; } handlerFactories.Add(factory); if (handlerFactories.Count == 1) //TODO: Multi-threading! { foreach(var consumer in Consumers) { Consumer.BindAsync(EventNameAttribute.GetNameOrDefault(eventType)); } } return new EventHandlerFactoryUnregistrar(this, eventType, factory); } }
-
0
Hi. thanks,
In your example you refer for multi consumer for EventBus.
And for multi consumer for backgroud job using RabbitMq. it's enought to set ConsumerDispatchConcurrency using this example:[Dependency(ReplaceServices = true)] public class MyAbpRabbitMqModule : AbpRabbitMqModule { public override void ConfigureServices(ServiceConfigurationContext context) { var configuration = context.Services.GetConfiguration(); Configure<AbpRabbitMqOptions>(configuration.GetSection("RabbitMQ")); Configure<AbpRabbitMqOptions>(options => { foreach (var connectionFactory in options.Connections.Values) { connectionFactory.DispatchConsumersAsync = true; connectionFactory.ConsumerDispatchConcurrency = 3; // add this line for configure } }); } }
<br>
<br> -
0
Hi,
I think you need to create multi-consumers
-
0
Hi, looking inside RabbitMq client libray, setting
ConsumerDispatchConcurrency
>1 generate a Thread pool by itself, according with this documentation:
https://rabbitmq.github.io/rabbitmq-dotnet-client/api/RabbitMQ.Client.ConnectionFactory.html#RabbitMQ_Client_ConnectionFactory_ConsumerDispatchConcurrency
So seems usingIAsyncBasicConsumer
the library made all.internal ConsumerDispatcherChannelBase(Impl.Channel channel, ushort concurrency) { _channel = channel; _concurrency = concurrency; var channelOpts = new System.Threading.Channels.UnboundedChannelOptions { SingleReader = _concurrency == 1, SingleWriter = false, AllowSynchronousContinuations = false }; var workChannel = System.Threading.Channels.Channel.CreateUnbounded<WorkStruct>(channelOpts); _reader = workChannel.Reader; _writer = workChannel.Writer; Func<Task> loopStart = ProcessChannelAsync; if (_concurrency == 1) { _worker = Task.Run(loopStart); } else { var tasks = new Task[_concurrency]; for (int i = 0; i < _concurrency; i++) { tasks[i] = Task.Run(loopStart); } _worker = Task.WhenAll(tasks); } }
-
0
You can give it a try to check consume efficiency
-
0
Thanks.
Marco