View Javadoc

1   package com.atlassian.core.task;
2   
3   import org.apache.log4j.Logger;
4   
5   import javax.mail.MessagingException;
6   import java.io.Serializable;
7   import java.util.ArrayList;
8   import java.util.List;
9   import java.util.concurrent.atomic.AtomicInteger;
10  
11  public class AbstractErrorQueuedTaskQueue extends AbstractTaskQueue implements TaskQueueWithErrorQueue
12  {
13      private static final transient Logger log = Logger.getLogger(AbstractErrorQueuedTaskQueue.class);
14  
15      private final TaskQueue errorQueue;
16  
17      private int retryCount = 5;
18  
19      private List<Task> failed;
20  
21      public AbstractErrorQueuedTaskQueue(TaskQueue errorQueue, FifoBuffer<Task> buffer)
22      {
23          super(buffer);
24          this.errorQueue = errorQueue;
25      }
26  
27      public void flush()
28      {
29          failed = new ArrayList<Task>();
30          super.flush();
31          for (Task task : failed)
32          {
33              addTask(task);
34          }
35      }
36  
37      protected void handleException(Task task, Exception rootException)
38      {
39          TaskDecorator theTask = (TaskDecorator)task;
40  
41          if (theTask.getExecutionCount() > retryCount) {
42  
43              errorQueue.addTask(theTask.getTask());
44          }else {
45  
46              failed.add(task);
47          }
48          if (rootException instanceof MessagingException)
49          {
50              Exception e = rootException;
51              while (e instanceof MessagingException)
52              {
53                  MessagingException me = (MessagingException)e;
54                  log.error(me.getMessage(), me);
55                  e = me.getNextException();
56              }
57          }
58          else
59              log.error(rootException, rootException);
60      }
61  
62      public void addTask(Task task)
63      {
64          if(task instanceof TaskDecorator)
65          {
66              super.addTask(task);
67          } else {
68              super.addTask(new TaskDecorator(task));
69          }
70      }
71  
72      public TaskQueue getErrorQueue()
73      {
74          return errorQueue;
75      }
76  
77      public int getRetryCount()
78      {
79          return retryCount;
80      }
81  
82      public void setRetryCount(int retryCount)
83      {
84          this.retryCount = retryCount;
85      }
86  
87      public static class TaskDecorator implements Task, Serializable
88      {
89          private final Task task;
90          private final AtomicInteger executionCount = new AtomicInteger();
91  
92          public TaskDecorator(Task task)
93          {
94              this.task = task;
95          }
96  
97          public void execute() throws Exception
98          {
99              executionCount.incrementAndGet();
100             task.execute();
101         }
102 
103         public int getExecutionCount()
104         {
105             return executionCount.get();
106         }
107 
108         public Task getTask()
109         {
110             return task;
111         }
112     }
113 }