1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.atlassian.util.concurrent;
18
19 import net.jcip.annotations.ThreadSafe;
20
21 import java.util.concurrent.Callable;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ConcurrentMap;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.FutureTask;
26
27 @ThreadSafe public class ConcurrentOperationMapImpl<K, R> implements ConcurrentOperationMap<K, R> {
28
29 private final ConcurrentMap<K, CallerRunsFuture<R>> map = new ConcurrentHashMap<K, CallerRunsFuture<R>>();
30 private final Function<Callable<R>, CallerRunsFuture<R>> futureFactory;
31
32 public ConcurrentOperationMapImpl() {
33 this(new Function<Callable<R>, CallerRunsFuture<R>>() {
34 public CallerRunsFuture<R> get(final Callable<R> input) {
35 return new CallerRunsFuture<R>(input);
36 }
37 });
38 }
39
40 ConcurrentOperationMapImpl(final Function<Callable<R>, CallerRunsFuture<R>> futureFactory) {
41 this.futureFactory = Assertions.notNull("futureFactory", futureFactory);
42 }
43
44 public R runOperation(final K key, final Callable<R> operation) throws ExecutionException {
45 CallerRunsFuture<R> future = map.get(key);
46 while (future == null) {
47 map.putIfAbsent(key, futureFactory.get(operation));
48 future = map.get(key);
49 }
50 try {
51 return future.get();
52 }
53 finally {
54 map.remove(key, future);
55 }
56 }
57
58 static class CallerRunsFuture<T> extends FutureTask<T> {
59 CallerRunsFuture(final Callable<T> callable) {
60 super(callable);
61 }
62
63 @Override public T get() throws ExecutionException {
64 run();
65 try {
66 return super.get();
67 }
68 catch (final InterruptedException e) {
69 throw new RuntimeInterruptedException(e);
70 }
71 catch (final ExecutionException e) {
72 final Throwable cause = e.getCause();
73 if (cause instanceof RuntimeException) {
74 throw (RuntimeException) cause;
75 }
76 else if (cause instanceof Error) {
77 throw (Error) cause;
78 }
79 else {
80 throw e;
81 }
82 }
83 }
84 }
85 }