View Javadoc

1   /**
2    * Copyright 2008 Atlassian Pty Ltd 
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License"); 
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at 
7    * 
8    *     http://www.apache.org/licenses/LICENSE-2.0 
9    * 
10   * Unless required by applicable law or agreed to in writing, software 
11   * distributed under the License is distributed on an "AS IS" BASIS, 
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
13   * See the License for the specific language governing permissions and 
14   * limitations under the License.
15   */
16  
17  package com.atlassian.util.concurrent;
18  
19  import java.util.concurrent.Callable;
20  import java.util.concurrent.ConcurrentHashMap;
21  import java.util.concurrent.ConcurrentMap;
22  import java.util.concurrent.ExecutionException;
23  import java.util.concurrent.FutureTask;
24  
25  import net.jcip.annotations.ThreadSafe;
26  
27  @ThreadSafe
28  public class ConcurrentOperationMapImpl<K, R> implements ConcurrentOperationMap<K, R> {
29  
30      private final ConcurrentMap<K, CallerRunsFuture<R>> map = new ConcurrentHashMap<K, CallerRunsFuture<R>>();
31      private final Function<Callable<R>, CallerRunsFuture<R>> futureFactory;
32  
33      public ConcurrentOperationMapImpl() {
34          this(new Function<Callable<R>, CallerRunsFuture<R>>() {
35              public CallerRunsFuture<R> get(final Callable<R> input) {
36                  return new CallerRunsFuture<R>(input);
37              }
38          });
39      }
40  
41      ConcurrentOperationMapImpl(final Function<Callable<R>, CallerRunsFuture<R>> futureFactory) {
42          this.futureFactory = Assertions.notNull("futureFactory", futureFactory);
43      }
44  
45      public R runOperation(final K key, final Callable<R> operation) throws ExecutionException {
46          CallerRunsFuture<R> future = map.get(key);
47          while (future == null) {
48              map.putIfAbsent(key, futureFactory.get(operation));
49              future = map.get(key);
50          }
51          try {
52              return future.get();
53          } finally {
54              map.remove(key, future);
55          }
56      }
57  
58      static class CallerRunsFuture<T> extends FutureTask<T> {
59          CallerRunsFuture(final Callable<T> callable) {
60              super(callable);
61          }
62  
63          @Override
64          public T get() throws ExecutionException {
65              run();
66              try {
67                  return super.get();
68              } catch (final InterruptedException e) {
69                  // /CLOVER:OFF
70                  // how to test?
71                  throw new RuntimeInterruptedException(e);
72                  // /CLOVER:ON
73              } catch (final ExecutionException e) {
74                  final Throwable cause = e.getCause();
75                  if (cause instanceof RuntimeException) {
76                      throw (RuntimeException) cause;
77                  } else if (cause instanceof Error) {
78                      throw (Error) cause;
79                  } else {
80                      throw e;
81                  }
82              }
83          }
84      }
85  }