Open Closed

DistributedEventBus doesn't catch the exception when it can't connect to Confluence Kafka #6022


User avatar
0
paykoolbackend created

Hello,

We are using DistributedEventBus with Confluent Kafka.

We have wrapped _distributedEventBus.PublishAsync(event) in a try-catch block, but it doesn't catch the exception when it can't connect to Confluence Kafka. (Following your suggestion, use try-catch in https://support.abp.io/QA/Questions/3079/IDistributedEventBus-throw-exception-when-it-can-not-connect-to-Kafka)

try
{
    await _distributedEventBus.PublishAsync(eventData);
}
catch (Exception ex)
{
    _logger.LogError(ex.Message);
    throw;
}

Additionally, we need to wait for 5 more minutes, then application throw exception as "An internal error occurred during your request!". For the request, it took too much time.

Q1: Why can't we catch the exception? Q2: How to avoid waiting for too long?

  • ABP Framework version: v7.1.1
  • UI Type: Flutter
  • Database System: EF Core (SQL Server) / MongoDB
  • Tiered (for MVC) or Auth Server Separated (for Angular):
  • Exception message and full stack trace:
  • Steps to reproduce the issue:

16 Answer(s)
  • User Avatar
    0
    liangshiwei created
    Support Team Fullstack Developer

    Hi,

    First:

    The event is published in a transaction, it will not be published immediately, but after the transaction is completed, if you want to catch exceptions, you can try it.

    try
    {
        await _distributedEventBus.PublishAsync(new MyEventData()
        {
            Name = "test"
        }, onUnitOfWorkComplete:false);
        
        //Or
        //await UnitOfWorkManager.Current.CompleteAsync();
    }
    catch(Exception e)
    {
        Logger.LogInformation("PublishAsync error...............: " + e.StackTrace);
    }
    

    Second:

    By default, the message time out is 300000ms: https://docs.confluent.io/platform/current/clients/confluent-kafka-dotnet/_site/api/Confluent.Kafka.ProducerConfig.html#Confluent_Kafka_ProducerConfig_MessageTimeoutMs

    This means that it will wait for the end of the timeout to throw an exception.

    You can config the timeout:

    Configure<AbpKafkaOptions>(options =>
    {
        options.ConfigureProducer = config =>
        {
            config.MessageTimeoutMs = 60;
        };
    });
    
  • User Avatar
    0
    paykoolbackend created

    Thanks, let me try it.

  • User Avatar
    0
    liangshiwei created
    Support Team Fullstack Developer

    ok

  • User Avatar
    0
    paykoolbackend created

    Test OK.

    There is another question: Can I configure ProducerConfig and ConsumerConfig using the appsettings.json file? Or can they only be configured using the Options Classes? https://docs.abp.io/en/abp/latest/Distributed-Event-Bus-Kafka-Integration

  • User Avatar
    0
    liangshiwei created
    Support Team Fullstack Developer

    Hi,

    Or can they only be configured using the Options Classes?

    Yes

  • User Avatar
    0
    paykoolbackend created

    ok, thanks

  • User Avatar
    0
    liangshiwei created
    Support Team Fullstack Developer

    :)

  • User Avatar
    0
    paykoolbackend created

    After our discussion, modifying the MessageTimeoutMs is not the best option for us because it's difficult to set an appropriate timeout.

    We would like to handle the issue of not being able to connect to the Kafka server and do not want the API to block. Currently, we have configured the settings as follows:

    Configure<AbpKafkaOptions>(options =>
    {
        options.ConfigureProducer = config =>
        {
            config.EnableIdempotence = true;
            config.MessageSendMaxRetries = 3;
            config.RetryBackoffMs = 1000;
        };
    });
    

    After the retries are exhausted, we still need to wait for the MessageTimeoutMs time.(default = 300000ms) Is it possible to prioritize the retry time, so that an error is reported when the retries are exhausted?

  • User Avatar
    0
    liangshiwei created
    Support Team Fullstack Developer

    Hi,

    This has nothing to do with ABP. :). This is the behavior of Kafka API

  • User Avatar
    0
    paykoolbackend created

    Got it, thanks.

    In the event of connection issues with the Kafka server, what would you recommend doing?

  • User Avatar
    0
    liangshiwei created
    Support Team Fullstack Developer

    Hi,

    You should make sure the Kafka server is highly available.

    When there is a problem with the connection it means there are serious problems with your application or Kafka server, and you must fix them. there is no other way.

  • User Avatar
    0
    paykoolbackend created

    The strategies mentioned above are things we will do, but I would like to know if ABP has any mechanisms to handle sudden Kafka server connection failures, so that the API can operate smoothly without blocking.

  • User Avatar
    0
    liangshiwei created
    Support Team Fullstack Developer

    Hi,

    No, you must do it yourself.

  • User Avatar
    0
    liangshiwei created
    Support Team Fullstack Developer

    You can consider using the Outbox/Inbox pattern, It can ensure that event data will not be lost when the Kafka server is in error.

    https://docs.abp.io/en/abp/latest/Distributed-Event-Bus#outbox-inbox-for-transactional-events

  • User Avatar
    0
    paykoolbackend created

    OK, thank you.

  • User Avatar
    0
    liangshiwei created
    Support Team Fullstack Developer

    :)

Made with ❤️ on ABP v9.2.0-preview. Updated on January 08, 2025, 14:09