View Javadoc

1   /***
2    *  Copyright 2003-2009 Terracotta, Inc.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package net.sf.ehcache.distribution;
18  
19  import net.sf.ehcache.AbstractCacheTest;
20  import net.sf.ehcache.Cache;
21  import net.sf.ehcache.CacheException;
22  import net.sf.ehcache.CacheManager;
23  import net.sf.ehcache.Ehcache;
24  import net.sf.ehcache.Element;
25  import net.sf.ehcache.StopWatch;
26  import net.sf.ehcache.ThreadKiller;
27  import net.sf.ehcache.event.CountingCacheEventListener;
28  import net.sf.ehcache.management.ManagementService;
29  import org.junit.After;
30  import static org.junit.Assert.assertEquals;
31  import static org.junit.Assert.assertNotNull;
32  import static org.junit.Assert.assertNull;
33  import static org.junit.Assert.assertTrue;
34  import static org.junit.Assert.fail;
35  import org.junit.Before;
36  import org.junit.Test;
37  import org.junit.Ignore;
38  
39  import java.io.IOException;
40  import java.io.Serializable;
41  import java.rmi.RemoteException;
42  import java.util.ArrayList;
43  import java.util.Arrays;
44  import java.util.Date;
45  import java.util.List;
46  import java.util.Random;
47  
48  import org.slf4j.Logger;
49  import org.slf4j.LoggerFactory;
50  
51  /***
52   * Tests replication of Cache events
53   * <p/>
54   * Note these tests need a live network interface running in multicast mode to work
55   * <p/>
56   * If running involving RMIAsynchronousCacheReplicator individually the test will fail because
57   * the VM will gobble up the SoftReferences rather than allocating more memory. Uncomment the
58   * forceVMGrowth() method usage in setup.
59   *
60   * @author Greg Luck
61   * @version $Id: RMICacheReplicatorTest.java 1619 2010-01-08 15:46:20Z cdennis $
62   */
63  
64  
65  //
66  // Please close jira MNK-1377 after fixing ignored tests below
67  //
68  
69  public class RMICacheReplicatorTest extends AbstractCacheTest {
70  
71  
72      /***
73       * A value to represent replicate asynchronously
74       */
75      protected static final boolean ASYNCHRONOUS = true;
76  
77      /***
78       * A value to represent replicate synchronously
79       */
80      protected static final boolean SYNCHRONOUS = false;
81  
82      private static final Logger LOG = LoggerFactory.getLogger(RMICacheReplicatorTest.class.getName());
83  
84  
85      /***
86       * CacheManager 1 in the cluster
87       */
88      protected CacheManager manager1;
89      /***
90       * CacheManager 2 in the cluster
91       */
92      protected CacheManager manager2;
93      /***
94       * CacheManager 3 in the cluster
95       */
96      protected CacheManager manager3;
97      /***
98       * CacheManager 4 in the cluster
99       */
100     protected CacheManager manager4;
101     /***
102      * CacheManager 5 in the cluster
103      */
104     protected CacheManager manager5;
105     /***
106      * CacheManager 6 in the cluster
107      */
108     protected CacheManager manager6;
109 
110     /***
111      * The name of the cache under test
112      */
113     protected String cacheName = "sampleCache1";
114     /***
115      * CacheManager 1 of 2s cache being replicated
116      */
117     protected Ehcache cache1;
118 
119     /***
120      * CacheManager 2 of 2s cache being replicated
121      */
122     protected Ehcache cache2;
123 
124     /***
125      * Allows setup to be the same
126      */
127     protected String cacheNameBase = "ehcache-distributed";
128 
129     /***
130      * {@inheritDoc}
131      * Sets up two caches: cache1 is local. cache2 is to be receive updates
132      *
133      * @throws Exception
134      */
135     @Override
136     @Before
137     public void setUp() throws Exception {
138 
139         //Required to get SoftReference tests to pass. The VM clean up SoftReferences rather than allocating
140         // memory to -Xmx!
141 //        forceVMGrowth();
142 //        System.gc();
143         MulticastKeepaliveHeartbeatSender.setHeartBeatInterval(1000);
144 
145         CountingCacheEventListener.resetCounters();
146         manager1 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed1.xml");
147         manager2 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed2.xml");
148         manager3 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed3.xml");
149         manager4 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed4.xml");
150         manager5 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed5.xml");
151 
152         //manager6 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed-jndi6.xml");
153 
154         //allow cluster to be established
155         Thread.sleep(1020);
156 
157         cache1 = manager1.getCache(cacheName);
158         cache1.removeAll();
159 
160         cache2 = manager2.getCache(cacheName);
161         cache2.removeAll();
162 
163         //enable distributed removeAlls to finish
164         waitForPropagate();
165 
166 
167     }
168 
169     /***
170      * {@inheritDoc}
171      *
172      * @throws Exception
173      */
174     @Override
175     @After
176     public void tearDown() throws Exception {
177 
178         if (manager1 != null) {
179             manager1.shutdown();
180         }
181         if (manager2 != null) {
182             manager2.shutdown();
183         }
184         if (manager3 != null) {
185             manager3.shutdown();
186         }
187         if (manager4 != null) {
188             manager4.shutdown();
189         }
190         if (manager5 != null) {
191             manager5.shutdown();
192         }
193         if (manager6 != null) {
194             manager6.shutdown();
195         }
196         Thread.sleep(2000);
197 
198         List threads = JVMUtil.enumerateThreads();
199         for (int i = 0; i < threads.size(); i++) {
200             Thread thread = (Thread) threads.get(i);
201             if (thread.getName().equals("Replication Thread")) {
202                 fail("There should not be any replication threads running after shutdown");
203             }
204         }
205 
206     }
207 
208     /***
209      * 5 cache managers should means that each cache has four remote peers
210      */
211     
212     @Test
213     public void testRemoteCachePeersEqualsNumberOfCacheManagersInCluster() {
214 
215         CacheManagerPeerProvider provider = manager1.getCacheManagerPeerProvider("RMI");
216         List remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
217         assertEquals(4, remotePeersOfCache1.size());
218     }
219 
220     /***
221      * Does a new cache manager in the cluster get detected?
222      */
223     
224     @Test
225     public void testRemoteCachePeersDetectsNewCacheManager() throws InterruptedException {
226 
227         CacheManagerPeerProvider provider = manager1.getCacheManagerPeerProvider("RMI");
228         List remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
229         assertEquals(4, remotePeersOfCache1.size());
230 
231         //Add new CacheManager to cluster
232         manager6 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed6.xml");
233 
234         //Allow detection to occur
235         Thread.sleep(10020);
236 
237         remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
238         assertEquals(5, remotePeersOfCache1.size());
239     }
240 
241     /***
242      * Does a down cache manager in the cluster get removed?
243      */
244     
245     @Test
246     public void testRemoteCachePeersDetectsDownCacheManager() throws InterruptedException {
247 
248         CacheManagerPeerProvider provider = manager1.getCacheManagerPeerProvider("RMI");
249         List remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
250         assertEquals(4, remotePeersOfCache1.size());
251 
252         //Drop a CacheManager from the cluster
253         manager5.shutdown();
254 
255         //Allow change detection to occur. Heartbeat 1 second and is not stale until 5000
256         Thread.sleep(11020);
257         remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
258 
259 
260         assertEquals(3, remotePeersOfCache1.size());
261     }
262 
263     /***
264      * Does a down cache manager in the cluster get removed?
265      */
266 
267     @Test
268     public void testRemoteCachePeersDetectsDownCacheManagerSlow() throws InterruptedException {
269 
270         try {
271             CacheManagerPeerProvider provider = manager1.getCacheManagerPeerProvider("RMI");
272             List remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
273             assertEquals(4, remotePeersOfCache1.size());
274 
275             MulticastKeepaliveHeartbeatSender.setHeartBeatInterval(2000);
276             Thread.sleep(2000);
277 
278             //Drop a CacheManager from the cluster
279             manager5.shutdown();
280 
281             //Insufficient time for it to timeout
282             remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
283             assertEquals(4, remotePeersOfCache1.size());
284         } finally {
285             MulticastKeepaliveHeartbeatSender.setHeartBeatInterval(1000);
286             Thread.sleep(2000);
287         }
288 
289 
290     }
291 
292     /***
293      * Tests put and remove initiated from cache1 in a cluster
294      * <p/>
295      * This test goes into an infinite loop if the chain of notifications is not somehow broken.
296      */
297     @Test
298     public void testPutProgagatesFromAndToEveryCacheManagerAndCache() throws CacheException, InterruptedException {
299 
300         //Put
301         String[] cacheNames = manager1.getCacheNames();
302         int numberOfCaches = getNumberOfReplicatingCachesInCacheManager();
303         Arrays.sort(cacheNames);
304         for (int i = 0; i < cacheNames.length; i++) {
305             String name = cacheNames[i];
306             manager1.getCache(name).put(new Element("" + i, Integer.valueOf(i)));
307             //Add some non serializable elements that should not get propagated
308             manager1.getCache(name).put(new Element("nonSerializable" + i, new Object()));
309         }
310 
311         waitForPropagate();
312 
313         int count2 = 0;
314         int count3 = 0;
315         int count4 = 0;
316         int count5 = 0;
317         for (int i = 0; i < cacheNames.length; i++) {
318             String name = cacheNames[i];
319             Element element2 = manager2.getCache(name).get("" + i);
320             if (element2 != null) {
321                 count2++;
322             }
323             Element nonSerializableElement2 = manager2.getCache(name).get("nonSerializable" + i);
324             if (nonSerializableElement2 != null) {
325                 count2++;
326             }
327             Element element3 = manager3.getCache(name).get("" + i);
328             if (element3 != null) {
329                 count3++;
330             }
331             Element element4 = manager4.getCache(name).get("" + i);
332             if (element4 != null) {
333                 count4++;
334             }
335             Element element5 = manager5.getCache(name).get("" + i);
336             if (element5 != null) {
337                 count5++;
338             }
339         }
340         //sampleCache2 in manager1 replicates puts via invalidate, so the count will be 1 less
341         assertEquals(numberOfCaches - 1, count2);
342         assertEquals(numberOfCaches - 1, count3);
343         assertEquals(numberOfCaches - 1, count4);
344         assertEquals(numberOfCaches - 1, count5);
345 
346 
347     }
348 
349     /***
350      * Tests what happens when a CacheManager in the cluster comes and goes. In ehcache-1.2.4 this would cause the new RMI CachePeers in the CacheManager to
351      * be permanently corrupt.
352      */
353     
354     @Test
355     public void testPutProgagatesFromAndToEveryCacheManagerAndCacheDirty() throws CacheException, InterruptedException {
356 
357         manager3.shutdown();
358 
359         Thread.sleep(11020);
360 
361         manager3 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed3.xml");
362         Thread.sleep(11020);
363 
364         //Put
365         String[] cacheNames = manager1.getCacheNames();
366         int numberOfCaches = getNumberOfReplicatingCachesInCacheManager();
367         Arrays.sort(cacheNames);
368         for (int i = 0; i < cacheNames.length; i++) {
369             String name = cacheNames[i];
370             manager1.getCache(name).put(new Element("" + i, Integer.valueOf(i)));
371             //Add some non serializable elements that should not get propagated
372             manager1.getCache(name).put(new Element("nonSerializable" + i, new Object()));
373         }
374 
375         waitForPropagate();
376 
377         int count2 = 0;
378         int count3 = 0;
379         int count4 = 0;
380         int count5 = 0;
381         for (int i = 0; i < cacheNames.length; i++) {
382             String name = cacheNames[i];
383             Element element2 = manager2.getCache(name).get("" + i);
384             if (element2 != null) {
385                 count2++;
386             }
387             Element nonSerializableElement2 = manager2.getCache(name).get("nonSerializable" + i);
388             if (nonSerializableElement2 != null) {
389                 count2++;
390             }
391             Element element3 = manager3.getCache(name).get("" + i);
392             if (element3 != null) {
393                 count3++;
394             }
395             Element element4 = manager4.getCache(name).get("" + i);
396             if (element4 != null) {
397                 count4++;
398             }
399             Element element5 = manager5.getCache(name).get("" + i);
400             if (element5 != null) {
401                 count5++;
402             }
403         }
404         //sampleCache2 in manager1 replicates puts via invalidate, so the count will be 1 less
405         assertEquals(numberOfCaches - 1, count2);
406         assertEquals(numberOfCaches - 1, count3);
407         assertEquals(numberOfCaches - 1, count4);
408         assertEquals(numberOfCaches - 1, count5);
409 
410 
411     }
412 
413     /***
414      * Enables long stabilty runs using replication to be done.
415      * <p/>
416      * This test has been run in a profile for 15 hours without any observed issues.
417      *
418      * @throws InterruptedException
419      */
420     public void manualStabilityTest() throws InterruptedException {
421         forceVMGrowth();
422 
423         ManagementService.registerMBeans(manager3, createMBeanServer(), true, true, true, true);
424         while (true) {
425             testBigPutsProgagatesAsynchronous();
426         }
427     }
428 
429     /***
430      * Non JUnit invocation of stability test to get cleaner run
431      *
432      * @param args
433      * @throws InterruptedException
434      */
435     public static void main(String[] args) throws Exception {
436         RMICacheReplicatorTest replicatorTest = new RMICacheReplicatorTest();
437         replicatorTest.setUp();
438         replicatorTest.manualStabilityTest();
439     }
440 
441     /***
442      * The number of caches there should be.
443      */
444     protected int getNumberOfReplicatingCachesInCacheManager() {
445         return 55;
446     }
447 
448 
449     /***
450      * Performance and capacity tests.
451      * <p/>
452      * The numbers given are for the remote peer tester (java -jar ehcache-1.x-remote-debugger.jar ehcache-distributed1.xml)
453      * running on a 10Mbit ethernet network and are measured from the time the peer starts receiving to when
454      * it has fully received.
455      * <p/>
456      * r37 and earlier - initial implementation
457      * 38 seconds to get all notifications with 6 peers, 2000 Elements and 400 byte payload
458      * 18 seconds to get all notifications with 2 peers, 2000 Elements and 400 byte payload
459      * 40 seconds to get all notifications with 2 peers, 2000 Elements and 10k payload
460      * 22 seconds to get all notifications with 2 peers, 2000 Elements and 1k payload
461      * 26 seconds to get all notifications with 2 peers, 200 Elements and 100k payload
462      * <p/>
463      * r38 - RMI stub lookup on registration rather than at each lookup. Saves quite a few lookups. Also change to 5 second heartbeat
464      * 38 seconds to get 2000 notifications with 6 peers, Elements with 400 byte payload (1 second heartbeat)
465      * 16 seconds to get 2000 notifications with 6 peers, Elements with 400 byte payload (5 second heartbeat)
466      * 13 seconds to get 2000 notifications with 2 peers, Elements with 400 byte payload
467      * <p/>
468      * r39 - Batching asyn replicator. Send all queued messages in one RMI call once per second.
469      * 2 seconds to get 2000 notifications with 6 peers, Elements with 400 byte payload (5 second heartbeat)
470      */
471     
472     @Test
473     public void testBigPutsProgagatesAsynchronous() throws CacheException, InterruptedException {
474 
475         //Give everything a chance to startup
476         //Thread.sleep(10000);
477         StopWatch stopWatch = new StopWatch();
478         Integer index = null;
479         for (int i = 0; i < 2; i++) {
480             for (int j = 0; j < 1000; j++) {
481                 index = Integer.valueOf(((1000 * i) + j));
482                 cache1.put(new Element(index,
483                         "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
484                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
485                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
486                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
487                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
488             }
489 
490         }
491         long elapsed = stopWatch.getElapsedTime();
492         long putTime = ((elapsed / 1000));
493         LOG.info("Put Elapsed time: " + putTime);
494         //assertTrue(putTime < 8);
495 
496         assertEquals(2000, cache1.getSize());
497 
498         Thread.sleep(2000);
499         assertEquals(2000, manager2.getCache("sampleCache1").getSize());
500         assertEquals(2000, manager3.getCache("sampleCache1").getSize());
501         assertEquals(2000, manager4.getCache("sampleCache1").getSize());
502         assertEquals(2000, manager5.getCache("sampleCache1").getSize());
503 
504         CountingCacheEventListener.resetCounters();
505 
506     }
507 
508 
509     /***
510      * Performance and capacity tests.
511      * <p/>
512      */
513     
514     @Test
515     public void testBootstrap() throws CacheException, InterruptedException, RemoteException {
516 
517         //load up some data
518         StopWatch stopWatch = new StopWatch();
519         Integer index = null;
520         for (int i = 0; i < 2; i++) {
521             for (int j = 0; j < 1000; j++) {
522                 index = Integer.valueOf(((1000 * i) + j));
523                 cache1.put(new Element(index,
524                         "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
525                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
526                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
527                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
528                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
529             }
530 
531         }
532         long elapsed = stopWatch.getElapsedTime();
533         long putTime = ((elapsed / 1000));
534         LOG.info("Put Elapsed time: " + putTime);
535 
536         assertEquals(2000, cache1.getSize());
537 
538         Thread.sleep(7000);
539         assertEquals(2000, manager2.getCache("sampleCache1").getSize());
540         assertEquals(2000, manager3.getCache("sampleCache1").getSize());
541         assertEquals(2000, manager4.getCache("sampleCache1").getSize());
542         assertEquals(2000, manager5.getCache("sampleCache1").getSize());
543 
544         //now test bootstrap
545         manager1.addCache("bootStrapResults");
546         Cache cache = manager1.getCache("bootStrapResults");
547         List cachePeers = manager1.getCacheManagerPeerProvider("RMI").listRemoteCachePeers(cache1);
548         CachePeer cachePeer = (CachePeer) cachePeers.get(0);
549 
550         List keys = cachePeer.getKeys();
551         assertEquals(2000, keys.size());
552 
553         Element firstElement = cachePeer.getQuiet((Serializable) keys.get(0));
554         long size = firstElement.getSerializedSize();
555         assertEquals(504, size);
556 
557         int chunkSize = (int) (5000000 / size);
558 
559         List requestChunk = new ArrayList();
560         for (int i = 0; i < keys.size(); i++) {
561             Serializable serializable = (Serializable) keys.get(i);
562             requestChunk.add(serializable);
563             if (requestChunk.size() == chunkSize) {
564                 fetchAndPutElements(cache, requestChunk, cachePeer);
565                 requestChunk.clear();
566             }
567         }
568         //get leftovers
569         fetchAndPutElements(cache, requestChunk, cachePeer);
570 
571         assertEquals(keys.size(), cache.getSize());
572 
573     }
574 
575     private void fetchAndPutElements(Ehcache cache, List requestChunk, CachePeer cachePeer) throws RemoteException {
576         List receivedChunk = cachePeer.getElements(requestChunk);
577         for (int i = 0; i < receivedChunk.size(); i++) {
578             Element element = (Element) receivedChunk.get(i);
579             assertNotNull(element);
580             cache.put(element, true);
581         }
582 
583     }
584 
585 
586     /***
587      * Drive everything to point of breakage within a 64MB VM.
588      */
589     public void xTestHugePutsBreaksAsynchronous() throws CacheException, InterruptedException {
590 
591         //Give everything a chance to startup
592         StopWatch stopWatch = new StopWatch();
593         Integer index = null;
594         for (int i = 0; i < 500; i++) {
595             for (int j = 0; j < 1000; j++) {
596                 index = Integer.valueOf(((1000 * i) + j));
597                 cache1.put(new Element(index,
598                         "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
599                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
600                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
601                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
602                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
603             }
604 
605         }
606         long elapsed = stopWatch.getElapsedTime();
607         long putTime = ((elapsed / 1000));
608         LOG.info("Put Elapsed time: " + putTime);
609         //assertTrue(putTime < 8);
610 
611         assertEquals(100000, cache1.getSize());
612 
613         Thread.sleep(100000);
614         assertEquals(20000, manager2.getCache("sampleCache1").getSize());
615         assertEquals(20000, manager3.getCache("sampleCache1").getSize());
616         assertEquals(20000, manager4.getCache("sampleCache1").getSize());
617         assertEquals(20000, manager5.getCache("sampleCache1").getSize());
618 
619     }
620 
621 
622     /***
623      * Performance and capacity tests.
624      * <p/>
625      * The numbers given are for the remote peer tester (java -jar ehcache-1.x-remote-debugger.jar ehcache-distributed1.xml)
626      * running on a 10Mbit ethernet network and are measured from the time the peer starts receiving to when
627      * it has fully received.
628      * <p/>
629      * 4 seconds to get all remove notifications with 6 peers, 5000 Elements and 400 byte payload
630      */
631     @Test
632     public void testBigRemovesProgagatesAsynchronous() throws CacheException, InterruptedException {
633 
634         //Give everything a chance to startup
635         Integer index = null;
636         for (int i = 0; i < 5; i++) {
637             for (int j = 0; j < 1000; j++) {
638                 index = Integer.valueOf(((1000 * i) + j));
639                 cache1.put(new Element(index,
640                         "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
641                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
642                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
643                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
644                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
645             }
646 
647         }
648 
649 
650         Ehcache[] caches = {
651             cache1,
652             manager2.getCache("sampleCache1"),
653             manager3.getCache("sampleCache1"),
654             manager4.getCache("sampleCache1"),
655             manager5.getCache("sampleCache1") };
656 
657         waitForCacheSize(5000, 25, caches);
658         //Let the disk stores catch up before the next stage of the test
659         Thread.sleep(2000);
660 
661         for (int i = 0; i < 5; i++) {
662             for (int j = 0; j < 1000; j++) {
663                 cache1.remove(Integer.valueOf(((1000 * i) + j)));
664             }
665         }
666 
667         long timeForPropagate = waitForCacheSize(0, 25, caches);
668         LOG.info("Remove Elapsed time: " + timeForPropagate);
669 
670     }
671 
672     public long waitForCacheSize(long size, int maxSeconds, Ehcache... caches) throws InterruptedException {
673 
674         StopWatch stopWatch = new StopWatch();
675         while(checkForCacheSize(size, caches)) {
676             Thread.sleep(500);
677             if(stopWatch.getElapsedTime() > maxSeconds * 1000) {
678                 fail("Caches still haven't reached the expected size after " + maxSeconds + " seconds");
679             }
680         }
681 
682         return stopWatch.getElapsedTime();
683     }
684 
685     private boolean checkForCacheSize(long size, Ehcache... caches) {
686         boolean sizeReached = true;
687         for (Ehcache cache : caches) {
688             if(cache.getSize() != size) {
689                 sizeReached = false;
690                 break;
691             }
692         }
693         return sizeReached;
694     }
695 
696 
697     /***
698      * Performance and capacity tests.
699      * <p/>
700      * 5 seconds to send all notifications synchronously with 5 peers, 2000 Elements and 400 byte payload
701      * The numbers given below are for the remote peer tester (java -jar ehcache-1.x-remote-debugger.jar ehcache-distributed1.xml)
702      * running on a 10Mbit ethernet network and are measured from the time the peer starts receiving to when
703      * it has fully received.
704      */
705     @Test
706     public void testBigPutsProgagatesSynchronous() throws CacheException, InterruptedException {
707 
708         //Give everything a chance to startup
709         StopWatch stopWatch = new StopWatch();
710         Integer index;
711         for (int i = 0; i < 2; i++) {
712             for (int j = 0; j < 1000; j++) {
713                 index = Integer.valueOf(((1000 * i) + j));
714                 manager1.getCache("sampleCache3").put(new Element(index,
715                         "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
716                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
717                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
718                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
719                                 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
720             }
721 
722         }
723         long elapsed = stopWatch.getElapsedTime();
724         long putTime = ((elapsed / 1000));
725         LOG.info("Put and Propagate Synchronously Elapsed time: " + putTime + " seconds");
726 
727         assertEquals(2000, manager1.getCache("sampleCache3").getSize());
728         assertEquals(2000, manager2.getCache("sampleCache3").getSize());
729         assertEquals(2000, manager3.getCache("sampleCache3").getSize());
730         assertEquals(2000, manager4.getCache("sampleCache3").getSize());
731         assertEquals(2000, manager5.getCache("sampleCache3").getSize());
732 
733     }
734 
735 
736     /***
737      * manager1 adds a replicating cache, then manager2 and so on. Then we remove one. Does everything work as expected?
738      */
739     @Test
740     public void testPutWithNewCacheAddedProgressively() throws InterruptedException {
741 
742         manager1.addCache("progressiveAddCache");
743         manager2.addCache("progressiveAddCache");
744 
745         //The cluster will not have formed yet, so it will fail
746         try {
747             putTest(manager1.getCache("progressiveAddCache"), manager2.getCache("progressiveAddCache"), ASYNCHRONOUS);
748             fail();
749         } catch (AssertionError e) {
750             //expected
751         }
752 
753         //The cluster will now have formed yet, so it will succeed
754         putTest(manager1.getCache("progressiveAddCache"), manager2.getCache("progressiveAddCache"), ASYNCHRONOUS);
755 
756         Cache secondCache = manager2.getCache("progressiveAddCache");
757 
758         //The second peer disappears. The test will fail.
759         manager2.removeCache("progressiveAddCache");
760         try {
761             putTest(manager1.getCache("progressiveAddCache"), secondCache, ASYNCHRONOUS);
762             fail();
763         } catch (IllegalStateException e) {
764             //The second cache will not alive. Expected. But no other exception is caught and this will otherwise fail.
765 
766         }
767 
768 
769     }
770 
771 
772     /***
773      * Test various cache configurations for cache1 - explicit setting of:
774      * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
775      */
776     @Test
777     public void testPutWithExplicitReplicationConfig() throws InterruptedException {
778 
779         putTest(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
780     }
781 
782 
783     
784     /***
785      * Test various cache configurations for cache1 - explicit setting of:
786      * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
787      */
788     @Test
789     public void testPutWithThreadKiller() throws InterruptedException {
790 
791         putTestWithThreadKiller(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
792     }
793 
794     /***
795      * CacheEventListeners that are not CacheReplicators should receive cache events originated from receipt
796      * of a remote event by a CachePeer.
797      */
798     
799     @Test
800     public void testRemotelyReceivedPutNotifiesCountingListener() throws InterruptedException {
801 
802         putTest(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
803         assertEquals(1, CountingCacheEventListener.getCacheElementsPut(manager1.getCache("sampleCache1")).size());
804         assertEquals(1, CountingCacheEventListener.getCacheElementsPut(manager2.getCache("sampleCache1")).size());
805 
806     }
807 
808     /***
809      * Test various cache configurations for cache1 - explicit setting of:
810      * properties="replicateAsynchronously=false, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
811      */
812     @Test
813     public void testPutWithExplicitReplicationSynchronousConfig() throws InterruptedException {
814         putTest(manager1.getCache("sampleCache3"), manager2.getCache("sampleCache3"), SYNCHRONOUS);
815     }
816 
817 
818     /***
819      * Test put replicated for cache4 - no properties.
820      * Defaults should be replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
821      */
822     @Test
823     public void testPutWithEmptyReplicationPropertiesConfig() throws InterruptedException {
824         putTest(manager1.getCache("sampleCache4"), manager2.getCache("sampleCache4"), ASYNCHRONOUS);
825     }
826 
827     /***
828      * Test put replicated for cache4 - missing replicatePuts property.
829      * replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
830      * should equal replicateAsynchronously=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
831      */
832     @Test
833     public void testPutWithOneMissingReplicationPropertyConfig() throws InterruptedException {
834         putTest(manager1.getCache("sampleCache5"), manager2.getCache("sampleCache5"), ASYNCHRONOUS);
835     }
836 
837 
838     /***
839      * Tests put and remove initiated from cache1 in a cluster
840      * <p/>
841      * This test goes into an infinite loop if the chain of notifications is not somehow broken.
842      */
843     public void putTest(Ehcache fromCache, Ehcache toCache, boolean asynchronous) throws CacheException, InterruptedException {
844 
845         Serializable key = new Date();
846         Serializable value = new Date();
847         Element sourceElement = new Element(key, value);
848 
849         //Put
850         fromCache.put(sourceElement);
851         int i = 0;
852 
853         if (asynchronous) {
854             waitForPropagate();
855         }
856 
857         //Should have been replicated to toCache.
858         Element deliveredElement = toCache.get(key);
859         assertEquals(sourceElement, deliveredElement);
860 
861     }
862 
863 
864     /***
865      * Tests put and remove initiated from cache1 in a cluster
866      * <p/>
867      * This test goes into an infinite loop if the chain of notifications is not somehow broken.
868      */
869     public void putTestWithThreadKiller(Ehcache fromCache, Ehcache toCache, boolean asynchronous)
870             throws CacheException, InterruptedException {
871 
872         fromCache.put(new Element("thread killer", new ThreadKiller()));
873         if (asynchronous) {
874             waitForPropagate();
875         }
876 
877         Serializable key = new Date();
878         Serializable value = new Date();
879         Element sourceElement = new Element(key, value);
880 
881         //Put
882         fromCache.put(sourceElement);
883 
884         if (asynchronous) {
885             waitForPropagate();
886         }
887 
888         //Should have been replicated to toCache.
889         Element deliveredElement = toCache.get(key);
890         assertEquals(sourceElement, deliveredElement);
891 
892     }
893 
894 
895     /***
896      * Checks that a put received from a remote cache notifies any registered listeners.
897      * <p/>
898      * This test goes into an infinite loop if the chain of notifications is not somehow broken.
899      */
900     @Test
901     public void testRemotePutNotificationGetsToOtherListeners() throws CacheException, InterruptedException {
902 
903         Serializable key = new Date();
904         Serializable value = new Date();
905         Element element1 = new Element(key, value);
906 
907         //Put
908         cache1.put(new Element("1", new Date()));
909         cache1.put(new Element("2", new Date()));
910         cache1.put(new Element("3", new Date()));
911 
912         //Nonserializable and non deliverable put
913         Object nonSerializableObject = new Object();
914         cache1.put(new Element(nonSerializableObject, new Object()));
915 
916 
917         waitForPropagate();
918 
919         //local initiating cache's counting listener should have been notified
920         assertEquals(4, CountingCacheEventListener.getCacheElementsPut(cache1).size());
921         //remote receiving caches' counting listener should have been notified
922         assertEquals(3, CountingCacheEventListener.getCacheElementsPut(cache2).size());
923 
924         //Update
925         cache1.put(new Element("1", new Date()));
926         cache1.put(new Element("2", new Date()));
927         cache1.put(new Element("3", new Date()));
928 
929         //Nonserializable and non deliverable put
930         cache1.put(new Element(nonSerializableObject, new Object()));
931 
932         waitForPropagate();
933 
934         //local initiating cache's counting listener should have been notified
935         assertEquals(4, CountingCacheEventListener.getCacheElementsUpdated(cache1).size());
936         //remote receiving caches' counting listener should have been notified
937         assertEquals(3, CountingCacheEventListener.getCacheElementsUpdated(cache2).size());
938 
939         //Remove
940         cache1.remove("1");
941         cache1.remove("2");
942         cache1.remove("3");
943         cache1.remove(nonSerializableObject);
944 
945         waitForPropagate();
946 
947         //local initiating cache's counting listener should have been notified
948         assertEquals(4, CountingCacheEventListener.getCacheElementsRemoved(cache1).size());
949         //remote receiving caches' counting listener should have been notified
950         assertEquals(3, CountingCacheEventListener.getCacheElementsRemoved(cache2).size());
951 
952     }
953 
954 
955     /***
956      * Test various cache configurations for cache1 - explicit setting of:
957      * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
958      */
959     @Test
960     public void testRemoveWithExplicitReplicationConfig() throws InterruptedException {
961         removeTest(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
962     }
963 
964     /***
965      * Test various cache configurations for cache1 - explicit setting of:
966      * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
967      */
968     @Test
969     public void testRemoveWithExplicitReplicationSynchronousConfig() throws InterruptedException {
970         removeTest(manager1.getCache("sampleCache3"), manager2.getCache("sampleCache3"), SYNCHRONOUS);
971     }
972 
973 
974     /***
975      * Test put replicated for cache4 - no properties.
976      * Defaults should be replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
977      */
978     @Test
979     public void testRemoveWithEmptyReplicationPropertiesConfig() throws InterruptedException {
980         removeTest(manager1.getCache("sampleCache4"), manager2.getCache("sampleCache4"), ASYNCHRONOUS);
981     }
982 
983     /***
984      * Tests put and remove initiated from a cache to another cache in a cluster
985      * <p/>
986      * This test goes into an infinite loop if the chain of notifications is not somehow broken.
987      */
988     public void removeTest(Ehcache fromCache, Ehcache toCache, boolean asynchronous) throws CacheException, InterruptedException {
989 
990         Serializable key = new Date();
991         Serializable value = new Date();
992         Element element1 = new Element(key, value);
993 
994         //Put
995         fromCache.put(element1);
996 
997         if (asynchronous) {
998             waitForPropagate();
999         }
1000 
1001         //Should have been replicated to cache2.
1002         Element element2 = toCache.get(key);
1003         assertEquals(element1, element2);
1004 
1005         //Remove
1006         fromCache.remove(key);
1007         if (asynchronous) {
1008             waitForPropagate();
1009         }
1010 
1011         //Should have been replicated to cache2.
1012         element2 = toCache.get(key);
1013         assertNull(element2);
1014 
1015     }
1016 
1017 
1018     /***
1019      * test removeAll sync
1020      */
1021     @Test
1022     public void testRemoveAllAsynchronous() throws Exception {
1023         removeAllTest(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
1024     }
1025 
1026     /***
1027      * test removeAll async
1028      */
1029     @Test
1030     public void testRemoveAllSynchronous() throws Exception {
1031         removeAllTest(manager1.getCache("sampleCache3"), manager2.getCache("sampleCache3"), SYNCHRONOUS);
1032     }
1033 
1034     /***
1035      * Tests removeAll initiated from a cache to another cache in a cluster
1036      * <p/>
1037      * This test goes into an infinite loop if the chain of notifications is not somehow broken.
1038      */
1039     public void removeAllTest(Ehcache fromCache, Ehcache toCache, boolean asynchronous) throws Exception {
1040 
1041         //removeAll is distributed. Stop it colliding with the rest of the test
1042         waitForPropagate();
1043 
1044 
1045         Serializable key = new Date();
1046         Serializable value = new Date();
1047         Element element1 = new Element(key, value);
1048 
1049         //Put
1050         fromCache.put(element1);
1051 
1052 
1053         if (asynchronous) {
1054             waitForPropagate();
1055         }
1056 
1057         //Should have been replicated to cache2.
1058         Element element2 = toCache.get(key);
1059         assertEquals(element1, element2);
1060 
1061         //Remove
1062         fromCache.removeAll();
1063         if (asynchronous) {
1064             waitForPropagate();
1065         }
1066 
1067         //Should have been replicated to cache2.
1068         element2 = toCache.get(key);
1069         assertNull(element2);
1070         assertEquals(0, toCache.getSize());
1071 
1072     }
1073 
1074 
1075     /***
1076      * Test various cache configurations for cache1 - explicit setting of:
1077      * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
1078      */
1079     @Test
1080     public void testUpdateWithExplicitReplicationConfig() throws Exception {
1081         updateViaCopyTest(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
1082     }
1083 
1084     /***
1085      * Test various cache configurations for cache1 - explicit setting of:
1086      * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
1087      */
1088     @Test
1089     public void testUpdateWithExplicitReplicationSynchronousConfig() throws Exception {
1090         updateViaCopyTest(manager1.getCache("sampleCache3"), manager2.getCache("sampleCache3"), SYNCHRONOUS);
1091     }
1092 
1093 
1094     /***
1095      * Test put replicated for cache4 - no properties.
1096      * Defaults should be replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
1097      */
1098     @Test
1099     public void testUpdateWithEmptyReplicationPropertiesConfig() throws Exception {
1100         updateViaCopyTest(manager1.getCache("sampleCache4"), manager2.getCache("sampleCache4"), ASYNCHRONOUS);
1101     }
1102 
1103     /***
1104      * Tests put and update through copy initiated from cache1 in a cluster
1105      * <p/>
1106      * This test goes into an infinite loop if the chain of notifications is not somehow broken.
1107      */
1108     public void updateViaCopyTest(Ehcache fromCache, Ehcache toCache, boolean asynchronous) throws Exception {
1109 
1110         fromCache.removeAll();
1111         toCache.removeAll();
1112 
1113         //removeAll is distributed. Stop it colliding with the rest of the test
1114         waitForPropagate();
1115 
1116         Serializable key = new Date();
1117         Serializable value = new Date();
1118         Element element1 = new Element(key, value);
1119 
1120         //Put
1121         fromCache.put(element1);
1122         if (asynchronous) {
1123             waitForPropagate();
1124         }
1125 
1126         //Should have been replicated to cache2.
1127         Element element2 = toCache.get(key);
1128         assertEquals(element1, element2);
1129 
1130         //Update
1131         Element updatedElement1 = new Element(key, new Date());
1132 
1133         fromCache.put(updatedElement1);
1134         if (asynchronous) {
1135             waitForPropagate();
1136         }
1137 
1138         //Should have been replicated to cache2.
1139         Element receivedUpdatedElement2 = toCache.get(key);
1140         assertEquals(updatedElement1, receivedUpdatedElement2);
1141 
1142     }
1143 
1144 
1145 
1146     /***
1147      * Tests put through invalidation initiated from cache1 in a cluster
1148      * <p/>
1149      * This test goes into an infinite loop if the chain of notifications is not somehow broken.
1150      */
1151     @Test
1152     public void testPutViaInvalidate() throws CacheException, InterruptedException, IOException {
1153 
1154         cache1 = manager1.getCache("sampleCache2");
1155         cache1.removeAll();
1156 
1157         cache2 = manager2.getCache("sampleCache2");
1158         cache2.removeAll();
1159 
1160         //removeAll is distributed. Stop it colliding with the rest of the test
1161         waitForPropagate();
1162 
1163         String key = "1";
1164         Serializable value = new Date();
1165         Element element1 = new Element(key, value);
1166 
1167         Element element3 = new Element("key2", "two");
1168 
1169         //Put into 2. 2 is configured to replicate puts via copy
1170         cache2.put(element1);
1171         assertNotNull(cache2.get(key));
1172         waitForPropagate();
1173 
1174         //Should have been replicated to cache1.
1175         Element element2 = cache1.get(key);
1176         assertEquals(element1, element2);
1177 
1178         //Put
1179         cache1.put(element3);
1180         waitForPropagate();
1181 
1182         //Invalidate should have been replicated to cache2.
1183         assertNull(cache2.get("key2"));
1184 
1185         //Update
1186         cache1.put(element3);
1187         waitForPropagate();
1188 
1189         //Should have been removed in cache2.
1190         element2 = cache2.get("key2");
1191         assertNull(element2);
1192 
1193     }
1194 
1195 
1196     /***
1197      * Tests put and update through invalidation initiated from cache1 in a cluster
1198      * <p/>
1199      * This test goes into an infinite loop if the chain of notifications is not somehow broken.
1200      */
1201     @Test
1202     public void testUpdateViaInvalidate() throws CacheException, InterruptedException, IOException {
1203 
1204         cache1 = manager1.getCache("sampleCache2");
1205         cache1.removeAll();
1206 
1207         cache2 = manager2.getCache("sampleCache2");
1208         cache2.removeAll();
1209 
1210         //removeAll is distributed. Stop it colliding with the rest of the test
1211         waitForPropagate();
1212 
1213         String key = "1";
1214         Serializable value = new Date();
1215         Element element1 = new Element(key, value);
1216 
1217         //Put
1218         cache2.put(element1);
1219         Element element2 = cache2.get(key);
1220         assertEquals(element1, element2);
1221 
1222         //Update
1223         cache1.put(element1);
1224         waitForPropagate();
1225 
1226         //Should have been removed in cache2.
1227         element2 = cache2.get(key);
1228         assertNull(element2);
1229 
1230     }
1231 
1232 
1233     /***
1234      * Tests put and update through invalidation initiated from cache1 in a cluster
1235      * <p/>
1236      * This test goes into an infinite loop if the chain of notifications is not somehow broken.
1237      */
1238     @Test
1239     public void testUpdateViaInvalidateNonSerializableValue() throws CacheException, InterruptedException, IOException {
1240 
1241         cache1 = manager1.getCache("sampleCache2");
1242         cache1.removeAll();
1243 
1244         cache2 = manager2.getCache("sampleCache2");
1245         cache2.removeAll();
1246 
1247         //removeAll is distributed. Stop it colliding with the rest of the test
1248         waitForPropagate();
1249 
1250         String key = "1";
1251         Serializable value = new Date();
1252 
1253         /***
1254          * Non-serializable test class
1255          */
1256         class NonSerializable {
1257             //
1258         }
1259 
1260         NonSerializable value1 = new NonSerializable();
1261         Element element1 = new Element(key, value1);
1262 
1263         //Put
1264         cache2.put(element1);
1265         Element element2 = cache2.get(key);
1266         assertEquals(element1, element2);
1267 
1268         //Update
1269         cache1.put(element1);
1270         waitForPropagate();
1271 
1272         //Should have been removed in cache2.
1273         element2 = cache2.get(key);
1274         assertNull(element2);
1275 
1276     }
1277 
1278 
1279     /***
1280      * What happens when two cache instances replicate to each other and a change is initiated
1281      */
1282     @Test
1283     public void testInfiniteNotificationsLoop() throws InterruptedException {
1284 
1285         Serializable key = "1";
1286         Serializable value = new Date();
1287         Element element = new Element(key, value);
1288 
1289         //Put
1290         cache1.put(element);
1291         waitForPropagate();
1292 
1293         //Should have been replicated to cache2.
1294         Element element2 = cache2.get(key);
1295         assertEquals(element, element2);
1296 
1297         //Remove
1298         cache1.remove(key);
1299         assertNull(cache1.get(key));
1300 
1301         //Should have been replicated to cache2.
1302         waitForPropagate();
1303         element2 = cache2.get(key);
1304         assertNull(element2);
1305 
1306         //Put into 2
1307         Element element3 = new Element("3", "ddsfds");
1308         cache2.put(element3);
1309         waitForPropagate();
1310         Element element4 = cache2.get("3");
1311         assertEquals(element3, element4);
1312 
1313     }
1314 
1315 
1316     /***
1317      * Shows result of perf problem and fix in flushReplicationQueue
1318      * <p/>
1319      * Behaviour before change:
1320      * <p/>
1321      * INFO: Items written: 10381
1322      * Oct 29, 2007 11:40:04 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1323      * INFO: Items written: 29712
1324      * Oct 29, 2007 11:40:57 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1325      * INFO: Items written: 1
1326      * Oct 29, 2007 11:40:58 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1327      * INFO: Items written: 32354
1328      * Oct 29, 2007 11:42:34 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1329      * INFO: Items written: 322
1330      * Oct 29, 2007 11:42:35 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1331      * INFO: Items written: 41909
1332      * <p/>
1333      * Behaviour after change:
1334      * INFO: Items written: 26356
1335      * Oct 29, 2007 11:44:39 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1336      * INFO: Items written: 33656
1337      * Oct 29, 2007 11:44:40 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1338      * INFO: Items written: 32234
1339      * Oct 29, 2007 11:44:42 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1340      * INFO: Items written: 38677
1341      * Oct 29, 2007 11:44:43 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1342      * INFO: Items written: 43418
1343      * Oct 29, 2007 11:44:44 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1344      * INFO: Items written: 31277
1345      * Oct 29, 2007 11:44:45 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1346      * INFO: Items written: 27769
1347      * Oct 29, 2007 11:44:46 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1348      * INFO: Items written: 29596
1349      * Oct 29, 2007 11:44:47 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1350      * INFO: Items written: 17142
1351      * Oct 29, 2007 11:44:48 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1352      * INFO: Items written: 14775
1353      * Oct 29, 2007 11:44:49 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1354      * INFO: Items written: 4088
1355      * Oct 29, 2007 11:44:51 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1356      * INFO: Items written: 5492
1357      * Oct 29, 2007 11:44:52 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1358      * INFO: Items written: 10188
1359      * <p/>
1360      * Also no pauses noted.
1361      */
1362     @Test
1363     public void testReplicatePerf() throws InterruptedException {
1364 
1365         if (manager2 != null) {
1366             manager2.shutdown();
1367         }
1368         if (manager3 != null) {
1369             manager3.shutdown();
1370         }
1371         if (manager4 != null) {
1372             manager4.shutdown();
1373         }
1374         if (manager5 != null) {
1375             manager5.shutdown();
1376         }
1377         if (manager6 != null) {
1378             manager6.shutdown();
1379         }
1380 
1381         //wait for cluster to drop back to just one: manager1
1382         waitForPropagate();
1383 
1384 
1385         long start = System.currentTimeMillis();
1386         final String keyBase = Long.toString(start);
1387         int count = 0;
1388 
1389         for (int i = 0; i < 100000; i++) {
1390             final String key = keyBase + ':' + Integer.toString((int) (Math.random() * 1000.0));
1391             cache1.put(new Element(key, "My Test"));
1392             cache1.get(key);
1393             cache1.remove(key);
1394             count++;
1395 
1396             final long end = System.currentTimeMillis();
1397             if (end - start >= 1000) {
1398                 start = end;
1399                 LOG.info("Items written: " + count);
1400                 //make sure it does not choke
1401                 assertTrue(count > 1000);
1402                 count = 0;
1403             }
1404         }
1405     }
1406 
1407 
1408     /***
1409      * Need to wait for async
1410      *
1411      * @throws InterruptedException
1412      */
1413     protected void waitForPropagate() throws InterruptedException {
1414         Thread.sleep(1500);
1415     }
1416 
1417     /***
1418      * Need to wait for async
1419      *
1420      * @throws InterruptedException
1421      */
1422     protected void waitForSlowPropagate() throws InterruptedException {
1423         Thread.sleep(6000);
1424     }
1425 
1426 
1427     /***
1428      * Distributed operations create extra scope for deadlock.
1429      * This test checks whether a distributed deadlock scenario exists for synchronous replication
1430      * of each distributed operation all at once.
1431      * It shows that no distributed deadlock exists for asynchronous replication. It is multi thread
1432      * and multi process safe.
1433      * <p/>
1434      * Carefully tailored to exercise:
1435      * <ol>
1436      * <li>overflow to disk. We put in 20 things and the memory size is 10
1437      * <li>each peer is working on the same set of keys thus maximising contention
1438      * <li>we do puts, gets and removes to explore all the execution paths
1439      * </ol>
1440      * If a deadlock occurs, processing will stop until a SocketTimeout exception is thrown and
1441      * the deadlock will be released.
1442      */
1443     @Test
1444     public void testCacheOperationsSynchronousMultiThreaded() throws Exception, InterruptedException {
1445 
1446         // Run a set of threads, that attempt to fetch the elements
1447         final List executables = new ArrayList();
1448 
1449         executables.add(new ClusterExecutable(manager1, "sampleCache3"));
1450         executables.add(new ClusterExecutable(manager2, "sampleCache3"));
1451         executables.add(new ClusterExecutable(manager3, "sampleCache3"));
1452 
1453         runThreads(executables);
1454     }
1455 
1456 
1457     /***
1458      * Distributed operations create extra scope for deadlock.
1459      * This test checks whether a distributed deadlock scenario exists for asynchronous replication
1460      * of each distributed operation all at once.
1461      * It shows that no distributed deadlock exists for asynchronous replication. It is multi thread
1462      * and multi process safe.
1463      * It uses sampleCache2, which is configured to be asynchronous
1464      * <p/>
1465      * Carefully tailored to exercise:
1466      * <ol>
1467      * <li>overflow to disk. We put in 20 things and the memory size is 10
1468      * <li>each peer is working on the same set of keys thus maximising contention
1469      * <li>we do puts, gets and removes to explore all the execution paths
1470      * </ol>
1471      */
1472     @Test
1473     public void testCacheOperationsAynchronousMultiThreaded() throws Exception, InterruptedException {
1474 
1475         // Run a set of threads, that attempt to fetch the elements
1476         final List executables = new ArrayList();
1477 
1478         executables.add(new ClusterExecutable(manager1, "sampleCache2"));
1479         executables.add(new ClusterExecutable(manager2, "sampleCache2"));
1480         executables.add(new ClusterExecutable(manager3, "sampleCache2"));
1481 
1482         runThreads(executables);
1483     }
1484 
1485     /***
1486      * An Exececutable which allows the CacheManager to be set
1487      */
1488     class ClusterExecutable implements Executable {
1489 
1490         private CacheManager manager;
1491         private String cacheName;
1492 
1493         /***
1494          * Construct with CacheManager
1495          *
1496          * @param manager
1497          */
1498         public ClusterExecutable(CacheManager manager, String cacheName) {
1499             this.manager = manager;
1500             this.cacheName = cacheName;
1501         }
1502 
1503         /***
1504          * Execute
1505          *
1506          * @throws Exception
1507          */
1508         public void execute() throws Exception {
1509             Random random = new Random();
1510 
1511             for (int i = 0; i < 20; i++) {
1512                 Integer key = Integer.valueOf((i));
1513                 int operationSelector = random.nextInt(4);
1514                 Cache cache = manager.getCache(cacheName);
1515                 if (operationSelector == 100) {
1516                     cache.get(key);
1517                     if (LOG.isDebugEnabled()) {
1518                         LOG.debug(cache.getGuid() + ": get " + key);
1519                     }
1520                 } else if (operationSelector == 100) {
1521                     cache.remove(key);
1522                     if (LOG.isDebugEnabled()) {
1523                         LOG.debug(cache.getGuid() + ": remove " + key);
1524                     }
1525                 } else if (operationSelector == 2) {
1526                     cache.put(new Element(key,
1527                             "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
1528                                     + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
1529                                     + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
1530                                     + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
1531                                     + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
1532                     if (LOG.isDebugEnabled()) {
1533                         LOG.debug(cache.getGuid() + ": put " + key);
1534                     }
1535                 } else {
1536                     //every twelfth time 1/4 * 1/3 = 1/12
1537                     if (random.nextInt(3) == 1) {
1538                         LOG.debug("cache.removeAll()");
1539                         cache.removeAll();
1540                     }
1541                 }
1542             }
1543 
1544         }
1545     }
1546 
1547 }