1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.mycore.services.queuedjob;
20
21 import java.lang.reflect.Constructor;
22 import java.util.HashMap;
23 import java.util.Map;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.ThreadFactory;
26 import java.util.concurrent.ThreadPoolExecutor;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.atomic.AtomicInteger;
29 import java.util.concurrent.locks.ReentrantLock;
30
31 import org.apache.logging.log4j.LogManager;
32 import org.apache.logging.log4j.Logger;
33 import org.mycore.backend.jpa.MCREntityManagerProvider;
34 import org.mycore.common.MCRSession;
35 import org.mycore.common.MCRSessionMgr;
36 import org.mycore.common.MCRSystemUserInformation;
37 import org.mycore.common.config.MCRConfiguration2;
38 import org.mycore.common.events.MCRShutdownHandler;
39 import org.mycore.common.events.MCRShutdownHandler.Closeable;
40 import org.mycore.common.processing.MCRProcessableCollection;
41 import org.mycore.common.processing.MCRProcessableDefaultCollection;
42 import org.mycore.common.processing.MCRProcessableRegistry;
43 import org.mycore.util.concurrent.processing.MCRProcessableExecutor;
44 import org.mycore.util.concurrent.processing.MCRProcessableFactory;
45
46 import jakarta.persistence.EntityManager;
47 import jakarta.persistence.EntityTransaction;
48 import jakarta.persistence.PersistenceException;
49 import jakarta.persistence.RollbackException;
50
51
52
53
54
55
56 public class MCRJobMaster implements Runnable, Closeable {
57
58 private static Map<String, MCRJobMaster> INSTANCES = new HashMap<>();
59
60 private static Logger LOGGER = LogManager.getLogger(MCRJobMaster.class);
61
62 private final MCRJobQueue jobQueue;
63
64 private Class<? extends MCRJobAction> action;
65
66 private MCRProcessableExecutor jobServe;
67
68 private MCRProcessableDefaultCollection processableCollection;
69
70 private volatile boolean running = true;
71
72 private ReentrantLock runLock;
73
74 private MCRJobMaster(Class<? extends MCRJobAction> action) {
75 MCRShutdownHandler.getInstance().addCloseable(this);
76 this.action = action;
77 runLock = new ReentrantLock();
78 jobQueue = MCRJobQueue.getInstance(action);
79
80 MCRProcessableRegistry registry = MCRProcessableRegistry.getSingleInstance();
81 processableCollection = new MCRProcessableDefaultCollection(getName());
82 registry.register(processableCollection);
83 }
84
85
86
87
88
89
90
91 public static MCRJobMaster getInstance(Class<? extends MCRJobAction> action) {
92 String key = action != null && !MCRJobQueue.singleQueue ? action.getName() : "single";
93 MCRJobMaster master = INSTANCES.computeIfAbsent(key,
94 k -> new MCRJobMaster(MCRJobQueue.singleQueue ? null : action));
95
96 if (!master.running) {
97 return null;
98 }
99 return master;
100 }
101
102
103
104
105
106
107 public static boolean isRunning(Class<? extends MCRJobAction> action) {
108 String key = action != null && !MCRJobQueue.singleQueue ? action.getName() : "single";
109 MCRJobMaster master = INSTANCES.get(key);
110
111 return master != null && master.running;
112 }
113
114
115
116
117
118
119 public static void startMasterThread(Class<? extends MCRJobAction> action) {
120 if (!isRunning(action)) {
121 LOGGER.info("Starting job master thread{}\".", action == null ? "" : " for action \"" + action.getName());
122 final Thread master = new Thread(getInstance(action));
123 master.start();
124 }
125 }
126
127
128
129
130
131
132 @Override
133 public void run() {
134 Thread.currentThread().setName(getName());
135
136 MCRSessionMgr.unlock();
137 MCRSession mcrSession = MCRSessionMgr.getCurrentSession();
138 mcrSession.setUserInformation(MCRSystemUserInformation.getSystemUserInstance());
139
140 boolean activated = MCRConfiguration2.getBoolean(MCRJobQueue.CONFIG_PREFIX + "activated").orElse(true);
141 activated = activated
142 && MCRConfiguration2.getBoolean(MCRJobQueue.CONFIG_PREFIX + jobQueue.configPrefixAdd + "activated")
143 .orElse(true);
144
145 LOGGER.info("JobQueue{} is {}", MCRJobQueue.singleQueue ? "" : " for \"" + action.getName() + "\"",
146 activated ? "activated" : "deactivated");
147 if (activated) {
148 running = true;
149 int jobThreadCount = MCRConfiguration2.getInt(MCRJobQueue.CONFIG_PREFIX + "JobThreads").orElse(2);
150 jobThreadCount = MCRConfiguration2
151 .getInt(MCRJobQueue.CONFIG_PREFIX + jobQueue.configPrefixAdd + "JobThreads").orElse(jobThreadCount);
152
153 ThreadFactory slaveFactory = new ThreadFactory() {
154 AtomicInteger tNum = new AtomicInteger();
155
156 ThreadGroup tg = new ThreadGroup("MCRJob slave job thread group");
157
158 public Thread newThread(Runnable r) {
159 return new Thread(tg, r, getPreLabel() + "Slave#" + tNum.incrementAndGet());
160 }
161 };
162 final AtomicInteger activeThreads = new AtomicInteger();
163 final LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
164 ThreadPoolExecutor executor = new ThreadPoolExecutor(jobThreadCount, jobThreadCount, 1, TimeUnit.DAYS,
165 workQueue,
166 slaveFactory) {
167
168 @Override
169 protected void afterExecute(Runnable r, Throwable t) {
170 super.afterExecute(r, t);
171 activeThreads.decrementAndGet();
172 }
173
174 @Override
175 protected void beforeExecute(Thread t, Runnable r) {
176 super.beforeExecute(t, r);
177 activeThreads.incrementAndGet();
178 }
179 };
180
181 jobServe = MCRProcessableFactory.newPool(executor, processableCollection);
182 processableCollection.setProperty("running", running);
183
184 LOGGER.info("JobMaster{} with {} thread(s) is started",
185 MCRJobQueue.singleQueue ? "" : " for \"" + action.getName() + "\"", jobThreadCount);
186 while (running) {
187 try {
188 while (activeThreads.get() < jobThreadCount) {
189 runLock.lock();
190 try {
191 if (!running) {
192 break;
193 }
194
195 EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
196 EntityTransaction transaction = em.getTransaction();
197
198 MCRJob job = null;
199 MCRJobAction action = null;
200 try {
201 transaction.begin();
202
203 job = jobQueue.poll();
204 processableCollection.setProperty("queue size", jobQueue.size());
205
206 if (job != null) {
207 action = toMCRJobAction(job.getAction());
208
209 if (action != null && !action.isActivated()) {
210 job.setStatus(MCRJobStatus.NEW);
211 job.setStart(null);
212 }
213 }
214
215 transaction.commit();
216 } catch (RollbackException e) {
217 LOGGER.error("Error while getting next job.", e);
218 if (transaction != null) {
219 try {
220 transaction.rollback();
221 } catch (RuntimeException re) {
222 LOGGER.warn("Could not rollback transaction.", re);
223 }
224 }
225 } finally {
226 em.close();
227 }
228 if (job != null && action != null && action.isActivated()
229 && !jobServe.getExecutor().isShutdown()) {
230 LOGGER.info("Creating:{}", job);
231 jobServe.submit(new MCRJobThread(job));
232 } else {
233 try {
234 synchronized (jobQueue) {
235 if (running) {
236 LOGGER.debug("No job in queue going to sleep");
237
238
239 jobQueue.wait(60000);
240 }
241 }
242 } catch (InterruptedException e) {
243 LOGGER.error("Job thread was interrupted.", e);
244 }
245 }
246 } finally {
247 runLock.unlock();
248 }
249 }
250 if (activeThreads.get() < jobThreadCount) {
251 try {
252 LOGGER.info("Waiting for a job to finish");
253 Thread.sleep(1000);
254 } catch (InterruptedException e) {
255 LOGGER.error("Job thread was interrupted.", e);
256 }
257 }
258 } catch (PersistenceException e) {
259 LOGGER.warn("We have an database error, sleep and run later.", e);
260 try {
261 Thread.sleep(60000);
262 } catch (InterruptedException ie) {
263 LOGGER.error("Waiting for database was interrupted.", ie);
264 }
265 } catch (Throwable e) {
266 LOGGER.error("Keep running while catching exceptions.", e);
267 }
268 }
269 processableCollection.setProperty("running", running);
270 }
271 LOGGER.info("{} thread finished", getName());
272 MCRSessionMgr.releaseCurrentSession();
273 }
274
275
276
277
278 public void prepareClose() {
279 LOGGER.info("Closing master thread");
280
281 running = false;
282
283 synchronized (jobQueue) {
284 LOGGER.debug("Wake up queue");
285 jobQueue.notifyAll();
286 }
287 runLock.lock();
288 try {
289 if (jobServe != null) {
290 LOGGER.debug("Shutdown executor jobs.");
291 jobServe.getExecutor().shutdown();
292 try {
293 LOGGER.debug("Await termination of executor jobs.");
294 jobServe.getExecutor().awaitTermination(60, TimeUnit.SECONDS);
295 LOGGER.debug("All jobs finished.");
296 } catch (InterruptedException e) {
297 LOGGER.debug("Could not wait 60 seconds...", e);
298 }
299 }
300 } finally {
301 runLock.unlock();
302 }
303 }
304
305
306
307
308 public void close() {
309 if (jobServe != null && !jobServe.getExecutor().isShutdown()) {
310 LOGGER.info("We are in a hurry, closing service right now");
311 jobServe.getExecutor().shutdownNow();
312 try {
313 jobServe.getExecutor().awaitTermination(60, TimeUnit.SECONDS);
314 } catch (InterruptedException e) {
315 LOGGER.debug("Could not wait 60 seconds...", e);
316 }
317 }
318 }
319
320 @Override
321 public int getPriority() {
322 return MCRShutdownHandler.Closeable.DEFAULT_PRIORITY - 1;
323 }
324
325 protected String getPreLabel() {
326 return (MCRJobQueue.singleQueue ? "Job" : action.getSimpleName());
327 }
328
329
330
331
332
333
334 public String getName() {
335 return getPreLabel() + " Master";
336 }
337
338
339
340
341
342
343 public MCRProcessableCollection getProcessableCollection() {
344 return processableCollection;
345 }
346
347 private static MCRJobAction toMCRJobAction(Class<? extends MCRJobAction> actionClass) {
348 try {
349 Constructor<? extends MCRJobAction> actionConstructor = actionClass.getConstructor();
350
351 return actionConstructor.newInstance();
352 } catch (Exception e) {
353 LOGGER.error(e.getMessage(), e);
354 }
355
356 return null;
357 }
358 }