View Javadoc
1   package com.atlassian.activeobjects.osgi;
2   
3   import com.atlassian.activeobjects.config.ActiveObjectsConfiguration;
4   import com.atlassian.activeobjects.external.ActiveObjects;
5   import com.atlassian.activeobjects.internal.ActiveObjectsFactory;
6   import com.atlassian.activeobjects.plugin.ActiveObjectModuleDescriptor;
7   import com.atlassian.activeobjects.spi.ContextClassLoaderThreadFactory;
8   import com.atlassian.activeobjects.spi.HotRestartEvent;
9   import com.atlassian.activeobjects.spi.InitExecutorServiceProvider;
10  import com.atlassian.event.api.EventListener;
11  import com.atlassian.event.api.EventPublisher;
12  import com.atlassian.plugin.ModuleDescriptor;
13  import com.atlassian.plugin.Plugin;
14  import com.atlassian.plugin.event.events.PluginDisabledEvent;
15  import com.atlassian.plugin.event.events.PluginEnabledEvent;
16  import com.atlassian.plugin.event.events.PluginModuleEnabledEvent;
17  import com.atlassian.plugin.osgi.factory.OsgiPlugin;
18  import com.atlassian.sal.api.executor.ThreadLocalDelegateExecutorFactory;
19  import com.atlassian.tenancy.api.Tenant;
20  import com.atlassian.tenancy.api.TenantContext;
21  import com.atlassian.tenancy.api.event.TenantArrivedEvent;
22  import com.google.common.annotations.VisibleForTesting;
23  import com.google.common.base.Function;
24  import com.google.common.cache.CacheBuilder;
25  import com.google.common.cache.CacheLoader;
26  import com.google.common.cache.LoadingCache;
27  import com.google.common.collect.ImmutableList;
28  import org.osgi.framework.Bundle;
29  import org.osgi.framework.ServiceFactory;
30  import org.osgi.framework.ServiceRegistration;
31  import org.slf4j.Logger;
32  import org.slf4j.LoggerFactory;
33  import org.springframework.beans.factory.DisposableBean;
34  import org.springframework.beans.factory.InitializingBean;
35  
36  import javax.annotation.Nonnull;
37  import javax.annotation.Nullable;
38  import java.util.IdentityHashMap;
39  import java.util.Map;
40  import java.util.concurrent.ExecutorService;
41  import java.util.concurrent.ThreadFactory;
42  import java.util.concurrent.TimeUnit;
43  import java.util.concurrent.locks.Lock;
44  import java.util.concurrent.locks.ReentrantLock;
45  
46  import static com.google.common.base.Preconditions.checkNotNull;
47  
48  /**
49   * <p>This is the service factory that will create the {@link com.atlassian.activeobjects.external.ActiveObjects}
50   * instance for each plugin using active objects.</p>
51   *
52   * <p>The instance created by that factory is a delegating instance that works together with the
53   * {@link ActiveObjectsServiceFactory} to get a correctly configure instance according
54   * to the {@link com.atlassian.activeobjects.config.ActiveObjectsConfiguration plugin configuration} and
55   * the application configuration.</p>
56   */
57  // @NotFinalForTesting
58  public class ActiveObjectsServiceFactory implements ServiceFactory, InitializingBean, DisposableBean {
59      private static final Logger logger = LoggerFactory.getLogger(ActiveObjectsServiceFactory.class);
60  
61      private static final String INIT_TASK_TIMEOUT_MS_PROPERTY = "ao-plugin.init.task.timeout";
62  
63      @VisibleForTesting
64      protected static final int INIT_TASK_TIMEOUT_MS = Integer.getInteger(INIT_TASK_TIMEOUT_MS_PROPERTY, 30000);
65  
66      private final EventPublisher eventPublisher;
67      private final TenantContext tenantContext;
68  
69      @VisibleForTesting
70      final ThreadFactory aoContextThreadFactory;
71  
72      @VisibleForTesting
73      final LoadingCache<Tenant, ExecutorService> initExecutorsByTenant;
74  
75      @VisibleForTesting
76      volatile boolean destroying = false;
77  
78      @VisibleForTesting
79      volatile boolean cleaning = false;
80  
81      @VisibleForTesting
82      final Function<Tenant, ExecutorService> initExecutorFn;
83  
84      // use BundleRef to ensure that we key on reference equality of the bundles, not any object equality
85      @VisibleForTesting
86      final LoadingCache<BundleRef, TenantAwareActiveObjects> aoDelegatesByBundle;
87  
88      // note that we need an explicit lock here to allow aoDelegatesByBundle time to load the configuration during the
89      // invocation of onPluginModuleEnabledEvent
90      @VisibleForTesting
91      final Map<Bundle, ActiveObjectsConfiguration> unattachedConfigByBundle = new IdentityHashMap<>();
92  
93      private final Lock unattachedConfigsLock = new ReentrantLock();
94  
95      public ActiveObjectsServiceFactory(
96              @Nonnull final ActiveObjectsFactory factory,
97              @Nonnull final EventPublisher eventPublisher,
98              @Nonnull final TenantContext tenantContext,
99              @Nonnull final ThreadLocalDelegateExecutorFactory threadLocalDelegateExecutorFactory,
100             @Nonnull final InitExecutorServiceProvider initExecutorServiceProvider) {
101         this.eventPublisher = checkNotNull(eventPublisher);
102         this.tenantContext = checkNotNull(tenantContext);
103         checkNotNull(factory);
104         checkNotNull(threadLocalDelegateExecutorFactory);
105         checkNotNull(initExecutorServiceProvider);
106 
107         // store the CCL of the ao-plugin bundle for use by all shared thread pool executors
108         ClassLoader bundleContextClassLoader = Thread.currentThread().getContextClassLoader();
109         aoContextThreadFactory = new ContextClassLoaderThreadFactory(bundleContextClassLoader);
110 
111         // loading cache for init executors pools
112         initExecutorsByTenant = CacheBuilder.newBuilder().build(new CacheLoader<Tenant, ExecutorService>() {
113             @Override
114             public ExecutorService load(@Nonnull final Tenant tenant) throws Exception {
115                 logger.debug("creating new init executor for {}", tenant);
116                 return initExecutorServiceProvider.initExecutorService(tenant);
117             }
118         });
119 
120         // initExecutor retrieval function
121         initExecutorFn = new Function<Tenant, ExecutorService>() {
122             @Override
123             public ExecutorService apply(@Nullable final Tenant tenant) {
124                 if (destroying) {
125                     throw new IllegalStateException("applied initExecutorFn after ActiveObjectsServiceFactory destruction");
126                 } else if (cleaning) {
127                     throw new IllegalStateException("applied initExecutorFn during ActiveObjects cleaning");
128                 }
129 
130                 //noinspection ConstantConditions
131                 checkNotNull(tenant);
132                 return initExecutorsByTenant.getUnchecked(tenant);
133             }
134         };
135 
136         // loading cache for ActiveObjects delegates
137         aoDelegatesByBundle = CacheBuilder.newBuilder().build(new CacheLoader<BundleRef, TenantAwareActiveObjects>() {
138             @Override
139             public TenantAwareActiveObjects load(@Nonnull final BundleRef bundleRef) throws Exception {
140                 TenantAwareActiveObjects delegate = new TenantAwareActiveObjects(bundleRef.bundle, factory, tenantContext, initExecutorFn);
141                 delegate.init();
142                 unattachedConfigsLock.lock();
143                 try {
144                     final ActiveObjectsConfiguration aoConfig = unattachedConfigByBundle.get(bundleRef.bundle);
145                     if (aoConfig != null) {
146                         delegate.setAoConfiguration(aoConfig);
147                         unattachedConfigByBundle.remove(bundleRef.bundle);
148                     }
149                 } finally {
150                     unattachedConfigsLock.unlock();
151                 }
152                 return delegate;
153             }
154         });
155     }
156 
157     @Override
158     public void afterPropertiesSet() throws Exception {
159         logger.debug("afterPropertiesSet");
160 
161         // we want tenant arrival and hot restart event notifications
162         eventPublisher.register(this);
163     }
164 
165     @Override
166     public void destroy() throws Exception {
167         logger.debug("destroy");
168 
169         destroying = true;
170 
171         for (ExecutorService initExecutor : initExecutorsByTenant.asMap().values()) {
172             initExecutor.shutdownNow();
173         }
174 
175         for (TenantAwareActiveObjects aoDelegate : aoDelegatesByBundle.asMap().values()) {
176             aoDelegate.destroy();
177         }
178 
179         eventPublisher.unregister(this);
180     }
181 
182     /**
183      * Invoked when the Gemini Blueprints/Spring DM proxy first accesses the module. Note that Blueprints is lazy, so
184      * this may not be called. A "safety backup" is added in {@link #onPluginEnabledEvent(PluginEnabledEvent)} to
185      * eagerly initialise the lazy proxies that are not invoked during plugin startup.
186      */
187     @Override
188     public Object getService(Bundle bundle, ServiceRegistration serviceRegistration) {
189         checkNotNull(bundle);
190         logger.debug("getService bundle [{}]", bundle.getSymbolicName());
191 
192         if (destroying) {
193             throw new IllegalStateException("getService after ActiveObjectsServiceFactory destruction");
194         }
195 
196         return aoDelegatesByBundle.getUnchecked(new BundleRef(bundle));
197     }
198 
199     /**
200      * Invoked when the Gemini Blueprints/Spring DM proxy releases the module. Note that Blueprints is lazy, so the
201      * proxy may never have been realised, thus this may not be called. A "safety backup" is added in
202      * {@link #onPluginDisabledEvent(PluginDisabledEvent)} to release those references, for plugins that never
203      * realise the module.
204      */
205     @Override
206     public void ungetService(Bundle bundle, ServiceRegistration serviceRegistration, Object ao) {
207         checkNotNull(bundle);
208         logger.debug("ungetService bundle [{}]", bundle.getSymbolicName());
209 
210         aoDelegatesByBundle.invalidate(new BundleRef(bundle));
211         if (ao instanceof TenantAwareActiveObjects) {
212             ((TenantAwareActiveObjects) ao).destroy();
213         }
214     }
215 
216     public void startCleaning() {
217         logger.debug("startCleaning");
218 
219         cleaning = true;
220 
221         for (final ExecutorService initExecutor : initExecutorsByTenant.asMap().values()) {
222             initExecutor.shutdownNow();
223             try {
224                 if (!initExecutor.awaitTermination(INIT_TASK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
225                     logger.error("startCleaning timed out after {}ms awaiting init thread completion, continuing; note that this timeout may be adjusted via the system property '{}'", INIT_TASK_TIMEOUT_MS, INIT_TASK_TIMEOUT_MS_PROPERTY);
226                 }
227             } catch (InterruptedException e) {
228                 logger.error("startCleaning interrupted while awaiting running init thread completion, continuing", e);
229             }
230         }
231     }
232 
233     public void stopCleaning() {
234         logger.debug("stopCleaning");
235 
236         cleaning = false;
237     }
238 
239     /**
240      * Listens for {@link TenantArrivedEvent} and allows initialisation of any uninitialised instances
241      */
242     @SuppressWarnings("UnusedDeclaration")
243     @EventListener
244     public void onTenantArrived(TenantArrivedEvent event) {
245         // ensure that the tenant is still present
246         Tenant tenant = tenantContext.getCurrentTenant();
247         logger.debug("onTenantArrived tenant arrived {}", tenant);
248 
249         if (tenant != null) {
250             for (TenantAwareActiveObjects aoDelegate : ImmutableList.copyOf(aoDelegatesByBundle.asMap().values())) {
251                 logger.debug("onTenantArrived starting AO delegate for bundle [{}]", aoDelegate.getBundle().getSymbolicName());
252                 aoDelegate.startActiveObjects(tenant);
253             }
254         }
255     }
256 
257     /**
258      * Listens for {@link HotRestartEvent} and recreate all {@link ActiveObjects} instances within the delegates with
259      * the possibly different configuration and data source
260      */
261     @SuppressWarnings("UnusedDeclaration")
262     @EventListener
263     public void onHotRestart(HotRestartEvent hotRestartEvent) {
264         Tenant tenant = tenantContext.getCurrentTenant();
265         logger.debug("onHotRestart performing hot restart with tenant {}", tenant);
266 
267         if (tenant != null) {
268             final ExecutorService initExecutor = initExecutorsByTenant.getIfPresent(tenant);
269             initExecutorsByTenant.invalidate(tenant);
270             for (TenantAwareActiveObjects aoDelegate : ImmutableList.copyOf(aoDelegatesByBundle.asMap().values())) {
271                 logger.debug("onHotRestart restarting AO delegate for bundle [{}]", aoDelegate.getBundle().getSymbolicName());
272                 aoDelegate.restartActiveObjects(tenant);
273             }
274 
275             if (initExecutor != null) {
276                 logger.debug("onHotRestart terminating any initExecutor threads");
277                 initExecutor.shutdownNow();
278             }
279         }
280     }
281 
282     /**
283      * Listens for {@link PluginModuleEnabledEvent} for {@link ActiveObjectModuleDescriptor}.
284      * Passes it to appropriate delegate i.e. the one for which the plugin/bundle key matches.
285      */
286     @SuppressWarnings("UnusedDeclaration")
287     @EventListener
288     public void onPluginModuleEnabledEvent(PluginModuleEnabledEvent pluginModuleEnabledEvent) {
289         final ModuleDescriptor moduleDescriptor = pluginModuleEnabledEvent.getModule();
290         if (moduleDescriptor instanceof ActiveObjectModuleDescriptor) {
291             final Plugin plugin = moduleDescriptor.getPlugin();
292             if (plugin instanceof OsgiPlugin) {
293                 final Bundle bundle = ((OsgiPlugin) plugin).getBundle();
294                 if (bundle != null) {
295 
296                     boolean attachedToDelegate = false;
297                     final ActiveObjectsConfiguration aoConfig = ((ActiveObjectModuleDescriptor) moduleDescriptor).getConfiguration();
298 
299                     unattachedConfigsLock.lock();
300                     try {
301                         for (TenantAwareActiveObjects aoDelegate : aoDelegatesByBundle.asMap().values()) {
302                             if (aoDelegate.getBundle().equals(bundle)) {
303                                 logger.debug("onPluginModuleEnabledEvent attaching <ao> configuration module to ActiveObjects service of [{}]", plugin);
304                                 aoDelegate.setAoConfiguration(aoConfig);
305                                 attachedToDelegate = true;
306                                 break;
307                             }
308                         }
309                         if (!attachedToDelegate) {
310                             logger.debug("onPluginModuleEnabledEvent storing unattached <ao> configuration module for [{}]", plugin);
311                             unattachedConfigByBundle.put(bundle, aoConfig);
312                         }
313                     } finally {
314                         unattachedConfigsLock.unlock();
315                     }
316                 }
317             }
318         }
319     }
320 
321     /**
322      * Listens for {@link PluginEnabledEvent}. If the plugin is present in the unattached configurations, it will tickle
323      * <code>aoDelegatesByBundle</code> to ensure that the configuration has been attached to a service, whether it has
324      * been OSGi registered or not.
325      */
326     @SuppressWarnings("UnusedDeclaration")
327     @EventListener
328     public void onPluginEnabledEvent(PluginEnabledEvent pluginEnabledEvent) {
329         final Plugin plugin = pluginEnabledEvent.getPlugin();
330         if (plugin instanceof OsgiPlugin) {
331             final Bundle bundle = ((OsgiPlugin) plugin).getBundle();
332             if (bundle != null) {
333                 if (unattachedConfigByBundle.containsKey(bundle)) {
334                     logger.debug("onPluginEnabledEvent attaching unbound <ao> to [{}]", plugin);
335 
336                     // the cacheloader will do the attaching, after locking first
337                     aoDelegatesByBundle.getUnchecked(new BundleRef(bundle));
338                 }
339             }
340         }
341     }
342 
343     /**
344      * Listens for {@link PluginDisabledEvent}. If the plugin is present in the unattached or attached configurations,
345      * it will be removed to ensure that we don't leak resources and, more importantly, don't retain it if the plugin
346      * is re-enabled.
347      */
348     @SuppressWarnings("UnusedDeclaration")
349     @EventListener
350     public void onPluginDisabledEvent(PluginDisabledEvent pluginDisabledEvent) {
351         final Plugin plugin = pluginDisabledEvent.getPlugin();
352         if (plugin instanceof OsgiPlugin) {
353             final Bundle bundle = ((OsgiPlugin) plugin).getBundle();
354             if (bundle != null) {
355                 logger.debug("onPluginDisabledEvent removing delegate for [{}]", plugin);
356                 aoDelegatesByBundle.invalidate(new BundleRef(bundle));
357 
358                 unattachedConfigsLock.lock();
359                 try {
360                     if (unattachedConfigByBundle.containsKey(bundle)) {
361                         logger.debug("onPluginDisabledEvent removing unbound <ao> for [{}]", plugin);
362                         unattachedConfigByBundle.remove(bundle);
363                     }
364                 } finally {
365                     unattachedConfigsLock.unlock();
366                 }
367             }
368         }
369     }
370 
371     /**
372      * Provides a wrapper that gives explicit object identity hashing and reference equality (via identity hasing)of a
373      * {@link Bundle}, for use in maps etc.
374      */
375     protected static class BundleRef {
376         final Bundle bundle;
377 
378         public BundleRef(Bundle bundle) {
379             this.bundle = checkNotNull(bundle);
380         }
381 
382         @Override
383         public boolean equals(final Object o) {
384             if (o == null || getClass() != o.getClass()) {
385                 return false;
386             }
387 
388             final BundleRef bundleRef = (BundleRef) o;
389 
390             return bundle == bundleRef.bundle;
391         }
392 
393         @Override
394         public int hashCode() {
395             return System.identityHashCode(bundle);
396         }
397     }
398 }