1 package com.atlassian.activeobjects.osgi;
2
3 import com.atlassian.activeobjects.config.ActiveObjectsConfiguration;
4 import com.atlassian.activeobjects.external.ActiveObjects;
5 import com.atlassian.activeobjects.internal.ActiveObjectsFactory;
6 import com.atlassian.activeobjects.plugin.ActiveObjectModuleDescriptor;
7 import com.atlassian.activeobjects.spi.ContextClassLoaderThreadFactory;
8 import com.atlassian.activeobjects.spi.HotRestartEvent;
9 import com.atlassian.activeobjects.spi.InitExecutorServiceProvider;
10 import com.atlassian.event.api.EventListener;
11 import com.atlassian.event.api.EventPublisher;
12 import com.atlassian.plugin.ModuleDescriptor;
13 import com.atlassian.plugin.Plugin;
14 import com.atlassian.plugin.event.events.PluginDisabledEvent;
15 import com.atlassian.plugin.event.events.PluginEnabledEvent;
16 import com.atlassian.plugin.event.events.PluginModuleEnabledEvent;
17 import com.atlassian.plugin.osgi.factory.OsgiPlugin;
18 import com.atlassian.sal.api.executor.ThreadLocalDelegateExecutorFactory;
19 import com.atlassian.tenancy.api.Tenant;
20 import com.atlassian.tenancy.api.TenantContext;
21 import com.atlassian.tenancy.api.event.TenantArrivedEvent;
22 import com.google.common.annotations.VisibleForTesting;
23 import com.google.common.base.Function;
24 import com.google.common.cache.CacheBuilder;
25 import com.google.common.cache.CacheLoader;
26 import com.google.common.cache.LoadingCache;
27 import com.google.common.collect.ImmutableList;
28 import org.osgi.framework.Bundle;
29 import org.osgi.framework.ServiceFactory;
30 import org.osgi.framework.ServiceRegistration;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 import org.springframework.beans.factory.DisposableBean;
34 import org.springframework.beans.factory.InitializingBean;
35
36 import javax.annotation.Nonnull;
37 import javax.annotation.Nullable;
38 import java.util.IdentityHashMap;
39 import java.util.Map;
40 import java.util.concurrent.ExecutorService;
41 import java.util.concurrent.ThreadFactory;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.locks.Lock;
44 import java.util.concurrent.locks.ReentrantLock;
45
46 import static com.google.common.base.Preconditions.checkNotNull;
47
48
49
50
51
52
53
54
55
56
57
58 public class ActiveObjectsServiceFactory implements ServiceFactory, InitializingBean, DisposableBean {
59 private static final Logger logger = LoggerFactory.getLogger(ActiveObjectsServiceFactory.class);
60
61 private static final String INIT_TASK_TIMEOUT_MS_PROPERTY = "ao-plugin.init.task.timeout";
62
63 @VisibleForTesting
64 protected static final int INIT_TASK_TIMEOUT_MS = Integer.getInteger(INIT_TASK_TIMEOUT_MS_PROPERTY, 30000);
65
66 private final EventPublisher eventPublisher;
67 private final TenantContext tenantContext;
68
69 @VisibleForTesting
70 final ThreadFactory aoContextThreadFactory;
71
72 @VisibleForTesting
73 final LoadingCache<Tenant, ExecutorService> initExecutorsByTenant;
74
75 @VisibleForTesting
76 volatile boolean destroying = false;
77
78 @VisibleForTesting
79 volatile boolean cleaning = false;
80
81 @VisibleForTesting
82 final Function<Tenant, ExecutorService> initExecutorFn;
83
84
85 @VisibleForTesting
86 final LoadingCache<BundleRef, TenantAwareActiveObjects> aoDelegatesByBundle;
87
88
89
90 @VisibleForTesting
91 final Map<Bundle, ActiveObjectsConfiguration> unattachedConfigByBundle = new IdentityHashMap<>();
92
93 private final Lock unattachedConfigsLock = new ReentrantLock();
94
95 public ActiveObjectsServiceFactory(
96 @Nonnull final ActiveObjectsFactory factory,
97 @Nonnull final EventPublisher eventPublisher,
98 @Nonnull final TenantContext tenantContext,
99 @Nonnull final ThreadLocalDelegateExecutorFactory threadLocalDelegateExecutorFactory,
100 @Nonnull final InitExecutorServiceProvider initExecutorServiceProvider) {
101 this.eventPublisher = checkNotNull(eventPublisher);
102 this.tenantContext = checkNotNull(tenantContext);
103 checkNotNull(factory);
104 checkNotNull(threadLocalDelegateExecutorFactory);
105 checkNotNull(initExecutorServiceProvider);
106
107
108 ClassLoader bundleContextClassLoader = Thread.currentThread().getContextClassLoader();
109 aoContextThreadFactory = new ContextClassLoaderThreadFactory(bundleContextClassLoader);
110
111
112 initExecutorsByTenant = CacheBuilder.newBuilder().build(new CacheLoader<Tenant, ExecutorService>() {
113 @Override
114 public ExecutorService load(@Nonnull final Tenant tenant) throws Exception {
115 logger.debug("creating new init executor for {}", tenant);
116 return initExecutorServiceProvider.initExecutorService(tenant);
117 }
118 });
119
120
121 initExecutorFn = new Function<Tenant, ExecutorService>() {
122 @Override
123 public ExecutorService apply(@Nullable final Tenant tenant) {
124 if (destroying) {
125 throw new IllegalStateException("applied initExecutorFn after ActiveObjectsServiceFactory destruction");
126 } else if (cleaning) {
127 throw new IllegalStateException("applied initExecutorFn during ActiveObjects cleaning");
128 }
129
130
131 checkNotNull(tenant);
132 return initExecutorsByTenant.getUnchecked(tenant);
133 }
134 };
135
136
137 aoDelegatesByBundle = CacheBuilder.newBuilder().build(new CacheLoader<BundleRef, TenantAwareActiveObjects>() {
138 @Override
139 public TenantAwareActiveObjects load(@Nonnull final BundleRef bundleRef) throws Exception {
140 TenantAwareActiveObjects delegate = new TenantAwareActiveObjects(bundleRef.bundle, factory, tenantContext, initExecutorFn);
141 delegate.init();
142 unattachedConfigsLock.lock();
143 try {
144 final ActiveObjectsConfiguration aoConfig = unattachedConfigByBundle.get(bundleRef.bundle);
145 if (aoConfig != null) {
146 delegate.setAoConfiguration(aoConfig);
147 unattachedConfigByBundle.remove(bundleRef.bundle);
148 }
149 } finally {
150 unattachedConfigsLock.unlock();
151 }
152 return delegate;
153 }
154 });
155 }
156
157 @Override
158 public void afterPropertiesSet() throws Exception {
159 logger.debug("afterPropertiesSet");
160
161
162 eventPublisher.register(this);
163 }
164
165 @Override
166 public void destroy() throws Exception {
167 logger.debug("destroy");
168
169 destroying = true;
170
171 for (ExecutorService initExecutor : initExecutorsByTenant.asMap().values()) {
172 initExecutor.shutdownNow();
173 }
174
175 for (TenantAwareActiveObjects aoDelegate : aoDelegatesByBundle.asMap().values()) {
176 aoDelegate.destroy();
177 }
178
179 eventPublisher.unregister(this);
180 }
181
182
183
184
185
186
187 @Override
188 public Object getService(Bundle bundle, ServiceRegistration serviceRegistration) {
189 checkNotNull(bundle);
190 logger.debug("getService bundle [{}]", bundle.getSymbolicName());
191
192 if (destroying) {
193 throw new IllegalStateException("getService after ActiveObjectsServiceFactory destruction");
194 }
195
196 return aoDelegatesByBundle.getUnchecked(new BundleRef(bundle));
197 }
198
199
200
201
202
203
204
205 @Override
206 public void ungetService(Bundle bundle, ServiceRegistration serviceRegistration, Object ao) {
207 checkNotNull(bundle);
208 logger.debug("ungetService bundle [{}]", bundle.getSymbolicName());
209
210 aoDelegatesByBundle.invalidate(new BundleRef(bundle));
211 if (ao instanceof TenantAwareActiveObjects) {
212 ((TenantAwareActiveObjects) ao).destroy();
213 }
214 }
215
216 public void startCleaning() {
217 logger.debug("startCleaning");
218
219 cleaning = true;
220
221 for (final ExecutorService initExecutor : initExecutorsByTenant.asMap().values()) {
222 initExecutor.shutdownNow();
223 try {
224 if (!initExecutor.awaitTermination(INIT_TASK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
225 logger.error("startCleaning timed out after {}ms awaiting init thread completion, continuing; note that this timeout may be adjusted via the system property '{}'", INIT_TASK_TIMEOUT_MS, INIT_TASK_TIMEOUT_MS_PROPERTY);
226 }
227 } catch (InterruptedException e) {
228 logger.error("startCleaning interrupted while awaiting running init thread completion, continuing", e);
229 }
230 }
231 }
232
233 public void stopCleaning() {
234 logger.debug("stopCleaning");
235
236 cleaning = false;
237 }
238
239
240
241
242 @SuppressWarnings("UnusedDeclaration")
243 @EventListener
244 public void onTenantArrived(TenantArrivedEvent event) {
245
246 Tenant tenant = tenantContext.getCurrentTenant();
247 logger.debug("onTenantArrived tenant arrived {}", tenant);
248
249 if (tenant != null) {
250 for (TenantAwareActiveObjects aoDelegate : ImmutableList.copyOf(aoDelegatesByBundle.asMap().values())) {
251 logger.debug("onTenantArrived starting AO delegate for bundle [{}]", aoDelegate.getBundle().getSymbolicName());
252 aoDelegate.startActiveObjects(tenant);
253 }
254 }
255 }
256
257
258
259
260
261 @SuppressWarnings("UnusedDeclaration")
262 @EventListener
263 public void onHotRestart(HotRestartEvent hotRestartEvent) {
264 Tenant tenant = tenantContext.getCurrentTenant();
265 logger.debug("onHotRestart performing hot restart with tenant {}", tenant);
266
267 if (tenant != null) {
268 final ExecutorService initExecutor = initExecutorsByTenant.getIfPresent(tenant);
269 initExecutorsByTenant.invalidate(tenant);
270 for (TenantAwareActiveObjects aoDelegate : ImmutableList.copyOf(aoDelegatesByBundle.asMap().values())) {
271 logger.debug("onHotRestart restarting AO delegate for bundle [{}]", aoDelegate.getBundle().getSymbolicName());
272 aoDelegate.restartActiveObjects(tenant);
273 }
274
275 if (initExecutor != null) {
276 logger.debug("onHotRestart terminating any initExecutor threads");
277 initExecutor.shutdownNow();
278 }
279 }
280 }
281
282
283
284
285
286 @SuppressWarnings("UnusedDeclaration")
287 @EventListener
288 public void onPluginModuleEnabledEvent(PluginModuleEnabledEvent pluginModuleEnabledEvent) {
289 final ModuleDescriptor moduleDescriptor = pluginModuleEnabledEvent.getModule();
290 if (moduleDescriptor instanceof ActiveObjectModuleDescriptor) {
291 final Plugin plugin = moduleDescriptor.getPlugin();
292 if (plugin instanceof OsgiPlugin) {
293 final Bundle bundle = ((OsgiPlugin) plugin).getBundle();
294 if (bundle != null) {
295
296 boolean attachedToDelegate = false;
297 final ActiveObjectsConfiguration aoConfig = ((ActiveObjectModuleDescriptor) moduleDescriptor).getConfiguration();
298
299 unattachedConfigsLock.lock();
300 try {
301 for (TenantAwareActiveObjects aoDelegate : aoDelegatesByBundle.asMap().values()) {
302 if (aoDelegate.getBundle().equals(bundle)) {
303 logger.debug("onPluginModuleEnabledEvent attaching <ao> configuration module to ActiveObjects service of [{}]", plugin);
304 aoDelegate.setAoConfiguration(aoConfig);
305 attachedToDelegate = true;
306 break;
307 }
308 }
309 if (!attachedToDelegate) {
310 logger.debug("onPluginModuleEnabledEvent storing unattached <ao> configuration module for [{}]", plugin);
311 unattachedConfigByBundle.put(bundle, aoConfig);
312 }
313 } finally {
314 unattachedConfigsLock.unlock();
315 }
316 }
317 }
318 }
319 }
320
321
322
323
324
325
326 @SuppressWarnings("UnusedDeclaration")
327 @EventListener
328 public void onPluginEnabledEvent(PluginEnabledEvent pluginEnabledEvent) {
329 final Plugin plugin = pluginEnabledEvent.getPlugin();
330 if (plugin instanceof OsgiPlugin) {
331 final Bundle bundle = ((OsgiPlugin) plugin).getBundle();
332 if (bundle != null) {
333 if (unattachedConfigByBundle.containsKey(bundle)) {
334 logger.debug("onPluginEnabledEvent attaching unbound <ao> to [{}]", plugin);
335
336
337 aoDelegatesByBundle.getUnchecked(new BundleRef(bundle));
338 }
339 }
340 }
341 }
342
343
344
345
346
347
348 @SuppressWarnings("UnusedDeclaration")
349 @EventListener
350 public void onPluginDisabledEvent(PluginDisabledEvent pluginDisabledEvent) {
351 final Plugin plugin = pluginDisabledEvent.getPlugin();
352 if (plugin instanceof OsgiPlugin) {
353 final Bundle bundle = ((OsgiPlugin) plugin).getBundle();
354 if (bundle != null) {
355 logger.debug("onPluginDisabledEvent removing delegate for [{}]", plugin);
356 aoDelegatesByBundle.invalidate(new BundleRef(bundle));
357
358 unattachedConfigsLock.lock();
359 try {
360 if (unattachedConfigByBundle.containsKey(bundle)) {
361 logger.debug("onPluginDisabledEvent removing unbound <ao> for [{}]", plugin);
362 unattachedConfigByBundle.remove(bundle);
363 }
364 } finally {
365 unattachedConfigsLock.unlock();
366 }
367 }
368 }
369 }
370
371
372
373
374
375 protected static class BundleRef {
376 final Bundle bundle;
377
378 public BundleRef(Bundle bundle) {
379 this.bundle = checkNotNull(bundle);
380 }
381
382 @Override
383 public boolean equals(final Object o) {
384 if (o == null || getClass() != o.getClass()) {
385 return false;
386 }
387
388 final BundleRef bundleRef = (BundleRef) o;
389
390 return bundle == bundleRef.bundle;
391 }
392
393 @Override
394 public int hashCode() {
395 return System.identityHashCode(bundle);
396 }
397 }
398 }