Open Closed

Kafka subscribe into multiple topics #3412


User avatar
0
tahseelit1 created

May I ask if the current version of abp.io framework support to subscribe multiple topic. if not please advice the other possible work around and please let me know you are are planning to add it in your next release.

Thanks

Check the docs before asking a question: https://docs.abp.io/en/commercial/latest/ Check the samples, to see the basic tasks: https://docs.abp.io/en/commercial/latest/samples/index The exact solution to your question may have been answered before, please use the search on the homepage.

If you're creating a bug/problem report, please include followings:

  • ABP Framework version: vX.X.X
  • UI type: Angular / MVC / Blazor
  • DB provider: EF Core / MongoDB
  • Tiered (MVC) or Identity Server Separated (Angular): yes / no
  • Exception message and stack trace:
  • Steps to reproduce the issue:"

3 Answer(s)
  • User Avatar
    0
    liangshiwei created
    Support Team Fullstack Developer

    Hi,

    Can you explain it in detail? thanks.

  • User Avatar
    0
    tahseelit1 created

    i want to implement KafkaDistributedEventBus one microservice subscribe into multiple topics

  • User Avatar
    0
    liangshiwei created
    Support Team Fullstack Developer

    Hi,

    I'm sorry to say for the current design, it's not possible, Kafka is different from RabbitMQ, it does not have Exchange and routeing keys.

    But Kafka consumers can subscribe to multiple topics, you can try this:

    [ExposeServices(typeof(KafkaMessageConsumer), typeof(IKafkaMessageConsumer))]
    public class MyKafkaMessageConsumer : KafkaMessageConsumer
    {
        private static readonly PropertyInfo ConsumerProperty;
    
        static MyKafkaMessageConsumer()
        {
            var type = typeof(KafkaMessageConsumer);
            ConsumerProperty = type.GetProperty("Consumer", BindingFlags.Instance | BindingFlags.NonPublic);
        }
    
        public MyKafkaMessageConsumer(IConsumerPool consumerPool, IExceptionNotifier exceptionNotifier,
            IOptions<AbpKafkaOptions> options, IProducerPool producerPool, AbpAsyncTimer timer) : base(consumerPool,
            exceptionNotifier, options, producerPool, timer)
        {
        }
    
        protected override void Consume()
        {
            ConsumerProperty.SetValue(this, ConsumerPool.Get(GroupId, ConnectionName));
    
            Task.Factory.StartNew(async () =>
            {
                Consumer.Subscribe(new []{ "MyTopicName", "MyTopicName2",.....});
    
                while (true)
                {
                    try
                    {
                        var consumeResult = Consumer.Consume();
    
                        if (consumeResult.IsPartitionEOF)
                        {
                            continue;
                        }
    
                        await HandleIncomingMessage(consumeResult);
                    }
                    catch (ConsumeException ex)
                    {
                        Logger.LogException(ex, LogLevel.Warning);
                        await ExceptionNotifier.NotifyAsync(ex, logLevel: LogLevel.Warning);
                    }
                }
            }, TaskCreationOptions.LongRunning);
        }
    }
    
Made with ❤️ on ABP v9.2.0-preview. Updated on January 08, 2025, 14:09