View Javadoc
1   package com.atlassian.activeobjects.internal;
2   
3   import com.atlassian.activeobjects.config.ActiveObjectsConfiguration;
4   import com.atlassian.activeobjects.external.ActiveObjects;
5   import com.atlassian.beehive.ClusterLock;
6   import com.atlassian.beehive.ClusterLockService;
7   import com.atlassian.sal.api.transaction.TransactionCallback;
8   import com.atlassian.sal.api.transaction.TransactionTemplate;
9   import com.atlassian.tenancy.api.Tenant;
10  import com.google.common.base.Supplier;
11  import net.java.ao.RawEntity;
12  import org.slf4j.Logger;
13  import org.slf4j.LoggerFactory;
14  
15  import java.util.Collection;
16  import java.util.Set;
17  import java.util.concurrent.TimeUnit;
18  
19  import static com.google.common.base.Preconditions.checkNotNull;
20  
21  /**
22   * Abstract implementation of {@link com.atlassian.activeobjects.internal.ActiveObjectsFactory} that implements the
23   * basic contract for a single {@link com.atlassian.activeobjects.internal.DataSourceType}.
24   */
25  abstract class AbstractActiveObjectsFactory implements ActiveObjectsFactory {
26      protected final Logger logger = LoggerFactory.getLogger(this.getClass());
27  
28      private static final String LOCK_TIMEOUT_PROPERTY = "ao-plugin.upgrade.task.lock.timeout";
29      private static final String LOCK_PREFIX = "ao-plugin.upgrade.";
30      protected static final int LOCK_TIMEOUT_SECONDS = Integer.getInteger(LOCK_TIMEOUT_PROPERTY, 300000);
31  
32      private final DataSourceType supportedDataSourceType;
33      private final ActiveObjectUpgradeManager aoUpgradeManager;
34      protected final TransactionTemplate transactionTemplate;
35      private final ClusterLockService clusterLockService;
36  
37      AbstractActiveObjectsFactory(DataSourceType dataSourceType, ActiveObjectUpgradeManager aoUpgradeManager,
38                                   TransactionTemplate transactionTemplate, ClusterLockService clusterLockService) {
39          this.supportedDataSourceType = checkNotNull(dataSourceType);
40          this.aoUpgradeManager = checkNotNull(aoUpgradeManager);
41          this.transactionTemplate = checkNotNull(transactionTemplate);
42          this.clusterLockService = checkNotNull(clusterLockService);
43      }
44  
45      @Override
46      public final boolean accept(ActiveObjectsConfiguration configuration) {
47          return supportedDataSourceType.equals(configuration.getDataSourceType());
48      }
49  
50      @Override
51      public final ActiveObjects create(final ActiveObjectsConfiguration configuration, final Tenant tenant) {
52          if (!accept(configuration)) {
53              throw new IllegalStateException(configuration + " is not supported. Did you can #accept(ActiveObjectConfiguration) before calling me?");
54          }
55  
56          final String lockName = LOCK_PREFIX + configuration.getPluginKey().asString();
57          final ClusterLock lock = clusterLockService.getLockForName(lockName);
58          try {
59              if (!lock.tryLock(LOCK_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
60                  throw new ActiveObjectsInitException("unable to acquire cluster lock named '" + lockName + "' after waiting " + LOCK_TIMEOUT_SECONDS + " seconds; note that this timeout may be adjusted via the system property '" + LOCK_TIMEOUT_PROPERTY + "'");
61              }
62          } catch (InterruptedException e) {
63              throw new ActiveObjectsInitException("interrupted while trying to acquire cluster lock named '" + lockName + "'", e);
64          }
65  
66          try {
67              upgrade(configuration, tenant);
68  
69              final ActiveObjects ao = doCreate(configuration, tenant);
70              final Set<Class<? extends RawEntity<?>>> entitiesToMigrate = configuration.getEntities();
71  
72              return transactionTemplate.execute(new TransactionCallback<ActiveObjects>() {
73                  @Override
74                  public ActiveObjects doInTransaction() {
75                      logger.debug("Created active objects instance with configuration {}, now migrating entities {}",
76                              configuration, entitiesToMigrate);
77                      ao.migrate(asArray(entitiesToMigrate));
78                      return ao;
79                  }
80              });
81          } finally {
82              lock.unlock();
83          }
84      }
85  
86      private void upgrade(final ActiveObjectsConfiguration configuration, final Tenant tenant) {
87          aoUpgradeManager.upgrade(configuration.getTableNamePrefix(), configuration.getUpgradeTasks(), new Supplier<ActiveObjects>() {
88              @Override
89              public ActiveObjects get() {
90                  return doCreate(configuration, tenant);
91              }
92          });
93      }
94  
95      @SuppressWarnings("unchecked")
96      private Class<? extends RawEntity<?>>[] asArray(Collection<Class<? extends RawEntity<?>>> classes) {
97          return classes.toArray(new Class[classes.size()]);
98      }
99  
100     /**
101      * This has the same contract as {@link #create(ActiveObjectsConfiguration, com.atlassian.tenancy.api.Tenant)}
102      * except that checking the configuration type has already been taken care of.
103      *
104      * @param configuration the configuration to work with
105      * @return the new {@link com.atlassian.activeobjects.external.ActiveObjects}
106      */
107     protected abstract ActiveObjects doCreate(ActiveObjectsConfiguration configuration, Tenant tenant);
108 }