0
duyan11110 created
Hi,
I setup Event Outbox as below: EntityFrameworkCoreModule
[ConnectionStringName(JobManagementServiceDbProperties.ConnectionStringName)]
public class JobManagementServiceDbContext : AbpDbContext<JobManagementServiceDbContext>,
IHasEventInbox,
IHasEventOutbox
{
public JobManagementServiceDbContext(DbContextOptions<JobManagementServiceDbContext> options)
: base(options)
{
}
public DbSet<IncomingEventRecord> IncomingEvents { get; set; }
public DbSet<OutgoingEventRecord> OutgoingEvents { get; set; }
protected override void OnModelCreating(ModelBuilder builder)
{
base.OnModelCreating(builder);
builder.ConfigureEventInbox();
builder.ConfigureEventOutbox();
builder.ConfigureJobManagementService();
}
}
HostModule
[DependsOn(
typeof(AbpAspNetCoreSerilogModule),
typeof(AbpSwashbuckleModule),
typeof(AbpBackgroundJobsRabbitMqModule),
typeof(AbpAspNetCoreMultiTenancyModule),
typeof(AbpDistributedLockingModule),
typeof(AbpEventBusRabbitMqModule),
typeof(AbpCachingStackExchangeRedisModule),
private void ConfigureDistributedEventBus()
{
Configure<AbpDistributedEventBusOptions>(options =>
{
options.Inboxes.Configure(config =>
{
config.UseDbContext<JobManagementServiceDbContext>();
});
options.Outboxes.Configure(config =>
{
//config.IsSendingEnabled = false;
//config.Selector = type => type == typeof(JobOperationEto) || type == typeof(JobConfigurationChangedEto);
config.UseDbContext<JobManagementServiceDbContext>();
});
});
}
I published event as below:
[Authorize(JobManagementServicePermissions.JobConfigurations.Default)]
public class JobConfigurationAppService :
CrudAppService<JobConfiguration, JobConfigurationDto, Guid, PagedAndSortedResultRequestDto, CreateUpdateJobConfigurationDto>,
IJobConfigurationAppService
{
private readonly IDistributedEventBus _distributedEventBus;
private readonly IUnitOfWorkManager _unitOfWorkManager;
public JobConfigurationAppService(
IRepository<JobConfiguration, Guid> repository,
IDistributedEventBus distributedEventBus,
IUnitOfWorkManager unitOfWorkManager)
: base(repository)
{
_distributedEventBus = distributedEventBus;
_unitOfWorkManager = unitOfWorkManager;
}
[Authorize(JobManagementServicePermissions.JobConfigurations.Create)]
public override async Task<JobConfigurationDto> CreateAsync(CreateUpdateJobConfigurationDto input)
{
var result = await base.CreateAsync(input);
await _distributedEventBus.PublishAsync(new JobConfigurationChangedEto
{
JobName = input.JobName,
JobGroup = input.JobGroup,
TenantId = CurrentTenant.Id,
IsActive = input.IsActive,
Description = input.Description,
JobDataJson = input.JobDataJson,
CronExpression = input.CronExpression,
JobType = input.JobType
});
return result;
}
[Authorize(JobManagementServicePermissions.JobConfigurations.Edit)]
public override async Task<JobConfigurationDto> UpdateAsync(Guid id, CreateUpdateJobConfigurationDto input)
{
var result = await base.UpdateAsync(id, input);
await _distributedEventBus.PublishAsync(new JobConfigurationChangedEto
{
JobName = input.JobName,
JobGroup = input.JobGroup,
TenantId = CurrentTenant.Id,
IsActive = input.IsActive,
Description = input.Description,
JobDataJson = input.JobDataJson,
CronExpression = input.CronExpression,
JobType = input.JobType
});
return result;
}
[Authorize(JobManagementServicePermissions.JobConfigurations.Delete)]
public override async Task DeleteAsync(Guid id)
{
var job = await Repository.GetAsync(id);
await base.DeleteAsync(id);
await _distributedEventBus.PublishAsync(new JobConfigurationChangedEto
{
JobName = job.JobName,
JobGroup = job.JobGroup,
TenantId = job.TenantId,
IsActive = false, // Worker sees this flag and will remove Job from Quartz
Description = job.Description,
JobDataJson = job.JobDataJson,
CronExpression = job.CronExpression,
JobType = job.JobType
});
}
[Authorize(JobManagementServicePermissions.JobConfigurations.Trigger)]
public async Task TriggerNowAsync(Guid id)
{
var job = await Repository.GetAsync(id);
await _distributedEventBus.PublishAsync(new JobOperationEto
{
JobName = job.JobName,
JobGroup = job.JobGroup,
TenantId = job.TenantId,
ActionType = JobActionType.TriggerNow
}, useOutbox: false);
}
[Authorize(JobManagementServicePermissions.JobConfigurations.Pause)]
public async Task PauseAsync(Guid id)
{
var job = await Repository.GetAsync(id);
job.IsActive = false;
await Repository.UpdateAsync(job);
await _distributedEventBus.PublishAsync(new JobOperationEto
{
JobName = job.JobName,
JobGroup = job.JobGroup,
TenantId = job.TenantId,
ActionType = JobActionType.Pause
});
}
[Authorize(JobManagementServicePermissions.JobConfigurations.Resume)]
public async Task ResumeAsync(Guid id)
{
var job = await Repository.GetAsync(id);
job.IsActive = true;
await Repository.UpdateAsync(job);
await _distributedEventBus.PublishAsync(new JobOperationEto
{
JobName = job.JobName,
JobGroup = job.JobGroup,
TenantId = job.TenantId,
ActionType = JobActionType.Resume
});
}
}
Only TriggerNowAsync method with useOutbox: false work fine, other methods can not publish events. I tried to set config.IsSendingEnabled = false; in HostModule but not found any record in AbpEventOutbox. Please help.
3 Answer(s)
-
0
hi
Can you share the full debug
Logs.txt?Maybe some errors occur. If you can share a demo project, that would be best.
liming.ma@volosoft.com
Thanks.
-
0
Hi,
I don't see any error in the log file. But it's very strange. It works but unstable. Sometime it can publish event, but mostly it does not. Do you have any suggestion for me to check my code?
-
0
hi
but mostly it does not.
Can you share a demo app to reproduce?
liming.ma@volosoft.com
Thanks.