Open Closed

Distributed-Event-Bus-Azure-Integration-with-outbox-pattern #7116


User avatar
0
mariovh created
  • ABP Framework version: v7.4.4
  • UI Type: Blazor WASM
  • Database System: EF Core (SQL Server)

Hi, Im using distributed event bus with outbox pattern and Azure service bus provider. I have 3 requirements that I'm struggling to implement with AzureDistributedEventBus.

The first is the need to publish to different topics based on the event name of the Eto:

[EventName("projects")] public class ProjectEto

[EventName("campaigns")] public class CampaignEto

The second is whether there's a way to override part of the message publishing with AzureDistributedEventBus to achieve the following: With the default implementation of the outbox pattern, when publishing the message to Azure Service Bus, override the message publishing to choose which topic the message corresponds to. Additionally, be able to, if the topic or subscription don't exist, create them programmatically using ServiceBusAdministrationClient.CreateTopicAsync from Azure.Messaging.ServiceBus.Administration.

The last one, because is posible to use outbox pattern with no Sql database/context, so my last question is if i can configure the pattern to not delete the rows in AbpEventOutbox table after publishing on azure and mark as published instead of delete.

Thank you!


14 Answer(s)
  • User Avatar
    0
    liangshiwei created
    Support Team Fullstack Developer

    Hi,

    For these purposes, you need to customize AzureDistributedEventBus

    The first is the need to publish to different topics based on the event name of the Eto: The second is whether there's a way to override part of the message publishing with AzureDistributedEventBus to achieve the following: With the default implementation of the outbox pattern, when publishing the message to Azure Service Bus, override the message publishing to choose which topic the message corresponds to. Additionally, be able to, if the topic or subscription don't exist, create them programmatically using ServiceBusAdministrationClient.CreateTopicAsync from Azure.Messaging.ServiceBus.Administration.

    ABP does not support posting to multiple topics, this is the code:

    https://github.com/abpframework/abp/blob/dev/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs#L70 https://github.com/abpframework/abp/blob/dev/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs#L124 https://github.com/abpframework/abp/blob/dev/framework/src/Volo.Abp.EventBus.Azure/Volo/Abp/EventBus/Azure/AzureDistributedEventBus.cs#L287

    You can override the AzureDistributedEventBus

    Pseudo-code, I didn't test it, just provider an idea

    [Dependency(ReplaceServices = true)]
    [ExposeServices(typeof(IDistributedEventBus), typeof(AzureDistributedEventBus), typeof(MyAzureDistributedEventBus))]
    public class MyAzureDistributedEventBus : AzureDistributedEventBus, ISingletonDependency
    {
        protected Dictionary<string, IAzureServiceBusMessageConsumer> Consumers { get; } = new();
        
        public MyAzureDistributedEventBus(
            IServiceScopeFactory serviceScopeFactory, 
            ICurrentTenant currentTenant,
            IUnitOfWorkManager unitOfWorkManager, 
            IOptions<AbpDistributedEventBusOptions> abpDistributedEventBusOptions,
            IGuidGenerator guidGenerator,
            IClock clock,
            IOptions<AbpAzureEventBusOptions> abpAzureEventBusOptions,
            IAzureServiceBusSerializer serializer,
            IAzureServiceBusMessageConsumerFactory messageConsumerFactory,
            IPublisherPool publisherPool,
            IEventHandlerInvoker eventHandlerInvoker,
            ILocalEventBus localEventBus,
            ICorrelationIdProvider correlationIdProvider) : base(
            serviceScopeFactory, 
            currentTenant, 
            unitOfWorkManager,
            abpDistributedEventBusOptions,
            guidGenerator, 
            clock, 
            abpAzureEventBusOptions,
            serializer,
            messageConsumerFactory,
            publisherPool,
            eventHandlerInvoker,
            localEventBus, 
            correlationIdProvider)
        {
        }
    
        protected async override Task PublishAsync(
            string eventName,
            byte[] body,
            string? correlationId,
            Guid? eventId)
        {
            var topicName = eventName;
            await CheckTopicAndConsumer(topicName);
            
            var message = new ServiceBusMessage(body) { Subject = eventName };
    
            if (message.MessageId.IsNullOrWhiteSpace())
            {
                message.MessageId = (eventId ?? GuidGenerator.Create()).ToString("N");
            }
    
            message.CorrelationId = correlationId;
    
            var publisher = await PublisherPool.GetAsync(
                topicName,
                Options.ConnectionName);
    
            await publisher.SendMessageAsync(message);
        }
    
        public async override Task PublishManyFromOutboxAsync(IEnumerable<OutgoingEventInfo> outgoingEvents, OutboxConfig outboxConfig)
        {
            var outgoingEventArray = outgoingEvents.ToArray();
    
            var eventGroup = outgoingEventArray.GroupBy(x => x.EventName);
            
            foreach (var events in eventGroup)
            {
                var topicName = events.Key;
                await CheckTopicAndConsumer(topicName);
                var publisher = await PublisherPool.GetAsync(
                    topicName,
                    Options.ConnectionName);
    
                using var messageBatch = await publisher.CreateMessageBatchAsync();
    
                foreach (var outgoingEvent in events)
                {
                    var message = new ServiceBusMessage(outgoingEvent.EventData) { Subject = outgoingEvent.EventName };
    
                    if (message.MessageId.IsNullOrWhiteSpace())
                    {
                        message.MessageId = outgoingEvent.Id.ToString();
                    }
    
                    message.CorrelationId = outgoingEvent.GetCorrelationId();
    
                    if (!messageBatch.TryAddMessage(message))
                    {
                        throw new AbpException(
                            "The message is too large to fit in the batch. Set AbpEventBusBoxesOptions.OutboxWaitingEventMaxCount to reduce the number");
                    }
    
                    using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
                    {
                        await TriggerDistributedEventSentAsync(new DistributedEventSent() {
                            Source = DistributedEventSource.Outbox,
                            EventName = outgoingEvent.EventName,
                            EventData = outgoingEvent.EventData
                        });
                    }
                }
    
                await publisher.SendMessagesAsync(messageBatch);
            }
            
        }
    
        private  async Task CheckTopicAndConsumer(string eventName)
        {
            // check topic is exists
            // if not exists, create topic
            // ServiceBusAdministrationClient.CreateTopicAsync(eventName
            
            // check consumer is exists
            
            if(!Consumers.ContainsKey(eventName))
            {
                var consumer = MessageConsumerFactory.CreateMessageConsumer(
                    eventName,
                    Options.SubscriberName,
                    Options.ConnectionName);
                consumer.OnMessageReceived(ProcessEventAsync);
                Consumers.Add(eventName, consumer);
            }
        }
        
        private async Task ProcessEventAsync(ServiceBusReceivedMessage message)
        {
            var eventName = message.Subject;
            if (eventName == null)
            {
                return;
            }
    
            var eventType = EventTypes.GetOrDefault(eventName);
            if (eventType == null)
            {
                return;
            }
    
            var eventData = Serializer.Deserialize(message.Body.ToArray(), eventType);
    
            if (await AddToInboxAsync(message.MessageId, eventName, eventType, eventData, message.CorrelationId))
            {
                return;
            }
    
            using (CorrelationIdProvider.Change(message.CorrelationId))
            {
                await TriggerHandlersDirectAsync(eventType, eventData);
            }
        }
    }
    
  • User Avatar
    0
    liangshiwei created
    Support Team Fullstack Developer

    The last one, because is posible to use outbox pattern with no Sql database/context, so my last question is if i can configure the pattern to not delete the rows in AbpEventOutbox table after publishing on azure and mark as published instead of delete.

    At the moment, it's not possible. If you don't remove it, it will cause performance problems as the data grows.

    But you can consider to create a redundant table to back it up, and when data is deleted, insert the data into the redundant table

  • User Avatar
    0
    mariovh created

    I will try to implement a custom AzureDistributedEventBus following your code.

    Thank you

  • User Avatar
    0
    liangshiwei created
    Support Team Fullstack Developer

    ok

  • User Avatar
    0
    mariovh created

    Hello, works fine but i have doubts about it, following your approach of publishing in a single topic for all events generated by all aggregates. For a second application also using Abp, multitenancy, and the outbox and inbox patterns, which wants to integrate with the first application through Azure Service Bus and consume messages from a certain type of aggregate or event. How is the topology managed in Azure? How does the second application know which of its tenants each message published by the first application corresponds to? What is the standard way that Abp has to communicate applications with Service Bus and the outbox and inbox patterns?

    Thanks

  • User Avatar
    0
    liangshiwei created
    Support Team Fullstack Developer

    Hi,

    How is the topology managed in Azure?

    Sorry, I didn't get it.

    How does the second application know which of its tenants each message published by the first application corresponds to

    If the event is multi-tenant, ABP will automatically switch the current tenant context. https://github.com/abpframework/abp/blob/9c4086792bd09af7ad4cbbeb2610b4641bd709ad/framework/src/Volo.Abp.EventBus/Volo/Abp/EventBus/EventBusBase.cs#L235

    Your event class implement from the IMultiTenant or IEventDataMayHaveTenantId interface.

    What is the standard way that Abp has to communicate applications with Service Bus and the outbox and inbox patterns?

    You can check these documents https://docs.abp.io/en/abp/latest/Distributed-Event-Bus#outbox-inbox-for-transactional-events https://microservices.io/patterns/data/transactional-outbox.html

  • User Avatar
    0
    mariovh created

    Hello,

    What I mean is whether applications by default can only publish to one topic. So, for two applications to communicate through Azure Service Bus, would they both publish and consume all their events from all their aggregates in the same topic? And what I mean is about the structure of Topics, Subscriptions, and Queues in Azure for the communication of both applications.

    Then, regarding multitenancy, I have a question. If I have Application 1 with tenants A and B, and Application 2 with tenants A and B, and the messages published in Azure Service Bus comply with the interfaces you mentioned (IMultitenant or IEventDataMayHaveTenantId), how does Application 2 know which tenant a message published by Application 1 belongs to if the internal IDs of their Tenants in their SaasTenants tables are different in both applications?

    Thanks, and sorry for the confusion.

  • User Avatar
    0
    liangshiwei created
    Support Team Fullstack Developer

    What I mean is whether applications by default can only publish to one topic. So, for two applications to communicate through Azure Service Bus, would they both publish and consume all their events from all their aggregates

    Yes, it is. we have a simple project: https://github.com/abpframework/abp-samples/tree/master/AzureEventBus

    Then, regarding multitenancy, I have a question. If I have Application 1 with tenants A and B, and Application 2 with tenants A and B, and the messages published in Azure Service Bus comply with the interfaces you mentioned (IMultitenant or IEventDataMayHaveTenantId), how does Application 2 know which tenant a message published by Application 1 belongs to if the internal IDs of their Tenants in their SaasTenants tables are different in both applications?

    That won't work, They should use the same SAAS database to make sure the Ids are the same.

  • User Avatar
    0
    mariovh created

    Hello, I have two more questions, Im reviewing class DistributedEventBusBase.cs method protected virtual async Task<bool> AddToOutboxAsync(Type eventType, object eventData) and i cant insert to AbpEventOutbox table values on ExtraProperties column when overriding the method.

    My second question, Im working with google protobuff to publish binary messages on Azure Service Bus, using MyAzureDistributedEventBus from previous questions, and casting Etos to proto generated classes before publishing on Azure Service Bus. I dont know if it is possible to instead of working on distributed events with Etos and IEntityEto<TKey> interface, is possible to work with a custom interface or a IMessage interface from google protobuff protocol.

    Thanks

  • User Avatar
    0
    liangshiwei created
    Support Team Fullstack Developer

    I have two more questions, Im reviewing class DistributedEventBusBase.cs method protected virtual async Task<bool> AddToOutboxAsync(Type eventType, object eventData) and i cant insert to AbpEventOutbox table values on ExtraProperties column when overriding the method.

    I think you can do it without any problem.

     var outgoingEventInfo = new OutgoingEventInfo(
        GuidGenerator.Create(),
        eventName,
        Serialize(eventData),
        Clock.Now
    );
    outgoingEventInfo.ExtraProperties["test"] = "value"
    

    My second question, Im working with google protobuff to publish binary messages on Azure Service Bus, using MyAzureDistributedEventBus from previous questions, and casting Etos to proto generated classes before publishing on Azure Service Bus. I dont know if it is possible to instead of working on distributed events with Etos and IEntityEto<TKey> interface, is possible to work with a custom interface or a IMessage interface from google protobuff protocol.

    I don't know much about google protobuff protocol, you can give it a try.

  • User Avatar
    0
    mariovh created

    Hello, It works with google protobuf instead of EntityEto replacing it on the Mapping.

    Following the documentation example using Etos:

    Configure<AbpDistributedEntityEventOptions>(options =>
    {
        options.AutoEventSelectors.Add<Product>();
        options.EtoMappings.Add<Product, ProductEto>();
    });
    

    Adds a selector to allow to publish the create, update and delete events for the Product entity.'

    Is possible for create and update events map to ProductEto, and for delete events mapping to other Eto, for example DeletedProductEto? Is there a way to configure the module with this aproach?

    Thank you for your help.

  • User Avatar
    0
    liangshiwei created
    Support Team Fullstack Developer

    Hi,

    Unfortunately, It can't be done.

    You can consider creating a local event handler to subscribe to delete events to publish the DeletedProductEto.

    For example:

     public class MyHandler
            : ILocalEventHandler<EntityDeletedEventData<Product>>,
              ITransientDependency
    {
        public async Task HandleEventAsync(
            EntityDeletedEventData<Product> eventData)
        {
            // Publish DeletedProductEto here.
        }
    }
    
  • User Avatar
    0
    mariovh created

    Hello, It works with the handler.

    Now the same issue i had to publish in multiple topics than i have to implement MyAzureDistributedEventBus:AzureDistributedEventBus, i have to implement the receiver/consumer part with inbox pattern, where i need to add to inbox and process messages comming from multiple queues on Azure Service Bus. Is that possible?

    Thank you

  • User Avatar
    0
    liangshiwei created
    Support Team Fullstack Developer

    You can give it a try.

Made with ❤️ on ABP v9.2.0-preview. Updated on January 15, 2025, 12:18