May I ask if the current version of abp.io framework support to subscribe multiple topic. if not please advice the other possible work around and please let me know you are are planning to add it in your next release.
Thanks
Check the docs before asking a question: https://docs.abp.io/en/commercial/latest/ Check the samples, to see the basic tasks: https://docs.abp.io/en/commercial/latest/samples/index The exact solution to your question may have been answered before, please use the search on the homepage.
If you're creating a bug/problem report, please include followings:
- ABP Framework version: vX.X.X
- UI type: Angular / MVC / Blazor
- DB provider: EF Core / MongoDB
- Tiered (MVC) or Identity Server Separated (Angular): yes / no
- Exception message and stack trace:
- Steps to reproduce the issue:"
3 Answer(s)
-
0
Hi,
Can you explain it in detail? thanks.
-
0
-
0
Hi,
I'm sorry to say for the current design, it's not possible, Kafka is different from RabbitMQ, it does not have Exchange and routeing keys.
But Kafka consumers can subscribe to multiple topics, you can try this:
[ExposeServices(typeof(KafkaMessageConsumer), typeof(IKafkaMessageConsumer))] public class MyKafkaMessageConsumer : KafkaMessageConsumer { private static readonly PropertyInfo ConsumerProperty; static MyKafkaMessageConsumer() { var type = typeof(KafkaMessageConsumer); ConsumerProperty = type.GetProperty("Consumer", BindingFlags.Instance | BindingFlags.NonPublic); } public MyKafkaMessageConsumer(IConsumerPool consumerPool, IExceptionNotifier exceptionNotifier, IOptions<AbpKafkaOptions> options, IProducerPool producerPool, AbpAsyncTimer timer) : base(consumerPool, exceptionNotifier, options, producerPool, timer) { } protected override void Consume() { ConsumerProperty.SetValue(this, ConsumerPool.Get(GroupId, ConnectionName)); Task.Factory.StartNew(async () => { Consumer.Subscribe(new []{ "MyTopicName", "MyTopicName2",.....}); while (true) { try { var consumeResult = Consumer.Consume(); if (consumeResult.IsPartitionEOF) { continue; } await HandleIncomingMessage(consumeResult); } catch (ConsumeException ex) { Logger.LogException(ex, LogLevel.Warning); await ExceptionNotifier.NotifyAsync(ex, logLevel: LogLevel.Warning); } } }, TaskCreationOptions.LongRunning); } }