View Javadoc

1   package com.atlassian.cache.ehcache;
2   
3   import net.sf.ehcache.distribution.CacheReplicator;
4   import net.sf.ehcache.distribution.RMIAsynchronousCacheReplicator;
5   import net.sf.ehcache.event.CacheEventListener;
6   import net.sf.ehcache.event.CacheEventListenerFactory;
7   import net.sf.ehcache.util.PropertyUtil;
8   import org.slf4j.Logger;
9   import org.slf4j.LoggerFactory;
10  
11  import java.util.Properties;
12  
13  
14  /**
15   * A factory for RMI-based {@link CacheReplicator}s, using properties. Config lines look like:
16   * <pre>&lt;cacheEventListenerFactory class="com.atlassian.cache.ehcache.RMICacheReplicatorFactory"
17   * properties="
18   * replicateAsynchronously=true,
19   * replicatePuts=true
20   * replicateUpdates=true
21   * replicateUpdatesViaCopy=true
22   * replicateRemovals=true
23   * "/&gt;</pre>
24   *
25   * This factory is a copy of the eponymous class from Ehcache, with the exception that any
26   * synchronous RMI replicators it creates do not suffer from https://jira.terracotta.org/jira/browse/EHC-683.
27   *
28   * @since 2.0
29   */
30  public class RMICacheReplicatorFactory extends CacheEventListenerFactory {
31  
32      /**
33       * A default for the amount of time the replication thread sleeps after it detects the replicationQueue is empty
34       * before checking again.
35       */
36      protected static final int DEFAULT_ASYNCHRONOUS_REPLICATION_INTERVAL_MILLIS = 1000;
37  
38      /**
39       * A default for the maximum number of operations in an RMI message.
40       */
41      protected static final int DEFAULT_ASYNCHRONOUS_REPLICATION_MAXIMUM_BATCH_SIZE = 1000;
42  
43      private static final Logger LOG = LoggerFactory.getLogger(RMICacheReplicatorFactory.class.getName());
44      private static final String REPLICATE_PUTS = "replicatePuts";
45      private static final String REPLICATE_PUTS_VIA_COPY = "replicatePutsViaCopy";
46      private static final String REPLICATE_UPDATES = "replicateUpdates";
47      private static final String REPLICATE_UPDATES_VIA_COPY = "replicateUpdatesViaCopy";
48      private static final String REPLICATE_REMOVALS = "replicateRemovals";
49      static final String REPLICATE_ASYNCHRONOUSLY = "replicateAsynchronously";
50      private static final String ASYNCHRONOUS_REPLICATION_INTERVAL_MILLIS = "asynchronousReplicationIntervalMillis";
51      private static final String ASYNCHRONOUS_REPLICATION_MAXIMUM_BATCH_SIZE = "asynchronousReplicationMaximumBatchSize";
52      private static final int MINIMUM_REASONABLE_INTERVAL = 10;
53  
54      private static boolean extractBoolean(final String key, final Properties properties) {
55          String booleanString = PropertyUtil.extractAndLogProperty(key, properties);
56          return booleanString == null || PropertyUtil.parseBoolean(booleanString);   // defaults to true
57      }
58  
59      /**
60       * Extracts the value of asynchronousReplicationIntervalMillis. Sets it to 1000ms if
61       * either not set or there is a problem parsing the number.
62       *
63       * @param properties the properties to parse (required)
64       */
65      private static int extractReplicationIntervalMilis(Properties properties) {
66          int asynchronousReplicationIntervalMillis;
67          String asynchronousReplicationIntervalMillisString =
68                  PropertyUtil.extractAndLogProperty(ASYNCHRONOUS_REPLICATION_INTERVAL_MILLIS, properties);
69          if (asynchronousReplicationIntervalMillisString != null) {
70              try {
71                  int asynchronousReplicationIntervalMillisCandidate =
72                          Integer.parseInt(asynchronousReplicationIntervalMillisString);
73                  if (asynchronousReplicationIntervalMillisCandidate < MINIMUM_REASONABLE_INTERVAL) {
74                      LOG.debug("Trying to set the asynchronousReplicationIntervalMillis to an unreasonable number." +
75                              " Using the default instead.");
76                      asynchronousReplicationIntervalMillis = DEFAULT_ASYNCHRONOUS_REPLICATION_INTERVAL_MILLIS;
77                  } else {
78                      asynchronousReplicationIntervalMillis = asynchronousReplicationIntervalMillisCandidate;
79                  }
80              } catch (NumberFormatException e) {
81                  LOG.warn("Number format exception trying to set asynchronousReplicationIntervalMillis. " +
82                          "Using the default instead. String value was: '" + asynchronousReplicationIntervalMillisString + "'");
83                  asynchronousReplicationIntervalMillis = DEFAULT_ASYNCHRONOUS_REPLICATION_INTERVAL_MILLIS;
84              }
85          } else {
86              asynchronousReplicationIntervalMillis = DEFAULT_ASYNCHRONOUS_REPLICATION_INTERVAL_MILLIS;
87          }
88          return asynchronousReplicationIntervalMillis;
89      }
90  
91      /**
92       * Extracts the value of maximumBatchSize. Sets it to 1024 if
93       * either not set or there is a problem parsing the number.
94       *
95       * @param properties the properties to parse (required)
96       */
97      private static int extractMaximumBatchSize(final Properties properties) {
98          String maximumBatchSizeString =
99                  PropertyUtil.extractAndLogProperty(ASYNCHRONOUS_REPLICATION_MAXIMUM_BATCH_SIZE, properties);
100         if (maximumBatchSizeString == null) {
101             return DEFAULT_ASYNCHRONOUS_REPLICATION_MAXIMUM_BATCH_SIZE;
102         } else {
103             try {
104                 return Integer.parseInt(maximumBatchSizeString);
105             } catch (NumberFormatException e) {
106                 LOG.warn("Number format exception trying to set maximumBatchSize. " +
107                         "Using the default instead. String value was: '" + maximumBatchSizeString + "'");
108                 return DEFAULT_ASYNCHRONOUS_REPLICATION_MAXIMUM_BATCH_SIZE;
109             }
110         }
111     }
112 
113     /**
114      * Create a <code>CacheEventListener</code> which is also a CacheReplicator.
115      * <p/>
116      * The defaults if properties are not specified are:
117      * <ul>
118      * <li>replicatePuts=true
119      * <li>replicatePutsViaCopy=true
120      * <li>replicateUpdates=true
121      * <li>replicateUpdatesViaCopy=true
122      * <li>replicateRemovals=true;
123      * <li>replicateAsynchronously=true
124      * <li>asynchronousReplicationIntervalMillis=1000
125      * </ul>
126      *
127      * @param properties implementation specific properties. These are configured as comma
128      *                   separated name value pairs in ehcache.xml e.g.
129      *                   <p/>
130      *                   <code>
131      *                   &lt;cacheEventListenerFactory class="net.sf.ehcache.distribution.RMICacheReplicatorFactory"
132      *                   properties="
133      *                   replicateAsynchronously=true,
134      *                   replicatePuts=true
135      *                   replicateUpdates=true
136      *                   replicateUpdatesViaCopy=true
137      *                   replicateRemovals=true
138      *                   asynchronousReplicationIntervalMillis=1000
139      *                   "/&gt;</code>
140      * @return a constructed CacheEventListener
141      */
142     public CacheEventListener createCacheEventListener(final Properties properties) {
143         final boolean replicatePuts = extractBoolean(REPLICATE_PUTS, properties);
144         final boolean replicatePutsViaCopy = extractBoolean(REPLICATE_PUTS_VIA_COPY, properties);
145         final boolean replicateUpdates = extractBoolean(REPLICATE_UPDATES, properties);
146         final boolean replicateUpdatesViaCopy = extractBoolean(REPLICATE_UPDATES_VIA_COPY, properties);
147         final boolean replicateRemovals = extractBoolean(REPLICATE_REMOVALS, properties);
148         final boolean replicateAsynchronously = extractBoolean(REPLICATE_ASYNCHRONOUSLY, properties);
149         final int replicationIntervalMillis = extractReplicationIntervalMilis(properties);
150         final int maximumBatchSize = extractMaximumBatchSize(properties);
151 
152         if (replicateAsynchronously) {
153             return new RMIAsynchronousCacheReplicator(
154                     replicatePuts,
155                     replicatePutsViaCopy,
156                     replicateUpdates,
157                     replicateUpdatesViaCopy,
158                     replicateRemovals,
159                     replicationIntervalMillis,
160                     maximumBatchSize);
161         }
162         // Return our copy of this class in which https://jira.terracotta.org/jira/browse/EHC-683 is fixed
163         return new RMISynchronousCacheReplicator(
164                 replicatePuts,
165                 replicatePutsViaCopy,
166                 replicateUpdates,
167                 replicateUpdatesViaCopy,
168                 replicateRemovals);
169     }
170 }