Skip to content

Commit

Permalink
IGNITE-18427 .NET: Fix platform cache invalidation on client nodes wi…
Browse files Browse the repository at this point in the history
…th near cache (apache#10465)

* Remove platform cache entry when corresponding `GridCacheMapEntry` is not `valid`.
* Add a test where two thick client nodes with near cache connect to different backup server nodes.
  • Loading branch information
ptupitsyn authored Dec 29, 2022
1 parent 220f59f commit db7758c
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6718,18 +6718,22 @@ protected void updatePlatformCache(@Nullable CacheObject val, @Nullable Affinity
if (!hasPlatformCache())
return;

PlatformProcessor proc = this.cctx.kernalContext().platform();
PlatformProcessor proc = cctx.kernalContext().platform();
if (!proc.hasContext() || !proc.context().isPlatformCacheSupported())
return;

try {
CacheObjectContext ctx = this.cctx.cacheObjectContext();
CacheObjectContext ctx = cctx.cacheObjectContext();
byte[] keyBytes = key.valueBytes(ctx);

// val is null when entry is removed.
byte[] keyBytes = this.key.valueBytes(ctx);
byte[] valBytes = val == null ? null : val.valueBytes(ctx);
// valid(ver) is false when near cache entry is out of sync.
boolean valid = val != null && ver != null && valid(ver);

proc.context().updatePlatformCache(this.cctx.cacheId(), keyBytes, valBytes, partition(), ver);
// null valBytes means that entry should be removed from platform cache.
byte[] valBytes = valid ? val.valueBytes(ctx) : null;

proc.context().updatePlatformCache(cctx.cacheId(), keyBytes, valBytes, partition(), ver);
}
catch (Throwable e) {
U.error(log, "Failed to update Platform Cache: " + e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace Apache.Ignite.Core.Tests.Cache.Platform
{
using System;
using System.Collections.Generic;
using System.Linq;
using Apache.Ignite.Core.Cache.Configuration;
using Apache.Ignite.Core.Cluster;
using Apache.Ignite.Core.Discovery.Tcp;
using Apache.Ignite.Core.Discovery.Tcp.Static;
using NUnit.Framework;

/// <summary>
/// Tests platform cache with thick clients connected to different parts of the cluster.
/// </summary>
public class PlatformCachePartialClientConnectionTest
{
private const string CacheName = "cache1";
private const string AttrMacs = "org.apache.ignite.macs";

private const int Key = 1;
private const int InitialValue = 0;

[TearDown]
public void TearDown()
{
Ignition.StopAll(true);
}

/// <summary>
/// Tests that thick client connected only to backup node 1 updates a value,
/// and another thick client connected to a different backup node sees the update in Platform Cache.
/// </summary>
[Test]
public static void TestPutFromOneClientGetFromAnother()
{
// Start 3 servers.
var servers = Enumerable.Range(0, 3)
.Select(i => Ignition.Start(GetConfiguration(false, i, 0)))
.ToArray();

CreateCache(servers[0]);

// Start 2 thick clients, connect to different backup nodes only (not entire cluster).
var primaryAndBackups = servers[0].GetAffinity(CacheName).MapKeyToPrimaryAndBackups(Key);
var backupServer1Mac = GetMac(primaryAndBackups[1]);
var backupServer2Mac = GetMac(primaryAndBackups[2]);

var client1 = Ignition.Start(GetConfiguration(true, backupServer1Mac, backupServer1Mac));
var client2 = Ignition.Start(GetConfiguration(true, backupServer2Mac, backupServer2Mac));

// Check initial value.
var client1Cache = client1.GetOrCreateNearCache<int, int>(CacheName, new NearCacheConfiguration());
var client2Cache = client2.GetOrCreateNearCache<int, int>(CacheName, new NearCacheConfiguration());

var client1Value = client1Cache.Get(Key);
var client2Value = client2Cache.Get(Key);

Assert.AreEqual(InitialValue, client1Value);
Assert.AreEqual(InitialValue, client2Value);

// Update value from client 1.
const int newValue = 1;
client1Cache.Put(Key, newValue);

// Read value from client 1 and 2.
client1Value = client1Cache.Get(Key);
client2Value = client2Cache.Get(Key);

Assert.AreEqual(newValue, client1Value);
Assert.AreEqual(newValue, client2Value);
}

private static int GetMac(IClusterNode node) => Convert.ToInt32(node.Attributes[AttrMacs]);

private static IgniteConfiguration GetConfiguration(bool client, int localMac, int remoteMac)
{
var name = (client ? "client" : "server") + localMac;
var remotePort = 48500 + remoteMac;

var discoverySpi = new TcpDiscoverySpi
{
IpFinder = new TcpDiscoveryStaticIpFinder
{
Endpoints = new List<string> { $"127.0.0.1:{remotePort}" }
}
};

if (!client)
{
discoverySpi.LocalPort = 48500 + localMac;
discoverySpi.LocalPortRange = 1;
}

var igniteConfig = new IgniteConfiguration(TestUtils.GetTestConfiguration())
{
ClientMode = client,
IgniteInstanceName = name,
// ConsistentId = name,
UserAttributes = new Dictionary<string, object>
{
[$"override.{AttrMacs}"] = localMac.ToString()
},
DiscoverySpi = discoverySpi
};

return igniteConfig;
}

private static void CreateCache(IIgnite ignite)
{
var cacheConfig = new CacheConfiguration(CacheName)
{
CacheMode = CacheMode.Replicated,
ReadFromBackup = true, // Does not reproduce when false.
PlatformCacheConfiguration = new PlatformCacheConfiguration
{
KeyTypeName = typeof(int).FullName,
ValueTypeName = typeof(int).FullName
}
};

var cache = ignite.GetOrCreateCache<int, int>(cacheConfig);
cache.Put(Key, InitialValue);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1229,13 +1229,14 @@ public void TestPlatformCachingWithBackups()
/// Tests that Replicated cache puts all entries on all nodes to platform cache.
/// </summary>
[Test]
public void TestPlatformCachingReplicated()
public void TestPlatformCachingReplicated([Values(false, true)] bool readFromBackup)
{
var cfg = new CacheConfiguration(TestUtils.TestName)
{
CacheMode = CacheMode.Replicated,
PlatformCacheConfiguration = new PlatformCacheConfiguration(),
WriteSynchronizationMode = CacheWriteSynchronizationMode.FullSync
WriteSynchronizationMode = CacheWriteSynchronizationMode.FullSync,
ReadFromBackup = readFromBackup
};

var cache1 = _grid.CreateCache<int, int>(cfg);
Expand Down
1 change: 0 additions & 1 deletion modules/platforms/dotnet/Apache.Ignite.Core/IIgnite.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ namespace Apache.Ignite.Core
using Apache.Ignite.Core.Log;
using Apache.Ignite.Core.Lifecycle;
using Apache.Ignite.Core.Messaging;
using Apache.Ignite.Core.PersistentStore;
using Apache.Ignite.Core.Plugin;
using Apache.Ignite.Core.Services;
using Apache.Ignite.Core.Transactions;
Expand Down

0 comments on commit db7758c

Please sign in to comment.