diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index d939635cc2763..f45f803136d74 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -6718,18 +6718,22 @@ protected void updatePlatformCache(@Nullable CacheObject val, @Nullable Affinity if (!hasPlatformCache()) return; - PlatformProcessor proc = this.cctx.kernalContext().platform(); + PlatformProcessor proc = cctx.kernalContext().platform(); if (!proc.hasContext() || !proc.context().isPlatformCacheSupported()) return; try { - CacheObjectContext ctx = this.cctx.cacheObjectContext(); + CacheObjectContext ctx = cctx.cacheObjectContext(); + byte[] keyBytes = key.valueBytes(ctx); // val is null when entry is removed. - byte[] keyBytes = this.key.valueBytes(ctx); - byte[] valBytes = val == null ? null : val.valueBytes(ctx); + // valid(ver) is false when near cache entry is out of sync. + boolean valid = val != null && ver != null && valid(ver); - proc.context().updatePlatformCache(this.cctx.cacheId(), keyBytes, valBytes, partition(), ver); + // null valBytes means that entry should be removed from platform cache. + byte[] valBytes = valid ? val.valueBytes(ctx) : null; + + proc.context().updatePlatformCache(cctx.cacheId(), keyBytes, valBytes, partition(), ver); } catch (Throwable e) { U.error(log, "Failed to update Platform Cache: " + e); diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Platform/PlatformCachePartialClientConnectionTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Platform/PlatformCachePartialClientConnectionTest.cs new file mode 100644 index 0000000000000..da9bdd7f6e758 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Platform/PlatformCachePartialClientConnectionTest.cs @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Tests.Cache.Platform +{ + using System; + using System.Collections.Generic; + using System.Linq; + using Apache.Ignite.Core.Cache.Configuration; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Discovery.Tcp; + using Apache.Ignite.Core.Discovery.Tcp.Static; + using NUnit.Framework; + + /// + /// Tests platform cache with thick clients connected to different parts of the cluster. + /// + public class PlatformCachePartialClientConnectionTest + { + private const string CacheName = "cache1"; + private const string AttrMacs = "org.apache.ignite.macs"; + + private const int Key = 1; + private const int InitialValue = 0; + + [TearDown] + public void TearDown() + { + Ignition.StopAll(true); + } + + /// + /// Tests that thick client connected only to backup node 1 updates a value, + /// and another thick client connected to a different backup node sees the update in Platform Cache. + /// + [Test] + public static void TestPutFromOneClientGetFromAnother() + { + // Start 3 servers. + var servers = Enumerable.Range(0, 3) + .Select(i => Ignition.Start(GetConfiguration(false, i, 0))) + .ToArray(); + + CreateCache(servers[0]); + + // Start 2 thick clients, connect to different backup nodes only (not entire cluster). + var primaryAndBackups = servers[0].GetAffinity(CacheName).MapKeyToPrimaryAndBackups(Key); + var backupServer1Mac = GetMac(primaryAndBackups[1]); + var backupServer2Mac = GetMac(primaryAndBackups[2]); + + var client1 = Ignition.Start(GetConfiguration(true, backupServer1Mac, backupServer1Mac)); + var client2 = Ignition.Start(GetConfiguration(true, backupServer2Mac, backupServer2Mac)); + + // Check initial value. + var client1Cache = client1.GetOrCreateNearCache(CacheName, new NearCacheConfiguration()); + var client2Cache = client2.GetOrCreateNearCache(CacheName, new NearCacheConfiguration()); + + var client1Value = client1Cache.Get(Key); + var client2Value = client2Cache.Get(Key); + + Assert.AreEqual(InitialValue, client1Value); + Assert.AreEqual(InitialValue, client2Value); + + // Update value from client 1. + const int newValue = 1; + client1Cache.Put(Key, newValue); + + // Read value from client 1 and 2. + client1Value = client1Cache.Get(Key); + client2Value = client2Cache.Get(Key); + + Assert.AreEqual(newValue, client1Value); + Assert.AreEqual(newValue, client2Value); + } + + private static int GetMac(IClusterNode node) => Convert.ToInt32(node.Attributes[AttrMacs]); + + private static IgniteConfiguration GetConfiguration(bool client, int localMac, int remoteMac) + { + var name = (client ? "client" : "server") + localMac; + var remotePort = 48500 + remoteMac; + + var discoverySpi = new TcpDiscoverySpi + { + IpFinder = new TcpDiscoveryStaticIpFinder + { + Endpoints = new List { $"127.0.0.1:{remotePort}" } + } + }; + + if (!client) + { + discoverySpi.LocalPort = 48500 + localMac; + discoverySpi.LocalPortRange = 1; + } + + var igniteConfig = new IgniteConfiguration(TestUtils.GetTestConfiguration()) + { + ClientMode = client, + IgniteInstanceName = name, + // ConsistentId = name, + UserAttributes = new Dictionary + { + [$"override.{AttrMacs}"] = localMac.ToString() + }, + DiscoverySpi = discoverySpi + }; + + return igniteConfig; + } + + private static void CreateCache(IIgnite ignite) + { + var cacheConfig = new CacheConfiguration(CacheName) + { + CacheMode = CacheMode.Replicated, + ReadFromBackup = true, // Does not reproduce when false. + PlatformCacheConfiguration = new PlatformCacheConfiguration + { + KeyTypeName = typeof(int).FullName, + ValueTypeName = typeof(int).FullName + } + }; + + var cache = ignite.GetOrCreateCache(cacheConfig); + cache.Put(Key, InitialValue); + } + } +} diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Platform/PlatformCacheTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Platform/PlatformCacheTest.cs index ccfc834dee020..ea77dc96809d2 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Platform/PlatformCacheTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Platform/PlatformCacheTest.cs @@ -1229,13 +1229,14 @@ public void TestPlatformCachingWithBackups() /// Tests that Replicated cache puts all entries on all nodes to platform cache. /// [Test] - public void TestPlatformCachingReplicated() + public void TestPlatformCachingReplicated([Values(false, true)] bool readFromBackup) { var cfg = new CacheConfiguration(TestUtils.TestName) { CacheMode = CacheMode.Replicated, PlatformCacheConfiguration = new PlatformCacheConfiguration(), - WriteSynchronizationMode = CacheWriteSynchronizationMode.FullSync + WriteSynchronizationMode = CacheWriteSynchronizationMode.FullSync, + ReadFromBackup = readFromBackup }; var cache1 = _grid.CreateCache(cfg); diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IIgnite.cs b/modules/platforms/dotnet/Apache.Ignite.Core/IIgnite.cs index 0b9c67ddcdad9..dd72c71a94ee0 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/IIgnite.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/IIgnite.cs @@ -33,7 +33,6 @@ namespace Apache.Ignite.Core using Apache.Ignite.Core.Log; using Apache.Ignite.Core.Lifecycle; using Apache.Ignite.Core.Messaging; - using Apache.Ignite.Core.PersistentStore; using Apache.Ignite.Core.Plugin; using Apache.Ignite.Core.Services; using Apache.Ignite.Core.Transactions;