Open Closed

Automaticly add prefix to rabbitmq message #9292


User avatar
0
nguyenngoc.son created

Hi, Is this possible to add the prefix to all rabbitmq message before send and verify the event name contains that prefix before processing in general. For example I have 200 rabbitmq event at the moments, I want to add the prefix NewEvent for every message before send without changing the EventName in the ETO class, And in the IDistributedEventHandler, before processing can I check the message contains the prefix NewEvent in event name or not? The reason why I need that is because we're in processing of migrate one service to new architure and clone the ETO class seen not the good idea.


1 Answer(s)
  • User Avatar
    0
    enisn created
    Support Team .NET Developer

    Hi,

    I investigated how system works and found the following points.

    The following method is called from everywhere whenever event name required: https://github.com/abpframework/abp/blob/97e10cc7b2b849b0aebf2aa00881616d036b0af1/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/EventNameAttribute.cs#L22 Since this is a static method, it cannot be overriden in your project.

    Another option seems like overriding RabbitMqDistributedEventBus in your project. Here is the original source code of that class: https://github.com/abpframework/abp/blob/rel-9.1/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.cs

    using Microsoft.Extensions.Options;
    using RabbitMQ.Client;
    using Volo.Abp;
    using Volo.Abp.DependencyInjection;
    using Volo.Abp.EventBus;
    using Volo.Abp.EventBus.Distributed;
    using Volo.Abp.EventBus.Local;
    using Volo.Abp.EventBus.RabbitMq;
    using Volo.Abp.Guids;
    using Volo.Abp.MultiTenancy;
    using Volo.Abp.RabbitMQ;
    using Volo.Abp.Timing;
    using Volo.Abp.Tracing;
    using Volo.Abp.Uow;
    
    namespace MyCompanyName.MyApplicationName;
    
    [Dependency(ReplaceServices = true)]
    [ExposeServices(typeof(IDistributedEventBus), typeof(RabbitMqDistributedEventBus), typeof(IRabbitMqDistributedEventBus))]
    public class MyRabbitMqDistributedEventBus : RabbitMqDistributedEventBus
    {
        public MyRabbitMqDistributedEventBus(
            IOptions<AbpRabbitMqEventBusOptions> options,
            IConnectionPool connectionPool,
            IRabbitMqSerializer serializer,
            IServiceScopeFactory serviceScopeFactory,
            IOptions<AbpDistributedEventBusOptions> distributedEventBusOptions,
            IRabbitMqMessageConsumerFactory messageConsumerFactory,
            ICurrentTenant currentTenant,
            IUnitOfWorkManager unitOfWorkManager,
            IGuidGenerator guidGenerator,
            IClock clock,
            IEventHandlerInvoker eventHandlerInvoker,
            ILocalEventBus localEventBus,
            ICorrelationIdProvider correlationIdProvider) : base(options, connectionPool, serializer, serviceScopeFactory, distributedEventBusOptions, messageConsumerFactory, currentTenant, unitOfWorkManager, guidGenerator, clock, eventHandlerInvoker, localEventBus, correlationIdProvider)
        {
        }
    
        public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
        {
            var handlerFactories = GetOrCreateHandlerFactories(eventType);
    
            if (factory.IsInFactories(handlerFactories))
            {
                return NullDisposable.Instance;
            }
    
            handlerFactories.Add(factory);
    
            if (handlerFactories.Count == 1)
            {
                // Here we are registering the consumer for the first time.
                Consumer.BindAsync( "SomePrefix_" + EventNameAttribute.GetNameOrDefault(eventType));
            }
    
            return new EventHandlerFactoryUnregistrar(this, eventType, factory);
        }
    
        protected override Task PublishAsync(IModel channel, string eventName, byte[] body, Dictionary<string, object>? headersArguments = null, Guid? eventId = null, string? correlationId = null)
        {
            eventName += "SomePrefix_" + eventName;
            return base.PublishAsync(channel, eventName, body, headersArguments, eventId, correlationId);
        }
    
        private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType)
        {
            return HandlerFactories.GetOrAdd(
                eventType,
                type =>
                {
                    var eventName = EventNameAttribute.GetNameOrDefault(type);
                    EventTypes.GetOrAdd(eventName, eventType);
                    return new List<IEventHandlerFactory>();
                }
            );
        }
    }
    

    In that case, you'll override this service both consumer & publisher projects to make consistency.

Boost Your Development
ABP Live Training
Packages
See Trainings
Mastering ABP Framework Book
The Official Guide
Mastering
ABP Framework
Learn More
Mastering ABP Framework Book
Made with ❤️ on ABP v9.3.0-preview. Updated on June 13, 2025, 11:37