View Javadoc

1   package com.atlassian.event.internal;
2   
3   import com.atlassian.event.api.EventPublisher;
4   import com.atlassian.event.config.ListenerHandlersConfiguration;
5   import com.atlassian.event.spi.EventDispatcher;
6   import com.atlassian.event.spi.ListenerHandler;
7   import com.atlassian.event.spi.ListenerInvoker;
8   import com.google.common.base.Function;
9   import com.google.common.cache.CacheBuilder;
10  import com.google.common.cache.CacheLoader;
11  import com.google.common.cache.LoadingCache;
12  import com.google.common.collect.ImmutableList;
13  import com.google.common.collect.ImmutableSet;
14  import com.google.common.collect.MapMaker;
15  import org.slf4j.Logger;
16  import org.slf4j.LoggerFactory;
17  
18  import javax.annotation.Nonnull;
19  import java.util.List;
20  import java.util.Set;
21  import java.util.concurrent.ConcurrentMap;
22  
23  import static com.google.common.base.Preconditions.checkNotNull;
24  import static com.google.common.collect.Iterables.concat;
25  import static com.google.common.collect.Iterables.transform;
26  
27  /**
28   * A non-blocking implementation of the {@link com.atlassian.event.api.EventPublisher} interface.
29   * <p>
30   * This class is a drop-in replacement for {@link EventPublisherImpl} except that it does not
31   * synchronise on the internal map of event type to {@link ListenerInvoker}, and should handle
32   * much higher parallelism of event dispatch.
33   * <p>
34   * One can customise the event listening by instantiating with custom
35   * {@link com.atlassian.event.spi.ListenerHandler listener handlers} and the event dispatching through
36   * {@link com.atlassian.event.spi.EventDispatcher}. See the {@link com.atlassian.event.spi} package
37   * for more information.
38   *
39   * @see com.atlassian.event.spi.ListenerHandler
40   * @see com.atlassian.event.spi.EventDispatcher
41   * @since 2.0.2
42   */
43  public final class LockFreeEventPublisher implements EventPublisher {
44      /**
45       * Gets the {@link ListenerInvoker invokers} for a listener
46       */
47      private final InvokerBuilder invokerBuilder;
48  
49      /**
50       * Publishes an event.
51       */
52      private final Publisher publisher;
53  
54      /**
55       * <strong>Note:</strong> this field makes this implementation stateful
56       */
57      private final Listeners listeners = new Listeners();
58  
59      public LockFreeEventPublisher(final EventDispatcher eventDispatcher, final ListenerHandlersConfiguration listenerHandlersConfiguration) {
60          this(eventDispatcher, listenerHandlersConfiguration, new InvokerTransformer() {
61              @Nonnull
62              @Override
63              public Iterable<ListenerInvoker> transformAll(@Nonnull Iterable<ListenerInvoker> invokers, @Nonnull Object event) {
64                  return invokers;
65              }
66          });
67      }
68  
69      /**
70       * If you need to customise the asynchronous handling, you should use the
71       * {@link com.atlassian.event.internal.AsynchronousAbleEventDispatcher}
72       * together with a custom executor.
73       * <p>
74       * You might also want to have a look at using the
75       * {@link com.atlassian.event.internal.EventThreadFactory} to keep the naming
76       * of event threads consistent with the default naming of the Atlassian Event
77       * library.
78       *
79       * @param eventDispatcher               the event dispatcher to be used with the publisher
80       * @param listenerHandlersConfiguration the list of listener handlers to be used with this publisher
81       * @param transformer                   the batcher for batching up listener invocations
82       * @see com.atlassian.event.internal.AsynchronousAbleEventDispatcher
83       * @see com.atlassian.event.internal.EventThreadFactory
84       */
85  
86      public LockFreeEventPublisher(final EventDispatcher eventDispatcher,
87                                    final ListenerHandlersConfiguration listenerHandlersConfiguration,
88                                    final InvokerTransformer transformer) {
89          invokerBuilder = new InvokerBuilder(checkNotNull(listenerHandlersConfiguration).getListenerHandlers());
90          publisher = new Publisher(eventDispatcher, listeners, transformer);
91      }
92  
93      public void publish(final @Nonnull Object event) {
94          checkNotNull(event);
95          publisher.dispatch(event);
96      }
97  
98      public void register(final @Nonnull Object listener) {
99          checkNotNull(listener);
100         listeners.register(listener, invokerBuilder.build(listener));
101     }
102 
103     public void unregister(final @Nonnull Object listener) {
104         checkNotNull(listener);
105         listeners.remove(listener);
106     }
107 
108     public void unregisterAll() {
109         listeners.clear();
110     }
111 
112     //
113     // inner classes
114     //
115 
116     /**
117      * Maps classes to the relevant {@link Invokers}
118      */
119     static final class Listeners {
120         /**
121          * We always want an {@link Invokers} created for any class requested, even if it is empty.
122          * <b>Warning:</b> We need to use weakKeys here to ensure plugin event classes are GC'd, otherwise we leak...
123          */
124         private final LoadingCache<Class<?>, Invokers> invokers = CacheBuilder.newBuilder().build(new CacheLoader<Class<?>, Invokers>() {
125             @Override
126             public Invokers load(final Class<?> key) throws Exception {
127                 return new Invokers();
128             }
129         });
130 
131         void register(final Object listener, final Iterable<ListenerInvoker> invokers) {
132             for (final ListenerInvoker invoker : invokers) {
133                 register(listener, invoker);
134             }
135         }
136 
137         private void register(final Object listener, final ListenerInvoker invoker) {
138             // if supported classes is empty, then all events are supported.
139             if (invoker.getSupportedEventTypes().isEmpty()) {
140                 invokers.getUnchecked(Object.class).add(listener, invoker);
141             } else {
142                 // if it it empty, we won't loop, otherwise register the invoker against all its classes
143                 for (final Class<?> eventClass : invoker.getSupportedEventTypes()) {
144                     invokers.getUnchecked(eventClass).add(listener, invoker);
145                 }
146             }
147         }
148 
149         void remove(final Object listener) {
150             for (final Invokers entry : ImmutableList.copyOf(invokers.asMap().values())) {
151                 entry.remove(listener);
152             }
153         }
154 
155         void clear() {
156             invokers.invalidateAll();
157         }
158 
159         public Iterable<ListenerInvoker> get(final Class<?> eventClass) {
160             return invokers.getUnchecked(eventClass).all();
161         }
162     }
163 
164     /**
165      * map of Key to Set of ListenerInvoker
166      */
167     static final class Invokers {
168         private final ConcurrentMap<Object, ListenerInvoker> listeners = new MapMaker().weakKeys().makeMap();
169 
170         Iterable<ListenerInvoker> all() {
171             return listeners.values();
172         }
173 
174         public void remove(final Object key) {
175             listeners.remove(key);
176         }
177 
178         public void add(final Object key, final ListenerInvoker invoker) {
179             listeners.put(key, invoker);
180         }
181     }
182 
183     /**
184      * Responsible for publishing an event.
185      * <p>
186      * Must first get the Set of all ListenerInvokers that
187      * are registered for that event and then use the
188      * {@link EventDispatcher} to send the event to them.
189      */
190     static final class Publisher {
191         private final Logger log = LoggerFactory.getLogger(this.getClass());
192         private final Listeners listeners;
193         private final EventDispatcher dispatcher;
194         private final InvokerTransformer transformer;
195 
196         /**
197          * transform an event class into the relevant invokers
198          */
199         @SuppressWarnings("unchecked")
200         private final Function<Class, Iterable<ListenerInvoker>> eventClassToInvokersTransformer = new Function<Class, Iterable<ListenerInvoker>>() {
201             public Iterable<ListenerInvoker> apply(final Class eventClass) {
202                 return listeners.get(eventClass);
203             }
204         };
205 
206         Publisher(final EventDispatcher dispatcher, final Listeners listeners, final InvokerTransformer transformer) {
207             this.dispatcher = checkNotNull(dispatcher);
208             this.listeners = checkNotNull(listeners);
209             this.transformer = checkNotNull(transformer);
210         }
211 
212         public void dispatch(final Object event) {
213             Iterable<ListenerInvoker> invokers = getInvokers(event);
214             try {
215                 invokers = transformer.transformAll(invokers, event);
216             } catch (Exception e) {
217                 log.error("Exception while transforming invokers. Dispatching original invokers instead.", e);
218             }
219             for (final ListenerInvoker invoker : invokers) {
220                 // EVENT-14 -  we should continue to process all listeners even if one throws some horrible exception
221                 try {
222                     dispatcher.dispatch(invoker, event);
223                 } catch (Exception e) {
224                     log.error("There was an exception thrown trying to dispatch event '" + event +
225                             "' from the invoker '" + invoker + "'.", e);
226                 }
227             }
228         }
229 
230         /**
231          * Get all classes and interfaces an object extends or implements and then find all ListenerInvokers that apply
232          *
233          * @param event to find its classes/interfaces
234          * @return an iterable of the invokers for those classes.
235          */
236         Iterable<ListenerInvoker> getInvokers(final Object event) {
237             final Set<Class<?>> allEventTypes = ClassUtils.findAllTypes(event.getClass());
238             return ImmutableSet.copyOf(concat(transform(allEventTypes, eventClassToInvokersTransformer)));
239         }
240     }
241 
242     /**
243      * Holds all configured {@link ListenerHandler handlers}
244      */
245     static final class InvokerBuilder {
246         private final Iterable<ListenerHandler> listenerHandlers;
247 
248         InvokerBuilder(final @Nonnull Iterable<ListenerHandler> listenerHandlers) {
249             this.listenerHandlers = checkNotNull(listenerHandlers);
250         }
251 
252         Iterable<ListenerInvoker> build(final Object listener) throws IllegalArgumentException {
253             final ImmutableList.Builder<ListenerInvoker> builder = ImmutableList.builder();
254             for (final ListenerHandler listenerHandler : listenerHandlers) {
255                 builder.addAll(listenerHandler.getInvokers(listener));
256             }
257             final List<ListenerInvoker> invokers = builder.build();
258             if (invokers.isEmpty()) {
259                 throw new IllegalArgumentException("No listener invokers were found for listener <" + listener + ">");
260             }
261             return invokers;
262         }
263     }
264 }