View Javadoc
1   package com.atlassian.streams.refapp;
2   
3   import com.atlassian.sal.api.ApplicationProperties;
4   import com.atlassian.sal.api.UrlMode;
5   import com.atlassian.streams.api.ActivityRequest;
6   import com.atlassian.streams.api.StreamsEntry;
7   import com.atlassian.streams.api.UserProfile;
8   import com.atlassian.streams.api.common.NonEmptyIterables;
9   import com.atlassian.streams.refapp.api.StreamsActivityManager;
10  import com.atlassian.streams.refapp.api.StreamsEntryRequest;
11  import com.atlassian.streams.spi.Filters;
12  import com.atlassian.streams.spi.StreamsI18nResolver;
13  import com.google.common.base.Predicate;
14  import com.google.common.collect.ImmutableList;
15  
16  import java.net.URI;
17  import java.util.Arrays;
18  import java.util.concurrent.locks.Lock;
19  import java.util.concurrent.locks.ReentrantLock;
20  
21  import static com.atlassian.streams.api.common.Iterables.take;
22  import static com.atlassian.streams.api.common.Option.some;
23  import static com.atlassian.streams.spi.Filters.anyInUsers;
24  import static com.atlassian.streams.spi.Filters.entriesInActivities;
25  import static com.atlassian.streams.spi.Filters.entryAuthors;
26  import static com.atlassian.streams.spi.Filters.notInUsers;
27  import static com.google.common.base.Preconditions.checkNotNull;
28  import static com.google.common.base.Predicates.and;
29  import static com.google.common.collect.Iterables.filter;
30  import static com.google.common.collect.Iterables.find;
31  
32  /**
33   * An in-memory storage service to developer to test features of activities streams in refapp, e.g. create entry, filter
34   * entry....
35   */
36  public class RefappStreamsActivityManager implements StreamsActivityManager {
37      /**
38       * Size of the memory buffer to store events.
39       */
40      public static final int BUFFER_SIZE = 100;
41  
42      private final ApplicationProperties applicationProperties;
43      private final StreamsI18nResolver i18nResolver;
44      private final RefappRenderer refappRenderer;
45  
46      /**
47       * Array implementation of circular buffer.
48       */
49      private StreamsEntry[] buffer = new StreamsEntry[BUFFER_SIZE];
50      /**
51       * A global log to control concurrent access to the buffer.
52       */
53      private final Lock bufferLock = new ReentrantLock();
54      /**
55       * This keep position to put the next element into the buffer. After adding a new element, this position is increased or circled to the beginning when the buffer is full.
56       */
57      private int position = 0;
58  
59      public RefappStreamsActivityManager(ApplicationProperties applicationProperties,
60                                          StreamsI18nResolver i18nResolver,
61                                          RefappRenderer refappRenderer) {
62          this.applicationProperties = checkNotNull(applicationProperties, "applicationProperties");
63          this.i18nResolver = checkNotNull(i18nResolver, "i18nResolver");
64          this.refappRenderer = checkNotNull(refappRenderer, "refappRenderer");
65      }
66  
67  
68      @Override
69      public Iterable<StreamsEntry> getEntries(final ActivityRequest activityRequest) {
70          // Standard filters: posted date and user
71          // Provider filter: activity.
72          final Iterable filteredResult = filter(getBufferSnapshot(), and(
73                          inDateRange(activityRequest),
74                          entryAuthors(notInUsers(activityRequest)),
75                          entryAuthors(anyInUsers(activityRequest)),
76                          entriesInActivities(activityRequest))
77          );
78          return take(activityRequest.getMaxResults(), filteredResult);
79      }
80  
81  
82      @Override
83      public void addEntry(StreamsEntryRequest entryRequest) {
84          bufferLock.lock();
85          try {
86              buffer[position] = createEntry(entryRequest);
87              // Move to next position, circle at the end of the array.
88              // With this, the next position will overwrite the oldest item when the buffer is full.
89              position = (position + 1) % buffer.length;
90          } finally {
91              bufferLock.unlock();
92          }
93      }
94  
95      @Override
96      public boolean removeEntry(final int id) {
97          // After removing, we reorder the buffer by item's age decreasing, null elements are placed at the end of the buffer.
98          // This is quite complicated, but it make we perform add operation more faster. Add is much more frequent operation, as expected.
99          bufferLock.lock();
100         try {
101             final boolean isBufferCircled = buffer[position] != null;
102             // Copy and uncircle the buffer if it is already circled.
103             final StreamsEntry[] copy;
104             if (isBufferCircled) {
105                 copy = copyUncircledBuffer();
106             } else {
107                 copy = buffer;
108             }
109 
110             // Find the position of the element to remove.
111             int removePos = 0;
112             Predicate<StreamsEntry> findEntry = findEntryById(id);
113             for (; removePos < copy.length && copy[removePos] != null; removePos++) {
114                 if (findEntry.apply(copy[removePos])) {
115                     break;
116                 }
117             }
118             if (removePos == copy.length || copy[removePos] == null) {
119                 return false;
120             }
121 
122             // Shift back to remove the position and move the null element to the end of the array.
123             int numberOfElements = isBufferCircled ? buffer.length : position;
124             int shiftSize = numberOfElements - removePos - 1;
125             System.arraycopy(copy, removePos + 1, copy, removePos, shiftSize);
126             copy[removePos + shiftSize] = null;
127 
128             position = removePos + shiftSize;
129             buffer = copy;
130             return true;
131         } finally {
132             bufferLock.unlock();
133         }
134     }
135 
136     @Override
137     public StreamsEntry getEntry(final int id) {
138         return find(getBufferSnapshot(), findEntryById(id), null);
139     }
140 
141     @Override
142     public URI buildUriId(final int id) {
143         return URI.create(applicationProperties.getBaseUrl(UrlMode.ABSOLUTE) + "/refapp-streams/" + id);
144     }
145 
146     private Predicate<StreamsEntry> findEntryById(final int id) {
147         return new Predicate<StreamsEntry>() {
148             @Override
149             public boolean apply(final StreamsEntry input) {
150                 return input.getId().equals(buildUriId(id));
151             }
152         };
153     }
154 
155     private StreamsEntry createEntry(StreamsEntryRequest entryInput) {
156         return new StreamsEntry(StreamsEntry.params()
157                 .id(buildUriId(entryInput.getId()))
158                 .postedDate(entryInput.getPostedDate())
159                 .applicationType("com.atlassian.refimpl")
160                 .alternateLinkUri(buildUriId(entryInput.getId()))
161                 .authors(NonEmptyIterables.from(ImmutableList.of(new UserProfile.Builder(entryInput.getUser()).build())).get())
162                 .verb(entryInput.getVerb())
163                 .addActivityObject(new StreamsEntry.ActivityObject(StreamsEntry.ActivityObject.params()
164                         .id("activity-object-" + entryInput.getId())
165                         .activityObjectType(entryInput.getType())
166                         .title(some(entryInput.getTitle()))))
167                 .renderer(refappRenderer), i18nResolver);
168     }
169 
170     private Iterable<StreamsEntry> getBufferSnapshot() {
171         // Take a snapshot of buffer before search on that snapshot to get rid of concurrent access on the buffer.
172         final StreamsEntry[] copy;
173         bufferLock.lock();
174         try {
175             // When the next position is non-null, it means that the buffer is already full and circled.
176             final boolean isBufferCircled = buffer[position] != null;
177             if (isBufferCircled) {
178                 // Copy the buffer and uncycle it in case it is full and circled.
179                 copy = copyUncircledBuffer();
180             } else {
181                 // In case it is not full yet, we do want to exclude the null items at the end of the buffers.
182                 copy = new StreamsEntry[position];
183                 System.arraycopy(buffer, 0, copy, 0, position);
184             }
185 
186         } finally {
187             bufferLock.unlock();
188         }
189         return Arrays.asList(copy);
190     }
191 
192     private StreamsEntry[] copyUncircledBuffer() {
193         final StreamsEntry[] copy = new StreamsEntry[buffer.length];
194         System.arraycopy(buffer, position, copy, 0, buffer.length - position);
195         System.arraycopy(buffer, 0, copy, buffer.length - position, position);
196         return copy;
197     }
198 
199     private static Predicate<StreamsEntry> inDateRange(final ActivityRequest activityRequest) {
200         return new Predicate<StreamsEntry>() {
201             @Override
202             public boolean apply(final StreamsEntry input) {
203                 if (input.getPostedDate() == null) {
204                     return false;
205                 }
206                 return Filters.inDateRange(activityRequest).apply(input.getPostedDate().toDate());
207             }
208         };
209     }
210 }