BrightSide Workbench Full Report + Source Code
MailQueueConsumer.java
Go to the documentation of this file.
1 /*
2  * TurrĂ³ i Cutiller Foundation. License notice.
3  * Copyright (C) 2022 Lluis TurrĂ³ Cutiller <http://www.turro.org/>
4  *
5  * This program is free software: you can redistribute it and/or modify
6  * it under the terms of the GNU Affero General Public License as published by
7  * the Free Software Foundation, either version 3 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU Affero General Public License for more details.
14  *
15  * You should have received a copy of the GNU Affero General Public License
16  * along with this program. If not, see <http://www.gnu.org/licenses/>.
17  */
18 
19 package org.turro.mail.pool;
20 
21 import java.util.Collection;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import java.util.function.Consumer;
27 import org.turro.action.IElephantCloseable;
28 import org.turro.annotation.ElephantCloseable;
29 import org.turro.log.WebLoggers;
30 import org.turro.mail.message.MailMessage;
31 
36 @ElephantCloseable
37 public class MailQueueConsumer implements Runnable, IElephantCloseable {
38 
39  public void addToQueue(Collection<MailMessage> pool) {
40  addToQueue(pool, null);
41  }
42 
43  public void addToQueue(Collection<MailMessage> pool, Consumer onFinish) {
44  if(pool == null) return;
45  int index = 0, size = pool.size();
46  for(MailMessage mail : pool) {
47  if(++index == size) {
48  addToQueue(mail, onFinish);
49  } else {
50  addToQueue(mail);
51  }
52  }
53  }
54 
55  public void addToQueue(MailMessage mail) {
56  addToQueue(mail, null);
57  }
58 
59  public void addToQueue(MailMessage mail, Consumer onFinish) {
60  try {
61  if(!QUEUE.offer(new MailQueueItem(mail, onFinish), 60L, TimeUnit.MINUTES)) {
62  WebLoggers.warning(this).message("Too long attempt").log();
63  }
64  } catch (InterruptedException ex) {
65  WebLoggers.severe(this).exception(ex).log();
66  }
67  }
68 
69  public int size() {
70  return QUEUE.size();
71  }
72 
73  private void addTrigger() {
74  QUEUE.add(new MailQueueItem(null, null));
75  }
76 
77  /* Sending */
78 
79  public void run() {
80  try {
81  running.set(true);
82  while (running.get()) {
83  MailQueueItem item = QUEUE.take();
84  if(!running.get()) break;
85  try {
86  item.send();
87  } catch(Exception ex) {
88  WebLoggers.severe(this).exception(ex).log();
89  }
90  Thread.sleep(200);
91  }
92  } catch (InterruptedException e) {
93  Thread.currentThread().interrupt();
94  }
95  }
96 
97  /* Pool */
98 
99  private static final BlockingQueue<MailQueueItem> QUEUE = new LinkedBlockingQueue<>(500);
100 
101  /* Sender instance */
102 
103  private static MailQueueConsumer INSTANCE;
104  private static Thread MAIL_QUEUE;
105  private static final AtomicBoolean running = new AtomicBoolean(false);
106 
107  public static MailQueueConsumer getInstance() {
108  if(INSTANCE == null) {
109  INSTANCE = new MailQueueConsumer();
110  MAIL_QUEUE = new Thread(INSTANCE, "ElephantMailQueue");
111  MAIL_QUEUE.start();
112  }
113  return INSTANCE;
114  }
115 
116  public static void destroy() {
117  if(INSTANCE != null) {
118  running.set(false);
119  getInstance().addTrigger();
120  INSTANCE = null;
121  }
122  }
123 
128  @Deprecated(forRemoval = false)
129  public MailQueueConsumer() {
130  }
131 
132  /* Closeable */
133 
134  @Override
135  public void closeElephant() {
137  }
138 
139 }
140 
static WebLoggers warning(Object entity)
Definition: WebLoggers.java:47
WebLoggers message(String text, Object... parameters)
Definition: WebLoggers.java:34
static WebLoggers severe(Object entity)
Definition: WebLoggers.java:51
WebLoggers exception(Throwable throwable)
Definition: WebLoggers.java:29
static MailQueueConsumer getInstance()
void addToQueue(MailMessage mail, Consumer onFinish)
void addToQueue(Collection< MailMessage > pool)
void addToQueue(Collection< MailMessage > pool, Consumer onFinish)