Replicated Caching using JMS

Introduction

As of version 1.6, JMS can be used as the underlying mechanism for the replicated operations in Ehcache with the jmsreplication module.

JMS (Java Message Service) is a mechanism for interacting with message queues. Message queues themselves are a very mature piece of infrastructure used in many enterprise software contexts. Because they are a required part of the Java EE standard, the large enterprise vendors all provide their own implementations. There are also several open source choices including Open MQ and Active MQ. Ehcache is integration tested against both of these.

The Ehcache jmsreplication module lets organisations with a message queue investment leverage it for caching.

It provides:

  • replication between cache nodes using a replication topic, in accordance with ehcache's standard replication mechanism
  • pushing of data directly to cache nodes from external topic publishers, in any language. This is done by sending the data to the replication topic, where it automatically picked up by the cache subscribers.
  • a JMSCacheLoader, which sends cache load requests to a queue. Either an Ehcache cluster node, or an external queue receiver can respond.

Ehcache Replication and External Publishers

Ehcache replicates using JMS as follows:

  • Each cache node subscribes to a predefined topic, configured as the <topicBindingName> in ehcache.xml.
  • Each replicated cache publishes cache Elements to that topic. Replication is configured per cache.

To set up replicated caching based on JMS you need to configure a JMSCacheManagerPeerProviderFactory which is done globally for a CacheManager.

For each cache that wishing to replicate, you add a JGroupsCacheReplicatorFactory element to the cache element.

Ehcache Image

Configuration

Message Queue Configuration

Each cluster needs to use a fixed topic name for replication. Set up a topic using the tools in your message queue. Out of the box, both ActiveMQ and Open MQ support auto creation of destinations, so this step may be optional.

Ehcache Configuration

Configuration is done in the ehcache.xml.

There are two things to configure:

  • The JMSCacheManagerPeerProviderFactory which is done once per CacheManager and therefore once per ehcache.xml file.
  • The JMSCacheReplicatorFactory which is added to each cache's configuration if you want that cache replicated.

The main configuration happens in the JGroupsCacheManagerPeerProviderFactory connect sub-property. A connect property is passed directly to the JGroups channel and therefore all the protocol stacks and options available in JGroups can be set.

Configuring the JMSCacheManagerPeerProviderFactory

Following is the configuration instructions as it appears in the sample ehcache.xml shipped with ehcache:

{Configuring JMS replication}.
===========================


The JMS PeerProviderFactory uses JNDI to maintain message queue independence.
Refer to the manual for full configuration examples using ActiveMQ and Open Message Queue.

Valid properties are:
* initialContextFactoryName (mandatory) - the name of the factory used to create 
  the message queue initial context.
* providerURL (mandatory) - the JNDI configuration information for the service 
  provider to use.
* topicConnectionFactoryBindingName (mandatory) - the JNDI binding name for the
  TopicConnectionFactory
* topicBindingName (mandatory) - the JNDI binding name for the topic name
* securityPrincipalName - the JNDI java.naming.security.principal
* securityCredentials - the JNDI java.naming.security.credentials
* urlPkgPrefixes - the JNDI java.naming.factory.url.pkgs
* userName - the user name to use when creating the TopicConnection to the Message 
  Queue
* password - the password to use when creating the TopicConnection to the Message
  Queue
* acknowledgementMode - the JMS Acknowledgement mode for both publisher and
  subscriber.
    The available choices are
      AUTO_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE and SESSION_TRANSACTED.
    The default is AUTO_ACKNOWLEDGE.
* listenToTopic - true or false. If false, this cache will send to the JMS topic
  but will not listen for updates.
* Default is true.
Example Configurations

Usage is best illustrated with concrete examples for Active MQ and Open MQ.

Configuring the JMSCacheManagerPeerProviderFactory for Active MQ

This configuration works with Active MQ out of the box.

<cacheManagerPeerProviderFactory
       class="net.sf.ehcache.distribution.jms.JMSCacheManagerPeerProviderFactory"
       properties="initialContextFactoryName=ExampleActiveMQInitialContextFactory,
           providerURL=tcp://localhost:61616,
           topicConnectionFactoryBindingName=topicConnectionFactory,
           topicBindingName=ehcache"
       propertySeparator=","
       />

You need to provide your own ActiveMQInitialContextFactory for the initialContextFactoryName. An example which should work for most purposes is:

public class ExampleActiveMQInitialContextFactory 
  extends ActiveMQInitialContextFactory {
    /**
     * {@inheritDoc}
     */
    @Override
    @SuppressWarnings("unchecked")
    public Context getInitialContext(Hashtable environment)
      throws NamingException 
    {
      Map<String, Object> data = new ConcurrentHashMap<String, Object>();
      String factoryBindingName = 
        (String)environment.get(JMSCacheManagerPeerProviderFactory
          .TOPIC_CONNECTION_FACTORY_BINDING_NAME);
      try {
        data.put(factoryBindingName, createConnectionFactory(environment));
      } catch (URISyntaxException e) {
        throw new NamingException("Error initialisating ConnectionFactory"
                                  + " with message "
                      + e.getMessage());
      }
      String topicBindingName = 
        (String)environment.get(JMSCacheManagerPeerProviderFactory
          .TOPIC_BINDING_NAME);
      data.put(topicBindingName, createTopic(topicBindingName));
      return createContext(environment, data);
    }
}  
Configuring the JMSCacheManagerPeerProviderFactory for {Open MQ}

This configuration works with an out of the box Open MQ.

<cacheManagerPeerProviderFactory
  class="net.sf.ehcache.distribution.jms.JMSCacheManagerPeerProviderFactory"
  properties="initialContextFactoryName=com.sun.jndi.fscontext.RefFSContextFactory,
           providerURL=file:///tmp,
           topicConnectionFactoryBindingName=MyConnectionFactory,
           topicBindingName=ehcache"
       propertySeparator=","
       />

To set up the Open MQ file system initial context to work with this example use the following imqobjmgr commands to create the requires objects in the context.

imqobjmgr add -t tf -l 'MyConnectionFactory' -j java.naming.provider.url \
=file:///tmp -j java.naming.factory.initial=com.sun.jndi.fscontext.RefFSContextFactory -f
imqobjmgr add -t t -l 'ehcache' -o 'imqDestinationName=EhcacheTopicDest'
-j java.naming.provider.url\
=file:///tmp -j java.naming.factory.initial=com.sun.jndi.fscontext.RefFSContextFactory -f
Configuring the JMSCacheReplicatorFactory

This is the same as configuring any of the cache replicators. The class should be net.sf.ehcache.distribution.jms.JMSCacheReplicatorFactory.

See the following example:

<cache name="sampleCacheAsync"
 maxEntriesLocalHeap="1000"
 eternal="false"
 timeToIdleSeconds="1000"
 timeToLiveSeconds="1000"
 overflowToDisk="false">
  <cacheEventListenerFactory
     class="net.sf.ehcache.distribution.jms.JMSCacheReplicatorFactory"
     properties="replicateAsynchronously=true,
                  replicatePuts=true,
                  replicateUpdates=true,
                  replicateUpdatesViaCopy=true,
                  replicateRemovals=true,
                  asynchronousReplicationIntervalMillis=1000"
      propertySeparator=","/>
</cache>

External JMS Publishers

Anything that can publish to a message queue can also add cache entries to ehcache. These are called non-cache publishers.

Required Message Properties

Publishers need to set up to four String properties on each message: cacheName, action, mimeType and key.

cacheName Property

A JMS message property which contains the name of the cache to operate on. If no cacheName is set the message will be <ignored>. A warning log message will indicate that the message has been ignored.

action Property

A JMS message property which contains the action to perform on the cache.

Available actions are strings labeled PUT, REMOVE and REMOVE_ALL.

If not set no action is performed. A warning log message will indicate that the message has been ignored.

mimeType Property

A JMS message property which contains the mimeType of the message. Applies to the PUT action. If not set the message is interpreted as follows:

ObjectMessage - if it is an net.sf.ehcache.Element, then it is treated as such and stored in the cache. For other objects, a new Element is created using the object in the ObjectMessage as the value and the key property as a key. Because objects are already typed, the mimeType is ignored.

TextMessage - Stored in the cache as value of MimeTypeByteArray. The mimeType should be specified. If not specified it is stored as type text/plain.

BytesMessage - Stored in the cache as value of MimeTypeByteArray. The mimeType should be specified. If not specified it is stored as type application/octet-stream.

Other message types are not supported.

To send XML use a TextMessage or BytesMessage and set the mimeType to application/xml.It will be stored in the cache as a value of MimeTypeByteArray.

The REMOVE and REMOVE_ALL actions do not require a mimeType property.

key Property

The key in the cache on which to operate on. The key is of type String.

The REMOVE_ALL action does not require a key property.

If an ObjectMessage of type net.sf.ehcache.Element is sent, the key is contained in the element. Any key set as a property is ignored.

If the key is required but not provided, a warning log message will indicate that the message has been ignored.

Code Samples

These samples use Open MQ as the message queue and use it with out of the box defaults. They are heavily based on Ehcache's own JMS integration tests. See the test source for more details.

Messages should be sent to the topic that Ehcache is listening on. In these samples it is EhcacheTopicDest.

All samples get a Topic Connection using the following method:

private TopicConnection getMQConnection() throws JMSException {
  com.sun.messaging.ConnectionFactory factory = 
    new com.sun.messaging.ConnectionFactory();
  factory.setProperty(ConnectionConfiguration.imqAddressList, "localhost:7676");
  factory.setProperty(ConnectionConfiguration.imqReconnectEnabled, "true");
  TopicConnection myConnection = factory.createTopicConnection();
  return myConnection;
}

PUT a Java Object into an Ehcache Cluster

String payload = "this is an object";
TopicConnection connection = getMQConnection();
connection.start();
TopicSession publisherSession =
  connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

ObjectMessage message = publisherSession.createObjectMessage(payload);
message.setStringProperty(ACTION_PROPERTY, "PUT");
message.setStringProperty(CACHE_NAME_PROPERTY, "sampleCacheAsync");

//don't set. Should work.
//message.setStringProperty(MIME_TYPE_PROPERTY, null);
//should work. Key should be ignored when sending an element.
message.setStringProperty(KEY_PROPERTY, "1234");

Topic topic = publisherSession.createTopic("EhcacheTopicDest");
TopicPublisher publisher = publisherSession.createPublisher(topic);
publisher.send(message);

connection.stop();

Ehcache will create an Element in cache "sampleCacheAsync" with key "1234" and a Java class String value of "this is an object".

PUT XML into an Ehcache Cluster

TopicConnection connection = getMQConnection();
connection.start();
TopicSession publisherSession = connection.createTopicSession(false,
  Session.AUTO_ACKNOWLEDGE);

String value = "<?xml version=\"1.0\"?>\n" +
   "<oldjoke>\n" +
   "<burns>Say <quote>goodnight</quote>,\n" +
   "Gracie.</burns>\n" +
   "<allen><quote>Goodnight, \n" +
   "Gracie.</quote></allen>\n" +
   "<applause/>\n" +
   "</oldjoke>";

TextMessage message = publisherSession.createTextMessage(value);
message.setStringProperty(ACTION_PROPERTY, "PUT");
message.setStringProperty(CACHE_NAME_PROPERTY, "sampleCacheAsync");
message.setStringProperty(MIME_TYPE_PROPERTY, "application/xml");
message.setStringProperty(KEY_PROPERTY, "1234");

Topic topic = publisherSession.createTopic("EhcacheTopicDest");
TopicPublisher publisher = publisherSession.createPublisher(topic);
publisher.send(message);

connection.stop();

Ehcache will create an Element in cache "sampleCacheAsync" with key "1234" and a value of type MimeTypeByteArray.

On a get from the cache the MimeTypeByteArray will be returned. It is an Ehcache value object from which a mimeType and byte[] can be retrieved. The mimeType will be "application/xml". The byte[] will contain the XML String encoded in bytes, using the platform's default charset.

PUT arbitrary bytes into an Ehcache Cluster

byte[] bytes = new byte[]{0x34, (byte) 0xe3, (byte) 0x88};
TopicConnection connection = getMQConnection();
connection.start();

TopicSession publisherSession = connection.createTopicSession(false,
  Session.AUTO_ACKNOWLEDGE);

BytesMessage message = publisherSession.createBytesMessage();
message.writeBytes(bytes);
message.setStringProperty(ACTION_PROPERTY, "PUT");
message.setStringProperty(CACHE_NAME_PROPERTY, "sampleCacheAsync");
message.setStringProperty(MIME_TYPE_PROPERTY, "application/octet-stream");
message.setStringProperty(KEY_PROPERTY, "1234");

Topic topic = publisherSession.createTopic("EhcacheTopicDest");
TopicPublisher publisher = publisherSession.createPublisher(topic);
publisher.send(message);

Ehcache will create an Element in cache "sampleCacheAsync" with key "1234" in and a value of type MimeTypeByteArray.

On a get from the cache the MimeTypeByteArray will be returned. It is an Ehcache value object from which a mimeType and byte[] can be retrieved. The mimeType will be "application/octet-stream". The byte[] will contain the original bytes.

REMOVE

TopicConnection connection = getMQConnection();
connection.start();

TopicSession publisherSession = connection.createTopicSession(false, 
  Session.AUTO_ACKNOWLEDGE);

ObjectMessage message = publisherSession.createObjectMessage();
message.setStringProperty(ACTION_PROPERTY, "REMOVE");
message.setStringProperty(CACHE_NAME_PROPERTY, "sampleCacheAsync");
message.setStringProperty(KEY_PROPERTY, "1234");

Topic topic = publisherSession.createTopic("EhcacheTopicDest");
TopicPublisher publisher = publisherSession.createPublisher(topic);
publisher.send(message);

Ehcache will remove the Element with key "1234" from cache "sampleCacheAsync" from the cluster.

REMOVE_ALL

TopicConnection connection = getMQConnection();
connection.start();

TopicSession publisherSession = connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);

ObjectMessage message = publisherSession.createObjectMessage();
message.setStringProperty(ACTION_PROPERTY, "REMOVE_ALL");
message.setStringProperty(CACHE_NAME_PROPERTY, "sampleCacheAsync");

Topic topic = publisherSession.createTopic("EhcacheTopicDest");
TopicPublisher publisher = publisherSession.createPublisher(topic);
publisher.send(message);

connection.stop();

Ehcache will remove all Elements from cache "sampleCacheAsync" in the cluster.

Using the JMSCacheLoader

The JMSCacheLoader is a CacheLoader which loads objects into the cache by sending requests to a JMS Queue.

The loader places an ObjectMessage of type JMSEventMessage on the getQueue with an Action of type GET.

It is configured with the following String properties, loaderArgument.

The defaultLoaderArgument, or the loaderArgument if specified on the load request. To work with the JMSCacheManagerPeerProvider this should be the name of the cache to load from. For custom responders, it can be anything which has meaning to the responder.

A queue responder will respond to the request. You can either create your own or use the one built-into the JMSCacheManagerPeerProviderFactory, which attempts to load the queue from its cache.

The JMSCacheLoader uses JNDI to maintain message queue independence. Refer to the manual for full configuration examples using ActiveMQ and Open Message Queue.

It is configured as per the following example:

<cacheLoaderFactory class="net.sf.ehcache.distribution.jms.JMSCacheLoaderFactory"
  properties="initialContextFactoryName=com.sun.jndi.fscontext.RefFSContextFactory,
  providerURL=file:///tmp,
  replicationTopicConnectionFactoryBindingName=MyConnectionFactory,
  replicationTopicBindingName=ehcache,
  getQueueConnectionFactoryBindingName=queueConnectionFactory,
  getQueueBindingName=ehcacheGetQueue,
  timeoutMillis=20000
  defaultLoaderArgument=/>

Valid properties are:

  • initialContextFactoryName (mandatory) - the name of the factory used to create the message queue initial context.
  • providerURL (mandatory) - the JNDI configuration information for the service provider to use.
  • getQueueConnectionFactoryBindingName (mandatory) - the JNDI binding name for the QueueConnectionFactory
  • getQueueBindingName (mandatory) - the JNDI binding name for the queue name used to do make requests.
  • defaultLoaderArgument - (optional) - an application specific argument. If not supplied as a cache.load() parameter this default value will be used. The argument is passed in the JMS request as a StringProperty called loaderArgument.
  • timeoutMillis - time in milliseconds to wait for a reply.
  • securityPrincipalName - the JNDI java.naming.security.principal
  • securityCredentials - the JNDI java.naming.security.credentials
  • urlPkgPrefixes - the JNDI java.naming.factory.url.pkgs
  • userName - the user name to use when creating the TopicConnection to the Message Queue
  • password - the password to use when creating the TopicConnection to the Message Queue
  • acknowledgementMode - the JMS Acknowledgement mode for both publisher and subscriber. The available choices are AUTO_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE and SESSION_TRANSACTED. The default is AUTO_ACKNOWLEDGE.

Example Configuration Using Active MQ

<cache name="sampleCacheNorep"
  maxEntriesLocalHeap="1000"
  eternal="false"
  timeToIdleSeconds="1000"
  timeToLiveSeconds="1000"
  overflowToDisk="false">
  <cacheEventListenerFactory
   class="net.sf.ehcache.distribution.jms.JMSCacheReplicatorFactory"
   properties="replicateAsynchronously=false, replicatePuts=false,
   replicateUpdates=false, replicateUpdatesViaCopy=false,
   replicateRemovals=false, loaderArgument=sampleCacheNorep"
   propertySeparator=","/>
<cacheLoaderFactory 
  class="net.sf.ehcache.distribution.jms.JMSCacheLoaderFactory"
  properties="initialContextFactoryName=net.sf.ehcache.distribution.jms.
       TestActiveMQInitialContextFactory,
       providerURL=tcp://localhost:61616,
       replicationTopicConnectionFactoryBindingName=topicConnectionFactory,
       getQueueConnectionFactoryBindingName=queueConnectionFactory,
       replicationTopicBindingName=ehcache,
       getQueueBindingName=ehcacheGetQueue,
       timeoutMillis=10000"/>
</cache>

Example Configuration Using Open MQ

<cache name="sampleCacheNorep"
  maxEntriesLocalHeap="1000"
  eternal="false"
  timeToIdleSeconds="100000"
  timeToLiveSeconds="100000"
  overflowToDisk="false">
  <cacheEventListenerFactory
    class="net.sf.ehcache.distribution.jms.JMSCacheReplicatorFactory"
    properties="replicateAsynchronously=false, replicatePuts=false,
                replicateUpdates=false, replicateUpdatesViaCopy=false,
                replicateRemovals=false"
                propertySeparator=","/>
  <cacheLoaderFactory 
    class="net.sf.ehcache.distribution.jms.JMSCacheLoaderFactory"
    properties="initialContextFactoryName=com.sun.jndi.fscontext.RefFSContextFactory,
                providerURL=file:///tmp,
                replicationTopicConnectionFactoryBindingName=MyConnectionFactory,
                replicationTopicBindingName=ehcache,
                getQueueConnectionFactoryBindingName=queueConnectionFactory,
                getQueueBindingName=ehcacheGetQueue,
                timeoutMillis=10000,
                userName=test,
                password=test"/>
</cache>

Configuring Clients for Message Queue Reliability

Ehcache replication and cache loading is designed to gracefully degrade if the message queue infrastructure stops. Replicates and loads will fail. But when the message queue comes back, these operations will start up again.

For this to work, the ConnectionFactory used with the specific message queue needs to be configured correctly. For example, with Open MQ, reconnection is configured as follows:

  • imqReconnect='true' - without this reconnect will not happen
  • imqPingInterval='5' - Consumers will not reconnect until they notice the connection is down. The ping interval
  • does this. The default is 30. Set it lower if you want the Ehcache cluster to reform more quickly.
  • Finally, unlimited retry attempts are recommended. This is also the default.

For greater reliability consider using a message queue cluster. Most message queues support clustering. The cluster configuration is once again placed in the ConnectionFactory configuration.

Tested Message Queues

Open MQ

This open source message queue is tested in integration tests. It works perfectly.

Active MQ

This open source message queue is tested in integration tests. It works perfectly other than having a problem with temporary reply queues which prevents the use of JMSCacheLoader. JMSCacheLoader is not used during replication.

Oracle AQ

Versions up to an including 0.4 do not work, due to Oracle not supporting the unified API (send) for topics.

JBoss Queue

Works as reported by a user.

Known Issues

Active MQ Temporary Destinatons

ActiveMQ seems to have a bug in at least ActiveMQ 5.1 where it does not cleanup temporary queues, even though they have been deleted. That bug appears to be long standing but was though to have been fixed. See http://issues.apache.org/activemq/browse/AMQ-1255.

The JMSCacheLoader uses temporary reply queues when loading. The Active MQ issue is readily reproduced in Ehcache integration testing. Accordingly, use of the JMSCacheLoader with ActiveMQ is not recommended. Open MQ tests fine.

Active MQ works fine for replication.

WebSphere 5 and 6

Websphere Application Server prevents MessageListeners, which are not MDBs, from being created in the container.

While this is a general Java EE limitation, most other app servers either are permissive or can be configured to be permissive. WebSphere 4 worked, but 5 and 6 enforce the restriction.

Accordingly, Ehcache together with JMS cannot be used with WebSphere 5 and 6.