1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.mycore.services.queuedjob;
20
21 import java.util.AbstractQueue;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.Date;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.NoSuchElementException;
29 import java.util.Queue;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ConcurrentLinkedQueue;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.ScheduledExecutorService;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.locks.ReentrantLock;
36 import java.util.function.Function;
37
38 import org.apache.logging.log4j.LogManager;
39 import org.apache.logging.log4j.Logger;
40 import org.mycore.backend.jpa.MCREntityManagerProvider;
41 import org.mycore.common.config.MCRConfiguration2;
42 import org.mycore.common.events.MCRShutdownHandler;
43 import org.mycore.common.events.MCRShutdownHandler.Closeable;
44
45 import jakarta.persistence.EntityManager;
46 import jakarta.persistence.NoResultException;
47 import jakarta.persistence.Query;
48 import jakarta.persistence.TypedQuery;
49 import jakarta.persistence.criteria.CriteriaBuilder;
50 import jakarta.persistence.criteria.CriteriaQuery;
51 import jakarta.persistence.criteria.JoinType;
52 import jakarta.persistence.criteria.MapJoin;
53 import jakarta.persistence.criteria.Path;
54 import jakarta.persistence.criteria.Predicate;
55 import jakarta.persistence.criteria.Root;
56
57 public class MCRJobQueue extends AbstractQueue<MCRJob> implements Closeable {
58 private static Logger LOGGER = LogManager.getLogger(MCRJobQueue.class);
59
60 protected static Map<String, MCRJobQueue> INSTANCES = new ConcurrentHashMap<>();
61
62 protected static String CONFIG_PREFIX = "MCR.QueuedJob.";
63
64 protected static boolean singleQueue = MCRConfiguration2.getBoolean(CONFIG_PREFIX + "SingleQueue").orElse(true);
65
66 protected String configPrefixAdd = "";
67
68 private Class<? extends MCRJobAction> action;
69
70 private Queue<MCRJob> preFetch;
71
72 private ScheduledExecutorService stalledJobScheduler;
73
74 private final ReentrantLock pollLock;
75
76 private boolean running;
77
78 private MCRJobQueue(Class<? extends MCRJobAction> action) {
79 int waitTime = MCRConfiguration2.getInt(CONFIG_PREFIX + "TimeTillReset").orElse(10);
80 if (!singleQueue && action != null) {
81 this.action = action;
82 configPrefixAdd = action.getSimpleName();
83 if (configPrefixAdd.length() > 0) {
84 configPrefixAdd = configPrefixAdd.concat(".");
85 }
86 waitTime = MCRConfiguration2.getInt(CONFIG_PREFIX + configPrefixAdd + "TimeTillReset").orElse(waitTime);
87 }
88 waitTime = waitTime * 60;
89
90 stalledJobScheduler = Executors.newSingleThreadScheduledExecutor();
91 stalledJobScheduler.scheduleAtFixedRate(MCRStalledJobResetter.getInstance(this.action), waitTime, waitTime,
92 TimeUnit.SECONDS);
93 preFetch = new ConcurrentLinkedQueue<>();
94 running = true;
95 pollLock = new ReentrantLock();
96 MCRShutdownHandler.getInstance().addCloseable(this);
97 }
98
99
100
101
102
103
104
105 public static MCRJobQueue getInstance(Class<? extends MCRJobAction> action) {
106 String key = action != null && !singleQueue ? action.getName() : "single";
107 MCRJobQueue queue = INSTANCES.computeIfAbsent(key, k -> new MCRJobQueue(singleQueue ? null : action));
108
109 if (!queue.running) {
110 return null;
111 }
112 return queue;
113 }
114
115
116
117
118 @Override
119 public MCRJob poll() {
120 if (!running) {
121 return null;
122 }
123 try {
124 pollLock.lock();
125 MCRJob job = getElement();
126 if (job != null) {
127 job.setStart(new Date(System.currentTimeMillis()));
128 job.setStatus(MCRJobStatus.PROCESSING);
129 if (!updateJob(job)) {
130 job = null;
131 }
132 }
133 return job;
134 } finally {
135 pollLock.unlock();
136 }
137 }
138
139
140
141
142
143
144 @Override
145 public MCRJob remove() throws NoSuchElementException {
146 if (!running) {
147 return null;
148 }
149 MCRJob job = poll();
150 if (job == null) {
151 throw new NoSuchElementException();
152 }
153 return job;
154 }
155
156
157
158
159
160 @Override
161 public MCRJob peek() {
162 if (!running) {
163 return null;
164 }
165 return getElement();
166 }
167
168
169
170
171
172
173 @Override
174 public MCRJob element() throws NoSuchElementException {
175 if (!running) {
176 return null;
177 }
178 MCRJob job = peek();
179 if (job == null) {
180 throw new NoSuchElementException();
181 }
182 return job;
183 }
184
185
186
187
188
189
190 @Override
191 public boolean offer(MCRJob job) {
192 if (!running) {
193 return false;
194 }
195
196 if (job.getAction() == null && action != null) {
197 job.setAction(action);
198 }
199
200 MCRJob oldJob = getJob(job.getAction(), job.getParameters());
201 if (oldJob != null) {
202 job = oldJob;
203 } else {
204 job.setAdded(new Date());
205 }
206 job.setStatus(MCRJobStatus.NEW);
207 job.setStart(null);
208 if ((job.getId() == 0 && addJob(job)) || (updateJob(job))) {
209 notifyListener();
210 return true;
211 } else {
212 return false;
213 }
214 }
215
216
217
218
219 @Override
220 public void clear() {
221 if (!running) {
222 return;
223 }
224
225 EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
226
227 StringBuilder sb = new StringBuilder("DELETE FROM MCRJob");
228 if (action != null) {
229 sb.append(" WHERE action='").append(action.getName()).append('\'');
230 }
231
232 Query query = em.createQuery(sb.toString());
233 query.executeUpdate();
234 }
235
236
237
238
239
240
241 @Override
242 public Iterator<MCRJob> iterator() {
243 return iterator(MCRJobStatus.NEW);
244 }
245
246
247
248
249 public Iterator<MCRJob> iterator(MCRJobStatus status) {
250 if (!running) {
251 List<MCRJob> empty = Collections.emptyList();
252 return empty.iterator();
253 }
254 EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
255
256 CriteriaBuilder cb = em.getCriteriaBuilder();
257 CriteriaQuery<MCRJob> cq = cb.createQuery(MCRJob.class);
258 Root<MCRJob> root = cq.from(MCRJob.class);
259
260 List<Predicate> predicates = new ArrayList<>();
261 if (status != null) {
262 predicates.add(cb.equal(root.get("status"), status));
263 }
264 if (action != null) {
265 predicates.add(cb.equal(root.get("action"), action));
266 }
267 cq.where(cb.and(predicates.toArray(new Predicate[] {})));
268 cq.orderBy(cb.asc(root.get("added")));
269 cq.distinct(true);
270
271 TypedQuery<MCRJob> query = em.createQuery(cq);
272
273 return query.getResultList().iterator();
274 }
275
276
277
278
279 @Override
280 public int size() {
281 if (!running) {
282 return 0;
283 }
284 EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
285
286 StringBuilder sb = new StringBuilder("SELECT count(*) FROM MCRJob WHERE ");
287 if (action != null) {
288 sb.append("action='").append(action.getName()).append("' AND ");
289 }
290 sb.append("status='" + MCRJobStatus.NEW + "'");
291
292 return em.createQuery(sb.toString(), Number.class).getSingleResult().intValue();
293 }
294
295
296
297
298
299
300 public MCRJob getElementOutOfOrder(Class<? extends MCRJobAction> action, Map<String, String> params)
301 throws NoSuchElementException {
302 if (!running) {
303 return null;
304 }
305 MCRJob job = getJob(action, params);
306 if (job == null) {
307 return null;
308 }
309 job.setStart(new Date(System.currentTimeMillis()));
310 job.setStatus(MCRJobStatus.PROCESSING);
311 if (!updateJob(job)) {
312 throw new NoSuchElementException();
313 }
314 return job;
315 }
316
317
318
319
320
321
322
323 public MCRJob getJob(Map<String, String> params) {
324 return getJob(action, params);
325 }
326
327 private MCRJob getJob(Class<? extends MCRJobAction> action, Map<String, String> params) {
328 if (!running) {
329 return null;
330 }
331
332 return buildQuery(action, params, (q) -> {
333 try {
334 return q.getSingleResult();
335 } catch (NoResultException e) {
336 return null;
337 }
338
339 });
340 }
341
342
343
344
345
346
347
348
349 public List<MCRJob> getJobs(Map<String, String> params) {
350 return getJobs(action, params);
351 }
352
353 private List<MCRJob> getJobs(Class<? extends MCRJobAction> action, Map<String, String> params) {
354 if (!running) {
355 return null;
356 }
357
358 return buildQuery(action, params, TypedQuery::getResultList);
359 }
360
361
362
363
364
365
366
367 private <T> T buildQuery(Class<? extends MCRJobAction> action, Map<String, String> params,
368 Function<TypedQuery<MCRJob>, T> consumer) {
369 EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
370 CriteriaBuilder cb = em.getCriteriaBuilder();
371
372 CriteriaQuery<MCRJob> query = cb.createQuery(MCRJob.class);
373 Root<MCRJob> jobRoot = query.from(MCRJob.class);
374 query.select(jobRoot);
375
376 params.keySet().forEach(key -> {
377 MapJoin<MCRJob, String, String> parameterJoin = jobRoot.join(MCRJob_.parameters, JoinType.INNER);
378 Path<String> keyPath = parameterJoin.key();
379 Path<String> valuePath = parameterJoin.value();
380 parameterJoin.on(cb.equal(keyPath, key), cb.equal(valuePath, params.get(key)));
381 });
382
383 query.where(cb.equal(jobRoot.get(MCRJob_.action), action));
384 T result = consumer.apply(em.createQuery(query));
385 clearPreFetch();
386 return result;
387 }
388
389 private MCRJob getElement() {
390 if (!running) {
391 return null;
392 }
393 MCRJob job = getNextPrefetchedElement();
394 if (job != null) {
395 return job;
396 }
397 LOGGER.debug("No prefetched jobs available");
398 if (preFetch(MCRConfiguration2.getInt(CONFIG_PREFIX + "preFetchAmount").orElse(50)) == 0) {
399 return null;
400 }
401 return getNextPrefetchedElement();
402 }
403
404 private MCRJob getNextPrefetchedElement() {
405 MCRJob job = preFetch.poll();
406 LOGGER.debug("Fetched job: {}", job);
407 return job;
408 }
409
410 private int preFetch(int amount) {
411 EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
412
413 CriteriaBuilder cb = em.getCriteriaBuilder();
414 CriteriaQuery<MCRJob> cq = cb.createQuery(MCRJob.class);
415 Root<MCRJob> root = cq.from(MCRJob.class);
416
417 List<Predicate> predicates = new ArrayList<>();
418 predicates.add(cb.equal(root.get("status"), MCRJobStatus.NEW));
419 if (action != null) {
420 predicates.add(cb.equal(root.get("action"), action));
421 }
422 cq.where(cb.and(predicates.toArray(new Predicate[] {})));
423 cq.orderBy(cb.asc(root.get("added")));
424 cq.distinct(true);
425
426 TypedQuery<MCRJob> query = em.createQuery(cq);
427 query.setMaxResults(amount);
428
429 List<MCRJob> jobs = query.getResultList();
430
431 int i = 0;
432 for (MCRJob job : jobs) {
433 if (job.getParameters().isEmpty()) {
434 continue;
435 }
436
437 i++;
438 preFetch.add(job.clone());
439 em.detach(job);
440 }
441 LOGGER.debug("prefetched {} jobs", i);
442 return i;
443 }
444
445 private void clearPreFetch() {
446 preFetch.clear();
447 }
448
449 private boolean updateJob(MCRJob job) {
450 if (!running) {
451 return false;
452 }
453 EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
454 em.merge(job);
455 return true;
456 }
457
458 private boolean addJob(MCRJob job) {
459 if (!running) {
460 return false;
461 }
462 EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
463 em.persist(job);
464 return true;
465 }
466
467
468
469
470
471 public synchronized void notifyListener() {
472 this.notifyAll();
473
474 boolean autostart = MCRConfiguration2.getBoolean(CONFIG_PREFIX + "autostart").orElse(true);
475 autostart = MCRConfiguration2.getBoolean(CONFIG_PREFIX + configPrefixAdd + "autostart").orElse(autostart);
476
477 if (autostart) {
478 MCRJobMaster.startMasterThread(action);
479 }
480 }
481
482
483
484
485
486
487
488
489 public int remove(Class<? extends MCRJobAction> action, Map<String, String> params) {
490 if (!running) {
491 return 0;
492 }
493
494 EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
495
496 StringBuilder qStr = new StringBuilder("FROM MCRJob job WHERE action = '" + action.getName() + "' ");
497 for (String paramKey : params.keySet()) {
498 qStr.append(" AND job.parameters['")
499 .append(paramKey)
500 .append("'] = '")
501 .append(params.get(paramKey))
502 .append('\'');
503 }
504
505 Query query = em.createQuery(qStr.toString());
506
507 @SuppressWarnings("unchecked")
508 Iterator<MCRJob> results = query.getResultList().iterator();
509 if (!results.hasNext()) {
510 return 0;
511 }
512
513 MCRJob job = results.next();
514
515 try {
516 em.remove(job);
517 em.detach(job);
518 return 1;
519 } finally {
520 clearPreFetch();
521 }
522 }
523
524
525
526
527
528
529
530 public int remove(Class<? extends MCRJobAction> action) {
531 if (!running) {
532 return 0;
533 }
534
535 EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
536
537 Query query = em.createQuery("FROM MCRJob job WHERE action = '" + action.getName() + "'");
538
539 @SuppressWarnings("unchecked")
540 Iterator<MCRJob> results = query.getResultList().iterator();
541 if (!results.hasNext()) {
542 return 0;
543 }
544 try {
545 int delC = 0;
546 while (results.hasNext()) {
547 MCRJob job = results.next();
548
549 em.remove(job);
550 em.detach(job);
551 delC++;
552 }
553 return delC;
554 } finally {
555 clearPreFetch();
556 }
557 }
558
559
560
561
562 @Override
563 public void prepareClose() {
564 stalledJobScheduler.shutdownNow();
565 running = false;
566 try {
567 stalledJobScheduler.awaitTermination(60, TimeUnit.SECONDS);
568 } catch (InterruptedException e) {
569 LOGGER.info("Could not wait for 60 seconds...");
570 stalledJobScheduler.shutdownNow();
571 }
572 }
573
574
575
576
577 @Override
578 public void close() {
579
580 }
581
582
583
584
585 @Override
586 public String toString() {
587 return "MCRJobQueue";
588 }
589
590 @Override
591 public int getPriority() {
592 return MCRShutdownHandler.Closeable.DEFAULT_PRIORITY;
593 }
594 }