Hi, Is this possible to add the prefix to all rabbitmq message before send and verify the event name contains that prefix before processing in general. For example I have 200 rabbitmq event at the moments, I want to add the prefix NewEvent for every message before send without changing the EventName in the ETO class, And in the IDistributedEventHandler, before processing can I check the message contains the prefix NewEvent in event name or not? The reason why I need that is because we're in processing of migrate one service to new architure and clone the ETO class seen not the good idea.
1 Answer(s)
-
0
Hi,
I investigated how system works and found the following points.
The following method is called from everywhere whenever event name required: https://github.com/abpframework/abp/blob/97e10cc7b2b849b0aebf2aa00881616d036b0af1/framework/src/Volo.Abp.EventBus.Abstractions/Volo/Abp/EventBus/EventNameAttribute.cs#L22 Since this is a static method, it cannot be overriden in your project.
Another option seems like overriding
RabbitMqDistributedEventBus
in your project. Here is the original source code of that class: https://github.com/abpframework/abp/blob/rel-9.1/framework/src/Volo.Abp.EventBus.RabbitMQ/Volo/Abp/EventBus/RabbitMq/RabbitMqDistributedEventBus.csusing Microsoft.Extensions.Options; using RabbitMQ.Client; using Volo.Abp; using Volo.Abp.DependencyInjection; using Volo.Abp.EventBus; using Volo.Abp.EventBus.Distributed; using Volo.Abp.EventBus.Local; using Volo.Abp.EventBus.RabbitMq; using Volo.Abp.Guids; using Volo.Abp.MultiTenancy; using Volo.Abp.RabbitMQ; using Volo.Abp.Timing; using Volo.Abp.Tracing; using Volo.Abp.Uow; namespace MyCompanyName.MyApplicationName; [Dependency(ReplaceServices = true)] [ExposeServices(typeof(IDistributedEventBus), typeof(RabbitMqDistributedEventBus), typeof(IRabbitMqDistributedEventBus))] public class MyRabbitMqDistributedEventBus : RabbitMqDistributedEventBus { public MyRabbitMqDistributedEventBus( IOptions<AbpRabbitMqEventBusOptions> options, IConnectionPool connectionPool, IRabbitMqSerializer serializer, IServiceScopeFactory serviceScopeFactory, IOptions<AbpDistributedEventBusOptions> distributedEventBusOptions, IRabbitMqMessageConsumerFactory messageConsumerFactory, ICurrentTenant currentTenant, IUnitOfWorkManager unitOfWorkManager, IGuidGenerator guidGenerator, IClock clock, IEventHandlerInvoker eventHandlerInvoker, ILocalEventBus localEventBus, ICorrelationIdProvider correlationIdProvider) : base(options, connectionPool, serializer, serviceScopeFactory, distributedEventBusOptions, messageConsumerFactory, currentTenant, unitOfWorkManager, guidGenerator, clock, eventHandlerInvoker, localEventBus, correlationIdProvider) { } public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) { var handlerFactories = GetOrCreateHandlerFactories(eventType); if (factory.IsInFactories(handlerFactories)) { return NullDisposable.Instance; } handlerFactories.Add(factory); if (handlerFactories.Count == 1) { // Here we are registering the consumer for the first time. Consumer.BindAsync( "SomePrefix_" + EventNameAttribute.GetNameOrDefault(eventType)); } return new EventHandlerFactoryUnregistrar(this, eventType, factory); } protected override Task PublishAsync(IModel channel, string eventName, byte[] body, Dictionary<string, object>? headersArguments = null, Guid? eventId = null, string? correlationId = null) { eventName += "SomePrefix_" + eventName; return base.PublishAsync(channel, eventName, body, headersArguments, eventId, correlationId); } private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType) { return HandlerFactories.GetOrAdd( eventType, type => { var eventName = EventNameAttribute.GetNameOrDefault(type); EventTypes.GetOrAdd(eventName, eventType); return new List<IEventHandlerFactory>(); } ); } }
In that case, you'll override this service both consumer & publisher projects to make consistency.