Hello,
We are using DistributedEventBus with Confluent Kafka.
We have wrapped _distributedEventBus.PublishAsync(event) in a try-catch block, but it doesn't catch the exception when it can't connect to Confluence Kafka. (Following your suggestion, use try-catch in https://support.abp.io/QA/Questions/3079/IDistributedEventBus-throw-exception-when-it-can-not-connect-to-Kafka)
try
{
await _distributedEventBus.PublishAsync(eventData);
}
catch (Exception ex)
{
_logger.LogError(ex.Message);
throw;
}
Additionally, we need to wait for 5 more minutes, then application throw exception as "An internal error occurred during your request!". For the request, it took too much time.
Q1: Why can't we catch the exception? Q2: How to avoid waiting for too long?
- ABP Framework version: v7.1.1
- UI Type: Flutter
- Database System: EF Core (SQL Server) / MongoDB
- Tiered (for MVC) or Auth Server Separated (for Angular):
- Exception message and full stack trace:
- Steps to reproduce the issue:
16 Answer(s)
-
0
Hi,
First:
The event is published in a transaction, it will not be published immediately, but after the transaction is completed, if you want to catch exceptions, you can try it.
try { await _distributedEventBus.PublishAsync(new MyEventData() { Name = "test" }, onUnitOfWorkComplete:false); //Or //await UnitOfWorkManager.Current.CompleteAsync(); } catch(Exception e) { Logger.LogInformation("PublishAsync error...............: " + e.StackTrace); }
Second:
By default, the message time out is
300000ms
: https://docs.confluent.io/platform/current/clients/confluent-kafka-dotnet/_site/api/Confluent.Kafka.ProducerConfig.html#Confluent_Kafka_ProducerConfig_MessageTimeoutMsThis means that it will wait for the end of the timeout to throw an exception.
You can config the timeout:
Configure<AbpKafkaOptions>(options => { options.ConfigureProducer = config => { config.MessageTimeoutMs = 60; }; });
-
0
Thanks, let me try it.
-
0
ok
-
0
Test OK.
There is another question: Can I configure ProducerConfig and ConsumerConfig using the appsettings.json file? Or can they only be configured using the Options Classes? https://docs.abp.io/en/abp/latest/Distributed-Event-Bus-Kafka-Integration
-
0
Hi,
Or can they only be configured using the Options Classes?
Yes
-
0
ok, thanks
-
0
:)
-
0
After our discussion, modifying the MessageTimeoutMs is not the best option for us because it's difficult to set an appropriate timeout.
We would like to handle the issue of not being able to connect to the Kafka server and do not want the API to block. Currently, we have configured the settings as follows:
Configure<AbpKafkaOptions>(options => { options.ConfigureProducer = config => { config.EnableIdempotence = true; config.MessageSendMaxRetries = 3; config.RetryBackoffMs = 1000; }; });
After the retries are exhausted, we still need to wait for the MessageTimeoutMs time.(default = 300000ms) Is it possible to prioritize the retry time, so that an error is reported when the retries are exhausted?
-
0
Hi,
This has nothing to do with ABP. :). This is the behavior of Kafka API
-
0
Got it, thanks.
In the event of connection issues with the Kafka server, what would you recommend doing?
-
0
Hi,
You should make sure the Kafka server is highly available.
When there is a problem with the connection it means there are serious problems with your application or Kafka server, and you must fix them. there is no other way.
-
0
The strategies mentioned above are things we will do, but I would like to know if ABP has any mechanisms to handle sudden Kafka server connection failures, so that the API can operate smoothly without blocking.
-
0
Hi,
No, you must do it yourself.
-
0
You can consider using the Outbox/Inbox pattern, It can ensure that event data will not be lost when the Kafka server is in error.
https://docs.abp.io/en/abp/latest/Distributed-Event-Bus#outbox-inbox-for-transactional-events
-
0
OK, thank you.
-
0
:)