View Javadoc

1   package com.atlassian.core.task;
2   
3   import org.slf4j.Logger;
4   import org.slf4j.LoggerFactory;
5   
6   import javax.mail.MessagingException;
7   import java.io.Serializable;
8   import java.util.ArrayList;
9   import java.util.List;
10  import java.util.concurrent.atomic.AtomicInteger;
11  
12  public class AbstractErrorQueuedTaskQueue extends AbstractTaskQueue implements TaskQueueWithErrorQueue {
13      private static final transient Logger log = LoggerFactory.getLogger(AbstractErrorQueuedTaskQueue.class);
14  
15      private final TaskQueue errorQueue;
16  
17      private int retryCount = 5;
18  
19      private List<Task> failed;
20  
21      public AbstractErrorQueuedTaskQueue(TaskQueue errorQueue, FifoBuffer<Task> buffer) {
22          super(buffer);
23          this.errorQueue = errorQueue;
24      }
25  
26      public void flush() {
27          failed = new ArrayList<Task>();
28          super.flush();
29          for (Task task : failed) {
30              addTask(task);
31          }
32      }
33  
34      protected void handleException(Task task, Exception rootException) {
35          TaskDecorator theTask = (TaskDecorator) task;
36  
37          if (theTask.getExecutionCount() > retryCount) {
38  
39              errorQueue.addTask(theTask.getTask());
40          } else {
41  
42              failed.add(task);
43          }
44          if (rootException instanceof MessagingException) {
45              Exception e = rootException;
46              while (e instanceof MessagingException) {
47                  MessagingException me = (MessagingException) e;
48                  log.error(me.getMessage(), me);
49                  e = me.getNextException();
50              }
51          } else
52              log.error("Failed to execute task", rootException);
53      }
54  
55      public void addTask(Task task) {
56          if (task instanceof TaskDecorator) {
57              super.addTask(task);
58          } else {
59              super.addTask(new TaskDecorator(task));
60          }
61      }
62  
63      public TaskQueue getErrorQueue() {
64          return errorQueue;
65      }
66  
67      public int getRetryCount() {
68          return retryCount;
69      }
70  
71      public void setRetryCount(int retryCount) {
72          this.retryCount = retryCount;
73      }
74  
75      public static class TaskDecorator implements Task, Serializable {
76          private final Task task;
77          private final AtomicInteger executionCount = new AtomicInteger();
78  
79          public TaskDecorator(Task task) {
80              this.task = task;
81          }
82  
83          public void execute() throws Exception {
84              executionCount.incrementAndGet();
85              task.execute();
86          }
87  
88          public int getExecutionCount() {
89              return executionCount.get();
90          }
91  
92          public Task getTask() {
93              return task;
94          }
95      }
96  }