|
21 | 21 | import java.util.Collections;
|
22 | 22 | import java.util.HashMap;
|
23 | 23 | import java.util.Map;
|
24 |
| -import java.util.Set; |
25 | 24 | import java.util.UUID;
|
| 25 | +import java.util.concurrent.ExecutionException; |
26 | 26 | import java.util.concurrent.atomic.AtomicInteger;
|
27 | 27 | import java.util.function.Consumer;
|
28 | 28 | import org.apache.ignite.Ignition;
|
@@ -176,10 +176,62 @@ public void testCacheOperation() throws Exception {
|
176 | 176 | checkCacheOperation(CACHE_REMOVE_ALL, cache -> cache.removeAll(Collections.singleton(3)));
|
177 | 177 |
|
178 | 178 | checkCacheOperation(CACHE_GET_AND_REMOVE, cache -> cache.getAndRemove(5));
|
| 179 | + } |
179 | 180 |
|
180 |
| - checkCacheOperation(CACHE_PUT_ALL_CONFLICT, putAllConflict(Collections.singletonMap(6, 1))); |
| 181 | + /** |
| 182 | + * Cache {@link TcpClientCache#putAllConflict} and {@link TcpClientCache#removeAllConflict} operations performed. |
| 183 | + * @throws Exception If failed. |
| 184 | + */ |
| 185 | + @Test |
| 186 | + public void testCacheAllConflictOperations() throws Exception { |
| 187 | + checkCacheAllConflictOperations(false); |
| 188 | + } |
181 | 189 |
|
182 |
| - checkCacheOperation(CACHE_REMOVE_ALL_CONFLICT, removeAllConflict(Collections.singleton(6))); |
| 190 | + /** |
| 191 | + * Cache {@link TcpClientCache#putAllConflictAsync} and {@link TcpClientCache#removeAllConflictAsync} operations performed. |
| 192 | + * @throws Exception If failed. |
| 193 | + */ |
| 194 | + @Test |
| 195 | + public void testCacheAllConflictOperationsAsync() throws Exception { |
| 196 | + checkCacheAllConflictOperations(true); |
| 197 | + } |
| 198 | + |
| 199 | + /** |
| 200 | + * @param async boolean flag for asynchronous cache operation processing. |
| 201 | + */ |
| 202 | + private void checkCacheAllConflictOperations(boolean async) throws Exception { |
| 203 | + int key = 6; |
| 204 | + int val = 1; |
| 205 | + |
| 206 | + GridCacheVersion confl = new GridCacheVersion(1, 0, 1, (byte)2); |
| 207 | + |
| 208 | + Map<?, T3<?, GridCacheVersion, Long>> putMap = F.asMap(key, new T3<>(val, confl, CU.EXPIRE_TIME_ETERNAL)); |
| 209 | + Map<?, GridCacheVersion> rmvMap = F.asMap(key, confl); |
| 210 | + |
| 211 | + if (async) { |
| 212 | + checkCacheOperation(CACHE_PUT_ALL_CONFLICT, cache -> ((TcpClientCache<Object, Object>)cache).putAllConflict(putMap)); |
| 213 | + |
| 214 | + checkCacheOperation(CACHE_REMOVE_ALL_CONFLICT, cache -> ((TcpClientCache<Object, Object>)cache).removeAllConflict(rmvMap)); |
| 215 | + } |
| 216 | + else { |
| 217 | + checkCacheOperation(CACHE_PUT_ALL_CONFLICT, cache -> { |
| 218 | + try { |
| 219 | + ((TcpClientCache<Object, Object>)cache).putAllConflictAsync(putMap).get(); |
| 220 | + } |
| 221 | + catch (InterruptedException | ExecutionException e) { |
| 222 | + throw new RuntimeException(e); |
| 223 | + } |
| 224 | + }); |
| 225 | + |
| 226 | + checkCacheOperation(CACHE_REMOVE_ALL_CONFLICT, cache -> { |
| 227 | + try { |
| 228 | + ((TcpClientCache<Object, Object>)cache).removeAllConflictAsync(rmvMap).get(); |
| 229 | + } |
| 230 | + catch (InterruptedException | ExecutionException e) { |
| 231 | + throw new RuntimeException(e); |
| 232 | + } |
| 233 | + }); |
| 234 | + } |
183 | 235 | }
|
184 | 236 |
|
185 | 237 | /** Checks cache operation. */
|
@@ -210,36 +262,6 @@ private void checkCacheOperation(OperationType op, Consumer<ClientCache<Object,
|
210 | 262 | assertEquals(1, ops.get());
|
211 | 263 | }
|
212 | 264 |
|
213 |
| - /** |
214 |
| - * Cache {@link TcpClientCache#putAllConflict} operation perfomed |
215 |
| - * @param map {@link Map} with entries for cache put all. |
216 |
| - * @return cache {@link Consumer<ClientCache>}. |
217 |
| - */ |
218 |
| - private Consumer<ClientCache<Object, Object>> putAllConflict(Map<Integer, Object> map) { |
219 |
| - Map<Integer, T3<Object, GridCacheVersion, Long>> drMap = new HashMap<>(); |
220 |
| - |
221 |
| - GridCacheVersion confl = new GridCacheVersion(1, 0, 1, (byte)2); |
222 |
| - |
223 |
| - map.forEach((key, value) -> drMap.put(key, new T3<>(value, confl, CU.EXPIRE_TIME_ETERNAL))); |
224 |
| - |
225 |
| - return cache -> ((TcpClientCache<Object, Object>)cache).putAllConflict(drMap); |
226 |
| - } |
227 |
| - |
228 |
| - /** |
229 |
| - * Cache {@link TcpClientCache#putAllConflict} operation perfomed |
230 |
| - * @param keys {@link Set} with keys for cache remove all. |
231 |
| - * @return cache {@link Consumer<ClientCache>}. |
232 |
| - */ |
233 |
| - private Consumer<ClientCache<Object, Object>> removeAllConflict(Set<Integer> keys) { |
234 |
| - Map<Integer, GridCacheVersion> drMap = new HashMap<>(); |
235 |
| - |
236 |
| - GridCacheVersion confl = new GridCacheVersion(1, 0, 1, (byte)2); |
237 |
| - |
238 |
| - keys.forEach(key -> drMap.put(key, confl)); |
239 |
| - |
240 |
| - return cache -> ((TcpClientCache<Object, Object>)cache).removeAllConflict(drMap); |
241 |
| - } |
242 |
| - |
243 | 265 | /** @throws Exception If failed. */
|
244 | 266 | @Test
|
245 | 267 | public void testTransaction() throws Exception {
|
|
0 commit comments