- ABP Framework version: v8.1.4
- UI Type: Angular
- Database System: EF Core ( PostgreSQL.)
- Tiered (for MVC) or Auth Server Separated (for Angular): yes
- Exception message and full stack trace:
StackExchange.Redis.RedisCommandException: Multi-key operations must involve a single slot; keys can use 'hash tags' to help this, i.e. '{/users/12345}/account' and '{/users/12345}/contacts' will always be in the same slot
at StackExchange.Redis.ConnectionMultiplexer.PrepareToPushMessageToBridge[T](Message message, ResultProcessor
1 processor, IResultBox
1 resultBox, ServerEndPoint& server) in //src/StackExchange.Redis/ConnectionMultiplexer.cs:line 1966at StackExchange.Redis.ConnectionMultiplexer.TryPushMessageToBridgeAsync[T](Message message, ResultProcessor1 processor, IResultBox
1 resultBox, ServerEndPoint& server) in //src/StackExchange.Redis/ConnectionMultiplexer.cs:line 2004 at StackExchange.Redis.ConnectionMultiplexer.ExecuteAsyncImpl[T](Message message, ResultProcessor1 processor, Object state, ServerEndPoint server) in /_/src/StackExchange.Redis/ConnectionMultiplexer.cs:line 2182 at StackExchange.Redis.RedisBase.ExecuteAsync[T](Message message, ResultProcessor
1 processor, ServerEndPoint server) in //src/StackExchange.Redis/RedisBase.cs:line 54at StackExchange.Redis.RedisDatabase.KeyDeleteAsync(RedisKey[] keys, CommandFlags flags) in //src/StackExchange.Redis/RedisDatabase.cs:line 769 at Volo.Abp.Caching.StackExchangeRedis.AbpRedisCache.RemoveManyAsync(IEnumerable1 keys, CancellationToken token) at Volo.Abp.Caching.DistributedCache
2.<>c__DisplayClass63_0.<g__RemoveRealCache|0>d.MoveNext() - Steps to reproduce the issue: Create a redis cluster by using helm bitnami chart inside kubernetes environment. Give the redis connection string to your abp app to use this redis-cluster for distributed cache. Then try to change the database connection string and save. Then switch back to shared database. You will get the above error.
The Problem i am facing is related with redis cache. If you use Redis Cluster instead of single instance of redis, you are facing an issue that some values are remaining in redis cache or they are not the same with the values in db. This brings the unstable behavior in the application. To give an example. 1- Change Database Connection Strings for a tenant. 2- Then try to login with that tenant, it will use the new connection string. 3- Afterwards try to revert it back to shared database. 4- You will get the error and it is not going to delete it from redis cache. 5- Even in database it is using the shared database, it will still use the connection string, since it stayed in cache in that way.
ps: it can be also vice versa( from redis cache it is gonna come an empty connection strings while in db there are connection strings). So the point is cache not becoming stable with database.
/// <summary>
/// Apply pending EF Core schema migrations to the database.
/// Returns true if any migration has applied.
/// </summary>
protected virtual async Task<bool> MigrateDatabaseSchemaAsync(Guid? tenantId)
{
var result = false;
Logger.LogError($"MigrateDatabaseSchemaAsync tenantId: {tenantId}");
using (CurrentTenant.Change(tenantId))
{
using (var uow = UnitOfWorkManager.Begin(requiresNew: true, isTransactional: false))
{
async Task<bool> MigrateDatabaseSchemaWithDbContextAsync()
{
var dbContext = await uow.ServiceProvider
.GetRequiredService<IDbContextProvider<TDbContext>>()
.GetDbContextAsync();
if ((await dbContext.Database.GetPendingMigrationsAsync()).Any())
{
await dbContext.Database.MigrateAsync();
Logger.LogError($"Migrated Database for tenant: {tenantId}");
return true;
}
return false;
}
Logger.LogError($"Starting migration for {tenantId}");
if (tenantId == null)
{
//Migrating the host database
Logger.LogInformation($"Migrating database of host. Database Name = {DatabaseName}");
Logger.LogError($"Migrating database of host. Database Name = {DatabaseName}");
result = await MigrateDatabaseSchemaWithDbContextAsync();
}
else
{
var tenantConfiguration = await TenantStore.FindAsync(tenantId.Value);
Logger.LogError("Tenant Configuration: "+tenantConfiguration?.Name);
Logger.LogError("Connection string values: " + (tenantConfiguration?.ConnectionStrings?.Values != null ? string.Join(", ", tenantConfiguration.ConnectionStrings.Values) : "null"));
Logger.LogError($"Connections strings is null: {tenantConfiguration?.ConnectionStrings != null}");
Logger.LogError($"tenantConfiguration.ConnectionStrings.Default is null or whitespace: {tenantConfiguration?.ConnectionStrings?.Default.IsNullOrWhiteSpace()}");
Logger.LogError($"tenantConfiguration.ConnectionStrings.GetOrDefault is null or whitespace: {tenantConfiguration?.ConnectionStrings?.GetOrDefault(DatabaseName).IsNullOrWhiteSpace()}");
if (tenantConfiguration != null
&& tenantConfiguration.ConnectionStrings != null
&& (!tenantConfiguration.ConnectionStrings.Default.IsNullOrWhiteSpace() || !tenantConfiguration.ConnectionStrings.GetOrDefault(DatabaseName).IsNullOrWhiteSpace()))
{
//Migrating the tenant database (only if tenant has a separate database)
Logger.LogInformation($"Migrating separate database of tenant. Database Name = {DatabaseName}, TenantId = {tenantId}");
Logger.LogError($"Migrating separate database of tenant. Database Name = {DatabaseName}, TenantId = {tenantId}");
result = await MigrateDatabaseSchemaWithDbContextAsync();
Logger.LogError($"Migrated separate database of tenant. Database Name = {DatabaseName}, TenantId = {tenantId}");
Logger.LogError("Connection string values: " + (tenantConfiguration?.ConnectionStrings?.Values != null ? string.Join(", ", tenantConfiguration.ConnectionStrings.Values) : "null"));
}
}
await uow.CompleteAsync();
}
}
return result;
}
The tenant configuration here
var tenantConfiguration = await TenantStore.FindAsync(tenantId.Value);
is giving me an empty connection strings time to time even if connection strings are defined in my database. I fix this by changing it to a single instance of redis cache, but it could be nice to use redis-cluster for performance. if you want to try, you can use helm chart that is provided by bitnami from this link. https://github.com/bitnami/charts/tree/main/bitnami/redis-cluster
10 Answer(s)
-
0
Hi,
I think this is a problem of redis cluster.
As I understand, the redis cluster should be responsible for the data consistency of each node.
Will this error occur if you use the redis cluster provided by aws or azure?
-
0
Hello again, Redis cluster is already responsible of data consistency, but the problem is when you do multikey operations (like RemoveMany), if they are on different nodes on the cluster, it is always going to throw an error. It doesn't matter from where you get your redis-cluster. So application needs to handle these cases. There are couple of ways to do this. Here are 2 short articles that you can look at. https://www.dragonflydb.io/error-solutions/crossslot-keys-in-request-dont-hash-to-the-same-slot https://medium.com/@mbh023/redis-multi-key-command-in-cluster-mode-feat-cross-slot-ec27b999f169
As i see there are 4 different solutions to fix the problem. 1- use hash-tags (mostly used) while you are inserting your data you should insert it with curly braces {}, so redis can set the data to the same node and when you do removemany then it won't throw an error since all the data are going to be at the same node. 2- you could skip to use removemany instead you can do single operations (like Remove() ) but that's going to be more slow if you have lots of things to remove. 3- another solution on the second article is to use the same algorithm that redis using to decide about where the data is so you can group your keys to delete according to where the nodes are. 4- Use single instance of redis since every record is going to be at the same node.(If you need performance, this can be a bottleneck in your app)
Hope i could make my point.
-
0
Hi,
Okay, I will check it.
-
0
Hi,
you can try replace the
AbpRedisCache
withMyAbpRedisCache
, we will fix it in the next patch version. your ticket was refunded.[DisableConventionalRegistration] public class MyAbpRedisCache : RedisCache, ICacheSupportsMultipleItems { protected static readonly string AbsoluteExpirationKey; protected static readonly string SlidingExpirationKey; protected static readonly string DataKey; protected static readonly long NotPresent; protected static readonly RedisValue[] HashMembersAbsoluteExpirationSlidingExpirationData; protected static readonly RedisValue[] HashMembersAbsoluteExpirationSlidingExpiration; private readonly static FieldInfo SetScriptField; private readonly static FieldInfo RedisDatabaseField; private readonly static MethodInfo ConnectMethod; private readonly static MethodInfo ConnectAsyncMethod; private readonly static MethodInfo MapMetadataMethod; private readonly static MethodInfo GetAbsoluteExpirationMethod; private readonly static MethodInfo GetExpirationInSecondsMethod; private readonly static MethodInfo OnRedisErrorMethod; protected RedisKey InstancePrefix { get; } static MyAbpRedisCache() { var type = typeof(RedisCache); RedisDatabaseField = Check.NotNull(type.GetField("_cache", BindingFlags.Instance | BindingFlags.NonPublic), nameof(RedisDatabaseField)); SetScriptField = Check.NotNull(type.GetField("_setScript", BindingFlags.Instance | BindingFlags.NonPublic), nameof(SetScriptField)); ConnectMethod = Check.NotNull(type.GetMethod("Connect", BindingFlags.Instance | BindingFlags.NonPublic), nameof(ConnectMethod)); ConnectAsyncMethod = Check.NotNull(type.GetMethod("ConnectAsync", BindingFlags.Instance | BindingFlags.NonPublic), nameof(ConnectAsyncMethod)); MapMetadataMethod = Check.NotNull(type.GetMethod("MapMetadata", BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Static), nameof(MapMetadataMethod)); GetAbsoluteExpirationMethod = Check.NotNull(type.GetMethod("GetAbsoluteExpiration", BindingFlags.Static | BindingFlags.NonPublic), nameof(GetAbsoluteExpirationMethod)); GetExpirationInSecondsMethod = Check.NotNull(type.GetMethod("GetExpirationInSeconds", BindingFlags.Static | BindingFlags.NonPublic), nameof(GetExpirationInSecondsMethod)); OnRedisErrorMethod = Check.NotNull(type.GetMethod("OnRedisError", BindingFlags.Instance | BindingFlags.NonPublic), nameof(OnRedisErrorMethod)); AbsoluteExpirationKey = type.GetField("AbsoluteExpirationKey", BindingFlags.Static | BindingFlags.NonPublic)!.GetValue(null)!.ToString()!; SlidingExpirationKey = type.GetField("SlidingExpirationKey", BindingFlags.Static | BindingFlags.NonPublic)!.GetValue(null)!.ToString()!; DataKey = type.GetField("DataKey", BindingFlags.Static | BindingFlags.NonPublic)!.GetValue(null)!.ToString()!; NotPresent = type.GetField("NotPresent", BindingFlags.Static | BindingFlags.NonPublic)!.GetValue(null)!.To<int>(); HashMembersAbsoluteExpirationSlidingExpirationData = [AbsoluteExpirationKey, SlidingExpirationKey, DataKey]; HashMembersAbsoluteExpirationSlidingExpiration = [AbsoluteExpirationKey, SlidingExpirationKey]; } public MyAbpRedisCache(IOptions<RedisCacheOptions> optionsAccessor) : base(optionsAccessor) { var instanceName = optionsAccessor.Value.InstanceName; if (!string.IsNullOrEmpty(instanceName)) { InstancePrefix = (RedisKey)Encoding.UTF8.GetBytes(instanceName); } } protected virtual IDatabase Connect() { return (IDatabase)ConnectMethod.Invoke(this, Array.Empty<object>())!; } protected virtual async ValueTask<IDatabase> ConnectAsync(CancellationToken token = default) { return await (ValueTask<IDatabase>)ConnectAsyncMethod.Invoke(this, new object[] { token })!; } public virtual byte[]?[] GetMany( IEnumerable<string> keys) { keys = Check.NotNull(keys, nameof(keys)); return GetAndRefreshMany(keys, true); } public virtual async Task<byte[]?[]> GetManyAsync( IEnumerable<string> keys, CancellationToken token = default) { keys = Check.NotNull(keys, nameof(keys)); return await GetAndRefreshManyAsync(keys, true, token); } public virtual void SetMany( IEnumerable<KeyValuePair<string, byte[]>> items, DistributedCacheEntryOptions options) { var cache = Connect(); try { Task.WaitAll(PipelineSetMany(cache, items, options)); } catch (Exception ex) { OnRedisError(ex, cache); throw; } } public virtual async Task SetManyAsync( IEnumerable<KeyValuePair<string, byte[]>> items, DistributedCacheEntryOptions options, CancellationToken token = default) { token.ThrowIfCancellationRequested(); var cache = await ConnectAsync(token); try { await Task.WhenAll(PipelineSetMany(cache, items, options)); } catch (Exception ex) { OnRedisError(ex, cache); throw; } } public virtual void RefreshMany( IEnumerable<string> keys) { keys = Check.NotNull(keys, nameof(keys)); GetAndRefreshMany(keys, false); } public virtual async Task RefreshManyAsync( IEnumerable<string> keys, CancellationToken token = default) { keys = Check.NotNull(keys, nameof(keys)); await GetAndRefreshManyAsync(keys, false, token); } public virtual void RemoveMany(IEnumerable<string> keys) { keys = Check.NotNull(keys, nameof(keys)); var cache = Connect(); try { Task.WaitAll(PipelineRemoveMany(cache, keys)); } catch (Exception ex) { OnRedisError(ex, cache); throw; } } public async Task RemoveManyAsync(IEnumerable<string> keys, CancellationToken token = default) { keys = Check.NotNull(keys, nameof(keys)); token.ThrowIfCancellationRequested(); var cache = await ConnectAsync(token); try { await Task.WhenAll(PipelineRemoveMany(cache, keys)); } catch (Exception ex) { OnRedisError(ex, cache); throw; } } protected virtual Task[] PipelineRemoveMany( IDatabase cache, IEnumerable<string> keys) { return keys.Select(key => cache.KeyDeleteAsync(InstancePrefix.Append(key))).ToArray<Task>(); } protected virtual byte[]?[] GetAndRefreshMany( IEnumerable<string> keys, bool getData) { var cache = Connect(); var keyArray = keys.Select(key => InstancePrefix.Append( key)).ToArray(); byte[]?[] bytes; try { var results = cache.HashMemberGetMany(keyArray, GetHashFields(getData)); Task.WaitAll(PipelineRefreshManyAndOutData(cache, keyArray, results, out bytes)); } catch (Exception ex) { OnRedisError(ex, cache); throw; } return bytes; } protected virtual async Task<byte[]?[]> GetAndRefreshManyAsync( IEnumerable<string> keys, bool getData, CancellationToken token = default) { token.ThrowIfCancellationRequested(); var cache = await ConnectAsync(token); var keyArray = keys.Select(key => InstancePrefix.Append(key)).ToArray(); byte[]?[] bytes; try { var results = await cache.HashMemberGetManyAsync(keyArray, GetHashFields(getData)); await Task.WhenAll(PipelineRefreshManyAndOutData(cache, keyArray, results, out bytes)); } catch (Exception ex) { OnRedisError(ex, cache); throw; } return bytes; } protected virtual Task[] PipelineRefreshManyAndOutData( IDatabase cache, RedisKey[] keys, RedisValue[][] results, out byte[]?[] bytes) { bytes = new byte[keys.Length][]; var tasks = new Task[keys.Length]; for (var i = 0; i < keys.Length; i++) { if (results[i].Length >= 2) { MapMetadata(results[i], out var absExpr, out var sldExpr); if (sldExpr.HasValue) { TimeSpan? expr; if (absExpr.HasValue) { var relExpr = absExpr.Value - DateTimeOffset.Now; expr = relExpr <= sldExpr.Value ? relExpr : sldExpr; } else { expr = sldExpr; } tasks[i] = cache.KeyExpireAsync(keys[i], expr); } else { tasks[i] = Task.CompletedTask; } } if (results[i].Length >= 3 && results[i][2].HasValue) { bytes[i] = results[i][2]; } else { bytes[i] = null; } } return tasks; } protected virtual Task[] PipelineSetMany( IDatabase cache, IEnumerable<KeyValuePair<string, byte[]>> items, DistributedCacheEntryOptions options) { items = Check.NotNull(items, nameof(items)); options = Check.NotNull(options, nameof(options)); var itemArray = items.ToArray(); var tasks = new Task[itemArray.Length]; var creationTime = DateTimeOffset.UtcNow; var absoluteExpiration = GetAbsoluteExpiration(creationTime, options); for (var i = 0; i < itemArray.Length; i++) { tasks[i] = cache.ScriptEvaluateAsync(GetSetScript(), new RedisKey[] { InstancePrefix.Append(itemArray[i].Key) }, [ absoluteExpiration?.Ticks ?? NotPresent, options.SlidingExpiration?.Ticks ?? NotPresent, GetExpirationInSeconds(creationTime, absoluteExpiration, options) ?? NotPresent, itemArray[i].Value ]); } return tasks; } protected virtual void MapMetadata( RedisValue[] results, out DateTimeOffset? absoluteExpiration, out TimeSpan? slidingExpiration) { var parameters = new object?[] { results, null, null }; MapMetadataMethod.Invoke(this, parameters); absoluteExpiration = (DateTimeOffset?)parameters[1]; slidingExpiration = (TimeSpan?)parameters[2]; } protected virtual long? GetExpirationInSeconds( DateTimeOffset creationTime, DateTimeOffset? absoluteExpiration, DistributedCacheEntryOptions options) { return (long?)GetExpirationInSecondsMethod.Invoke(null, new object?[] { creationTime, absoluteExpiration, options }); } protected virtual DateTimeOffset? GetAbsoluteExpiration( DateTimeOffset creationTime, DistributedCacheEntryOptions options) { return (DateTimeOffset?)GetAbsoluteExpirationMethod.Invoke(null, new object[] { creationTime, options }); } protected virtual void OnRedisError(Exception ex, IDatabase cache) { OnRedisErrorMethod.Invoke(this, [ex, cache]); } private string GetSetScript() { return SetScriptField.GetValue(this)!.ToString()!; } private static RedisValue[] GetHashFields(bool getData) { return getData ? HashMembersAbsoluteExpirationSlidingExpirationData : HashMembersAbsoluteExpirationSlidingExpiration; } }
public override void ConfigureServices(ServiceConfigurationContext context) { var configuration = context.Services.GetConfiguration(); var redisEnabled = configuration["Redis:IsEnabled"]; if (string.IsNullOrEmpty(redisEnabled) || bool.Parse(redisEnabled)) { context.Services.Replace(ServiceDescriptor.Singleton<IDistributedCache, MyAbpRedisCache>()); } }
-
0
please let me know if it can help you
-
0
Hello again, Can you explain little what has changed in the code, so it can solve the problem? Maybe i am wrong, but i couldn't see any hashtags in the code. I couldn't try your code because my app is in production right now, i need to create a staging environment to try it so i will try it at the weekend. It seems like changes are related with expiration date? I was expecting something like this instead
protected virtual Task[] PipelineSetMany( IEnumerable<KeyValuePair<string, byte[]>> items, DistributedCacheEntryOptions options) { items = Check.NotNull(items, nameof(items)); options = Check.NotNull(options, nameof(options)); var itemArray = items.ToArray(); var tasks = new Task[itemArray.Length]; var creationTime = DateTimeOffset.UtcNow; var absoluteExpiration = GetAbsoluteExpiration(creationTime, options); for (var i = 0; i < itemArray.Length; i++) { var keyWithHashTag = $"{{{Instance}}}{itemArray[i].Key}"; tasks[i] = RedisDatabase.ScriptEvaluateAsync(GetSetScript(), new RedisKey[] { keyWithHashTag }, new RedisValue[] { absoluteExpiration?.Ticks ?? NotPresent, options.SlidingExpiration?.Ticks ?? NotPresent, GetExpirationInSeconds(creationTime, absoluteExpiration, options) ?? NotPresent, itemArray[i].Value }); } return tasks; }
important part is
var keyWithHashTag = $"{{{Instance}}}{itemArray[i].Key}";
probably it shouldn't be instance but sth similar to this so the keys can go to the db with hashtags. Actually i have seen key normalizer class while i am looking at the code, I think that should be the one that i need to override. sth similar like this.public class DistributedCacheKeyNormalizer : IDistributedCacheKeyNormalizer, ITransientDependency { protected ICurrentTenant CurrentTenant { get; } protected AbpDistributedCacheOptions DistributedCacheOptions { get; } public DistributedCacheKeyNormalizer( ICurrentTenant currentTenant, IOptions<AbpDistributedCacheOptions> distributedCacheOptions) { CurrentTenant = currentTenant; DistributedCacheOptions = distributedCacheOptions.Value; } public virtual string NormalizeKey(DistributedCacheKeyNormalizeArgs args) { var normalizedKey = $"c:{args.CacheName},k:{DistributedCacheOptions.KeyPrefix}{args.Key}"; if (!args.IgnoreMultiTenancy && CurrentTenant.Id.HasValue) { normalizedKey = $"t:{{{CurrentTenant.Id.Value}}},{normalizedKey}"; } return normalizedKey; } }
so all the values with same tenant can be at the same slot. But as i say i will try your code and this code at the weekend to test, then i can post it over here what i have found.
-
0
-
0
Ok i think i get it now.
public async Task RemoveManyAsync(IEnumerable<string> keys, CancellationToken token = default) { keys = Check.NotNull(keys, nameof(keys)); token.ThrowIfCancellationRequested(); await ConnectAsync(token); await RedisDatabase.KeyDeleteAsync(keys.Select(key => (RedisKey)(Instance + key)).ToArray()); }
is changed to
protected virtual Task[] PipelineRemoveMany( IDatabase cache, IEnumerable<string> keys) { return keys.Select(key => cache.KeyDeleteAsync(InstancePrefix.Append(key))).ToArray<Task>(); }
so it does single key operations one by one even if it has multi keys. i will try this and let you know, thank you for the assistance.
-
0
: )
-
0
I have tested it. it works now thank you @liangshiwei