Open Closed

Distributed Event Outbox (RabbitMQ) not work #10359


User avatar
0
duyan11110 created

Hi,

I setup Event Outbox as below: EntityFrameworkCoreModule

[ConnectionStringName(JobManagementServiceDbProperties.ConnectionStringName)]
public class JobManagementServiceDbContext : AbpDbContext<JobManagementServiceDbContext>,
    IHasEventInbox,
    IHasEventOutbox
{
	public JobManagementServiceDbContext(DbContextOptions<JobManagementServiceDbContext> options)
        : base(options)
    {

    }

    public DbSet<IncomingEventRecord> IncomingEvents { get; set; }
    public DbSet<OutgoingEventRecord> OutgoingEvents { get; set; }

    protected override void OnModelCreating(ModelBuilder builder)
    {
        base.OnModelCreating(builder);

        builder.ConfigureEventInbox();
        builder.ConfigureEventOutbox();
        builder.ConfigureJobManagementService();
    }
}

HostModule

[DependsOn(
    typeof(AbpAspNetCoreSerilogModule),
    typeof(AbpSwashbuckleModule),
    typeof(AbpBackgroundJobsRabbitMqModule),
    typeof(AbpAspNetCoreMultiTenancyModule),
    typeof(AbpDistributedLockingModule),
    typeof(AbpEventBusRabbitMqModule),
    typeof(AbpCachingStackExchangeRedisModule),
private void ConfigureDistributedEventBus()
{
    Configure<AbpDistributedEventBusOptions>(options =>
    {
        options.Inboxes.Configure(config =>
        {
            config.UseDbContext<JobManagementServiceDbContext>();
        });

        options.Outboxes.Configure(config =>
        {
            //config.IsSendingEnabled = false;
            //config.Selector = type => type == typeof(JobOperationEto) || type == typeof(JobConfigurationChangedEto);
            config.UseDbContext<JobManagementServiceDbContext>();
        });
    });
}

I published event as below:

[Authorize(JobManagementServicePermissions.JobConfigurations.Default)]
public class JobConfigurationAppService :
    CrudAppService<JobConfiguration, JobConfigurationDto, Guid, PagedAndSortedResultRequestDto, CreateUpdateJobConfigurationDto>,
    IJobConfigurationAppService
{
    private readonly IDistributedEventBus _distributedEventBus;
    private readonly IUnitOfWorkManager _unitOfWorkManager;

    public JobConfigurationAppService(
        IRepository<JobConfiguration, Guid> repository,
        IDistributedEventBus distributedEventBus,
        IUnitOfWorkManager unitOfWorkManager)
        : base(repository)
    {
        _distributedEventBus = distributedEventBus;
        _unitOfWorkManager = unitOfWorkManager;
    }

    [Authorize(JobManagementServicePermissions.JobConfigurations.Create)]
    public override async Task<JobConfigurationDto> CreateAsync(CreateUpdateJobConfigurationDto input)
    {
        var result = await base.CreateAsync(input);

        await _distributedEventBus.PublishAsync(new JobConfigurationChangedEto
        {
            JobName = input.JobName,
            JobGroup = input.JobGroup,
            TenantId = CurrentTenant.Id,
            IsActive = input.IsActive,
            Description = input.Description,
            JobDataJson = input.JobDataJson,
            CronExpression = input.CronExpression,
            JobType = input.JobType
        });

        return result;
    }

    [Authorize(JobManagementServicePermissions.JobConfigurations.Edit)]
    public override async Task<JobConfigurationDto> UpdateAsync(Guid id, CreateUpdateJobConfigurationDto input)
    {
        var result = await base.UpdateAsync(id, input);

        await _distributedEventBus.PublishAsync(new JobConfigurationChangedEto
        {
            JobName = input.JobName,
            JobGroup = input.JobGroup,
            TenantId = CurrentTenant.Id,
            IsActive = input.IsActive,
            Description = input.Description,
            JobDataJson = input.JobDataJson,
            CronExpression = input.CronExpression,
            JobType = input.JobType
        });

        return result;
    }

    [Authorize(JobManagementServicePermissions.JobConfigurations.Delete)]
    public override async Task DeleteAsync(Guid id)
    {
        var job = await Repository.GetAsync(id);
        await base.DeleteAsync(id);

        await _distributedEventBus.PublishAsync(new JobConfigurationChangedEto
        {
            JobName = job.JobName,
            JobGroup = job.JobGroup,
            TenantId = job.TenantId,
            IsActive = false, // Worker sees this flag and will remove Job from Quartz
            Description = job.Description,
            JobDataJson = job.JobDataJson,
            CronExpression = job.CronExpression,
            JobType = job.JobType
        });
    }

    [Authorize(JobManagementServicePermissions.JobConfigurations.Trigger)]
    public async Task TriggerNowAsync(Guid id)
    {
        var job = await Repository.GetAsync(id);
        await _distributedEventBus.PublishAsync(new JobOperationEto
        {
            JobName = job.JobName,
            JobGroup = job.JobGroup,
            TenantId = job.TenantId,
            ActionType = JobActionType.TriggerNow
        }, useOutbox: false);
    }

    [Authorize(JobManagementServicePermissions.JobConfigurations.Pause)]
    public async Task PauseAsync(Guid id)
    {
        var job = await Repository.GetAsync(id);
        job.IsActive = false;
        await Repository.UpdateAsync(job);

        await _distributedEventBus.PublishAsync(new JobOperationEto
        {
            JobName = job.JobName,
            JobGroup = job.JobGroup,
            TenantId = job.TenantId,
            ActionType = JobActionType.Pause
        });
    }

    [Authorize(JobManagementServicePermissions.JobConfigurations.Resume)]
    public async Task ResumeAsync(Guid id)
    {
        var job = await Repository.GetAsync(id);
        job.IsActive = true;
        await Repository.UpdateAsync(job);

        await _distributedEventBus.PublishAsync(new JobOperationEto
        {
            JobName = job.JobName,
            JobGroup = job.JobGroup,
            TenantId = job.TenantId,
            ActionType = JobActionType.Resume
        });
    }
}

Only TriggerNowAsync method with useOutbox: false work fine, other methods can not publish events. I tried to set config.IsSendingEnabled = false; in HostModule but not found any record in AbpEventOutbox. Please help.


3 Answer(s)
  • User Avatar
    0
    maliming created
    Support Team Fullstack Developer

    hi

    Can you share the full debug Logs.txt?

    Maybe some errors occur. If you can share a demo project, that would be best.

    liming.ma@volosoft.com

    Thanks.

  • User Avatar
    0
    duyan11110 created

    Hi,

    I don't see any error in the log file. But it's very strange. It works but unstable. Sometime it can publish event, but mostly it does not. Do you have any suggestion for me to check my code?

  • User Avatar
    0
    maliming created
    Support Team Fullstack Developer

    hi

    but mostly it does not.

    Can you share a demo app to reproduce?

    liming.ma@volosoft.com

    Thanks.

Boost Your Development
ABP Live Training
Packages
See Trainings
Mastering ABP Framework Book
The Official Guide
Mastering
ABP Framework
Learn More
Mastering ABP Framework Book
Made with ❤️ on ABP v10.2.0-preview. Updated on February 05, 2026, 13:24
1
ABP Assistant
🔐 You need to be logged in to use the chatbot. Please log in first.