Open Closed

Multi threads consumer for same queue on RabbitMq #8640


User avatar
0
castellazzi created
  • ABP Framework version: v9.0.0

  • UI Type: Blazor Server

  • Database System: SQL Server

  • Tiered (for MVC) or Auth Server Separated (for Angular): yes

Hi I need to have more threads that consume same queue on rabbitmq. It's no important the order of processing messages in queue, but only this must be fast as it as possibile.
I see that on RabbitMq client library there is a parameter on ConnectionFactory ConsumerDispatchConcurrency that enable multithread consumer.
This feature require that messages will be consumed using IAsyncBasicConsumer.
Abp in AbpRabbitMqModule use AsyncEventingBasicConsumer.
Set parameter ConsumerDispatchConcurrency greater then 1 on AbpRabbitMqModule is possible and enable multithread consumer?
Many thanks.
Marco


6 Answer(s)
  • User Avatar
    0
    liangshiwei created
    Support Team Fullstack Developer

    Hi,

    You can try to override the RabbitMqDistributedEventBus and set ConsumerDispatchConcurrency

    For example:

    [Dependency(ReplaceServices = true)]
    [ExposeServices(typeof(IDistributedEventBus), typeof(RabbitMqDistributedEventBus), typeof(IRabbitMqDistributedEventBus))]
    public class MyRabbitMqDistributedEventBus : RabbitMqDistributedEventBus
    {
        protected List<IRabbitMqMessageConsumer> Consumers { get; private set; } = default!;
        
        public override Initialize()
        {
            // create consumers
            Consumers = new List....
            for(var i=0; i<3; i++)
            {
                Consumers.Add(MessageConsumerFactory.Create(
                    new ExchangeDeclareConfiguration(
                        AbpRabbitMqEventBusOptions.ExchangeName,
                        type: AbpRabbitMqEventBusOptions.GetExchangeTypeOrDefault(),
                        durable: true,
                        arguments: AbpRabbitMqEventBusOptions.ExchangeArguments
                    ),
                    new QueueDeclareConfiguration(
                        AbpRabbitMqEventBusOptions.ClientName,
                        durable: true,
                        exclusive: false,
                        autoDelete: false,
                        prefetchCount: AbpRabbitMqEventBusOptions.PrefetchCount,
                        arguments: AbpRabbitMqEventBusOptions.QueueArguments
                    ),
                    AbpRabbitMqEventBusOptions.ConnectionName
                );
              )
            }
            
            foreach(var consumer in Consumers)
            {
                consumer.OnMessageReceived(ProcessEventAsync);
            }        
    
            SubscribeHandlers(AbpDistributedEventBusOptions.Handlers);
        }
        
        public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
        {
            var handlerFactories = GetOrCreateHandlerFactories(eventType);
    
            if (factory.IsInFactories(handlerFactories))
            {
                return NullDisposable.Instance;
            }
    
            handlerFactories.Add(factory);
    
            if (handlerFactories.Count == 1) //TODO: Multi-threading!
            {
                foreach(var consumer in Consumers)
                {
                    Consumer.BindAsync(EventNameAttribute.GetNameOrDefault(eventType));
                }  
            }
            return new EventHandlerFactoryUnregistrar(this, eventType, factory);
        } 
    }
    
  • User Avatar
    0
    castellazzi created

    Hi. thanks,
    In your example you refer for multi consumer for EventBus.
    And for multi consumer for backgroud job using RabbitMq. it's enought to set ConsumerDispatchConcurrency using this example:

    [Dependency(ReplaceServices = true)]
    public class MyAbpRabbitMqModule : AbpRabbitMqModule
    {
        public override void ConfigureServices(ServiceConfigurationContext context)
        {    
            var configuration = context.Services.GetConfiguration();    
            Configure<AbpRabbitMqOptions>(configuration.GetSection("RabbitMQ"));    
            Configure<AbpRabbitMqOptions>(options =>    
            {        
                foreach (var connectionFactory in options.Connections.Values)        
                {
                    connectionFactory.DispatchConsumersAsync = true;                 
                    connectionFactory.ConsumerDispatchConcurrency = 3;  // add this line for configure       
                }
        
             });
        }
    }
    

    <br>
    <br>

  • User Avatar
    0
    liangshiwei created
    Support Team Fullstack Developer

    Hi,

    I think you need to create multi-consumers

  • User Avatar
    0
    castellazzi created

    Hi, looking inside RabbitMq client libray, setting ConsumerDispatchConcurrency >1 generate a Thread pool by itself, according with this documentation:
    https://rabbitmq.github.io/rabbitmq-dotnet-client/api/RabbitMQ.Client.ConnectionFactory.html#RabbitMQ_Client_ConnectionFactory_ConsumerDispatchConcurrency
    So seems using IAsyncBasicConsumer the library made all.

     internal ConsumerDispatcherChannelBase(Impl.Channel channel, ushort concurrency)
     {
         _channel = channel;
         _concurrency = concurrency;
    
         var channelOpts = new System.Threading.Channels.UnboundedChannelOptions
         {
             SingleReader = _concurrency == 1,
             SingleWriter = false,
             AllowSynchronousContinuations = false
         };
    
         var workChannel = System.Threading.Channels.Channel.CreateUnbounded<WorkStruct>(channelOpts);
         _reader = workChannel.Reader;
         _writer = workChannel.Writer;
    
         Func<Task> loopStart = ProcessChannelAsync;
         if (_concurrency == 1)
         {
             _worker = Task.Run(loopStart);
         }
         else
         {
             var tasks = new Task[_concurrency];
             for (int i = 0; i < _concurrency; i++)
             {
                 tasks[i] = Task.Run(loopStart);
             }
             _worker = Task.WhenAll(tasks);
         }
     }
    
  • User Avatar
    0
    liangshiwei created
    Support Team Fullstack Developer

    You can give it a try to check consume efficiency

  • User Avatar
    0
    castellazzi created

    Thanks.
    Marco

Made with ❤️ on ABP v9.2.0-preview. Updated on January 23, 2025, 12:17