View Javadoc

1   /***
2    *  Copyright 2003-2010 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.distribution;
18  
19  import static net.sf.ehcache.util.RetryAssert.assertBy;
20  import static net.sf.ehcache.util.RetryAssert.elementAt;
21  import static net.sf.ehcache.util.RetryAssert.sizeOf;
22  import static org.hamcrest.core.Is.is;
23  import static org.hamcrest.core.IsNull.notNullValue;
24  import static org.junit.Assert.assertEquals;
25  import static org.junit.Assert.assertNotNull;
26  import static org.junit.Assert.assertNull;
27  import static org.junit.Assert.assertThat;
28  import static org.junit.Assert.assertTrue;
29  import static org.junit.Assert.fail;
30  
31  import java.io.IOException;
32  import java.io.Serializable;
33  import java.util.ArrayList;
34  import java.util.Arrays;
35  import java.util.Collections;
36  import java.util.Date;
37  import java.util.List;
38  import java.util.Random;
39  import java.util.Set;
40  import java.util.concurrent.Callable;
41  import java.util.concurrent.TimeUnit;
42  
43  import net.sf.ehcache.AbstractCacheTest;
44  import net.sf.ehcache.Cache;
45  import net.sf.ehcache.CacheException;
46  import net.sf.ehcache.CacheManager;
47  import net.sf.ehcache.Ehcache;
48  import net.sf.ehcache.Element;
49  import net.sf.ehcache.ThreadKiller;
50  import net.sf.ehcache.event.CountingCacheEventListener;
51  import net.sf.ehcache.util.RetryAssert;
52  
53  import org.hamcrest.collection.IsEmptyCollection;
54  import org.junit.After;
55  import org.junit.AfterClass;
56  import org.junit.Assume;
57  import org.junit.Before;
58  import org.junit.BeforeClass;
59  import org.junit.Test;
60  import org.slf4j.Logger;
61  import org.slf4j.LoggerFactory;
62  
63  /***
64   * Tests replication of Cache events
65   * <p/>
66   * Note these tests need a live network interface running in multicast mode to work
67   * <p/>
68   * If running involving RMIAsynchronousCacheReplicator individually the test will fail because
69   * the VM will gobble up the SoftReferences rather than allocating more memory. Uncomment the
70   * forceVMGrowth() method usage in setup.
71   *
72   * @author Greg Luck
73   * @version $Id: RMICacheReplicatorTest.html 13146 2011-08-01 17:12:39Z oletizi $
74   */
75  public class RMICacheReplicatorTest extends AbstractRMITest {
76  
77      @BeforeClass
78      public static void enableHeapDump() {
79          setHeapDumpOnOutOfMemoryError(true);
80      }
81  
82      @AfterClass
83      public static void disableHeapDump() {
84          setHeapDumpOnOutOfMemoryError(false);
85      }
86  
87      /***
88       * A value to represent replicate asynchronously
89       */
90      protected static final boolean ASYNCHRONOUS = true;
91  
92      /***
93       * A value to represent replicate synchronously
94       */
95      protected static final boolean SYNCHRONOUS = false;
96  
97      private static final Logger LOG = LoggerFactory.getLogger(RMICacheReplicatorTest.class.getName());
98  
99  
100     /***
101      * CacheManager 1 in the cluster
102      */
103     protected CacheManager manager1;
104     /***
105      * CacheManager 2 in the cluster
106      */
107     protected CacheManager manager2;
108     /***
109      * CacheManager 3 in the cluster
110      */
111     protected CacheManager manager3;
112     /***
113      * CacheManager 4 in the cluster
114      */
115     protected CacheManager manager4;
116     /***
117      * CacheManager 5 in the cluster
118      */
119     protected CacheManager manager5;
120     /***
121      * CacheManager 6 in the cluster
122      */
123     protected CacheManager manager6;
124 
125     /***
126      * The name of the cache under test
127      */
128     protected String cacheName = "sampleCache1";
129     /***
130      * CacheManager 1 of 2s cache being replicated
131      */
132     protected Ehcache cache1;
133 
134     /***
135      * CacheManager 2 of 2s cache being replicated
136      */
137     protected Ehcache cache2;
138 
139     /***
140      * Allows setup to be the same
141      */
142     protected String cacheNameBase = "ehcache-distributed";
143 
144     /***
145      * {@inheritDoc}
146      * Sets up two caches: cache1 is local. cache2 is to be receive updates
147      *
148      * @throws Exception
149      */
150     @Before
151     public void setUp() throws Exception {
152         Assume.assumeThat(getActiveReplicationThreads(), IsEmptyCollection.<Thread>empty());
153 
154         //Required to get SoftReference tests to pass. The VM clean up SoftReferences rather than allocating
155         // memory to -Xmx!
156 //        forceVMGrowth();
157 //        System.gc();
158         MulticastKeepaliveHeartbeatSender.setHeartBeatInterval(1000);
159 
160         manager1 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed1.xml");
161         manager2 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed2.xml");
162         manager3 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed3.xml");
163         manager4 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed4.xml");
164         manager5 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed5.xml");
165 
166         //manager6 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed-jndi6.xml");
167 
168         //allow cluster to be established
169         waitForClusterMembership(10, TimeUnit.SECONDS, Collections.singleton(cacheName), manager1, manager2, manager3, manager4, manager5);
170 
171         manager1.getCache(cacheName).put(new Element("setup", "setup"));
172         for (CacheManager manager : new CacheManager[] {manager1, manager2, manager3, manager4, manager5}) {
173             assertBy(10, TimeUnit.SECONDS, elementAt(manager.getCache(cacheName), "setup"), notNullValue());
174         }
175 
176         manager1.getCache(cacheName).removeAll();
177         for (CacheManager manager : new CacheManager[] {manager1, manager2, manager3, manager4, manager5}) {
178             assertBy(10, TimeUnit.SECONDS, sizeOf(manager.getCache(cacheName)), is(0));
179         }
180 
181         CountingCacheEventListener.resetCounters();
182         cache1 = manager1.getCache(cacheName);
183         cache2 = manager2.getCache(cacheName);
184     }
185 
186     /***
187      * {@inheritDoc}
188      *
189      * @throws Exception
190      */
191     @After
192     public void tearDown() throws Exception {
193 
194         if (manager1 != null) {
195             manager1.shutdown();
196         }
197         if (manager2 != null) {
198             manager2.shutdown();
199         }
200         if (manager3 != null) {
201             manager3.shutdown();
202         }
203         if (manager4 != null) {
204             manager4.shutdown();
205         }
206         if (manager5 != null) {
207             manager5.shutdown();
208         }
209         if (manager6 != null) {
210             manager6.shutdown();
211         }
212 
213         RetryAssert.assertBy(30, TimeUnit.SECONDS, new Callable<Set<Thread>>() {
214             public Set<Thread> call() throws Exception {
215                 return getActiveReplicationThreads();
216             }
217         }, IsEmptyCollection.<Thread>empty());
218     }
219 
220     /***
221      * Does a new cache manager in the cluster get detected?
222      */
223     @Test
224     public void testRemoteCachePeersDetectsNewCacheManager() throws InterruptedException {
225         //Add new CacheManager to cluster
226         manager6 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed6.xml");
227 
228         //Allow detection to occur
229         waitForClusterMembership(10020, TimeUnit.MILLISECONDS, Collections.singleton(cache1.getName()), manager1, manager2, manager3, manager4, manager5, manager6);
230     }
231 
232     /***
233      * Does a down cache manager in the cluster get removed?
234      */
235     @Test
236     public void testRemoteCachePeersDetectsDownCacheManager() throws InterruptedException {
237         //Drop a CacheManager from the cluster
238         manager5.shutdown();
239 
240         //Allow change detection to occur. Heartbeat 1 second and is not stale until 5000
241         waitForClusterMembership(11020, TimeUnit.MILLISECONDS, Collections.singleton(cache1.getName()), manager1, manager2, manager3, manager4);
242     }
243 
244     /***
245      * Does a down cache manager in the cluster get removed?
246      */
247     @Test
248     public void testRemoteCachePeersDetectsDownCacheManagerSlow() throws InterruptedException {
249         try {
250             CacheManagerPeerProvider provider = manager1.getCacheManagerPeerProvider("RMI");
251             List remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
252             assertEquals(4, remotePeersOfCache1.size());
253 
254             MulticastKeepaliveHeartbeatSender.setHeartBeatInterval(2000);
255             Thread.sleep(2000);
256 
257             //Drop a CacheManager from the cluster
258             manager5.shutdown();
259 
260             //Insufficient time for it to timeout
261             remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
262             assertEquals(4, remotePeersOfCache1.size());
263         } finally {
264             MulticastKeepaliveHeartbeatSender.setHeartBeatInterval(1000);
265             Thread.sleep(2000);
266         }
267     }
268 
269     /***
270      * Tests put and remove initiated from cache1 in a cluster
271      * <p/>
272      * This test goes into an infinite loop if the chain of notifications is not somehow broken.
273      */
274     @Test
275     public void testPutPropagatesFromAndToEveryCacheManagerAndCache() throws CacheException, InterruptedException {
276 
277         //Put
278         final String[] cacheNames = manager1.getCacheNames();
279         Arrays.sort(cacheNames);
280         for (int i = 0; i < cacheNames.length; i++) {
281             String name = cacheNames[i];
282             manager1.getCache(name).put(new Element(Integer.toString(i), Integer.valueOf(i)));
283             //Add some non serializable elements that should not get propagated
284             manager1.getCache(name).put(new Element("nonSerializable" + i, new Object()));
285         }
286 
287         assertBy(10, TimeUnit.SECONDS, new Callable<Boolean>() {
288 
289             public Boolean call() throws Exception {
290                 for (int i = 0; i < cacheNames.length; i++) {
291                     String name = cacheNames[i];
292                     if (manager1.getCacheManagerPeerProvider("RMI").listRemoteCachePeers(manager1.getCache(name)).isEmpty()) {
293                         continue;
294                     }
295                     if ("sampleCache2".equals(name)) {
296                         //sampleCache2 in manager1 replicates puts via invalidate, so the count will be 1 less
297                         for (CacheManager manager : new CacheManager[] {manager2, manager3, manager4, manager5}) {
298                             assertNull(manager.getCache(name).get(Integer.toString(i)));
299                             assertNull(manager.getCache(name).get("nonSerializable" + i));
300                         }
301                     } else {
302                         for (CacheManager manager : new CacheManager[] {manager2, manager3, manager4, manager5}) {
303                             assertNotNull("Cache : " + name, manager.getCache(name).get(Integer.toString(i)));
304                             assertNull(manager.getCache(name).get("nonSerializable" + i));
305                         }
306                     }
307                 }
308                 return Boolean.TRUE;
309             }
310         }, is(Boolean.TRUE));
311     }
312 
313     /***
314      * Tests what happens when a CacheManager in the cluster comes and goes. In ehcache-1.2.4 this would cause the new RMI CachePeers in the CacheManager to
315      * be permanently corrupt.
316      */
317     @Test
318     public void testPutPropagatesFromAndToEveryCacheManagerAndCacheDirty() throws CacheException, InterruptedException {
319 
320         manager3.shutdown();
321         waitForClusterMembership(11020, TimeUnit.MILLISECONDS, Collections.singleton(cacheName), manager1, manager2, manager4, manager5);
322 
323         manager3 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed3.xml");
324         waitForClusterMembership(11020, TimeUnit.MILLISECONDS, Collections.singleton(cacheName), manager1, manager2, manager3, manager4, manager5);
325 
326         //Put
327         final String[] cacheNames = manager1.getCacheNames();
328         Arrays.sort(cacheNames);
329         for (int i = 0; i < cacheNames.length; i++) {
330             String name = cacheNames[i];
331             manager1.getCache(name).put(new Element(Integer.toString(i), Integer.valueOf(i)));
332             //Add some non serializable elements that should not get propagated
333             manager1.getCache(name).put(new Element("nonSerializable" + i, new Object()));
334         }
335 
336         assertBy(10, TimeUnit.SECONDS, new Callable<Boolean>() {
337 
338             public Boolean call() throws Exception {
339                 for (int i = 0; i < cacheNames.length; i++) {
340                     String name = cacheNames[i];
341                     if (manager1.getCacheManagerPeerProvider("RMI").listRemoteCachePeers(manager1.getCache(name)).isEmpty()) {
342                         continue;
343                     }
344                     if ("sampleCache2".equals(name)) {
345                         //sampleCache2 in manager1 replicates puts via invalidate, so the count will be 1 less
346                         for (CacheManager manager : new CacheManager[] {manager2, manager3, manager4, manager5}) {
347                             assertNull(manager2.getCache(name).get(Integer.toString(i)));
348                             assertNull(manager2.getCache(name).get("nonSerializable" + i));
349                         }
350                     } else {
351                         for (CacheManager manager : new CacheManager[] {manager2, manager3, manager4, manager5}) {
352                             assertNotNull(manager2.getCache(name).get(Integer.toString(i)));
353                             assertNull(manager2.getCache(name).get("nonSerializable" + i));
354                         }
355                     }
356                 }
357                 return Boolean.TRUE;
358             }
359         }, is(Boolean.TRUE));
360     }
361 
362     /***
363      * manager1 adds a replicating cache, then manager2 and so on. Then we remove one. Does everything work as expected?
364      */
365     @Test
366     public void testPutWithNewCacheAddedProgressively() throws InterruptedException {
367 
368         manager1.addCache("progressiveAddCache");
369         manager2.addCache("progressiveAddCache");
370 
371         //The cluster will not have formed yet, so it will fail
372         try {
373             putTest(manager1.getCache("progressiveAddCache"), manager2.getCache("progressiveAddCache"), ASYNCHRONOUS);
374             fail();
375         } catch (AssertionError e) {
376             //expected
377         }
378 
379         //The cluster will now have formed yet, so it will succeed
380         putTest(manager1.getCache("progressiveAddCache"), manager2.getCache("progressiveAddCache"), ASYNCHRONOUS);
381 
382         Cache secondCache = manager2.getCache("progressiveAddCache");
383 
384         //The second peer disappears. The test will fail.
385         manager2.removeCache("progressiveAddCache");
386         try {
387             putTest(manager1.getCache("progressiveAddCache"), secondCache, ASYNCHRONOUS);
388             fail();
389         } catch (IllegalStateException e) {
390             //The second cache will not alive. Expected. But no other exception is caught and this will otherwise fail.
391 
392         }
393 
394 
395     }
396 
397 
398     /***
399      * Test various cache configurations for cache1 - explicit setting of:
400      * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
401      */
402     @Test
403     public void testPutWithExplicitReplicationConfig() throws InterruptedException {
404 
405         putTest(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
406     }
407 
408 
409     /***
410      * Test various cache configurations for cache1 - explicit setting of:
411      * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
412      */
413     @Test
414     public void testPutWithThreadKiller() throws InterruptedException {
415 
416         putTestWithThreadKiller(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
417     }
418 
419     /***
420      * CacheEventListeners that are not CacheReplicators should receive cache events originated from receipt
421      * of a remote event by a CachePeer.
422      */
423 
424     @Test
425     public void testRemotelyReceivedPutNotifiesCountingListener() throws InterruptedException {
426 
427         putTest(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
428         assertEquals(1, CountingCacheEventListener.getCacheElementsPut(manager1.getCache("sampleCache1")).size());
429         assertEquals(1, CountingCacheEventListener.getCacheElementsPut(manager2.getCache("sampleCache1")).size());
430 
431     }
432 
433     /***
434      * Test various cache configurations for cache1 - explicit setting of:
435      * properties="replicateAsynchronously=false, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
436      */
437     @Test
438     public void testPutWithExplicitReplicationSynchronousConfig() throws InterruptedException {
439         putTest(manager1.getCache("sampleCache3"), manager2.getCache("sampleCache3"), SYNCHRONOUS);
440     }
441 
442 
443     /***
444      * Test put replicated for cache4 - no properties.
445      * Defaults should be replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
446      */
447     @Test
448     public void testPutWithEmptyReplicationPropertiesConfig() throws InterruptedException {
449         putTest(manager1.getCache("sampleCache4"), manager2.getCache("sampleCache4"), ASYNCHRONOUS);
450     }
451 
452     /***
453      * Test put replicated for cache4 - missing replicatePuts property.
454      * replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
455      * should equal replicateAsynchronously=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
456      */
457     @Test
458     public void testPutWithOneMissingReplicationPropertyConfig() throws InterruptedException {
459         putTest(manager1.getCache("sampleCache5"), manager2.getCache("sampleCache5"), ASYNCHRONOUS);
460     }
461 
462 
463     /***
464      * Tests put and remove initiated from cache1 in a cluster
465      * <p/>
466      * This test goes into an infinite loop if the chain of notifications is not somehow broken.
467      */
468     public void putTest(Ehcache fromCache, Ehcache toCache, boolean asynchronous) throws CacheException, InterruptedException {
469 
470         Serializable key = new Date();
471         Serializable value = new Date();
472         Element sourceElement = new Element(key, value);
473 
474         //Put
475         fromCache.put(sourceElement);
476         int i = 0;
477 
478         if (asynchronous) {
479             waitForPropagate();
480         }
481 
482         //Should have been replicated to toCache.
483         Element deliveredElement = toCache.get(key);
484         assertEquals(sourceElement, deliveredElement);
485 
486     }
487 
488 
489     /***
490      * Tests put and remove initiated from cache1 in a cluster
491      * <p/>
492      * This test goes into an infinite loop if the chain of notifications is not somehow broken.
493      */
494     public void putTestWithThreadKiller(Ehcache fromCache, Ehcache toCache, boolean asynchronous)
495             throws CacheException, InterruptedException {
496 
497         fromCache.put(new Element("thread killer", new ThreadKiller()));
498         if (asynchronous) {
499             waitForPropagate();
500         }
501 
502         Serializable key = new Date();
503         Serializable value = new Date();
504         Element sourceElement = new Element(key, value);
505 
506         //Put
507         fromCache.put(sourceElement);
508 
509         if (asynchronous) {
510             waitForPropagate();
511         }
512 
513         //Should have been replicated to toCache.
514         Element deliveredElement = toCache.get(key);
515         assertEquals(sourceElement, deliveredElement);
516 
517     }
518 
519 
520     /***
521      * Checks that a put received from a remote cache notifies any registered listeners.
522      * <p/>
523      * This test goes into an infinite loop if the chain of notifications is not somehow broken.
524      */
525     @Test
526     public void testRemotePutNotificationGetsToOtherListeners() throws CacheException, InterruptedException {
527 
528         Serializable key = new Date();
529         Serializable value = new Date();
530         Element element1 = new Element(key, value);
531 
532         //Put
533         cache1.put(new Element("1", new Date()));
534         cache1.put(new Element("2", new Date()));
535         cache1.put(new Element("3", new Date()));
536 
537         //Nonserializable and non deliverable put
538         Object nonSerializableObject = new Object();
539         cache1.put(new Element(nonSerializableObject, new Object()));
540 
541 
542         waitForPropagate();
543 
544         //local initiating cache's counting listener should have been notified
545         assertEquals(4, CountingCacheEventListener.getCacheElementsPut(cache1).size());
546         //remote receiving caches' counting listener should have been notified
547         assertEquals(3, CountingCacheEventListener.getCacheElementsPut(cache2).size());
548 
549         //Update
550         cache1.put(new Element("1", new Date()));
551         cache1.put(new Element("2", new Date()));
552         cache1.put(new Element("3", new Date()));
553 
554         //Nonserializable and non deliverable put
555         cache1.put(new Element(nonSerializableObject, new Object()));
556 
557         waitForPropagate();
558 
559         //local initiating cache's counting listener should have been notified
560         assertEquals(3, CountingCacheEventListener.getCacheElementsUpdated(cache1).size());
561         //remote receiving caches' counting listener should have been notified
562         assertEquals(3, CountingCacheEventListener.getCacheElementsUpdated(cache2).size());
563 
564         //Remove
565         cache1.remove("1");
566         cache1.remove("2");
567         cache1.remove("3");
568         cache1.remove(nonSerializableObject);
569 
570         waitForPropagate();
571 
572         //local initiating cache's counting listener should have been notified
573         assertEquals(4, CountingCacheEventListener.getCacheElementsRemoved(cache1).size());
574         //remote receiving caches' counting listener should have been notified
575         assertEquals(3, CountingCacheEventListener.getCacheElementsRemoved(cache2).size());
576 
577     }
578 
579 
580     /***
581      * Test various cache configurations for cache1 - explicit setting of:
582      * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
583      */
584     @Test
585     public void testRemoveWithExplicitReplicationConfig() throws InterruptedException {
586         removeTest(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
587     }
588 
589     /***
590      * Test various cache configurations for cache1 - explicit setting of:
591      * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
592      */
593     @Test
594     public void testRemoveWithExplicitReplicationSynchronousConfig() throws InterruptedException {
595         removeTest(manager1.getCache("sampleCache3"), manager2.getCache("sampleCache3"), SYNCHRONOUS);
596     }
597 
598 
599     /***
600      * Test put replicated for cache4 - no properties.
601      * Defaults should be replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
602      */
603     @Test
604     public void testRemoveWithEmptyReplicationPropertiesConfig() throws InterruptedException {
605         removeTest(manager1.getCache("sampleCache4"), manager2.getCache("sampleCache4"), ASYNCHRONOUS);
606     }
607 
608     /***
609      * Tests put and remove initiated from a cache to another cache in a cluster
610      * <p/>
611      * This test goes into an infinite loop if the chain of notifications is not somehow broken.
612      */
613     public void removeTest(Ehcache fromCache, Ehcache toCache, boolean asynchronous) throws CacheException, InterruptedException {
614 
615         Serializable key = new Date();
616         Serializable value = new Date();
617         Element element1 = new Element(key, value);
618 
619         //Put
620         fromCache.put(element1);
621 
622         if (asynchronous) {
623             waitForPropagate();
624         }
625 
626         //Should have been replicated to cache2.
627         Element element2 = toCache.get(key);
628         assertEquals(element1, element2);
629 
630         //Remove
631         fromCache.remove(key);
632         if (asynchronous) {
633             waitForPropagate();
634         }
635 
636         //Should have been replicated to cache2.
637         element2 = toCache.get(key);
638         assertNull(element2);
639 
640     }
641 
642 
643     /***
644      * test removeAll sync
645      */
646     @Test
647     public void testRemoveAllAsynchronous() throws Exception {
648         removeAllTest(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
649     }
650 
651     /***
652      * test removeAll async
653      */
654     @Test
655     public void testRemoveAllSynchronous() throws Exception {
656         removeAllTest(manager1.getCache("sampleCache3"), manager2.getCache("sampleCache3"), SYNCHRONOUS);
657     }
658 
659     /***
660      * Tests removeAll initiated from a cache to another cache in a cluster
661      * <p/>
662      * This test goes into an infinite loop if the chain of notifications is not somehow broken.
663      */
664     public void removeAllTest(Ehcache fromCache, Ehcache toCache, boolean asynchronous) throws Exception {
665 
666         //removeAll is distributed. Stop it colliding with the rest of the test
667         waitForPropagate();
668 
669 
670         Serializable key = new Date();
671         Serializable value = new Date();
672         Element element1 = new Element(key, value);
673 
674         //Put
675         fromCache.put(element1);
676 
677 
678         if (asynchronous) {
679             waitForPropagate();
680         }
681 
682         //Should have been replicated to cache2.
683         Element element2 = toCache.get(key);
684         assertEquals(element1, element2);
685 
686         //Remove
687         fromCache.removeAll();
688         if (asynchronous) {
689             waitForPropagate();
690         }
691 
692         //Should have been replicated to cache2.
693         element2 = toCache.get(key);
694         assertNull(element2);
695         assertEquals(0, toCache.getSize());
696 
697     }
698 
699 
700     /***
701      * Test various cache configurations for cache1 - explicit setting of:
702      * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
703      */
704     @Test
705     public void testUpdateWithExplicitReplicationConfig() throws Exception {
706         updateViaCopyTest(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
707     }
708 
709     /***
710      * Test various cache configurations for cache1 - explicit setting of:
711      * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
712      */
713     @Test
714     public void testUpdateWithExplicitReplicationSynchronousConfig() throws Exception {
715         updateViaCopyTest(manager1.getCache("sampleCache3"), manager2.getCache("sampleCache3"), SYNCHRONOUS);
716     }
717 
718 
719     /***
720      * Test put replicated for cache4 - no properties.
721      * Defaults should be replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
722      */
723     @Test
724     public void testUpdateWithEmptyReplicationPropertiesConfig() throws Exception {
725         updateViaCopyTest(manager1.getCache("sampleCache4"), manager2.getCache("sampleCache4"), ASYNCHRONOUS);
726     }
727 
728     /***
729      * Tests put and update through copy initiated from cache1 in a cluster
730      * <p/>
731      * This test goes into an infinite loop if the chain of notifications is not somehow broken.
732      */
733     public void updateViaCopyTest(Ehcache fromCache, Ehcache toCache, boolean asynchronous) throws Exception {
734 
735         fromCache.removeAll();
736         toCache.removeAll();
737 
738         //removeAll is distributed. Stop it colliding with the rest of the test
739         waitForPropagate();
740 
741         Serializable key = new Date();
742         Serializable value = new Date();
743         Element element1 = new Element(key, value);
744 
745         //Put
746         fromCache.put(element1);
747         if (asynchronous) {
748             waitForPropagate();
749         }
750 
751         //Should have been replicated to cache2.
752         Element element2 = toCache.get(key);
753         assertEquals(element1, element2);
754 
755         //Update
756         Element updatedElement1 = new Element(key, new Date());
757 
758         fromCache.put(updatedElement1);
759         if (asynchronous) {
760             waitForPropagate();
761         }
762 
763         //Should have been replicated to cache2.
764         Element receivedUpdatedElement2 = toCache.get(key);
765         assertEquals(updatedElement1, receivedUpdatedElement2);
766 
767     }
768 
769 
770     /***
771      * Tests put through invalidation initiated from cache1 in a cluster
772      * <p/>
773      * This test goes into an infinite loop if the chain of notifications is not somehow broken.
774      */
775     @Test
776     public void testPutViaInvalidate() throws CacheException, InterruptedException, IOException {
777 
778         cache1 = manager1.getCache("sampleCache2");
779         cache1.removeAll();
780 
781         cache2 = manager2.getCache("sampleCache2");
782         cache2.removeAll();
783 
784         //removeAll is distributed. Stop it colliding with the rest of the test
785         waitForPropagate();
786 
787         String key = "1";
788         Serializable value = new Date();
789         Element element1 = new Element(key, value);
790 
791         Element element3 = new Element("key2", "two");
792 
793         //Put into 2. 2 is configured to replicate puts via copy
794         cache2.put(element1);
795         assertNotNull(cache2.get(key));
796         waitForPropagate();
797 
798         //Should have been replicated to cache1.
799         Element element2 = cache1.get(key);
800         assertEquals(element1, element2);
801 
802         //Put
803         cache1.put(element3);
804         waitForPropagate();
805 
806         //Invalidate should have been replicated to cache2.
807         assertNull(cache2.get("key2"));
808 
809         //Update
810         cache1.put(element3);
811         waitForPropagate();
812 
813         //Should have been removed in cache2.
814         element2 = cache2.get("key2");
815         assertNull(element2);
816 
817     }
818 
819 
820     /***
821      * Tests put and update through invalidation initiated from cache1 in a cluster
822      * <p/>
823      * This test goes into an infinite loop if the chain of notifications is not somehow broken.
824      */
825     @Test
826     public void testUpdateViaInvalidate() throws CacheException, InterruptedException, IOException {
827 
828         cache1 = manager1.getCache("sampleCache2");
829         cache1.removeAll();
830 
831         cache2 = manager2.getCache("sampleCache2");
832         cache2.removeAll();
833 
834         //removeAll is distributed. Stop it colliding with the rest of the test
835         waitForPropagate();
836 
837         String key = "1";
838         Serializable value = new Date();
839         Element element1 = new Element(key, value);
840 
841         //Put
842         cache2.put(element1);
843         Element element2 = cache2.get(key);
844         assertEquals(element1, element2);
845 
846         //Update
847         cache1.put(element1);
848         waitForPropagate();
849 
850         //Should have been removed in cache2.
851         element2 = cache2.get(key);
852         assertNull(element2);
853 
854     }
855 
856 
857     /***
858      * Tests put and update through invalidation initiated from cache1 in a cluster
859      * <p/>
860      * This test goes into an infinite loop if the chain of notifications is not somehow broken.
861      */
862     @Test
863     public void testUpdateViaInvalidateNonSerializableValue() throws CacheException, InterruptedException, IOException {
864 
865         cache1 = manager1.getCache("sampleCache2");
866         cache1.removeAll();
867 
868         cache2 = manager2.getCache("sampleCache2");
869         cache2.removeAll();
870 
871         //removeAll is distributed. Stop it colliding with the rest of the test
872         waitForPropagate();
873 
874         String key = "1";
875         Serializable value = new Date();
876 
877         /***
878          * Non-serializable test class
879          */
880         class NonSerializable {
881             //
882         }
883 
884         NonSerializable value1 = new NonSerializable();
885         Element element1 = new Element(key, value1);
886 
887         //Put
888         cache2.put(element1);
889         Element element2 = cache2.get(key);
890         assertEquals(element1, element2);
891 
892         //Update
893         cache1.put(element1);
894         waitForPropagate();
895 
896         //Should have been removed in cache2.
897         element2 = cache2.get(key);
898         assertNull(element2);
899 
900     }
901 
902 
903     /***
904      * What happens when two cache instances replicate to each other and a change is initiated
905      */
906     @Test
907     public void testInfiniteNotificationsLoop() throws InterruptedException {
908 
909         Serializable key = "1";
910         Serializable value = new Date();
911         Element element = new Element(key, value);
912 
913         //Put
914         cache1.put(element);
915         waitForPropagate();
916 
917         //Should have been replicated to cache2.
918         Element element2 = cache2.get(key);
919         assertEquals(element, element2);
920 
921         //Remove
922         cache1.remove(key);
923         assertNull(cache1.get(key));
924 
925         //Should have been replicated to cache2.
926         waitForPropagate();
927         element2 = cache2.get(key);
928         assertNull(element2);
929 
930         //Put into 2
931         Element element3 = new Element("3", "ddsfds");
932         cache2.put(element3);
933         waitForPropagate();
934         Element element4 = cache2.get("3");
935         assertEquals(element3, element4);
936 
937     }
938 
939 
940     /***
941      * Shows result of perf problem and fix in flushReplicationQueue
942      * <p/>
943      * Behaviour before change:
944      * <p/>
945      * INFO: Items written: 10381
946      * Oct 29, 2007 11:40:04 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
947      * INFO: Items written: 29712
948      * Oct 29, 2007 11:40:57 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
949      * INFO: Items written: 1
950      * Oct 29, 2007 11:40:58 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
951      * INFO: Items written: 32354
952      * Oct 29, 2007 11:42:34 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
953      * INFO: Items written: 322
954      * Oct 29, 2007 11:42:35 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
955      * INFO: Items written: 41909
956      * <p/>
957      * Behaviour after change:
958      * INFO: Items written: 26356
959      * Oct 29, 2007 11:44:39 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
960      * INFO: Items written: 33656
961      * Oct 29, 2007 11:44:40 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
962      * INFO: Items written: 32234
963      * Oct 29, 2007 11:44:42 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
964      * INFO: Items written: 38677
965      * Oct 29, 2007 11:44:43 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
966      * INFO: Items written: 43418
967      * Oct 29, 2007 11:44:44 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
968      * INFO: Items written: 31277
969      * Oct 29, 2007 11:44:45 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
970      * INFO: Items written: 27769
971      * Oct 29, 2007 11:44:46 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
972      * INFO: Items written: 29596
973      * Oct 29, 2007 11:44:47 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
974      * INFO: Items written: 17142
975      * Oct 29, 2007 11:44:48 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
976      * INFO: Items written: 14775
977      * Oct 29, 2007 11:44:49 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
978      * INFO: Items written: 4088
979      * Oct 29, 2007 11:44:51 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
980      * INFO: Items written: 5492
981      * Oct 29, 2007 11:44:52 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
982      * INFO: Items written: 10188
983      * <p/>
984      * Also no pauses noted.
985      */
986     @Test
987     public void testReplicatePerf() throws InterruptedException {
988 
989         if (manager2 != null) {
990             manager2.shutdown();
991         }
992         if (manager3 != null) {
993             manager3.shutdown();
994         }
995         if (manager4 != null) {
996             manager4.shutdown();
997         }
998         if (manager5 != null) {
999             manager5.shutdown();
1000         }
1001         if (manager6 != null) {
1002             manager6.shutdown();
1003         }
1004 
1005         //wait for cluster to drop back to just one: manager1
1006         waitForPropagate();
1007 
1008 
1009         long start = System.currentTimeMillis();
1010         final String keyBase = Long.toString(start);
1011         int count = 0;
1012 
1013         for (int i = 0; i < 100000; i++) {
1014             final String key = keyBase + ':' + Integer.toString((int) (Math.random() * 1000.0));
1015             cache1.put(new Element(key, "My Test"));
1016             cache1.get(key);
1017             cache1.remove(key);
1018             count++;
1019 
1020             final long end = System.currentTimeMillis();
1021             if (end - start >= 1000) {
1022                 start = end;
1023                 LOG.info("Items written: " + count);
1024                 //make sure it does not choke
1025                 assertTrue(count > 1000);
1026                 count = 0;
1027             }
1028         }
1029     }
1030 
1031 
1032     /***
1033      * Need to wait for async
1034      *
1035      * @throws InterruptedException
1036      */
1037     protected void waitForPropagate() throws InterruptedException {
1038         Thread.sleep(1500);
1039     }
1040 
1041     /***
1042      * Need to wait for async
1043      *
1044      * @throws InterruptedException
1045      */
1046     protected void waitForSlowPropagate() throws InterruptedException {
1047         Thread.sleep(6000);
1048     }
1049 
1050 
1051     /***
1052      * Distributed operations create extra scope for deadlock.
1053      * This test checks whether a distributed deadlock scenario exists for synchronous replication
1054      * of each distributed operation all at once.
1055      * It shows that no distributed deadlock exists for asynchronous replication. It is multi thread
1056      * and multi process safe.
1057      * <p/>
1058      * Carefully tailored to exercise:
1059      * <ol>
1060      * <li>overflow to disk. We put in 20 things and the memory size is 10
1061      * <li>each peer is working on the same set of keys thus maximising contention
1062      * <li>we do puts, gets and removes to explore all the execution paths
1063      * </ol>
1064      * If a deadlock occurs, processing will stop until a SocketTimeout exception is thrown and
1065      * the deadlock will be released.
1066      */
1067     @Test
1068     public void testCacheOperationsSynchronousMultiThreaded() throws Exception, InterruptedException {
1069 
1070         // Run a set of threads, that attempt to fetch the elements
1071         final List executables = new ArrayList();
1072 
1073         executables.add(new ClusterExecutable(manager1, "sampleCache3"));
1074         executables.add(new ClusterExecutable(manager2, "sampleCache3"));
1075         executables.add(new ClusterExecutable(manager3, "sampleCache3"));
1076 
1077         assertThat(runTasks(executables), IsEmptyCollection.<Throwable>empty());
1078     }
1079 
1080 
1081     /***
1082      * Distributed operations create extra scope for deadlock.
1083      * This test checks whether a distributed deadlock scenario exists for asynchronous replication
1084      * of each distributed operation all at once.
1085      * It shows that no distributed deadlock exists for asynchronous replication. It is multi thread
1086      * and multi process safe.
1087      * It uses sampleCache2, which is configured to be asynchronous
1088      * <p/>
1089      * Carefully tailored to exercise:
1090      * <ol>
1091      * <li>overflow to disk. We put in 20 things and the memory size is 10
1092      * <li>each peer is working on the same set of keys thus maximising contention
1093      * <li>we do puts, gets and removes to explore all the execution paths
1094      * </ol>
1095      */
1096     @Test
1097     public void testCacheOperationsAynchronousMultiThreaded() throws Exception, InterruptedException {
1098 
1099         // Run a set of threads, that attempt to fetch the elements
1100         final List executables = new ArrayList();
1101 
1102         executables.add(new ClusterExecutable(manager1, "sampleCache2"));
1103         executables.add(new ClusterExecutable(manager2, "sampleCache2"));
1104         executables.add(new ClusterExecutable(manager3, "sampleCache2"));
1105 
1106         assertThat(runTasks(executables), IsEmptyCollection.<Throwable>empty());
1107     }
1108 
1109     /***
1110      * An Exececutable which allows the CacheManager to be set
1111      */
1112     class ClusterExecutable implements Callable<Void> {
1113 
1114         private final CacheManager manager;
1115         private final String cacheName;
1116 
1117         /***
1118          * Construct with CacheManager
1119          *
1120          * @param manager
1121          */
1122         public ClusterExecutable(CacheManager manager, String cacheName) {
1123             this.manager = manager;
1124             this.cacheName = cacheName;
1125         }
1126 
1127         /***
1128          * Execute
1129          *
1130          * @throws Exception
1131          */
1132         public Void call() throws Exception {
1133             Random random = new Random();
1134 
1135             for (int i = 0; i < 20; i++) {
1136                 Integer key = Integer.valueOf((i));
1137                 int operationSelector = random.nextInt(4);
1138                 Cache cache = manager.getCache(cacheName);
1139                 if (operationSelector == 100) {
1140                     cache.get(key);
1141                     if (LOG.isDebugEnabled()) {
1142                         LOG.debug(cache.getGuid() + ": get " + key);
1143                     }
1144                 } else if (operationSelector == 100) {
1145                     cache.remove(key);
1146                     if (LOG.isDebugEnabled()) {
1147                         LOG.debug(cache.getGuid() + ": remove " + key);
1148                     }
1149                 } else if (operationSelector == 2) {
1150                     cache.put(new Element(key,
1151                             "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
1152                                     + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
1153                                     + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
1154                                     + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
1155                                     + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
1156                     if (LOG.isDebugEnabled()) {
1157                         LOG.debug(cache.getGuid() + ": put " + key);
1158                     }
1159                 } else {
1160                     //every twelfth time 1/4 * 1/3 = 1/12
1161                     if (random.nextInt(3) == 1) {
1162                         LOG.debug("cache.removeAll()");
1163                         cache.removeAll();
1164                     }
1165                 }
1166             }
1167             return null;
1168         }
1169     }
1170 }