View Javadoc

1   package com.atlassian.cache.ehcache;
2   
3   import java.util.Properties;
4   
5   import org.slf4j.Logger;
6   import org.slf4j.LoggerFactory;
7   
8   import net.sf.ehcache.distribution.CacheReplicator;
9   import net.sf.ehcache.distribution.RMIAsynchronousCacheReplicator;
10  import net.sf.ehcache.event.CacheEventListener;
11  import net.sf.ehcache.event.CacheEventListenerFactory;
12  import net.sf.ehcache.util.PropertyUtil;
13  
14  
15  /**
16   * A factory for RMI-based {@link CacheReplicator}s, using properties. Config lines look like:
17   * <pre>&lt;cacheEventListenerFactory class="com.atlassian.cache.ehcache.RMICacheReplicatorFactory"
18   * properties="
19   * replicateAsynchronously=true,
20   * replicatePuts=true
21   * replicateUpdates=true
22   * replicateUpdatesViaCopy=true
23   * replicateRemovals=true
24   * "/&gt;</pre>
25   *
26   * This factory is a copy of the eponymous class from Ehcache, with the exception that any
27   * synchronous RMI replicators it creates do not suffer from https://jira.terracotta.org/jira/browse/EHC-683.
28   *
29   * @since 2.0
30   */
31  public class RMICacheReplicatorFactory extends CacheEventListenerFactory {
32  
33      /**
34       * A default for the amount of time the replication thread sleeps after it detects the replicationQueue is empty
35       * before checking again.
36       */
37      protected static final int DEFAULT_ASYNCHRONOUS_REPLICATION_INTERVAL_MILLIS = 1000;
38  
39      /**
40       * A default for the maximum number of operations in an RMI message.
41       */
42      protected static final int DEFAULT_ASYNCHRONOUS_REPLICATION_MAXIMUM_BATCH_SIZE = 1000;
43  
44      private static final Logger LOG = LoggerFactory.getLogger(RMICacheReplicatorFactory.class.getName());
45      private static final String REPLICATE_PUTS = "replicatePuts";
46      private static final String REPLICATE_PUTS_VIA_COPY = "replicatePutsViaCopy";
47      private static final String REPLICATE_UPDATES = "replicateUpdates";
48      private static final String REPLICATE_UPDATES_VIA_COPY = "replicateUpdatesViaCopy";
49      private static final String REPLICATE_REMOVALS = "replicateRemovals";
50      static final String REPLICATE_ASYNCHRONOUSLY = "replicateAsynchronously";
51      private static final String ASYNCHRONOUS_REPLICATION_INTERVAL_MILLIS = "asynchronousReplicationIntervalMillis";
52      private static final String ASYNCHRONOUS_REPLICATION_MAXIMUM_BATCH_SIZE = "asynchronousReplicationMaximumBatchSize";
53      private static final int MINIMUM_REASONABLE_INTERVAL = 10;
54  
55      private static boolean extractBoolean(final String key, final Properties properties)
56      {
57          String booleanString = PropertyUtil.extractAndLogProperty(key, properties);
58          return booleanString == null || PropertyUtil.parseBoolean(booleanString);   // defaults to true
59      }
60  
61      /**
62       * Extracts the value of asynchronousReplicationIntervalMillis. Sets it to 1000ms if
63       * either not set or there is a problem parsing the number.
64       *
65       * @param properties the properties to parse (required)
66       */
67      private static int extractReplicationIntervalMilis(Properties properties)
68      {
69          int asynchronousReplicationIntervalMillis;
70          String asynchronousReplicationIntervalMillisString =
71                  PropertyUtil.extractAndLogProperty(ASYNCHRONOUS_REPLICATION_INTERVAL_MILLIS, properties);
72          if (asynchronousReplicationIntervalMillisString != null)
73          {
74              try
75              {
76                  int asynchronousReplicationIntervalMillisCandidate =
77                          Integer.parseInt(asynchronousReplicationIntervalMillisString);
78                  if (asynchronousReplicationIntervalMillisCandidate < MINIMUM_REASONABLE_INTERVAL)
79                  {
80                      LOG.debug("Trying to set the asynchronousReplicationIntervalMillis to an unreasonable number." +
81                              " Using the default instead.");
82                      asynchronousReplicationIntervalMillis = DEFAULT_ASYNCHRONOUS_REPLICATION_INTERVAL_MILLIS;
83                  } else {
84                      asynchronousReplicationIntervalMillis = asynchronousReplicationIntervalMillisCandidate;
85                  }
86              }
87              catch (NumberFormatException e)
88              {
89                  LOG.warn("Number format exception trying to set asynchronousReplicationIntervalMillis. " +
90                          "Using the default instead. String value was: '" + asynchronousReplicationIntervalMillisString + "'");
91                  asynchronousReplicationIntervalMillis = DEFAULT_ASYNCHRONOUS_REPLICATION_INTERVAL_MILLIS;
92              }
93          }
94          else
95          {
96              asynchronousReplicationIntervalMillis = DEFAULT_ASYNCHRONOUS_REPLICATION_INTERVAL_MILLIS;
97          }
98          return asynchronousReplicationIntervalMillis;
99      }
100 
101     /**
102      * Extracts the value of maximumBatchSize. Sets it to 1024 if
103      * either not set or there is a problem parsing the number.
104      *
105      * @param properties the properties to parse (required)
106      */
107     private static int extractMaximumBatchSize(final Properties properties)
108     {
109         String maximumBatchSizeString =
110                 PropertyUtil.extractAndLogProperty(ASYNCHRONOUS_REPLICATION_MAXIMUM_BATCH_SIZE, properties);
111         if (maximumBatchSizeString == null)
112         {
113             return DEFAULT_ASYNCHRONOUS_REPLICATION_MAXIMUM_BATCH_SIZE;
114         }
115         else
116         {
117             try
118             {
119                 return Integer.parseInt(maximumBatchSizeString);
120             }
121             catch (NumberFormatException e)
122             {
123                 LOG.warn("Number format exception trying to set maximumBatchSize. " +
124                         "Using the default instead. String value was: '" + maximumBatchSizeString + "'");
125                 return DEFAULT_ASYNCHRONOUS_REPLICATION_MAXIMUM_BATCH_SIZE;
126             }
127         }
128     }
129 
130     /**
131      * Create a <code>CacheEventListener</code> which is also a CacheReplicator.
132      * <p/>
133      * The defaults if properties are not specified are:
134      * <ul>
135      * <li>replicatePuts=true
136      * <li>replicatePutsViaCopy=true
137      * <li>replicateUpdates=true
138      * <li>replicateUpdatesViaCopy=true
139      * <li>replicateRemovals=true;
140      * <li>replicateAsynchronously=true
141      * <li>asynchronousReplicationIntervalMillis=1000
142      * </ul>
143      *
144      * @param properties implementation specific properties. These are configured as comma
145      *                   separated name value pairs in ehcache.xml e.g.
146      *                   <p/>
147      *                   <code>
148      *                   &lt;cacheEventListenerFactory class="net.sf.ehcache.distribution.RMICacheReplicatorFactory"
149      *                   properties="
150      *                   replicateAsynchronously=true,
151      *                   replicatePuts=true
152      *                   replicateUpdates=true
153      *                   replicateUpdatesViaCopy=true
154      *                   replicateRemovals=true
155      *                   asynchronousReplicationIntervalMillis=1000
156      *                   "/&gt;</code>
157      * @return a constructed CacheEventListener
158      */
159     public CacheEventListener createCacheEventListener(final Properties properties)
160     {
161         final boolean replicatePuts = extractBoolean(REPLICATE_PUTS, properties);
162         final boolean replicatePutsViaCopy = extractBoolean(REPLICATE_PUTS_VIA_COPY, properties);
163         final boolean replicateUpdates = extractBoolean(REPLICATE_UPDATES, properties);
164         final boolean replicateUpdatesViaCopy = extractBoolean(REPLICATE_UPDATES_VIA_COPY, properties);
165         final boolean replicateRemovals = extractBoolean(REPLICATE_REMOVALS, properties);
166         final boolean replicateAsynchronously = extractBoolean(REPLICATE_ASYNCHRONOUSLY, properties);
167         final int replicationIntervalMillis = extractReplicationIntervalMilis(properties);
168         final int maximumBatchSize = extractMaximumBatchSize(properties);
169 
170         if (replicateAsynchronously)
171         {
172             return new RMIAsynchronousCacheReplicator(
173                     replicatePuts,
174                     replicatePutsViaCopy,
175                     replicateUpdates,
176                     replicateUpdatesViaCopy,
177                     replicateRemovals,
178                     replicationIntervalMillis,
179                     maximumBatchSize);
180         }
181         // Return our copy of this class in which https://jira.terracotta.org/jira/browse/EHC-683 is fixed
182         return new RMISynchronousCacheReplicator(
183                 replicatePuts,
184                 replicatePutsViaCopy,
185                 replicateUpdates,
186                 replicateUpdatesViaCopy,
187                 replicateRemovals);
188     }
189 }