1 /***
2 * Copyright 2003-2010 Terracotta, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.distribution;
18
19 import static net.sf.ehcache.util.RetryAssert.assertBy;
20 import static net.sf.ehcache.util.RetryAssert.elementAt;
21 import static net.sf.ehcache.util.RetryAssert.sizeOf;
22 import static org.hamcrest.core.Is.is;
23 import static org.hamcrest.core.IsNull.notNullValue;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertNotNull;
26 import static org.junit.Assert.assertNull;
27 import static org.junit.Assert.assertThat;
28 import static org.junit.Assert.assertTrue;
29 import static org.junit.Assert.fail;
30
31 import java.io.IOException;
32 import java.io.Serializable;
33 import java.util.ArrayList;
34 import java.util.Arrays;
35 import java.util.Collections;
36 import java.util.Date;
37 import java.util.List;
38 import java.util.Random;
39 import java.util.Set;
40 import java.util.concurrent.Callable;
41 import java.util.concurrent.TimeUnit;
42
43 import net.sf.ehcache.AbstractCacheTest;
44 import net.sf.ehcache.Cache;
45 import net.sf.ehcache.CacheException;
46 import net.sf.ehcache.CacheManager;
47 import net.sf.ehcache.Ehcache;
48 import net.sf.ehcache.Element;
49 import net.sf.ehcache.ThreadKiller;
50 import net.sf.ehcache.event.CountingCacheEventListener;
51 import net.sf.ehcache.util.RetryAssert;
52
53 import org.hamcrest.collection.IsEmptyCollection;
54 import org.junit.After;
55 import org.junit.AfterClass;
56 import org.junit.Assume;
57 import org.junit.Before;
58 import org.junit.BeforeClass;
59 import org.junit.Test;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 /***
64 * Tests replication of Cache events
65 * <p/>
66 * Note these tests need a live network interface running in multicast mode to work
67 * <p/>
68 * If running involving RMIAsynchronousCacheReplicator individually the test will fail because
69 * the VM will gobble up the SoftReferences rather than allocating more memory. Uncomment the
70 * forceVMGrowth() method usage in setup.
71 *
72 * @author Greg Luck
73 * @version $Id: RMICacheReplicatorTest.html 13146 2011-08-01 17:12:39Z oletizi $
74 */
75 public class RMICacheReplicatorTest extends AbstractRMITest {
76
77 @BeforeClass
78 public static void enableHeapDump() {
79 setHeapDumpOnOutOfMemoryError(true);
80 }
81
82 @AfterClass
83 public static void disableHeapDump() {
84 setHeapDumpOnOutOfMemoryError(false);
85 }
86
87 /***
88 * A value to represent replicate asynchronously
89 */
90 protected static final boolean ASYNCHRONOUS = true;
91
92 /***
93 * A value to represent replicate synchronously
94 */
95 protected static final boolean SYNCHRONOUS = false;
96
97 private static final Logger LOG = LoggerFactory.getLogger(RMICacheReplicatorTest.class.getName());
98
99
100 /***
101 * CacheManager 1 in the cluster
102 */
103 protected CacheManager manager1;
104 /***
105 * CacheManager 2 in the cluster
106 */
107 protected CacheManager manager2;
108 /***
109 * CacheManager 3 in the cluster
110 */
111 protected CacheManager manager3;
112 /***
113 * CacheManager 4 in the cluster
114 */
115 protected CacheManager manager4;
116 /***
117 * CacheManager 5 in the cluster
118 */
119 protected CacheManager manager5;
120 /***
121 * CacheManager 6 in the cluster
122 */
123 protected CacheManager manager6;
124
125 /***
126 * The name of the cache under test
127 */
128 protected String cacheName = "sampleCache1";
129 /***
130 * CacheManager 1 of 2s cache being replicated
131 */
132 protected Ehcache cache1;
133
134 /***
135 * CacheManager 2 of 2s cache being replicated
136 */
137 protected Ehcache cache2;
138
139 /***
140 * Allows setup to be the same
141 */
142 protected String cacheNameBase = "ehcache-distributed";
143
144 /***
145 * {@inheritDoc}
146 * Sets up two caches: cache1 is local. cache2 is to be receive updates
147 *
148 * @throws Exception
149 */
150 @Before
151 public void setUp() throws Exception {
152 Assume.assumeThat(getActiveReplicationThreads(), IsEmptyCollection.<Thread>empty());
153
154
155
156
157
158 MulticastKeepaliveHeartbeatSender.setHeartBeatInterval(1000);
159
160 manager1 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed1.xml");
161 manager2 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed2.xml");
162 manager3 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed3.xml");
163 manager4 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed4.xml");
164 manager5 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed5.xml");
165
166
167
168
169 waitForClusterMembership(10, TimeUnit.SECONDS, Collections.singleton(cacheName), manager1, manager2, manager3, manager4, manager5);
170
171 manager1.getCache(cacheName).put(new Element("setup", "setup"));
172 for (CacheManager manager : new CacheManager[] {manager1, manager2, manager3, manager4, manager5}) {
173 assertBy(10, TimeUnit.SECONDS, elementAt(manager.getCache(cacheName), "setup"), notNullValue());
174 }
175
176 manager1.getCache(cacheName).removeAll();
177 for (CacheManager manager : new CacheManager[] {manager1, manager2, manager3, manager4, manager5}) {
178 assertBy(10, TimeUnit.SECONDS, sizeOf(manager.getCache(cacheName)), is(0));
179 }
180
181 CountingCacheEventListener.resetCounters();
182 cache1 = manager1.getCache(cacheName);
183 cache2 = manager2.getCache(cacheName);
184 }
185
186 /***
187 * {@inheritDoc}
188 *
189 * @throws Exception
190 */
191 @After
192 public void tearDown() throws Exception {
193
194 if (manager1 != null) {
195 manager1.shutdown();
196 }
197 if (manager2 != null) {
198 manager2.shutdown();
199 }
200 if (manager3 != null) {
201 manager3.shutdown();
202 }
203 if (manager4 != null) {
204 manager4.shutdown();
205 }
206 if (manager5 != null) {
207 manager5.shutdown();
208 }
209 if (manager6 != null) {
210 manager6.shutdown();
211 }
212
213 RetryAssert.assertBy(30, TimeUnit.SECONDS, new Callable<Set<Thread>>() {
214 public Set<Thread> call() throws Exception {
215 return getActiveReplicationThreads();
216 }
217 }, IsEmptyCollection.<Thread>empty());
218 }
219
220 /***
221 * Does a new cache manager in the cluster get detected?
222 */
223 @Test
224 public void testRemoteCachePeersDetectsNewCacheManager() throws InterruptedException {
225
226 manager6 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed6.xml");
227
228
229 waitForClusterMembership(10020, TimeUnit.MILLISECONDS, Collections.singleton(cache1.getName()), manager1, manager2, manager3, manager4, manager5, manager6);
230 }
231
232 /***
233 * Does a down cache manager in the cluster get removed?
234 */
235 @Test
236 public void testRemoteCachePeersDetectsDownCacheManager() throws InterruptedException {
237
238 manager5.shutdown();
239
240
241 waitForClusterMembership(11020, TimeUnit.MILLISECONDS, Collections.singleton(cache1.getName()), manager1, manager2, manager3, manager4);
242 }
243
244 /***
245 * Does a down cache manager in the cluster get removed?
246 */
247 @Test
248 public void testRemoteCachePeersDetectsDownCacheManagerSlow() throws InterruptedException {
249 try {
250 CacheManagerPeerProvider provider = manager1.getCacheManagerPeerProvider("RMI");
251 List remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
252 assertEquals(4, remotePeersOfCache1.size());
253
254 MulticastKeepaliveHeartbeatSender.setHeartBeatInterval(2000);
255 Thread.sleep(2000);
256
257
258 manager5.shutdown();
259
260
261 remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
262 assertEquals(4, remotePeersOfCache1.size());
263 } finally {
264 MulticastKeepaliveHeartbeatSender.setHeartBeatInterval(1000);
265 Thread.sleep(2000);
266 }
267 }
268
269 /***
270 * Tests put and remove initiated from cache1 in a cluster
271 * <p/>
272 * This test goes into an infinite loop if the chain of notifications is not somehow broken.
273 */
274 @Test
275 public void testPutPropagatesFromAndToEveryCacheManagerAndCache() throws CacheException, InterruptedException {
276
277
278 final String[] cacheNames = manager1.getCacheNames();
279 Arrays.sort(cacheNames);
280 for (int i = 0; i < cacheNames.length; i++) {
281 String name = cacheNames[i];
282 manager1.getCache(name).put(new Element(Integer.toString(i), Integer.valueOf(i)));
283
284 manager1.getCache(name).put(new Element("nonSerializable" + i, new Object()));
285 }
286
287 assertBy(10, TimeUnit.SECONDS, new Callable<Boolean>() {
288
289 public Boolean call() throws Exception {
290 for (int i = 0; i < cacheNames.length; i++) {
291 String name = cacheNames[i];
292 if (manager1.getCacheManagerPeerProvider("RMI").listRemoteCachePeers(manager1.getCache(name)).isEmpty()) {
293 continue;
294 }
295 if ("sampleCache2".equals(name)) {
296
297 for (CacheManager manager : new CacheManager[] {manager2, manager3, manager4, manager5}) {
298 assertNull(manager.getCache(name).get(Integer.toString(i)));
299 assertNull(manager.getCache(name).get("nonSerializable" + i));
300 }
301 } else {
302 for (CacheManager manager : new CacheManager[] {manager2, manager3, manager4, manager5}) {
303 assertNotNull("Cache : " + name, manager.getCache(name).get(Integer.toString(i)));
304 assertNull(manager.getCache(name).get("nonSerializable" + i));
305 }
306 }
307 }
308 return Boolean.TRUE;
309 }
310 }, is(Boolean.TRUE));
311 }
312
313 /***
314 * Tests what happens when a CacheManager in the cluster comes and goes. In ehcache-1.2.4 this would cause the new RMI CachePeers in the CacheManager to
315 * be permanently corrupt.
316 */
317 @Test
318 public void testPutPropagatesFromAndToEveryCacheManagerAndCacheDirty() throws CacheException, InterruptedException {
319
320 manager3.shutdown();
321 waitForClusterMembership(11020, TimeUnit.MILLISECONDS, Collections.singleton(cacheName), manager1, manager2, manager4, manager5);
322
323 manager3 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed3.xml");
324 waitForClusterMembership(11020, TimeUnit.MILLISECONDS, Collections.singleton(cacheName), manager1, manager2, manager3, manager4, manager5);
325
326
327 final String[] cacheNames = manager1.getCacheNames();
328 Arrays.sort(cacheNames);
329 for (int i = 0; i < cacheNames.length; i++) {
330 String name = cacheNames[i];
331 manager1.getCache(name).put(new Element(Integer.toString(i), Integer.valueOf(i)));
332
333 manager1.getCache(name).put(new Element("nonSerializable" + i, new Object()));
334 }
335
336 assertBy(10, TimeUnit.SECONDS, new Callable<Boolean>() {
337
338 public Boolean call() throws Exception {
339 for (int i = 0; i < cacheNames.length; i++) {
340 String name = cacheNames[i];
341 if (manager1.getCacheManagerPeerProvider("RMI").listRemoteCachePeers(manager1.getCache(name)).isEmpty()) {
342 continue;
343 }
344 if ("sampleCache2".equals(name)) {
345
346 for (CacheManager manager : new CacheManager[] {manager2, manager3, manager4, manager5}) {
347 assertNull(manager2.getCache(name).get(Integer.toString(i)));
348 assertNull(manager2.getCache(name).get("nonSerializable" + i));
349 }
350 } else {
351 for (CacheManager manager : new CacheManager[] {manager2, manager3, manager4, manager5}) {
352 assertNotNull(manager2.getCache(name).get(Integer.toString(i)));
353 assertNull(manager2.getCache(name).get("nonSerializable" + i));
354 }
355 }
356 }
357 return Boolean.TRUE;
358 }
359 }, is(Boolean.TRUE));
360 }
361
362 /***
363 * manager1 adds a replicating cache, then manager2 and so on. Then we remove one. Does everything work as expected?
364 */
365 @Test
366 public void testPutWithNewCacheAddedProgressively() throws InterruptedException {
367
368 manager1.addCache("progressiveAddCache");
369 manager2.addCache("progressiveAddCache");
370
371
372 try {
373 putTest(manager1.getCache("progressiveAddCache"), manager2.getCache("progressiveAddCache"), ASYNCHRONOUS);
374 fail();
375 } catch (AssertionError e) {
376
377 }
378
379
380 putTest(manager1.getCache("progressiveAddCache"), manager2.getCache("progressiveAddCache"), ASYNCHRONOUS);
381
382 Cache secondCache = manager2.getCache("progressiveAddCache");
383
384
385 manager2.removeCache("progressiveAddCache");
386 try {
387 putTest(manager1.getCache("progressiveAddCache"), secondCache, ASYNCHRONOUS);
388 fail();
389 } catch (IllegalStateException e) {
390
391
392 }
393
394
395 }
396
397
398 /***
399 * Test various cache configurations for cache1 - explicit setting of:
400 * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
401 */
402 @Test
403 public void testPutWithExplicitReplicationConfig() throws InterruptedException {
404
405 putTest(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
406 }
407
408
409 /***
410 * Test various cache configurations for cache1 - explicit setting of:
411 * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
412 */
413 @Test
414 public void testPutWithThreadKiller() throws InterruptedException {
415
416 putTestWithThreadKiller(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
417 }
418
419 /***
420 * CacheEventListeners that are not CacheReplicators should receive cache events originated from receipt
421 * of a remote event by a CachePeer.
422 */
423
424 @Test
425 public void testRemotelyReceivedPutNotifiesCountingListener() throws InterruptedException {
426
427 putTest(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
428 assertEquals(1, CountingCacheEventListener.getCacheElementsPut(manager1.getCache("sampleCache1")).size());
429 assertEquals(1, CountingCacheEventListener.getCacheElementsPut(manager2.getCache("sampleCache1")).size());
430
431 }
432
433 /***
434 * Test various cache configurations for cache1 - explicit setting of:
435 * properties="replicateAsynchronously=false, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
436 */
437 @Test
438 public void testPutWithExplicitReplicationSynchronousConfig() throws InterruptedException {
439 putTest(manager1.getCache("sampleCache3"), manager2.getCache("sampleCache3"), SYNCHRONOUS);
440 }
441
442
443 /***
444 * Test put replicated for cache4 - no properties.
445 * Defaults should be replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
446 */
447 @Test
448 public void testPutWithEmptyReplicationPropertiesConfig() throws InterruptedException {
449 putTest(manager1.getCache("sampleCache4"), manager2.getCache("sampleCache4"), ASYNCHRONOUS);
450 }
451
452 /***
453 * Test put replicated for cache4 - missing replicatePuts property.
454 * replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
455 * should equal replicateAsynchronously=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
456 */
457 @Test
458 public void testPutWithOneMissingReplicationPropertyConfig() throws InterruptedException {
459 putTest(manager1.getCache("sampleCache5"), manager2.getCache("sampleCache5"), ASYNCHRONOUS);
460 }
461
462
463 /***
464 * Tests put and remove initiated from cache1 in a cluster
465 * <p/>
466 * This test goes into an infinite loop if the chain of notifications is not somehow broken.
467 */
468 public void putTest(Ehcache fromCache, Ehcache toCache, boolean asynchronous) throws CacheException, InterruptedException {
469
470 Serializable key = new Date();
471 Serializable value = new Date();
472 Element sourceElement = new Element(key, value);
473
474
475 fromCache.put(sourceElement);
476 int i = 0;
477
478 if (asynchronous) {
479 waitForPropagate();
480 }
481
482
483 Element deliveredElement = toCache.get(key);
484 assertEquals(sourceElement, deliveredElement);
485
486 }
487
488
489 /***
490 * Tests put and remove initiated from cache1 in a cluster
491 * <p/>
492 * This test goes into an infinite loop if the chain of notifications is not somehow broken.
493 */
494 public void putTestWithThreadKiller(Ehcache fromCache, Ehcache toCache, boolean asynchronous)
495 throws CacheException, InterruptedException {
496
497 fromCache.put(new Element("thread killer", new ThreadKiller()));
498 if (asynchronous) {
499 waitForPropagate();
500 }
501
502 Serializable key = new Date();
503 Serializable value = new Date();
504 Element sourceElement = new Element(key, value);
505
506
507 fromCache.put(sourceElement);
508
509 if (asynchronous) {
510 waitForPropagate();
511 }
512
513
514 Element deliveredElement = toCache.get(key);
515 assertEquals(sourceElement, deliveredElement);
516
517 }
518
519
520 /***
521 * Checks that a put received from a remote cache notifies any registered listeners.
522 * <p/>
523 * This test goes into an infinite loop if the chain of notifications is not somehow broken.
524 */
525 @Test
526 public void testRemotePutNotificationGetsToOtherListeners() throws CacheException, InterruptedException {
527
528 Serializable key = new Date();
529 Serializable value = new Date();
530 Element element1 = new Element(key, value);
531
532
533 cache1.put(new Element("1", new Date()));
534 cache1.put(new Element("2", new Date()));
535 cache1.put(new Element("3", new Date()));
536
537
538 Object nonSerializableObject = new Object();
539 cache1.put(new Element(nonSerializableObject, new Object()));
540
541
542 waitForPropagate();
543
544
545 assertEquals(4, CountingCacheEventListener.getCacheElementsPut(cache1).size());
546
547 assertEquals(3, CountingCacheEventListener.getCacheElementsPut(cache2).size());
548
549
550 cache1.put(new Element("1", new Date()));
551 cache1.put(new Element("2", new Date()));
552 cache1.put(new Element("3", new Date()));
553
554
555 cache1.put(new Element(nonSerializableObject, new Object()));
556
557 waitForPropagate();
558
559
560 assertEquals(3, CountingCacheEventListener.getCacheElementsUpdated(cache1).size());
561
562 assertEquals(3, CountingCacheEventListener.getCacheElementsUpdated(cache2).size());
563
564
565 cache1.remove("1");
566 cache1.remove("2");
567 cache1.remove("3");
568 cache1.remove(nonSerializableObject);
569
570 waitForPropagate();
571
572
573 assertEquals(4, CountingCacheEventListener.getCacheElementsRemoved(cache1).size());
574
575 assertEquals(3, CountingCacheEventListener.getCacheElementsRemoved(cache2).size());
576
577 }
578
579
580 /***
581 * Test various cache configurations for cache1 - explicit setting of:
582 * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
583 */
584 @Test
585 public void testRemoveWithExplicitReplicationConfig() throws InterruptedException {
586 removeTest(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
587 }
588
589 /***
590 * Test various cache configurations for cache1 - explicit setting of:
591 * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
592 */
593 @Test
594 public void testRemoveWithExplicitReplicationSynchronousConfig() throws InterruptedException {
595 removeTest(manager1.getCache("sampleCache3"), manager2.getCache("sampleCache3"), SYNCHRONOUS);
596 }
597
598
599 /***
600 * Test put replicated for cache4 - no properties.
601 * Defaults should be replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
602 */
603 @Test
604 public void testRemoveWithEmptyReplicationPropertiesConfig() throws InterruptedException {
605 removeTest(manager1.getCache("sampleCache4"), manager2.getCache("sampleCache4"), ASYNCHRONOUS);
606 }
607
608 /***
609 * Tests put and remove initiated from a cache to another cache in a cluster
610 * <p/>
611 * This test goes into an infinite loop if the chain of notifications is not somehow broken.
612 */
613 public void removeTest(Ehcache fromCache, Ehcache toCache, boolean asynchronous) throws CacheException, InterruptedException {
614
615 Serializable key = new Date();
616 Serializable value = new Date();
617 Element element1 = new Element(key, value);
618
619
620 fromCache.put(element1);
621
622 if (asynchronous) {
623 waitForPropagate();
624 }
625
626
627 Element element2 = toCache.get(key);
628 assertEquals(element1, element2);
629
630
631 fromCache.remove(key);
632 if (asynchronous) {
633 waitForPropagate();
634 }
635
636
637 element2 = toCache.get(key);
638 assertNull(element2);
639
640 }
641
642
643 /***
644 * test removeAll sync
645 */
646 @Test
647 public void testRemoveAllAsynchronous() throws Exception {
648 removeAllTest(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
649 }
650
651 /***
652 * test removeAll async
653 */
654 @Test
655 public void testRemoveAllSynchronous() throws Exception {
656 removeAllTest(manager1.getCache("sampleCache3"), manager2.getCache("sampleCache3"), SYNCHRONOUS);
657 }
658
659 /***
660 * Tests removeAll initiated from a cache to another cache in a cluster
661 * <p/>
662 * This test goes into an infinite loop if the chain of notifications is not somehow broken.
663 */
664 public void removeAllTest(Ehcache fromCache, Ehcache toCache, boolean asynchronous) throws Exception {
665
666
667 waitForPropagate();
668
669
670 Serializable key = new Date();
671 Serializable value = new Date();
672 Element element1 = new Element(key, value);
673
674
675 fromCache.put(element1);
676
677
678 if (asynchronous) {
679 waitForPropagate();
680 }
681
682
683 Element element2 = toCache.get(key);
684 assertEquals(element1, element2);
685
686
687 fromCache.removeAll();
688 if (asynchronous) {
689 waitForPropagate();
690 }
691
692
693 element2 = toCache.get(key);
694 assertNull(element2);
695 assertEquals(0, toCache.getSize());
696
697 }
698
699
700 /***
701 * Test various cache configurations for cache1 - explicit setting of:
702 * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
703 */
704 @Test
705 public void testUpdateWithExplicitReplicationConfig() throws Exception {
706 updateViaCopyTest(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
707 }
708
709 /***
710 * Test various cache configurations for cache1 - explicit setting of:
711 * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
712 */
713 @Test
714 public void testUpdateWithExplicitReplicationSynchronousConfig() throws Exception {
715 updateViaCopyTest(manager1.getCache("sampleCache3"), manager2.getCache("sampleCache3"), SYNCHRONOUS);
716 }
717
718
719 /***
720 * Test put replicated for cache4 - no properties.
721 * Defaults should be replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
722 */
723 @Test
724 public void testUpdateWithEmptyReplicationPropertiesConfig() throws Exception {
725 updateViaCopyTest(manager1.getCache("sampleCache4"), manager2.getCache("sampleCache4"), ASYNCHRONOUS);
726 }
727
728 /***
729 * Tests put and update through copy initiated from cache1 in a cluster
730 * <p/>
731 * This test goes into an infinite loop if the chain of notifications is not somehow broken.
732 */
733 public void updateViaCopyTest(Ehcache fromCache, Ehcache toCache, boolean asynchronous) throws Exception {
734
735 fromCache.removeAll();
736 toCache.removeAll();
737
738
739 waitForPropagate();
740
741 Serializable key = new Date();
742 Serializable value = new Date();
743 Element element1 = new Element(key, value);
744
745
746 fromCache.put(element1);
747 if (asynchronous) {
748 waitForPropagate();
749 }
750
751
752 Element element2 = toCache.get(key);
753 assertEquals(element1, element2);
754
755
756 Element updatedElement1 = new Element(key, new Date());
757
758 fromCache.put(updatedElement1);
759 if (asynchronous) {
760 waitForPropagate();
761 }
762
763
764 Element receivedUpdatedElement2 = toCache.get(key);
765 assertEquals(updatedElement1, receivedUpdatedElement2);
766
767 }
768
769
770 /***
771 * Tests put through invalidation initiated from cache1 in a cluster
772 * <p/>
773 * This test goes into an infinite loop if the chain of notifications is not somehow broken.
774 */
775 @Test
776 public void testPutViaInvalidate() throws CacheException, InterruptedException, IOException {
777
778 cache1 = manager1.getCache("sampleCache2");
779 cache1.removeAll();
780
781 cache2 = manager2.getCache("sampleCache2");
782 cache2.removeAll();
783
784
785 waitForPropagate();
786
787 String key = "1";
788 Serializable value = new Date();
789 Element element1 = new Element(key, value);
790
791 Element element3 = new Element("key2", "two");
792
793
794 cache2.put(element1);
795 assertNotNull(cache2.get(key));
796 waitForPropagate();
797
798
799 Element element2 = cache1.get(key);
800 assertEquals(element1, element2);
801
802
803 cache1.put(element3);
804 waitForPropagate();
805
806
807 assertNull(cache2.get("key2"));
808
809
810 cache1.put(element3);
811 waitForPropagate();
812
813
814 element2 = cache2.get("key2");
815 assertNull(element2);
816
817 }
818
819
820 /***
821 * Tests put and update through invalidation initiated from cache1 in a cluster
822 * <p/>
823 * This test goes into an infinite loop if the chain of notifications is not somehow broken.
824 */
825 @Test
826 public void testUpdateViaInvalidate() throws CacheException, InterruptedException, IOException {
827
828 cache1 = manager1.getCache("sampleCache2");
829 cache1.removeAll();
830
831 cache2 = manager2.getCache("sampleCache2");
832 cache2.removeAll();
833
834
835 waitForPropagate();
836
837 String key = "1";
838 Serializable value = new Date();
839 Element element1 = new Element(key, value);
840
841
842 cache2.put(element1);
843 Element element2 = cache2.get(key);
844 assertEquals(element1, element2);
845
846
847 cache1.put(element1);
848 waitForPropagate();
849
850
851 element2 = cache2.get(key);
852 assertNull(element2);
853
854 }
855
856
857 /***
858 * Tests put and update through invalidation initiated from cache1 in a cluster
859 * <p/>
860 * This test goes into an infinite loop if the chain of notifications is not somehow broken.
861 */
862 @Test
863 public void testUpdateViaInvalidateNonSerializableValue() throws CacheException, InterruptedException, IOException {
864
865 cache1 = manager1.getCache("sampleCache2");
866 cache1.removeAll();
867
868 cache2 = manager2.getCache("sampleCache2");
869 cache2.removeAll();
870
871
872 waitForPropagate();
873
874 String key = "1";
875 Serializable value = new Date();
876
877 /***
878 * Non-serializable test class
879 */
880 class NonSerializable {
881
882 }
883
884 NonSerializable value1 = new NonSerializable();
885 Element element1 = new Element(key, value1);
886
887
888 cache2.put(element1);
889 Element element2 = cache2.get(key);
890 assertEquals(element1, element2);
891
892
893 cache1.put(element1);
894 waitForPropagate();
895
896
897 element2 = cache2.get(key);
898 assertNull(element2);
899
900 }
901
902
903 /***
904 * What happens when two cache instances replicate to each other and a change is initiated
905 */
906 @Test
907 public void testInfiniteNotificationsLoop() throws InterruptedException {
908
909 Serializable key = "1";
910 Serializable value = new Date();
911 Element element = new Element(key, value);
912
913
914 cache1.put(element);
915 waitForPropagate();
916
917
918 Element element2 = cache2.get(key);
919 assertEquals(element, element2);
920
921
922 cache1.remove(key);
923 assertNull(cache1.get(key));
924
925
926 waitForPropagate();
927 element2 = cache2.get(key);
928 assertNull(element2);
929
930
931 Element element3 = new Element("3", "ddsfds");
932 cache2.put(element3);
933 waitForPropagate();
934 Element element4 = cache2.get("3");
935 assertEquals(element3, element4);
936
937 }
938
939
940 /***
941 * Shows result of perf problem and fix in flushReplicationQueue
942 * <p/>
943 * Behaviour before change:
944 * <p/>
945 * INFO: Items written: 10381
946 * Oct 29, 2007 11:40:04 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
947 * INFO: Items written: 29712
948 * Oct 29, 2007 11:40:57 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
949 * INFO: Items written: 1
950 * Oct 29, 2007 11:40:58 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
951 * INFO: Items written: 32354
952 * Oct 29, 2007 11:42:34 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
953 * INFO: Items written: 322
954 * Oct 29, 2007 11:42:35 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
955 * INFO: Items written: 41909
956 * <p/>
957 * Behaviour after change:
958 * INFO: Items written: 26356
959 * Oct 29, 2007 11:44:39 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
960 * INFO: Items written: 33656
961 * Oct 29, 2007 11:44:40 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
962 * INFO: Items written: 32234
963 * Oct 29, 2007 11:44:42 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
964 * INFO: Items written: 38677
965 * Oct 29, 2007 11:44:43 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
966 * INFO: Items written: 43418
967 * Oct 29, 2007 11:44:44 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
968 * INFO: Items written: 31277
969 * Oct 29, 2007 11:44:45 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
970 * INFO: Items written: 27769
971 * Oct 29, 2007 11:44:46 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
972 * INFO: Items written: 29596
973 * Oct 29, 2007 11:44:47 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
974 * INFO: Items written: 17142
975 * Oct 29, 2007 11:44:48 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
976 * INFO: Items written: 14775
977 * Oct 29, 2007 11:44:49 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
978 * INFO: Items written: 4088
979 * Oct 29, 2007 11:44:51 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
980 * INFO: Items written: 5492
981 * Oct 29, 2007 11:44:52 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
982 * INFO: Items written: 10188
983 * <p/>
984 * Also no pauses noted.
985 */
986 @Test
987 public void testReplicatePerf() throws InterruptedException {
988
989 if (manager2 != null) {
990 manager2.shutdown();
991 }
992 if (manager3 != null) {
993 manager3.shutdown();
994 }
995 if (manager4 != null) {
996 manager4.shutdown();
997 }
998 if (manager5 != null) {
999 manager5.shutdown();
1000 }
1001 if (manager6 != null) {
1002 manager6.shutdown();
1003 }
1004
1005
1006 waitForPropagate();
1007
1008
1009 long start = System.currentTimeMillis();
1010 final String keyBase = Long.toString(start);
1011 int count = 0;
1012
1013 for (int i = 0; i < 100000; i++) {
1014 final String key = keyBase + ':' + Integer.toString((int) (Math.random() * 1000.0));
1015 cache1.put(new Element(key, "My Test"));
1016 cache1.get(key);
1017 cache1.remove(key);
1018 count++;
1019
1020 final long end = System.currentTimeMillis();
1021 if (end - start >= 1000) {
1022 start = end;
1023 LOG.info("Items written: " + count);
1024
1025 assertTrue(count > 1000);
1026 count = 0;
1027 }
1028 }
1029 }
1030
1031
1032 /***
1033 * Need to wait for async
1034 *
1035 * @throws InterruptedException
1036 */
1037 protected void waitForPropagate() throws InterruptedException {
1038 Thread.sleep(1500);
1039 }
1040
1041 /***
1042 * Need to wait for async
1043 *
1044 * @throws InterruptedException
1045 */
1046 protected void waitForSlowPropagate() throws InterruptedException {
1047 Thread.sleep(6000);
1048 }
1049
1050
1051 /***
1052 * Distributed operations create extra scope for deadlock.
1053 * This test checks whether a distributed deadlock scenario exists for synchronous replication
1054 * of each distributed operation all at once.
1055 * It shows that no distributed deadlock exists for asynchronous replication. It is multi thread
1056 * and multi process safe.
1057 * <p/>
1058 * Carefully tailored to exercise:
1059 * <ol>
1060 * <li>overflow to disk. We put in 20 things and the memory size is 10
1061 * <li>each peer is working on the same set of keys thus maximising contention
1062 * <li>we do puts, gets and removes to explore all the execution paths
1063 * </ol>
1064 * If a deadlock occurs, processing will stop until a SocketTimeout exception is thrown and
1065 * the deadlock will be released.
1066 */
1067 @Test
1068 public void testCacheOperationsSynchronousMultiThreaded() throws Exception, InterruptedException {
1069
1070
1071 final List executables = new ArrayList();
1072
1073 executables.add(new ClusterExecutable(manager1, "sampleCache3"));
1074 executables.add(new ClusterExecutable(manager2, "sampleCache3"));
1075 executables.add(new ClusterExecutable(manager3, "sampleCache3"));
1076
1077 assertThat(runTasks(executables), IsEmptyCollection.<Throwable>empty());
1078 }
1079
1080
1081 /***
1082 * Distributed operations create extra scope for deadlock.
1083 * This test checks whether a distributed deadlock scenario exists for asynchronous replication
1084 * of each distributed operation all at once.
1085 * It shows that no distributed deadlock exists for asynchronous replication. It is multi thread
1086 * and multi process safe.
1087 * It uses sampleCache2, which is configured to be asynchronous
1088 * <p/>
1089 * Carefully tailored to exercise:
1090 * <ol>
1091 * <li>overflow to disk. We put in 20 things and the memory size is 10
1092 * <li>each peer is working on the same set of keys thus maximising contention
1093 * <li>we do puts, gets and removes to explore all the execution paths
1094 * </ol>
1095 */
1096 @Test
1097 public void testCacheOperationsAynchronousMultiThreaded() throws Exception, InterruptedException {
1098
1099
1100 final List executables = new ArrayList();
1101
1102 executables.add(new ClusterExecutable(manager1, "sampleCache2"));
1103 executables.add(new ClusterExecutable(manager2, "sampleCache2"));
1104 executables.add(new ClusterExecutable(manager3, "sampleCache2"));
1105
1106 assertThat(runTasks(executables), IsEmptyCollection.<Throwable>empty());
1107 }
1108
1109 /***
1110 * An Exececutable which allows the CacheManager to be set
1111 */
1112 class ClusterExecutable implements Callable<Void> {
1113
1114 private final CacheManager manager;
1115 private final String cacheName;
1116
1117 /***
1118 * Construct with CacheManager
1119 *
1120 * @param manager
1121 */
1122 public ClusterExecutable(CacheManager manager, String cacheName) {
1123 this.manager = manager;
1124 this.cacheName = cacheName;
1125 }
1126
1127 /***
1128 * Execute
1129 *
1130 * @throws Exception
1131 */
1132 public Void call() throws Exception {
1133 Random random = new Random();
1134
1135 for (int i = 0; i < 20; i++) {
1136 Integer key = Integer.valueOf((i));
1137 int operationSelector = random.nextInt(4);
1138 Cache cache = manager.getCache(cacheName);
1139 if (operationSelector == 100) {
1140 cache.get(key);
1141 if (LOG.isDebugEnabled()) {
1142 LOG.debug(cache.getGuid() + ": get " + key);
1143 }
1144 } else if (operationSelector == 100) {
1145 cache.remove(key);
1146 if (LOG.isDebugEnabled()) {
1147 LOG.debug(cache.getGuid() + ": remove " + key);
1148 }
1149 } else if (operationSelector == 2) {
1150 cache.put(new Element(key,
1151 "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
1152 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
1153 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
1154 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
1155 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
1156 if (LOG.isDebugEnabled()) {
1157 LOG.debug(cache.getGuid() + ": put " + key);
1158 }
1159 } else {
1160
1161 if (random.nextInt(3) == 1) {
1162 LOG.debug("cache.removeAll()");
1163 cache.removeAll();
1164 }
1165 }
1166 }
1167 return null;
1168 }
1169 }
1170 }