View Javadoc
1   package com.atlassian.activeobjects.osgi;
2   
3   import com.atlassian.activeobjects.config.ActiveObjectsConfiguration;
4   import com.atlassian.activeobjects.external.ActiveObjects;
5   import com.atlassian.activeobjects.external.ActiveObjectsModuleMetaData;
6   import com.atlassian.activeobjects.external.NoDataSourceException;
7   import com.atlassian.activeobjects.internal.ActiveObjectsFactory;
8   import com.atlassian.activeobjects.internal.ActiveObjectsInitException;
9   import com.atlassian.activeobjects.spi.DatabaseType;
10  import com.atlassian.sal.api.transaction.TransactionCallback;
11  import com.atlassian.tenancy.api.Tenant;
12  import com.atlassian.tenancy.api.TenantContext;
13  import com.atlassian.util.concurrent.Promise;
14  import com.atlassian.util.concurrent.Promises;
15  import com.google.common.annotations.VisibleForTesting;
16  import com.google.common.base.Function;
17  import com.google.common.cache.CacheBuilder;
18  import com.google.common.cache.CacheLoader;
19  import com.google.common.cache.LoadingCache;
20  import com.google.common.util.concurrent.SettableFuture;
21  import net.java.ao.DBParam;
22  import net.java.ao.EntityStreamCallback;
23  import net.java.ao.Query;
24  import net.java.ao.RawEntity;
25  import org.osgi.framework.Bundle;
26  import org.slf4j.Logger;
27  import org.slf4j.LoggerFactory;
28  
29  import javax.annotation.Nonnull;
30  import javax.annotation.Nullable;
31  import java.util.Map;
32  import java.util.concurrent.Callable;
33  import java.util.concurrent.ExecutionException;
34  import java.util.concurrent.ExecutorService;
35  import java.util.concurrent.TimeUnit;
36  import java.util.concurrent.TimeoutException;
37  
38  import static com.google.common.base.Preconditions.checkNotNull;
39  
40  /**
41   * Delegate for {@link com.atlassian.activeobjects.external.ActiveObjects}.
42   *
43   * Baby bear is not to eager, not to lazy.
44   *
45   * Any attempt to invoke this when no tenant is present will result in a {@link com.atlassian.activeobjects.external.NoDataSourceException}.
46   *
47   * Delegate calls will block when DDL / updgrade tasks are running.
48   *
49   * DDL / upgrade tasks will be initiated by the first call to the delegate or by a call to {@link #startActiveObjects}
50   */
51  class TenantAwareActiveObjects implements ActiveObjects {
52      private static final Logger logger = LoggerFactory.getLogger(TenantAwareActiveObjects.class);
53  
54      private final Bundle bundle;
55      private final TenantContext tenantContext;
56  
57      @VisibleForTesting
58      final SettableFuture<ActiveObjectsConfiguration> aoConfigFuture = SettableFuture.create();
59  
60      @VisibleForTesting
61      final LoadingCache<Tenant, Promise<ActiveObjects>> aoPromisesByTenant;
62  
63      TenantAwareActiveObjects(
64              @Nonnull final Bundle bundle,
65              @Nonnull final ActiveObjectsFactory factory,
66              @Nonnull final TenantContext tenantContext,
67              @Nonnull final Function<Tenant, ExecutorService> initExecutorFunction) {
68          this.bundle = checkNotNull(bundle);
69          this.tenantContext = checkNotNull(tenantContext);
70          checkNotNull(factory);
71          checkNotNull(initExecutorFunction);
72  
73          // loading cache for delegate promises by tenant
74          aoPromisesByTenant = CacheBuilder.newBuilder().build(new CacheLoader<Tenant, Promise<ActiveObjects>>() {
75              @Override
76              public Promise<ActiveObjects> load(@Nonnull final Tenant tenant) {
77                  logger.debug("bundle [{}] loading new AO promise for {}", bundle.getSymbolicName(), tenant);
78  
79                  return Promises.forFuture(aoConfigFuture).flatMap(new Function<ActiveObjectsConfiguration, Promise<ActiveObjects>>() {
80                      @Override
81                      public Promise<ActiveObjects> apply(@Nullable final ActiveObjectsConfiguration aoConfig) {
82                          logger.debug("bundle [{}] got ActiveObjectsConfiguration", bundle.getSymbolicName(), tenant);
83  
84                          final SettableFuture<ActiveObjects> aoFuture = SettableFuture.create();
85                          //noinspection ConstantConditions
86                          initExecutorFunction.apply(tenant).submit(new Callable<Void>() {
87                              @Override
88                              public Void call() {
89                                  try {
90                                      logger.debug("bundle [{}] creating ActiveObjects", bundle.getSymbolicName());
91                                      final ActiveObjects ao = factory.create(aoConfig, tenant);
92                                      logger.debug("bundle [{}] created ActiveObjects", bundle.getSymbolicName());
93                                      aoFuture.set(ao);
94                                  } catch (Throwable t) {
95                                      final ActiveObjectsInitException activeObjectsInitException = new ActiveObjectsInitException("bundle [" + bundle.getSymbolicName() + "]", t);
96                                      aoFuture.setException(activeObjectsInitException);
97                                      logger.warn("bundle [{}] failed to create ActiveObjects", bundle.getSymbolicName(), t);
98                                  }
99                                  return null;
100                             }
101                         });
102 
103                         return Promises.forFuture(aoFuture);
104                     }
105                 });
106             }
107         });
108     }
109 
110     public void init() {
111         logger.debug("init bundle [{}]", bundle.getSymbolicName());
112 
113         // start things up now if we have a tenant
114         Tenant tenant = tenantContext.getCurrentTenant();
115         if (tenant != null) {
116             aoPromisesByTenant.invalidate(tenant);
117             startActiveObjects(tenant);
118         }
119     }
120 
121     public void destroy() {
122         aoConfigFuture.cancel(false);
123         for (Promise<ActiveObjects> aoPromise : aoPromisesByTenant.asMap().values()) {
124             aoPromise.cancel(false);
125         }
126     }
127 
128     void setAoConfiguration(@Nonnull final ActiveObjectsConfiguration aoConfiguration) {
129         logger.debug("setAoConfiguration [{}]", bundle.getSymbolicName());
130 
131         if (aoConfigFuture.isDone()) {
132             final ActiveObjectsConfiguration currentAoConfiguration;
133             try {
134                 currentAoConfiguration = aoConfigFuture.get(0, TimeUnit.MILLISECONDS);
135             } catch (InterruptedException | ExecutionException | TimeoutException e) {
136                 // we've already checked the state of the future, this exception must be dire
137                 throw new IllegalStateException(e);
138             }
139 
140             if (currentAoConfiguration == aoConfiguration) {
141                 logger.debug("setAoConfiguration received same <ao> configuration twice [{}]", aoConfiguration);
142             } else {
143                 final RuntimeException e = new IllegalStateException("bundle [" + bundle.getSymbolicName() + "] has multiple active objects configurations - only one active objects module descriptor <ao> allowed per plugin!");
144                 aoConfigFuture.setException(e);
145                 throw e;
146             }
147         } else {
148             aoConfigFuture.set(aoConfiguration);
149         }
150     }
151 
152     void startActiveObjects(@Nonnull final Tenant tenant) {
153         checkNotNull(tenant);
154         aoPromisesByTenant.getUnchecked(tenant);
155     }
156 
157     void restartActiveObjects(@Nonnull final Tenant tenant) {
158         checkNotNull(tenant);
159         aoPromisesByTenant.invalidate(tenant);
160         aoPromisesByTenant.getUnchecked(tenant);
161     }
162 
163     @VisibleForTesting
164     protected Promise<ActiveObjects> delegate() {
165         if (!aoConfigFuture.isDone()) {
166             throw new IllegalStateException("plugin [{" + bundle.getSymbolicName() + "}] invoking ActiveObjects before <ao> configuration module is enabled or plugin is missing an <ao> configuration module. Note that scanning of entities from the ao.model package is no longer supported.");
167         }
168 
169         Tenant tenant = tenantContext.getCurrentTenant();
170         if (tenant != null) {
171             return aoPromisesByTenant.getUnchecked(tenant);
172         } else {
173             throw new NoDataSourceException();
174         }
175     }
176 
177     @Override
178     public ActiveObjectsModuleMetaData moduleMetaData() {
179         return new ActiveObjectsModuleMetaData() {
180             @Override
181             public void awaitInitialization() throws ExecutionException, InterruptedException {
182                 Tenant tenant = tenantContext.getCurrentTenant();
183                 if (tenant != null) {
184                     aoPromisesByTenant.getUnchecked(tenant).get();
185                 } else {
186                     throw new NoDataSourceException();
187                 }
188             }
189 
190             @Override
191             public void awaitInitialization(long timeout, TimeUnit unit)
192                     throws InterruptedException, ExecutionException, TimeoutException {
193                 Tenant tenant = tenantContext.getCurrentTenant();
194                 if (tenant != null) {
195                     aoPromisesByTenant.getUnchecked(tenant).get(timeout, unit);
196                 } else {
197                     throw new NoDataSourceException();
198                 }
199             }
200 
201             @Override
202             public boolean isInitialized() {
203                 Tenant tenant = tenantContext.getCurrentTenant();
204                 if (tenant != null) {
205                     Promise<ActiveObjects> aoPromise = aoPromisesByTenant.getUnchecked(tenant);
206                     if (aoPromise.isDone()) {
207                         try {
208                             aoPromise.claim();
209                             return true;
210                         } catch (Exception e) {
211                             // any exception indicates a failure in initialisation, or at least that the delegate is not usable
212                         }
213                     }
214                 }
215                 return false;
216             }
217 
218             @Override
219             public DatabaseType getDatabaseType() {
220                 return delegate().claim().moduleMetaData().getDatabaseType();
221             }
222 
223             @Override
224             public boolean isDataSourcePresent() {
225                 return tenantContext.getCurrentTenant() != null;
226             }
227 
228             @Override
229             public boolean isTablePresent(Class<? extends RawEntity<?>> type) {
230                 return delegate().claim().moduleMetaData().isTablePresent(type);
231             }
232         };
233     }
234 
235     @Override
236     public void migrate(final Class<? extends RawEntity<?>>... entities) {
237         delegate().claim().migrate(entities);
238     }
239 
240     @Override
241     public void migrateDestructively(final Class<? extends RawEntity<?>>... entities) {
242         delegate().claim().migrateDestructively(entities);
243     }
244 
245     @Override
246     public void flushAll() {
247         delegate().claim().flushAll();
248     }
249 
250     @Override
251     public void flush(final RawEntity<?>... entities) {
252         delegate().claim().flush(entities);
253     }
254 
255     @Override
256     public <T extends RawEntity<K>, K> T[] get(final Class<T> type, final K... keys) {
257         return delegate().claim().get(type, keys);
258     }
259 
260     @Override
261     public <T extends RawEntity<K>, K> T get(final Class<T> type, final K key) {
262         return delegate().claim().get(type, key);
263     }
264 
265     @Override
266     public <T extends RawEntity<K>, K> T create(final Class<T> type, final DBParam... params) {
267         return delegate().claim().create(type, params);
268     }
269 
270     @Override
271     public <T extends RawEntity<K>, K> T create(final Class<T> type, final Map<String, Object> params) {
272         return delegate().claim().create(type, params);
273     }
274 
275     @Override
276     public void delete(final RawEntity<?>... entities) {
277         delegate().claim().delete(entities);
278     }
279 
280     @Override
281     public <K> int deleteWithSQL(final Class<? extends RawEntity<K>> type, final String criteria, final Object... parameters) {
282         return delegate().claim().deleteWithSQL(type, criteria, parameters);
283     }
284 
285     @Override
286     public <T extends RawEntity<K>, K> T[] find(final Class<T> type) {
287         return delegate().claim().find(type);
288     }
289 
290     @Override
291     public <T extends RawEntity<K>, K> T[] find(final Class<T> type, final String criteria, final Object... parameters) {
292         return delegate().claim().find(type, criteria, parameters);
293     }
294 
295     @Override
296     public <T extends RawEntity<K>, K> T[] find(final Class<T> type, final Query query) {
297         return delegate().claim().find(type, query);
298     }
299 
300     @Override
301     public <T extends RawEntity<K>, K> T[] find(final Class<T> type, final String field, final Query query) {
302         return delegate().claim().find(type, field, query);
303     }
304 
305     @Override
306     public <T extends RawEntity<K>, K> T[] findWithSQL(final Class<T> type, final String keyField, final String sql, final Object... parameters) {
307         return delegate().claim().findWithSQL(type, keyField, sql, parameters);
308     }
309 
310     @Override
311     public <T extends RawEntity<K>, K> void stream(final Class<T> type, final EntityStreamCallback<T, K> streamCallback) {
312         delegate().claim().stream(type, streamCallback);
313     }
314 
315     @Override
316     public <T extends RawEntity<K>, K> void stream(final Class<T> type, final Query query, final EntityStreamCallback<T, K> streamCallback) {
317         delegate().claim().stream(type, query, streamCallback);
318     }
319 
320     @Override
321     public <K> int count(final Class<? extends RawEntity<K>> type) {
322         return delegate().claim().count(type);
323     }
324 
325     @Override
326     public <K> int count(final Class<? extends RawEntity<K>> type, final String criteria, final Object... parameters) {
327         return delegate().claim().count(type, criteria, parameters);
328     }
329 
330     @Override
331     public <K> int count(final Class<? extends RawEntity<K>> type, final Query query) {
332         return delegate().claim().count(type, query);
333     }
334 
335     @Override
336     public <T> T executeInTransaction(final TransactionCallback<T> callback) {
337         return delegate().claim().executeInTransaction(callback);
338     }
339 
340     public Bundle getBundle() {
341         return bundle;
342     }
343 }