1 package com.atlassian.core.task;
2
3 import org.apache.log4j.Logger;
4
5 import javax.mail.MessagingException;
6 import java.io.Serializable;
7 import java.util.ArrayList;
8 import java.util.List;
9 import java.util.concurrent.atomic.AtomicInteger;
10
11 public class AbstractErrorQueuedTaskQueue extends AbstractTaskQueue implements TaskQueueWithErrorQueue
12 {
13 private static final transient Logger log = Logger.getLogger(AbstractErrorQueuedTaskQueue.class);
14
15 private final TaskQueue errorQueue;
16
17 private int retryCount = 5;
18
19 private List<Task> failed;
20
21 public AbstractErrorQueuedTaskQueue(TaskQueue errorQueue, FifoBuffer<Task> buffer)
22 {
23 super(buffer);
24 this.errorQueue = errorQueue;
25 }
26
27 public void flush()
28 {
29 failed = new ArrayList<Task>();
30 super.flush();
31 for (Task task : failed)
32 {
33 addTask(task);
34 }
35 }
36
37 protected void handleException(Task task, Exception rootException)
38 {
39 TaskDecorator theTask = (TaskDecorator)task;
40
41 if (theTask.getExecutionCount() > retryCount) {
42
43 errorQueue.addTask(theTask.getTask());
44 }else {
45
46 failed.add(task);
47 }
48 if (rootException instanceof MessagingException)
49 {
50 Exception e = rootException;
51 while (e instanceof MessagingException)
52 {
53 MessagingException me = (MessagingException)e;
54 log.error(me.getMessage(), me);
55 e = me.getNextException();
56 }
57 }
58 else
59 log.error(rootException, rootException);
60 }
61
62 public void addTask(Task task)
63 {
64 if(task instanceof TaskDecorator)
65 {
66 super.addTask(task);
67 } else {
68 super.addTask(new TaskDecorator(task));
69 }
70 }
71
72 public TaskQueue getErrorQueue()
73 {
74 return errorQueue;
75 }
76
77 public int getRetryCount()
78 {
79 return retryCount;
80 }
81
82 public void setRetryCount(int retryCount)
83 {
84 this.retryCount = retryCount;
85 }
86
87 public static class TaskDecorator implements Task, Serializable
88 {
89 private final Task task;
90 private final AtomicInteger executionCount = new AtomicInteger();
91
92 public TaskDecorator(Task task)
93 {
94 this.task = task;
95 }
96
97 public void execute() throws Exception
98 {
99 executionCount.incrementAndGet();
100 task.execute();
101 }
102
103 public int getExecutionCount()
104 {
105 return executionCount.get();
106 }
107
108 public Task getTask()
109 {
110 return task;
111 }
112 }
113 }