1   /**
2    * Created by IntelliJ IDEA.
3    * User: Mike Cannon-Brookes
4    * Date: Jan 22, 2003
5    * Time: 12:39:32 AM
6    * To change this template use Options | File Templates.
7    */
8   package com.atlassian.mail.queue;
9   
10  import com.atlassian.mail.MailException;
11  import org.apache.log4j.Category;
12  
13  import java.sql.Timestamp;
14  import java.util.ArrayList;
15  import java.util.List;
16  import java.util.Queue;
17  import java.util.concurrent.PriorityBlockingQueue;
18  
19  /**
20   * This is a volatile queue of the outgoing emails from JIRA.
21   * <p/>
22   * Note - this class may lose emails if the server shuts down.
23   */
24  public class MailQueueImpl implements MailQueue
25  {
26      private static final Category log = Category.getInstance(MailQueueImpl.class);
27      private static final int MAX_SEND_ATTEMPTS = 10;
28      private Queue<MailQueueItem> items;
29      private Queue<MailQueueItem> errorItems;
30      private boolean sending;
31      private MailQueueItem itemBeingSent;
32      private Timestamp sendingStarted;
33  
34      public MailQueueImpl()
35      {
36          items = new PriorityBlockingQueue<MailQueueItem>();
37          errorItems = new PriorityBlockingQueue<MailQueueItem>();
38      }
39  
40      public void sendBuffer()
41      {
42          if (sending)
43          {
44              log.warn("Already sending "+items.size()+" mails:");
45              for (final MailQueueItem item : items)
46              {
47                  log.warn("Queued to send: " + item + ", " + item.getClass());
48              }
49              return;
50          }
51  
52          sendingStarted();
53          List<MailQueueItem> failed = new ArrayList<MailQueueItem>();
54  
55          try
56          {
57              while (!items.isEmpty())
58              {
59                  String origThreadName = Thread.currentThread().getName();
60                  MailQueueItem item = items.poll();
61                  this.itemBeingSent = item;
62                  log.debug("Sending: " + item);
63                  try
64                  {
65                      Thread.currentThread().setName("Sending mailitem "+item);
66                      item.send();
67                  }
68                  catch (MailException e)
69                  {
70                      if (item.getSendCount() > MAX_SEND_ATTEMPTS)
71                          errorItems.add(item);
72                      else
73                          failed.add(item);
74  
75                      log.error("Error occurred in sending e-mail: " + item, e);
76                  }
77                  finally
78                  {
79                      Thread.currentThread().setName(origThreadName);
80                  }
81              }
82  
83              items.addAll(failed);
84          }
85          finally
86          {
87              // Set sending to false no matter what happens
88              sendingStopped();
89          }
90      }
91  
92      public int size()
93      {
94          return items.size();
95      }
96  
97      public int errorSize()
98      {
99          return errorItems.size();
100     }
101 
102     public void addItem(MailQueueItem item)
103     {
104         log.debug("Queued: " + item);
105         items.add(item);
106     }
107 
108     public void addErrorItem(MailQueueItem item)
109     {
110         log.debug("Queued error: " + item);
111         errorItems.add(item);
112     }
113 
114     public Queue<MailQueueItem> getQueue()
115     {
116         return items;
117     }
118 
119     public Queue<MailQueueItem> getErrorQueue()
120     {
121         return errorItems;
122     }
123 
124     public boolean isSending()
125     {
126         return sending;
127     }
128 
129     public Timestamp getSendingStarted()
130     {
131         return sendingStarted;
132     }
133 
134     public MailQueueItem getItemBeingSent()
135     {
136         return itemBeingSent;
137     }
138 
139     public void unstickQueue() {
140         log.error("Mail on queue was considered stuck: " + itemBeingSent);
141         sendingStopped();
142     }
143 
144     public void emptyErrorQueue()
145     {
146         errorItems.clear();
147     }
148 
149     public void resendErrorQueue()
150     {
151         items.addAll(errorItems);
152         emptyErrorQueue();
153     }
154 
155     public void sendingStarted()
156     {
157         sending = true;
158         sendingStarted = new Timestamp(System.currentTimeMillis());
159     }
160 
161     public void sendingStopped()
162     {
163         sending = false;
164         sendingStarted = null;
165     }
166 }