1 package com.atlassian.activeobjects.osgi;
2
3 import com.atlassian.activeobjects.config.ActiveObjectsConfiguration;
4 import com.atlassian.activeobjects.external.ActiveObjects;
5 import com.atlassian.activeobjects.external.ActiveObjectsModuleMetaData;
6 import com.atlassian.activeobjects.external.NoDataSourceException;
7 import com.atlassian.activeobjects.internal.ActiveObjectsFactory;
8 import com.atlassian.activeobjects.internal.ActiveObjectsInitException;
9 import com.atlassian.activeobjects.spi.DatabaseType;
10 import com.atlassian.sal.api.transaction.TransactionCallback;
11 import com.atlassian.tenancy.api.Tenant;
12 import com.atlassian.tenancy.api.TenantContext;
13 import com.atlassian.util.concurrent.Promise;
14 import com.atlassian.util.concurrent.Promises;
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.base.Function;
17 import com.google.common.cache.CacheBuilder;
18 import com.google.common.cache.CacheLoader;
19 import com.google.common.cache.LoadingCache;
20 import com.google.common.util.concurrent.SettableFuture;
21 import net.java.ao.DBParam;
22 import net.java.ao.EntityStreamCallback;
23 import net.java.ao.Query;
24 import net.java.ao.RawEntity;
25 import org.osgi.framework.Bundle;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 import javax.annotation.Nonnull;
30 import javax.annotation.Nullable;
31 import java.util.Map;
32 import java.util.concurrent.Callable;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.TimeoutException;
37
38 import static com.google.common.base.Preconditions.checkNotNull;
39
40
41
42
43
44
45
46
47
48
49
50
51 class TenantAwareActiveObjects implements ActiveObjects {
52 private static final Logger logger = LoggerFactory.getLogger(TenantAwareActiveObjects.class);
53
54 private final Bundle bundle;
55 private final TenantContext tenantContext;
56
57 @VisibleForTesting
58 final SettableFuture<ActiveObjectsConfiguration> aoConfigFuture = SettableFuture.create();
59
60 @VisibleForTesting
61 final LoadingCache<Tenant, Promise<ActiveObjects>> aoPromisesByTenant;
62
63 TenantAwareActiveObjects(
64 @Nonnull final Bundle bundle,
65 @Nonnull final ActiveObjectsFactory factory,
66 @Nonnull final TenantContext tenantContext,
67 @Nonnull final Function<Tenant, ExecutorService> initExecutorFunction) {
68 this.bundle = checkNotNull(bundle);
69 this.tenantContext = checkNotNull(tenantContext);
70 checkNotNull(factory);
71 checkNotNull(initExecutorFunction);
72
73
74 aoPromisesByTenant = CacheBuilder.newBuilder().build(new CacheLoader<Tenant, Promise<ActiveObjects>>() {
75 @Override
76 public Promise<ActiveObjects> load(@Nonnull final Tenant tenant) {
77 logger.debug("bundle [{}] loading new AO promise for {}", bundle.getSymbolicName(), tenant);
78
79 return Promises.forFuture(aoConfigFuture).flatMap(new Function<ActiveObjectsConfiguration, Promise<ActiveObjects>>() {
80 @Override
81 public Promise<ActiveObjects> apply(@Nullable final ActiveObjectsConfiguration aoConfig) {
82 logger.debug("bundle [{}] got ActiveObjectsConfiguration", bundle.getSymbolicName(), tenant);
83
84 final SettableFuture<ActiveObjects> aoFuture = SettableFuture.create();
85
86 initExecutorFunction.apply(tenant).submit(new Callable<Void>() {
87 @Override
88 public Void call() {
89 try {
90 logger.debug("bundle [{}] creating ActiveObjects", bundle.getSymbolicName());
91 final ActiveObjects ao = factory.create(aoConfig, tenant);
92 logger.debug("bundle [{}] created ActiveObjects", bundle.getSymbolicName());
93 aoFuture.set(ao);
94 } catch (Throwable t) {
95 final ActiveObjectsInitException activeObjectsInitException = new ActiveObjectsInitException("bundle [" + bundle.getSymbolicName() + "]", t);
96 aoFuture.setException(activeObjectsInitException);
97 logger.warn("bundle [{}] failed to create ActiveObjects", bundle.getSymbolicName(), t);
98 }
99 return null;
100 }
101 });
102
103 return Promises.forFuture(aoFuture);
104 }
105 });
106 }
107 });
108 }
109
110 public void init() {
111 logger.debug("init bundle [{}]", bundle.getSymbolicName());
112
113
114 Tenant tenant = tenantContext.getCurrentTenant();
115 if (tenant != null) {
116 aoPromisesByTenant.invalidate(tenant);
117 startActiveObjects(tenant);
118 }
119 }
120
121 public void destroy() {
122 aoConfigFuture.cancel(false);
123 for (Promise<ActiveObjects> aoPromise : aoPromisesByTenant.asMap().values()) {
124 aoPromise.cancel(false);
125 }
126 }
127
128 void setAoConfiguration(@Nonnull final ActiveObjectsConfiguration aoConfiguration) {
129 logger.debug("setAoConfiguration [{}]", bundle.getSymbolicName());
130
131 if (aoConfigFuture.isDone()) {
132 final ActiveObjectsConfiguration currentAoConfiguration;
133 try {
134 currentAoConfiguration = aoConfigFuture.get(0, TimeUnit.MILLISECONDS);
135 } catch (InterruptedException | ExecutionException | TimeoutException e) {
136
137 throw new IllegalStateException(e);
138 }
139
140 if (currentAoConfiguration == aoConfiguration) {
141 logger.debug("setAoConfiguration received same <ao> configuration twice [{}]", aoConfiguration);
142 } else {
143 final RuntimeException e = new IllegalStateException("bundle [" + bundle.getSymbolicName() + "] has multiple active objects configurations - only one active objects module descriptor <ao> allowed per plugin!");
144 aoConfigFuture.setException(e);
145 throw e;
146 }
147 } else {
148 aoConfigFuture.set(aoConfiguration);
149 }
150 }
151
152 void startActiveObjects(@Nonnull final Tenant tenant) {
153 checkNotNull(tenant);
154 aoPromisesByTenant.getUnchecked(tenant);
155 }
156
157 void restartActiveObjects(@Nonnull final Tenant tenant) {
158 checkNotNull(tenant);
159 aoPromisesByTenant.invalidate(tenant);
160 aoPromisesByTenant.getUnchecked(tenant);
161 }
162
163 @VisibleForTesting
164 protected Promise<ActiveObjects> delegate() {
165 if (!aoConfigFuture.isDone()) {
166 throw new IllegalStateException("plugin [{" + bundle.getSymbolicName() + "}] invoking ActiveObjects before <ao> configuration module is enabled or plugin is missing an <ao> configuration module. Note that scanning of entities from the ao.model package is no longer supported.");
167 }
168
169 Tenant tenant = tenantContext.getCurrentTenant();
170 if (tenant != null) {
171 return aoPromisesByTenant.getUnchecked(tenant);
172 } else {
173 throw new NoDataSourceException();
174 }
175 }
176
177 @Override
178 public ActiveObjectsModuleMetaData moduleMetaData() {
179 return new ActiveObjectsModuleMetaData() {
180 @Override
181 public void awaitInitialization() throws ExecutionException, InterruptedException {
182 Tenant tenant = tenantContext.getCurrentTenant();
183 if (tenant != null) {
184 aoPromisesByTenant.getUnchecked(tenant).get();
185 } else {
186 throw new NoDataSourceException();
187 }
188 }
189
190 @Override
191 public void awaitInitialization(long timeout, TimeUnit unit)
192 throws InterruptedException, ExecutionException, TimeoutException {
193 Tenant tenant = tenantContext.getCurrentTenant();
194 if (tenant != null) {
195 aoPromisesByTenant.getUnchecked(tenant).get(timeout, unit);
196 } else {
197 throw new NoDataSourceException();
198 }
199 }
200
201 @Override
202 public boolean isInitialized() {
203 Tenant tenant = tenantContext.getCurrentTenant();
204 if (tenant != null) {
205 Promise<ActiveObjects> aoPromise = aoPromisesByTenant.getUnchecked(tenant);
206 if (aoPromise.isDone()) {
207 try {
208 aoPromise.claim();
209 return true;
210 } catch (Exception e) {
211
212 }
213 }
214 }
215 return false;
216 }
217
218 @Override
219 public DatabaseType getDatabaseType() {
220 return delegate().claim().moduleMetaData().getDatabaseType();
221 }
222
223 @Override
224 public boolean isDataSourcePresent() {
225 return tenantContext.getCurrentTenant() != null;
226 }
227
228 @Override
229 public boolean isTablePresent(Class<? extends RawEntity<?>> type) {
230 return delegate().claim().moduleMetaData().isTablePresent(type);
231 }
232 };
233 }
234
235 @Override
236 public void migrate(final Class<? extends RawEntity<?>>... entities) {
237 delegate().claim().migrate(entities);
238 }
239
240 @Override
241 public void migrateDestructively(final Class<? extends RawEntity<?>>... entities) {
242 delegate().claim().migrateDestructively(entities);
243 }
244
245 @Override
246 public void flushAll() {
247 delegate().claim().flushAll();
248 }
249
250 @Override
251 public void flush(final RawEntity<?>... entities) {
252 delegate().claim().flush(entities);
253 }
254
255 @Override
256 public <T extends RawEntity<K>, K> T[] get(final Class<T> type, final K... keys) {
257 return delegate().claim().get(type, keys);
258 }
259
260 @Override
261 public <T extends RawEntity<K>, K> T get(final Class<T> type, final K key) {
262 return delegate().claim().get(type, key);
263 }
264
265 @Override
266 public <T extends RawEntity<K>, K> T create(final Class<T> type, final DBParam... params) {
267 return delegate().claim().create(type, params);
268 }
269
270 @Override
271 public <T extends RawEntity<K>, K> T create(final Class<T> type, final Map<String, Object> params) {
272 return delegate().claim().create(type, params);
273 }
274
275 @Override
276 public void delete(final RawEntity<?>... entities) {
277 delegate().claim().delete(entities);
278 }
279
280 @Override
281 public <K> int deleteWithSQL(final Class<? extends RawEntity<K>> type, final String criteria, final Object... parameters) {
282 return delegate().claim().deleteWithSQL(type, criteria, parameters);
283 }
284
285 @Override
286 public <T extends RawEntity<K>, K> T[] find(final Class<T> type) {
287 return delegate().claim().find(type);
288 }
289
290 @Override
291 public <T extends RawEntity<K>, K> T[] find(final Class<T> type, final String criteria, final Object... parameters) {
292 return delegate().claim().find(type, criteria, parameters);
293 }
294
295 @Override
296 public <T extends RawEntity<K>, K> T[] find(final Class<T> type, final Query query) {
297 return delegate().claim().find(type, query);
298 }
299
300 @Override
301 public <T extends RawEntity<K>, K> T[] find(final Class<T> type, final String field, final Query query) {
302 return delegate().claim().find(type, field, query);
303 }
304
305 @Override
306 public <T extends RawEntity<K>, K> T[] findWithSQL(final Class<T> type, final String keyField, final String sql, final Object... parameters) {
307 return delegate().claim().findWithSQL(type, keyField, sql, parameters);
308 }
309
310 @Override
311 public <T extends RawEntity<K>, K> void stream(final Class<T> type, final EntityStreamCallback<T, K> streamCallback) {
312 delegate().claim().stream(type, streamCallback);
313 }
314
315 @Override
316 public <T extends RawEntity<K>, K> void stream(final Class<T> type, final Query query, final EntityStreamCallback<T, K> streamCallback) {
317 delegate().claim().stream(type, query, streamCallback);
318 }
319
320 @Override
321 public <K> int count(final Class<? extends RawEntity<K>> type) {
322 return delegate().claim().count(type);
323 }
324
325 @Override
326 public <K> int count(final Class<? extends RawEntity<K>> type, final String criteria, final Object... parameters) {
327 return delegate().claim().count(type, criteria, parameters);
328 }
329
330 @Override
331 public <K> int count(final Class<? extends RawEntity<K>> type, final Query query) {
332 return delegate().claim().count(type, query);
333 }
334
335 @Override
336 public <T> T executeInTransaction(final TransactionCallback<T> callback) {
337 return delegate().claim().executeInTransaction(callback);
338 }
339
340 public Bundle getBundle() {
341 return bundle;
342 }
343 }