1
2
3
4
5
6
7
8 package com.atlassian.mail.queue;
9
10 import com.atlassian.mail.MailException;
11 import org.apache.log4j.Category;
12
13 import java.sql.Timestamp;
14 import java.util.ArrayList;
15 import java.util.List;
16 import java.util.Queue;
17 import java.util.concurrent.PriorityBlockingQueue;
18
19
20
21
22
23
24 public class MailQueueImpl implements MailQueue
25 {
26 private static final Category log = Category.getInstance(MailQueueImpl.class);
27 private static final int MAX_SEND_ATTEMPTS = 10;
28 private Queue<MailQueueItem> items;
29 private Queue<MailQueueItem> errorItems;
30 private boolean sending;
31 private MailQueueItem itemBeingSent;
32 private Timestamp sendingStarted;
33
34 public MailQueueImpl()
35 {
36 items = new PriorityBlockingQueue<MailQueueItem>();
37 errorItems = new PriorityBlockingQueue<MailQueueItem>();
38 }
39
40 public void sendBuffer()
41 {
42 if (sending)
43 {
44 log.warn("Already sending "+items.size()+" mails:");
45 for (final MailQueueItem item : items)
46 {
47 log.warn("Queued to send: " + item + ", " + item.getClass());
48 }
49 return;
50 }
51
52 sendingStarted();
53 List<MailQueueItem> failed = new ArrayList<MailQueueItem>();
54
55 try
56 {
57 while (!items.isEmpty())
58 {
59 String origThreadName = Thread.currentThread().getName();
60 MailQueueItem item = items.poll();
61 this.itemBeingSent = item;
62 log.debug("Sending: " + item);
63 try
64 {
65 Thread.currentThread().setName("Sending mailitem "+item);
66 item.send();
67 }
68 catch (MailException e)
69 {
70 if (item.getSendCount() > MAX_SEND_ATTEMPTS)
71 errorItems.add(item);
72 else
73 failed.add(item);
74
75 log.error("Error occurred in sending e-mail: " + item, e);
76 }
77 finally
78 {
79 Thread.currentThread().setName(origThreadName);
80 }
81 }
82
83 items.addAll(failed);
84 }
85 finally
86 {
87
88 sendingStopped();
89 }
90 }
91
92 public int size()
93 {
94 return items.size();
95 }
96
97 public int errorSize()
98 {
99 return errorItems.size();
100 }
101
102 public void addItem(MailQueueItem item)
103 {
104 log.debug("Queued: " + item);
105 items.add(item);
106 }
107
108 public void addErrorItem(MailQueueItem item)
109 {
110 log.debug("Queued error: " + item);
111 errorItems.add(item);
112 }
113
114 public Queue<MailQueueItem> getQueue()
115 {
116 return items;
117 }
118
119 public Queue<MailQueueItem> getErrorQueue()
120 {
121 return errorItems;
122 }
123
124 public boolean isSending()
125 {
126 return sending;
127 }
128
129 public Timestamp getSendingStarted()
130 {
131 return sendingStarted;
132 }
133
134 public MailQueueItem getItemBeingSent()
135 {
136 return itemBeingSent;
137 }
138
139 public void unstickQueue() {
140 log.error("Mail on queue was considered stuck: " + itemBeingSent);
141 sendingStopped();
142 }
143
144 public void emptyErrorQueue()
145 {
146 errorItems.clear();
147 }
148
149 public void resendErrorQueue()
150 {
151 items.addAll(errorItems);
152 emptyErrorQueue();
153 }
154
155 public void sendingStarted()
156 {
157 sending = true;
158 sendingStarted = new Timestamp(System.currentTimeMillis());
159 }
160
161 public void sendingStopped()
162 {
163 sending = false;
164 sendingStarted = null;
165 }
166 }