1 /***
2 * Copyright 2003-2009 Terracotta, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package net.sf.ehcache.distribution;
18
19 import net.sf.ehcache.AbstractCacheTest;
20 import net.sf.ehcache.Cache;
21 import net.sf.ehcache.CacheException;
22 import net.sf.ehcache.CacheManager;
23 import net.sf.ehcache.Ehcache;
24 import net.sf.ehcache.Element;
25 import net.sf.ehcache.StopWatch;
26 import net.sf.ehcache.ThreadKiller;
27 import net.sf.ehcache.event.CountingCacheEventListener;
28 import net.sf.ehcache.management.ManagementService;
29 import org.junit.After;
30 import static org.junit.Assert.assertEquals;
31 import static org.junit.Assert.assertNotNull;
32 import static org.junit.Assert.assertNull;
33 import static org.junit.Assert.assertTrue;
34 import static org.junit.Assert.fail;
35 import org.junit.Before;
36 import org.junit.Test;
37 import org.junit.Ignore;
38
39 import java.io.IOException;
40 import java.io.Serializable;
41 import java.rmi.RemoteException;
42 import java.util.ArrayList;
43 import java.util.Arrays;
44 import java.util.Date;
45 import java.util.List;
46 import java.util.Random;
47
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 /***
52 * Tests replication of Cache events
53 * <p/>
54 * Note these tests need a live network interface running in multicast mode to work
55 * <p/>
56 * If running involving RMIAsynchronousCacheReplicator individually the test will fail because
57 * the VM will gobble up the SoftReferences rather than allocating more memory. Uncomment the
58 * forceVMGrowth() method usage in setup.
59 *
60 * @author Greg Luck
61 * @version $Id: RMICacheReplicatorTest.java 1619 2010-01-08 15:46:20Z cdennis $
62 */
63
64
65
66
67
68
69 public class RMICacheReplicatorTest extends AbstractCacheTest {
70
71
72 /***
73 * A value to represent replicate asynchronously
74 */
75 protected static final boolean ASYNCHRONOUS = true;
76
77 /***
78 * A value to represent replicate synchronously
79 */
80 protected static final boolean SYNCHRONOUS = false;
81
82 private static final Logger LOG = LoggerFactory.getLogger(RMICacheReplicatorTest.class.getName());
83
84
85 /***
86 * CacheManager 1 in the cluster
87 */
88 protected CacheManager manager1;
89 /***
90 * CacheManager 2 in the cluster
91 */
92 protected CacheManager manager2;
93 /***
94 * CacheManager 3 in the cluster
95 */
96 protected CacheManager manager3;
97 /***
98 * CacheManager 4 in the cluster
99 */
100 protected CacheManager manager4;
101 /***
102 * CacheManager 5 in the cluster
103 */
104 protected CacheManager manager5;
105 /***
106 * CacheManager 6 in the cluster
107 */
108 protected CacheManager manager6;
109
110 /***
111 * The name of the cache under test
112 */
113 protected String cacheName = "sampleCache1";
114 /***
115 * CacheManager 1 of 2s cache being replicated
116 */
117 protected Ehcache cache1;
118
119 /***
120 * CacheManager 2 of 2s cache being replicated
121 */
122 protected Ehcache cache2;
123
124 /***
125 * Allows setup to be the same
126 */
127 protected String cacheNameBase = "ehcache-distributed";
128
129 /***
130 * {@inheritDoc}
131 * Sets up two caches: cache1 is local. cache2 is to be receive updates
132 *
133 * @throws Exception
134 */
135 @Override
136 @Before
137 public void setUp() throws Exception {
138
139
140
141
142
143 MulticastKeepaliveHeartbeatSender.setHeartBeatInterval(1000);
144
145 CountingCacheEventListener.resetCounters();
146 manager1 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed1.xml");
147 manager2 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed2.xml");
148 manager3 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed3.xml");
149 manager4 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed4.xml");
150 manager5 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed5.xml");
151
152
153
154
155 Thread.sleep(1020);
156
157 cache1 = manager1.getCache(cacheName);
158 cache1.removeAll();
159
160 cache2 = manager2.getCache(cacheName);
161 cache2.removeAll();
162
163
164 waitForPropagate();
165
166
167 }
168
169 /***
170 * {@inheritDoc}
171 *
172 * @throws Exception
173 */
174 @Override
175 @After
176 public void tearDown() throws Exception {
177
178 if (manager1 != null) {
179 manager1.shutdown();
180 }
181 if (manager2 != null) {
182 manager2.shutdown();
183 }
184 if (manager3 != null) {
185 manager3.shutdown();
186 }
187 if (manager4 != null) {
188 manager4.shutdown();
189 }
190 if (manager5 != null) {
191 manager5.shutdown();
192 }
193 if (manager6 != null) {
194 manager6.shutdown();
195 }
196 Thread.sleep(2000);
197
198 List threads = JVMUtil.enumerateThreads();
199 for (int i = 0; i < threads.size(); i++) {
200 Thread thread = (Thread) threads.get(i);
201 if (thread.getName().equals("Replication Thread")) {
202 fail("There should not be any replication threads running after shutdown");
203 }
204 }
205
206 }
207
208 /***
209 * 5 cache managers should means that each cache has four remote peers
210 */
211
212 @Test
213 public void testRemoteCachePeersEqualsNumberOfCacheManagersInCluster() {
214
215 CacheManagerPeerProvider provider = manager1.getCacheManagerPeerProvider("RMI");
216 List remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
217 assertEquals(4, remotePeersOfCache1.size());
218 }
219
220 /***
221 * Does a new cache manager in the cluster get detected?
222 */
223
224 @Test
225 public void testRemoteCachePeersDetectsNewCacheManager() throws InterruptedException {
226
227 CacheManagerPeerProvider provider = manager1.getCacheManagerPeerProvider("RMI");
228 List remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
229 assertEquals(4, remotePeersOfCache1.size());
230
231
232 manager6 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed6.xml");
233
234
235 Thread.sleep(10020);
236
237 remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
238 assertEquals(5, remotePeersOfCache1.size());
239 }
240
241 /***
242 * Does a down cache manager in the cluster get removed?
243 */
244
245 @Test
246 public void testRemoteCachePeersDetectsDownCacheManager() throws InterruptedException {
247
248 CacheManagerPeerProvider provider = manager1.getCacheManagerPeerProvider("RMI");
249 List remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
250 assertEquals(4, remotePeersOfCache1.size());
251
252
253 manager5.shutdown();
254
255
256 Thread.sleep(11020);
257 remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
258
259
260 assertEquals(3, remotePeersOfCache1.size());
261 }
262
263 /***
264 * Does a down cache manager in the cluster get removed?
265 */
266
267 @Test
268 public void testRemoteCachePeersDetectsDownCacheManagerSlow() throws InterruptedException {
269
270 try {
271 CacheManagerPeerProvider provider = manager1.getCacheManagerPeerProvider("RMI");
272 List remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
273 assertEquals(4, remotePeersOfCache1.size());
274
275 MulticastKeepaliveHeartbeatSender.setHeartBeatInterval(2000);
276 Thread.sleep(2000);
277
278
279 manager5.shutdown();
280
281
282 remotePeersOfCache1 = provider.listRemoteCachePeers(cache1);
283 assertEquals(4, remotePeersOfCache1.size());
284 } finally {
285 MulticastKeepaliveHeartbeatSender.setHeartBeatInterval(1000);
286 Thread.sleep(2000);
287 }
288
289
290 }
291
292 /***
293 * Tests put and remove initiated from cache1 in a cluster
294 * <p/>
295 * This test goes into an infinite loop if the chain of notifications is not somehow broken.
296 */
297 @Test
298 public void testPutProgagatesFromAndToEveryCacheManagerAndCache() throws CacheException, InterruptedException {
299
300
301 String[] cacheNames = manager1.getCacheNames();
302 int numberOfCaches = getNumberOfReplicatingCachesInCacheManager();
303 Arrays.sort(cacheNames);
304 for (int i = 0; i < cacheNames.length; i++) {
305 String name = cacheNames[i];
306 manager1.getCache(name).put(new Element("" + i, Integer.valueOf(i)));
307
308 manager1.getCache(name).put(new Element("nonSerializable" + i, new Object()));
309 }
310
311 waitForPropagate();
312
313 int count2 = 0;
314 int count3 = 0;
315 int count4 = 0;
316 int count5 = 0;
317 for (int i = 0; i < cacheNames.length; i++) {
318 String name = cacheNames[i];
319 Element element2 = manager2.getCache(name).get("" + i);
320 if (element2 != null) {
321 count2++;
322 }
323 Element nonSerializableElement2 = manager2.getCache(name).get("nonSerializable" + i);
324 if (nonSerializableElement2 != null) {
325 count2++;
326 }
327 Element element3 = manager3.getCache(name).get("" + i);
328 if (element3 != null) {
329 count3++;
330 }
331 Element element4 = manager4.getCache(name).get("" + i);
332 if (element4 != null) {
333 count4++;
334 }
335 Element element5 = manager5.getCache(name).get("" + i);
336 if (element5 != null) {
337 count5++;
338 }
339 }
340
341 assertEquals(numberOfCaches - 1, count2);
342 assertEquals(numberOfCaches - 1, count3);
343 assertEquals(numberOfCaches - 1, count4);
344 assertEquals(numberOfCaches - 1, count5);
345
346
347 }
348
349 /***
350 * Tests what happens when a CacheManager in the cluster comes and goes. In ehcache-1.2.4 this would cause the new RMI CachePeers in the CacheManager to
351 * be permanently corrupt.
352 */
353
354 @Test
355 public void testPutProgagatesFromAndToEveryCacheManagerAndCacheDirty() throws CacheException, InterruptedException {
356
357 manager3.shutdown();
358
359 Thread.sleep(11020);
360
361 manager3 = new CacheManager(AbstractCacheTest.TEST_CONFIG_DIR + "distribution/ehcache-distributed3.xml");
362 Thread.sleep(11020);
363
364
365 String[] cacheNames = manager1.getCacheNames();
366 int numberOfCaches = getNumberOfReplicatingCachesInCacheManager();
367 Arrays.sort(cacheNames);
368 for (int i = 0; i < cacheNames.length; i++) {
369 String name = cacheNames[i];
370 manager1.getCache(name).put(new Element("" + i, Integer.valueOf(i)));
371
372 manager1.getCache(name).put(new Element("nonSerializable" + i, new Object()));
373 }
374
375 waitForPropagate();
376
377 int count2 = 0;
378 int count3 = 0;
379 int count4 = 0;
380 int count5 = 0;
381 for (int i = 0; i < cacheNames.length; i++) {
382 String name = cacheNames[i];
383 Element element2 = manager2.getCache(name).get("" + i);
384 if (element2 != null) {
385 count2++;
386 }
387 Element nonSerializableElement2 = manager2.getCache(name).get("nonSerializable" + i);
388 if (nonSerializableElement2 != null) {
389 count2++;
390 }
391 Element element3 = manager3.getCache(name).get("" + i);
392 if (element3 != null) {
393 count3++;
394 }
395 Element element4 = manager4.getCache(name).get("" + i);
396 if (element4 != null) {
397 count4++;
398 }
399 Element element5 = manager5.getCache(name).get("" + i);
400 if (element5 != null) {
401 count5++;
402 }
403 }
404
405 assertEquals(numberOfCaches - 1, count2);
406 assertEquals(numberOfCaches - 1, count3);
407 assertEquals(numberOfCaches - 1, count4);
408 assertEquals(numberOfCaches - 1, count5);
409
410
411 }
412
413 /***
414 * Enables long stabilty runs using replication to be done.
415 * <p/>
416 * This test has been run in a profile for 15 hours without any observed issues.
417 *
418 * @throws InterruptedException
419 */
420 public void manualStabilityTest() throws InterruptedException {
421 forceVMGrowth();
422
423 ManagementService.registerMBeans(manager3, createMBeanServer(), true, true, true, true);
424 while (true) {
425 testBigPutsProgagatesAsynchronous();
426 }
427 }
428
429 /***
430 * Non JUnit invocation of stability test to get cleaner run
431 *
432 * @param args
433 * @throws InterruptedException
434 */
435 public static void main(String[] args) throws Exception {
436 RMICacheReplicatorTest replicatorTest = new RMICacheReplicatorTest();
437 replicatorTest.setUp();
438 replicatorTest.manualStabilityTest();
439 }
440
441 /***
442 * The number of caches there should be.
443 */
444 protected int getNumberOfReplicatingCachesInCacheManager() {
445 return 55;
446 }
447
448
449 /***
450 * Performance and capacity tests.
451 * <p/>
452 * The numbers given are for the remote peer tester (java -jar ehcache-1.x-remote-debugger.jar ehcache-distributed1.xml)
453 * running on a 10Mbit ethernet network and are measured from the time the peer starts receiving to when
454 * it has fully received.
455 * <p/>
456 * r37 and earlier - initial implementation
457 * 38 seconds to get all notifications with 6 peers, 2000 Elements and 400 byte payload
458 * 18 seconds to get all notifications with 2 peers, 2000 Elements and 400 byte payload
459 * 40 seconds to get all notifications with 2 peers, 2000 Elements and 10k payload
460 * 22 seconds to get all notifications with 2 peers, 2000 Elements and 1k payload
461 * 26 seconds to get all notifications with 2 peers, 200 Elements and 100k payload
462 * <p/>
463 * r38 - RMI stub lookup on registration rather than at each lookup. Saves quite a few lookups. Also change to 5 second heartbeat
464 * 38 seconds to get 2000 notifications with 6 peers, Elements with 400 byte payload (1 second heartbeat)
465 * 16 seconds to get 2000 notifications with 6 peers, Elements with 400 byte payload (5 second heartbeat)
466 * 13 seconds to get 2000 notifications with 2 peers, Elements with 400 byte payload
467 * <p/>
468 * r39 - Batching asyn replicator. Send all queued messages in one RMI call once per second.
469 * 2 seconds to get 2000 notifications with 6 peers, Elements with 400 byte payload (5 second heartbeat)
470 */
471
472 @Test
473 public void testBigPutsProgagatesAsynchronous() throws CacheException, InterruptedException {
474
475
476
477 StopWatch stopWatch = new StopWatch();
478 Integer index = null;
479 for (int i = 0; i < 2; i++) {
480 for (int j = 0; j < 1000; j++) {
481 index = Integer.valueOf(((1000 * i) + j));
482 cache1.put(new Element(index,
483 "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
484 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
485 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
486 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
487 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
488 }
489
490 }
491 long elapsed = stopWatch.getElapsedTime();
492 long putTime = ((elapsed / 1000));
493 LOG.info("Put Elapsed time: " + putTime);
494
495
496 assertEquals(2000, cache1.getSize());
497
498 Thread.sleep(2000);
499 assertEquals(2000, manager2.getCache("sampleCache1").getSize());
500 assertEquals(2000, manager3.getCache("sampleCache1").getSize());
501 assertEquals(2000, manager4.getCache("sampleCache1").getSize());
502 assertEquals(2000, manager5.getCache("sampleCache1").getSize());
503
504 CountingCacheEventListener.resetCounters();
505
506 }
507
508
509 /***
510 * Performance and capacity tests.
511 * <p/>
512 */
513
514 @Test
515 public void testBootstrap() throws CacheException, InterruptedException, RemoteException {
516
517
518 StopWatch stopWatch = new StopWatch();
519 Integer index = null;
520 for (int i = 0; i < 2; i++) {
521 for (int j = 0; j < 1000; j++) {
522 index = Integer.valueOf(((1000 * i) + j));
523 cache1.put(new Element(index,
524 "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
525 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
526 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
527 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
528 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
529 }
530
531 }
532 long elapsed = stopWatch.getElapsedTime();
533 long putTime = ((elapsed / 1000));
534 LOG.info("Put Elapsed time: " + putTime);
535
536 assertEquals(2000, cache1.getSize());
537
538 Thread.sleep(7000);
539 assertEquals(2000, manager2.getCache("sampleCache1").getSize());
540 assertEquals(2000, manager3.getCache("sampleCache1").getSize());
541 assertEquals(2000, manager4.getCache("sampleCache1").getSize());
542 assertEquals(2000, manager5.getCache("sampleCache1").getSize());
543
544
545 manager1.addCache("bootStrapResults");
546 Cache cache = manager1.getCache("bootStrapResults");
547 List cachePeers = manager1.getCacheManagerPeerProvider("RMI").listRemoteCachePeers(cache1);
548 CachePeer cachePeer = (CachePeer) cachePeers.get(0);
549
550 List keys = cachePeer.getKeys();
551 assertEquals(2000, keys.size());
552
553 Element firstElement = cachePeer.getQuiet((Serializable) keys.get(0));
554 long size = firstElement.getSerializedSize();
555 assertEquals(504, size);
556
557 int chunkSize = (int) (5000000 / size);
558
559 List requestChunk = new ArrayList();
560 for (int i = 0; i < keys.size(); i++) {
561 Serializable serializable = (Serializable) keys.get(i);
562 requestChunk.add(serializable);
563 if (requestChunk.size() == chunkSize) {
564 fetchAndPutElements(cache, requestChunk, cachePeer);
565 requestChunk.clear();
566 }
567 }
568
569 fetchAndPutElements(cache, requestChunk, cachePeer);
570
571 assertEquals(keys.size(), cache.getSize());
572
573 }
574
575 private void fetchAndPutElements(Ehcache cache, List requestChunk, CachePeer cachePeer) throws RemoteException {
576 List receivedChunk = cachePeer.getElements(requestChunk);
577 for (int i = 0; i < receivedChunk.size(); i++) {
578 Element element = (Element) receivedChunk.get(i);
579 assertNotNull(element);
580 cache.put(element, true);
581 }
582
583 }
584
585
586 /***
587 * Drive everything to point of breakage within a 64MB VM.
588 */
589 public void xTestHugePutsBreaksAsynchronous() throws CacheException, InterruptedException {
590
591
592 StopWatch stopWatch = new StopWatch();
593 Integer index = null;
594 for (int i = 0; i < 500; i++) {
595 for (int j = 0; j < 1000; j++) {
596 index = Integer.valueOf(((1000 * i) + j));
597 cache1.put(new Element(index,
598 "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
599 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
600 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
601 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
602 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
603 }
604
605 }
606 long elapsed = stopWatch.getElapsedTime();
607 long putTime = ((elapsed / 1000));
608 LOG.info("Put Elapsed time: " + putTime);
609
610
611 assertEquals(100000, cache1.getSize());
612
613 Thread.sleep(100000);
614 assertEquals(20000, manager2.getCache("sampleCache1").getSize());
615 assertEquals(20000, manager3.getCache("sampleCache1").getSize());
616 assertEquals(20000, manager4.getCache("sampleCache1").getSize());
617 assertEquals(20000, manager5.getCache("sampleCache1").getSize());
618
619 }
620
621
622 /***
623 * Performance and capacity tests.
624 * <p/>
625 * The numbers given are for the remote peer tester (java -jar ehcache-1.x-remote-debugger.jar ehcache-distributed1.xml)
626 * running on a 10Mbit ethernet network and are measured from the time the peer starts receiving to when
627 * it has fully received.
628 * <p/>
629 * 4 seconds to get all remove notifications with 6 peers, 5000 Elements and 400 byte payload
630 */
631 @Test
632 public void testBigRemovesProgagatesAsynchronous() throws CacheException, InterruptedException {
633
634
635 Integer index = null;
636 for (int i = 0; i < 5; i++) {
637 for (int j = 0; j < 1000; j++) {
638 index = Integer.valueOf(((1000 * i) + j));
639 cache1.put(new Element(index,
640 "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
641 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
642 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
643 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
644 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
645 }
646
647 }
648
649
650 Ehcache[] caches = {
651 cache1,
652 manager2.getCache("sampleCache1"),
653 manager3.getCache("sampleCache1"),
654 manager4.getCache("sampleCache1"),
655 manager5.getCache("sampleCache1") };
656
657 waitForCacheSize(5000, 25, caches);
658
659 Thread.sleep(2000);
660
661 for (int i = 0; i < 5; i++) {
662 for (int j = 0; j < 1000; j++) {
663 cache1.remove(Integer.valueOf(((1000 * i) + j)));
664 }
665 }
666
667 long timeForPropagate = waitForCacheSize(0, 25, caches);
668 LOG.info("Remove Elapsed time: " + timeForPropagate);
669
670 }
671
672 public long waitForCacheSize(long size, int maxSeconds, Ehcache... caches) throws InterruptedException {
673
674 StopWatch stopWatch = new StopWatch();
675 while(checkForCacheSize(size, caches)) {
676 Thread.sleep(500);
677 if(stopWatch.getElapsedTime() > maxSeconds * 1000) {
678 fail("Caches still haven't reached the expected size after " + maxSeconds + " seconds");
679 }
680 }
681
682 return stopWatch.getElapsedTime();
683 }
684
685 private boolean checkForCacheSize(long size, Ehcache... caches) {
686 boolean sizeReached = true;
687 for (Ehcache cache : caches) {
688 if(cache.getSize() != size) {
689 sizeReached = false;
690 break;
691 }
692 }
693 return sizeReached;
694 }
695
696
697 /***
698 * Performance and capacity tests.
699 * <p/>
700 * 5 seconds to send all notifications synchronously with 5 peers, 2000 Elements and 400 byte payload
701 * The numbers given below are for the remote peer tester (java -jar ehcache-1.x-remote-debugger.jar ehcache-distributed1.xml)
702 * running on a 10Mbit ethernet network and are measured from the time the peer starts receiving to when
703 * it has fully received.
704 */
705 @Test
706 public void testBigPutsProgagatesSynchronous() throws CacheException, InterruptedException {
707
708
709 StopWatch stopWatch = new StopWatch();
710 Integer index;
711 for (int i = 0; i < 2; i++) {
712 for (int j = 0; j < 1000; j++) {
713 index = Integer.valueOf(((1000 * i) + j));
714 manager1.getCache("sampleCache3").put(new Element(index,
715 "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
716 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
717 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
718 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
719 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
720 }
721
722 }
723 long elapsed = stopWatch.getElapsedTime();
724 long putTime = ((elapsed / 1000));
725 LOG.info("Put and Propagate Synchronously Elapsed time: " + putTime + " seconds");
726
727 assertEquals(2000, manager1.getCache("sampleCache3").getSize());
728 assertEquals(2000, manager2.getCache("sampleCache3").getSize());
729 assertEquals(2000, manager3.getCache("sampleCache3").getSize());
730 assertEquals(2000, manager4.getCache("sampleCache3").getSize());
731 assertEquals(2000, manager5.getCache("sampleCache3").getSize());
732
733 }
734
735
736 /***
737 * manager1 adds a replicating cache, then manager2 and so on. Then we remove one. Does everything work as expected?
738 */
739 @Test
740 public void testPutWithNewCacheAddedProgressively() throws InterruptedException {
741
742 manager1.addCache("progressiveAddCache");
743 manager2.addCache("progressiveAddCache");
744
745
746 try {
747 putTest(manager1.getCache("progressiveAddCache"), manager2.getCache("progressiveAddCache"), ASYNCHRONOUS);
748 fail();
749 } catch (AssertionError e) {
750
751 }
752
753
754 putTest(manager1.getCache("progressiveAddCache"), manager2.getCache("progressiveAddCache"), ASYNCHRONOUS);
755
756 Cache secondCache = manager2.getCache("progressiveAddCache");
757
758
759 manager2.removeCache("progressiveAddCache");
760 try {
761 putTest(manager1.getCache("progressiveAddCache"), secondCache, ASYNCHRONOUS);
762 fail();
763 } catch (IllegalStateException e) {
764
765
766 }
767
768
769 }
770
771
772 /***
773 * Test various cache configurations for cache1 - explicit setting of:
774 * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
775 */
776 @Test
777 public void testPutWithExplicitReplicationConfig() throws InterruptedException {
778
779 putTest(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
780 }
781
782
783
784 /***
785 * Test various cache configurations for cache1 - explicit setting of:
786 * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
787 */
788 @Test
789 public void testPutWithThreadKiller() throws InterruptedException {
790
791 putTestWithThreadKiller(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
792 }
793
794 /***
795 * CacheEventListeners that are not CacheReplicators should receive cache events originated from receipt
796 * of a remote event by a CachePeer.
797 */
798
799 @Test
800 public void testRemotelyReceivedPutNotifiesCountingListener() throws InterruptedException {
801
802 putTest(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
803 assertEquals(1, CountingCacheEventListener.getCacheElementsPut(manager1.getCache("sampleCache1")).size());
804 assertEquals(1, CountingCacheEventListener.getCacheElementsPut(manager2.getCache("sampleCache1")).size());
805
806 }
807
808 /***
809 * Test various cache configurations for cache1 - explicit setting of:
810 * properties="replicateAsynchronously=false, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
811 */
812 @Test
813 public void testPutWithExplicitReplicationSynchronousConfig() throws InterruptedException {
814 putTest(manager1.getCache("sampleCache3"), manager2.getCache("sampleCache3"), SYNCHRONOUS);
815 }
816
817
818 /***
819 * Test put replicated for cache4 - no properties.
820 * Defaults should be replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
821 */
822 @Test
823 public void testPutWithEmptyReplicationPropertiesConfig() throws InterruptedException {
824 putTest(manager1.getCache("sampleCache4"), manager2.getCache("sampleCache4"), ASYNCHRONOUS);
825 }
826
827 /***
828 * Test put replicated for cache4 - missing replicatePuts property.
829 * replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
830 * should equal replicateAsynchronously=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
831 */
832 @Test
833 public void testPutWithOneMissingReplicationPropertyConfig() throws InterruptedException {
834 putTest(manager1.getCache("sampleCache5"), manager2.getCache("sampleCache5"), ASYNCHRONOUS);
835 }
836
837
838 /***
839 * Tests put and remove initiated from cache1 in a cluster
840 * <p/>
841 * This test goes into an infinite loop if the chain of notifications is not somehow broken.
842 */
843 public void putTest(Ehcache fromCache, Ehcache toCache, boolean asynchronous) throws CacheException, InterruptedException {
844
845 Serializable key = new Date();
846 Serializable value = new Date();
847 Element sourceElement = new Element(key, value);
848
849
850 fromCache.put(sourceElement);
851 int i = 0;
852
853 if (asynchronous) {
854 waitForPropagate();
855 }
856
857
858 Element deliveredElement = toCache.get(key);
859 assertEquals(sourceElement, deliveredElement);
860
861 }
862
863
864 /***
865 * Tests put and remove initiated from cache1 in a cluster
866 * <p/>
867 * This test goes into an infinite loop if the chain of notifications is not somehow broken.
868 */
869 public void putTestWithThreadKiller(Ehcache fromCache, Ehcache toCache, boolean asynchronous)
870 throws CacheException, InterruptedException {
871
872 fromCache.put(new Element("thread killer", new ThreadKiller()));
873 if (asynchronous) {
874 waitForPropagate();
875 }
876
877 Serializable key = new Date();
878 Serializable value = new Date();
879 Element sourceElement = new Element(key, value);
880
881
882 fromCache.put(sourceElement);
883
884 if (asynchronous) {
885 waitForPropagate();
886 }
887
888
889 Element deliveredElement = toCache.get(key);
890 assertEquals(sourceElement, deliveredElement);
891
892 }
893
894
895 /***
896 * Checks that a put received from a remote cache notifies any registered listeners.
897 * <p/>
898 * This test goes into an infinite loop if the chain of notifications is not somehow broken.
899 */
900 @Test
901 public void testRemotePutNotificationGetsToOtherListeners() throws CacheException, InterruptedException {
902
903 Serializable key = new Date();
904 Serializable value = new Date();
905 Element element1 = new Element(key, value);
906
907
908 cache1.put(new Element("1", new Date()));
909 cache1.put(new Element("2", new Date()));
910 cache1.put(new Element("3", new Date()));
911
912
913 Object nonSerializableObject = new Object();
914 cache1.put(new Element(nonSerializableObject, new Object()));
915
916
917 waitForPropagate();
918
919
920 assertEquals(4, CountingCacheEventListener.getCacheElementsPut(cache1).size());
921
922 assertEquals(3, CountingCacheEventListener.getCacheElementsPut(cache2).size());
923
924
925 cache1.put(new Element("1", new Date()));
926 cache1.put(new Element("2", new Date()));
927 cache1.put(new Element("3", new Date()));
928
929
930 cache1.put(new Element(nonSerializableObject, new Object()));
931
932 waitForPropagate();
933
934
935 assertEquals(4, CountingCacheEventListener.getCacheElementsUpdated(cache1).size());
936
937 assertEquals(3, CountingCacheEventListener.getCacheElementsUpdated(cache2).size());
938
939
940 cache1.remove("1");
941 cache1.remove("2");
942 cache1.remove("3");
943 cache1.remove(nonSerializableObject);
944
945 waitForPropagate();
946
947
948 assertEquals(4, CountingCacheEventListener.getCacheElementsRemoved(cache1).size());
949
950 assertEquals(3, CountingCacheEventListener.getCacheElementsRemoved(cache2).size());
951
952 }
953
954
955 /***
956 * Test various cache configurations for cache1 - explicit setting of:
957 * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
958 */
959 @Test
960 public void testRemoveWithExplicitReplicationConfig() throws InterruptedException {
961 removeTest(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
962 }
963
964 /***
965 * Test various cache configurations for cache1 - explicit setting of:
966 * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
967 */
968 @Test
969 public void testRemoveWithExplicitReplicationSynchronousConfig() throws InterruptedException {
970 removeTest(manager1.getCache("sampleCache3"), manager2.getCache("sampleCache3"), SYNCHRONOUS);
971 }
972
973
974 /***
975 * Test put replicated for cache4 - no properties.
976 * Defaults should be replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
977 */
978 @Test
979 public void testRemoveWithEmptyReplicationPropertiesConfig() throws InterruptedException {
980 removeTest(manager1.getCache("sampleCache4"), manager2.getCache("sampleCache4"), ASYNCHRONOUS);
981 }
982
983 /***
984 * Tests put and remove initiated from a cache to another cache in a cluster
985 * <p/>
986 * This test goes into an infinite loop if the chain of notifications is not somehow broken.
987 */
988 public void removeTest(Ehcache fromCache, Ehcache toCache, boolean asynchronous) throws CacheException, InterruptedException {
989
990 Serializable key = new Date();
991 Serializable value = new Date();
992 Element element1 = new Element(key, value);
993
994
995 fromCache.put(element1);
996
997 if (asynchronous) {
998 waitForPropagate();
999 }
1000
1001
1002 Element element2 = toCache.get(key);
1003 assertEquals(element1, element2);
1004
1005
1006 fromCache.remove(key);
1007 if (asynchronous) {
1008 waitForPropagate();
1009 }
1010
1011
1012 element2 = toCache.get(key);
1013 assertNull(element2);
1014
1015 }
1016
1017
1018 /***
1019 * test removeAll sync
1020 */
1021 @Test
1022 public void testRemoveAllAsynchronous() throws Exception {
1023 removeAllTest(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
1024 }
1025
1026 /***
1027 * test removeAll async
1028 */
1029 @Test
1030 public void testRemoveAllSynchronous() throws Exception {
1031 removeAllTest(manager1.getCache("sampleCache3"), manager2.getCache("sampleCache3"), SYNCHRONOUS);
1032 }
1033
1034 /***
1035 * Tests removeAll initiated from a cache to another cache in a cluster
1036 * <p/>
1037 * This test goes into an infinite loop if the chain of notifications is not somehow broken.
1038 */
1039 public void removeAllTest(Ehcache fromCache, Ehcache toCache, boolean asynchronous) throws Exception {
1040
1041
1042 waitForPropagate();
1043
1044
1045 Serializable key = new Date();
1046 Serializable value = new Date();
1047 Element element1 = new Element(key, value);
1048
1049
1050 fromCache.put(element1);
1051
1052
1053 if (asynchronous) {
1054 waitForPropagate();
1055 }
1056
1057
1058 Element element2 = toCache.get(key);
1059 assertEquals(element1, element2);
1060
1061
1062 fromCache.removeAll();
1063 if (asynchronous) {
1064 waitForPropagate();
1065 }
1066
1067
1068 element2 = toCache.get(key);
1069 assertNull(element2);
1070 assertEquals(0, toCache.getSize());
1071
1072 }
1073
1074
1075 /***
1076 * Test various cache configurations for cache1 - explicit setting of:
1077 * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
1078 */
1079 @Test
1080 public void testUpdateWithExplicitReplicationConfig() throws Exception {
1081 updateViaCopyTest(manager1.getCache("sampleCache1"), manager2.getCache("sampleCache1"), ASYNCHRONOUS);
1082 }
1083
1084 /***
1085 * Test various cache configurations for cache1 - explicit setting of:
1086 * properties="replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true "/>
1087 */
1088 @Test
1089 public void testUpdateWithExplicitReplicationSynchronousConfig() throws Exception {
1090 updateViaCopyTest(manager1.getCache("sampleCache3"), manager2.getCache("sampleCache3"), SYNCHRONOUS);
1091 }
1092
1093
1094 /***
1095 * Test put replicated for cache4 - no properties.
1096 * Defaults should be replicateAsynchronously=true, replicatePuts=true, replicateUpdates=true, replicateUpdatesViaCopy=true, replicateRemovals=true
1097 */
1098 @Test
1099 public void testUpdateWithEmptyReplicationPropertiesConfig() throws Exception {
1100 updateViaCopyTest(manager1.getCache("sampleCache4"), manager2.getCache("sampleCache4"), ASYNCHRONOUS);
1101 }
1102
1103 /***
1104 * Tests put and update through copy initiated from cache1 in a cluster
1105 * <p/>
1106 * This test goes into an infinite loop if the chain of notifications is not somehow broken.
1107 */
1108 public void updateViaCopyTest(Ehcache fromCache, Ehcache toCache, boolean asynchronous) throws Exception {
1109
1110 fromCache.removeAll();
1111 toCache.removeAll();
1112
1113
1114 waitForPropagate();
1115
1116 Serializable key = new Date();
1117 Serializable value = new Date();
1118 Element element1 = new Element(key, value);
1119
1120
1121 fromCache.put(element1);
1122 if (asynchronous) {
1123 waitForPropagate();
1124 }
1125
1126
1127 Element element2 = toCache.get(key);
1128 assertEquals(element1, element2);
1129
1130
1131 Element updatedElement1 = new Element(key, new Date());
1132
1133 fromCache.put(updatedElement1);
1134 if (asynchronous) {
1135 waitForPropagate();
1136 }
1137
1138
1139 Element receivedUpdatedElement2 = toCache.get(key);
1140 assertEquals(updatedElement1, receivedUpdatedElement2);
1141
1142 }
1143
1144
1145
1146 /***
1147 * Tests put through invalidation initiated from cache1 in a cluster
1148 * <p/>
1149 * This test goes into an infinite loop if the chain of notifications is not somehow broken.
1150 */
1151 @Test
1152 public void testPutViaInvalidate() throws CacheException, InterruptedException, IOException {
1153
1154 cache1 = manager1.getCache("sampleCache2");
1155 cache1.removeAll();
1156
1157 cache2 = manager2.getCache("sampleCache2");
1158 cache2.removeAll();
1159
1160
1161 waitForPropagate();
1162
1163 String key = "1";
1164 Serializable value = new Date();
1165 Element element1 = new Element(key, value);
1166
1167 Element element3 = new Element("key2", "two");
1168
1169
1170 cache2.put(element1);
1171 assertNotNull(cache2.get(key));
1172 waitForPropagate();
1173
1174
1175 Element element2 = cache1.get(key);
1176 assertEquals(element1, element2);
1177
1178
1179 cache1.put(element3);
1180 waitForPropagate();
1181
1182
1183 assertNull(cache2.get("key2"));
1184
1185
1186 cache1.put(element3);
1187 waitForPropagate();
1188
1189
1190 element2 = cache2.get("key2");
1191 assertNull(element2);
1192
1193 }
1194
1195
1196 /***
1197 * Tests put and update through invalidation initiated from cache1 in a cluster
1198 * <p/>
1199 * This test goes into an infinite loop if the chain of notifications is not somehow broken.
1200 */
1201 @Test
1202 public void testUpdateViaInvalidate() throws CacheException, InterruptedException, IOException {
1203
1204 cache1 = manager1.getCache("sampleCache2");
1205 cache1.removeAll();
1206
1207 cache2 = manager2.getCache("sampleCache2");
1208 cache2.removeAll();
1209
1210
1211 waitForPropagate();
1212
1213 String key = "1";
1214 Serializable value = new Date();
1215 Element element1 = new Element(key, value);
1216
1217
1218 cache2.put(element1);
1219 Element element2 = cache2.get(key);
1220 assertEquals(element1, element2);
1221
1222
1223 cache1.put(element1);
1224 waitForPropagate();
1225
1226
1227 element2 = cache2.get(key);
1228 assertNull(element2);
1229
1230 }
1231
1232
1233 /***
1234 * Tests put and update through invalidation initiated from cache1 in a cluster
1235 * <p/>
1236 * This test goes into an infinite loop if the chain of notifications is not somehow broken.
1237 */
1238 @Test
1239 public void testUpdateViaInvalidateNonSerializableValue() throws CacheException, InterruptedException, IOException {
1240
1241 cache1 = manager1.getCache("sampleCache2");
1242 cache1.removeAll();
1243
1244 cache2 = manager2.getCache("sampleCache2");
1245 cache2.removeAll();
1246
1247
1248 waitForPropagate();
1249
1250 String key = "1";
1251 Serializable value = new Date();
1252
1253 /***
1254 * Non-serializable test class
1255 */
1256 class NonSerializable {
1257
1258 }
1259
1260 NonSerializable value1 = new NonSerializable();
1261 Element element1 = new Element(key, value1);
1262
1263
1264 cache2.put(element1);
1265 Element element2 = cache2.get(key);
1266 assertEquals(element1, element2);
1267
1268
1269 cache1.put(element1);
1270 waitForPropagate();
1271
1272
1273 element2 = cache2.get(key);
1274 assertNull(element2);
1275
1276 }
1277
1278
1279 /***
1280 * What happens when two cache instances replicate to each other and a change is initiated
1281 */
1282 @Test
1283 public void testInfiniteNotificationsLoop() throws InterruptedException {
1284
1285 Serializable key = "1";
1286 Serializable value = new Date();
1287 Element element = new Element(key, value);
1288
1289
1290 cache1.put(element);
1291 waitForPropagate();
1292
1293
1294 Element element2 = cache2.get(key);
1295 assertEquals(element, element2);
1296
1297
1298 cache1.remove(key);
1299 assertNull(cache1.get(key));
1300
1301
1302 waitForPropagate();
1303 element2 = cache2.get(key);
1304 assertNull(element2);
1305
1306
1307 Element element3 = new Element("3", "ddsfds");
1308 cache2.put(element3);
1309 waitForPropagate();
1310 Element element4 = cache2.get("3");
1311 assertEquals(element3, element4);
1312
1313 }
1314
1315
1316 /***
1317 * Shows result of perf problem and fix in flushReplicationQueue
1318 * <p/>
1319 * Behaviour before change:
1320 * <p/>
1321 * INFO: Items written: 10381
1322 * Oct 29, 2007 11:40:04 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1323 * INFO: Items written: 29712
1324 * Oct 29, 2007 11:40:57 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1325 * INFO: Items written: 1
1326 * Oct 29, 2007 11:40:58 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1327 * INFO: Items written: 32354
1328 * Oct 29, 2007 11:42:34 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1329 * INFO: Items written: 322
1330 * Oct 29, 2007 11:42:35 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1331 * INFO: Items written: 41909
1332 * <p/>
1333 * Behaviour after change:
1334 * INFO: Items written: 26356
1335 * Oct 29, 2007 11:44:39 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1336 * INFO: Items written: 33656
1337 * Oct 29, 2007 11:44:40 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1338 * INFO: Items written: 32234
1339 * Oct 29, 2007 11:44:42 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1340 * INFO: Items written: 38677
1341 * Oct 29, 2007 11:44:43 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1342 * INFO: Items written: 43418
1343 * Oct 29, 2007 11:44:44 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1344 * INFO: Items written: 31277
1345 * Oct 29, 2007 11:44:45 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1346 * INFO: Items written: 27769
1347 * Oct 29, 2007 11:44:46 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1348 * INFO: Items written: 29596
1349 * Oct 29, 2007 11:44:47 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1350 * INFO: Items written: 17142
1351 * Oct 29, 2007 11:44:48 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1352 * INFO: Items written: 14775
1353 * Oct 29, 2007 11:44:49 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1354 * INFO: Items written: 4088
1355 * Oct 29, 2007 11:44:51 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1356 * INFO: Items written: 5492
1357 * Oct 29, 2007 11:44:52 AM net.sf.ehcache.distribution.RMICacheReplicatorTest testReplicatePerf
1358 * INFO: Items written: 10188
1359 * <p/>
1360 * Also no pauses noted.
1361 */
1362 @Test
1363 public void testReplicatePerf() throws InterruptedException {
1364
1365 if (manager2 != null) {
1366 manager2.shutdown();
1367 }
1368 if (manager3 != null) {
1369 manager3.shutdown();
1370 }
1371 if (manager4 != null) {
1372 manager4.shutdown();
1373 }
1374 if (manager5 != null) {
1375 manager5.shutdown();
1376 }
1377 if (manager6 != null) {
1378 manager6.shutdown();
1379 }
1380
1381
1382 waitForPropagate();
1383
1384
1385 long start = System.currentTimeMillis();
1386 final String keyBase = Long.toString(start);
1387 int count = 0;
1388
1389 for (int i = 0; i < 100000; i++) {
1390 final String key = keyBase + ':' + Integer.toString((int) (Math.random() * 1000.0));
1391 cache1.put(new Element(key, "My Test"));
1392 cache1.get(key);
1393 cache1.remove(key);
1394 count++;
1395
1396 final long end = System.currentTimeMillis();
1397 if (end - start >= 1000) {
1398 start = end;
1399 LOG.info("Items written: " + count);
1400
1401 assertTrue(count > 1000);
1402 count = 0;
1403 }
1404 }
1405 }
1406
1407
1408 /***
1409 * Need to wait for async
1410 *
1411 * @throws InterruptedException
1412 */
1413 protected void waitForPropagate() throws InterruptedException {
1414 Thread.sleep(1500);
1415 }
1416
1417 /***
1418 * Need to wait for async
1419 *
1420 * @throws InterruptedException
1421 */
1422 protected void waitForSlowPropagate() throws InterruptedException {
1423 Thread.sleep(6000);
1424 }
1425
1426
1427 /***
1428 * Distributed operations create extra scope for deadlock.
1429 * This test checks whether a distributed deadlock scenario exists for synchronous replication
1430 * of each distributed operation all at once.
1431 * It shows that no distributed deadlock exists for asynchronous replication. It is multi thread
1432 * and multi process safe.
1433 * <p/>
1434 * Carefully tailored to exercise:
1435 * <ol>
1436 * <li>overflow to disk. We put in 20 things and the memory size is 10
1437 * <li>each peer is working on the same set of keys thus maximising contention
1438 * <li>we do puts, gets and removes to explore all the execution paths
1439 * </ol>
1440 * If a deadlock occurs, processing will stop until a SocketTimeout exception is thrown and
1441 * the deadlock will be released.
1442 */
1443 @Test
1444 public void testCacheOperationsSynchronousMultiThreaded() throws Exception, InterruptedException {
1445
1446
1447 final List executables = new ArrayList();
1448
1449 executables.add(new ClusterExecutable(manager1, "sampleCache3"));
1450 executables.add(new ClusterExecutable(manager2, "sampleCache3"));
1451 executables.add(new ClusterExecutable(manager3, "sampleCache3"));
1452
1453 runThreads(executables);
1454 }
1455
1456
1457 /***
1458 * Distributed operations create extra scope for deadlock.
1459 * This test checks whether a distributed deadlock scenario exists for asynchronous replication
1460 * of each distributed operation all at once.
1461 * It shows that no distributed deadlock exists for asynchronous replication. It is multi thread
1462 * and multi process safe.
1463 * It uses sampleCache2, which is configured to be asynchronous
1464 * <p/>
1465 * Carefully tailored to exercise:
1466 * <ol>
1467 * <li>overflow to disk. We put in 20 things and the memory size is 10
1468 * <li>each peer is working on the same set of keys thus maximising contention
1469 * <li>we do puts, gets and removes to explore all the execution paths
1470 * </ol>
1471 */
1472 @Test
1473 public void testCacheOperationsAynchronousMultiThreaded() throws Exception, InterruptedException {
1474
1475
1476 final List executables = new ArrayList();
1477
1478 executables.add(new ClusterExecutable(manager1, "sampleCache2"));
1479 executables.add(new ClusterExecutable(manager2, "sampleCache2"));
1480 executables.add(new ClusterExecutable(manager3, "sampleCache2"));
1481
1482 runThreads(executables);
1483 }
1484
1485 /***
1486 * An Exececutable which allows the CacheManager to be set
1487 */
1488 class ClusterExecutable implements Executable {
1489
1490 private CacheManager manager;
1491 private String cacheName;
1492
1493 /***
1494 * Construct with CacheManager
1495 *
1496 * @param manager
1497 */
1498 public ClusterExecutable(CacheManager manager, String cacheName) {
1499 this.manager = manager;
1500 this.cacheName = cacheName;
1501 }
1502
1503 /***
1504 * Execute
1505 *
1506 * @throws Exception
1507 */
1508 public void execute() throws Exception {
1509 Random random = new Random();
1510
1511 for (int i = 0; i < 20; i++) {
1512 Integer key = Integer.valueOf((i));
1513 int operationSelector = random.nextInt(4);
1514 Cache cache = manager.getCache(cacheName);
1515 if (operationSelector == 100) {
1516 cache.get(key);
1517 if (LOG.isDebugEnabled()) {
1518 LOG.debug(cache.getGuid() + ": get " + key);
1519 }
1520 } else if (operationSelector == 100) {
1521 cache.remove(key);
1522 if (LOG.isDebugEnabled()) {
1523 LOG.debug(cache.getGuid() + ": remove " + key);
1524 }
1525 } else if (operationSelector == 2) {
1526 cache.put(new Element(key,
1527 "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
1528 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
1529 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
1530 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
1531 + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"));
1532 if (LOG.isDebugEnabled()) {
1533 LOG.debug(cache.getGuid() + ": put " + key);
1534 }
1535 } else {
1536
1537 if (random.nextInt(3) == 1) {
1538 LOG.debug("cache.removeAll()");
1539 cache.removeAll();
1540 }
1541 }
1542 }
1543
1544 }
1545 }
1546
1547 }