1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.atlassian.util.concurrent;
18
19 import java.util.concurrent.Callable;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ConcurrentMap;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.FutureTask;
24
25 import net.jcip.annotations.ThreadSafe;
26
27 @ThreadSafe
28 public class ConcurrentOperationMapImpl<K, R> implements ConcurrentOperationMap<K, R> {
29
30 private final ConcurrentMap<K, CallerRunsFuture<R>> map = new ConcurrentHashMap<K, CallerRunsFuture<R>>();
31 private final Function<Callable<R>, CallerRunsFuture<R>> futureFactory;
32
33 public ConcurrentOperationMapImpl() {
34 this(new Function<Callable<R>, CallerRunsFuture<R>>() {
35 public CallerRunsFuture<R> get(final Callable<R> input) {
36 return new CallerRunsFuture<R>(input);
37 }
38 });
39 }
40
41 ConcurrentOperationMapImpl(final Function<Callable<R>, CallerRunsFuture<R>> futureFactory) {
42 this.futureFactory = Assertions.notNull("futureFactory", futureFactory);
43 }
44
45 public R runOperation(final K key, final Callable<R> operation) throws ExecutionException {
46 CallerRunsFuture<R> future = map.get(key);
47 while (future == null) {
48 map.putIfAbsent(key, futureFactory.get(operation));
49 future = map.get(key);
50 }
51 try {
52 return future.get();
53 } finally {
54 map.remove(key, future);
55 }
56 }
57
58 static class CallerRunsFuture<T> extends FutureTask<T> {
59 CallerRunsFuture(final Callable<T> callable) {
60 super(callable);
61 }
62
63 @Override
64 public T get() throws ExecutionException {
65 run();
66 try {
67 return super.get();
68 } catch (final InterruptedException e) {
69
70
71 throw new RuntimeInterruptedException(e);
72
73 } catch (final ExecutionException e) {
74 final Throwable cause = e.getCause();
75 if (cause instanceof RuntimeException) {
76 throw (RuntimeException) cause;
77 } else if (cause instanceof Error) {
78 throw (Error) cause;
79 } else {
80 throw e;
81 }
82 }
83 }
84 }
85 }