View Javadoc
1   /*
2    * This file is part of ***  M y C o R e  ***
3    * See http://www.mycore.de/ for details.
4    *
5    * MyCoRe is free software: you can redistribute it and/or modify
6    * it under the terms of the GNU General Public License as published by
7    * the Free Software Foundation, either version 3 of the License, or
8    * (at your option) any later version.
9    *
10   * MyCoRe is distributed in the hope that it will be useful,
11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   * GNU General Public License for more details.
14   *
15   * You should have received a copy of the GNU General Public License
16   * along with MyCoRe.  If not, see <http://www.gnu.org/licenses/>.
17   */
18  
19  package org.mycore.services.queuedjob;
20  
21  import java.util.AbstractQueue;
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.Date;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.NoSuchElementException;
29  import java.util.Queue;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.ConcurrentLinkedQueue;
32  import java.util.concurrent.Executors;
33  import java.util.concurrent.ScheduledExecutorService;
34  import java.util.concurrent.TimeUnit;
35  import java.util.concurrent.locks.ReentrantLock;
36  import java.util.function.Function;
37  
38  import org.apache.logging.log4j.LogManager;
39  import org.apache.logging.log4j.Logger;
40  import org.mycore.backend.jpa.MCREntityManagerProvider;
41  import org.mycore.common.config.MCRConfiguration2;
42  import org.mycore.common.events.MCRShutdownHandler;
43  import org.mycore.common.events.MCRShutdownHandler.Closeable;
44  
45  import jakarta.persistence.EntityManager;
46  import jakarta.persistence.NoResultException;
47  import jakarta.persistence.Query;
48  import jakarta.persistence.TypedQuery;
49  import jakarta.persistence.criteria.CriteriaBuilder;
50  import jakarta.persistence.criteria.CriteriaQuery;
51  import jakarta.persistence.criteria.JoinType;
52  import jakarta.persistence.criteria.MapJoin;
53  import jakarta.persistence.criteria.Path;
54  import jakarta.persistence.criteria.Predicate;
55  import jakarta.persistence.criteria.Root;
56  
57  public class MCRJobQueue extends AbstractQueue<MCRJob> implements Closeable {
58      private static Logger LOGGER = LogManager.getLogger(MCRJobQueue.class);
59  
60      protected static Map<String, MCRJobQueue> INSTANCES = new ConcurrentHashMap<>();
61  
62      protected static String CONFIG_PREFIX = "MCR.QueuedJob.";
63  
64      protected static boolean singleQueue = MCRConfiguration2.getBoolean(CONFIG_PREFIX + "SingleQueue").orElse(true);
65  
66      protected String configPrefixAdd = "";
67  
68      private Class<? extends MCRJobAction> action;
69  
70      private Queue<MCRJob> preFetch;
71  
72      private ScheduledExecutorService stalledJobScheduler;
73  
74      private final ReentrantLock pollLock;
75  
76      private boolean running;
77  
78      private MCRJobQueue(Class<? extends MCRJobAction> action) {
79          int waitTime = MCRConfiguration2.getInt(CONFIG_PREFIX + "TimeTillReset").orElse(10);
80          if (!singleQueue && action != null) {
81              this.action = action;
82              configPrefixAdd = action.getSimpleName();
83              if (configPrefixAdd.length() > 0) {
84                  configPrefixAdd = configPrefixAdd.concat(".");
85              }
86              waitTime = MCRConfiguration2.getInt(CONFIG_PREFIX + configPrefixAdd + "TimeTillReset").orElse(waitTime);
87          }
88          waitTime = waitTime * 60;
89  
90          stalledJobScheduler = Executors.newSingleThreadScheduledExecutor();
91          stalledJobScheduler.scheduleAtFixedRate(MCRStalledJobResetter.getInstance(this.action), waitTime, waitTime,
92              TimeUnit.SECONDS);
93          preFetch = new ConcurrentLinkedQueue<>();
94          running = true;
95          pollLock = new ReentrantLock();
96          MCRShutdownHandler.getInstance().addCloseable(this);
97      }
98  
99      /**
100      * Returns an singleton instance of this class.
101      *
102      * @param action the {@link MCRJobAction} or <code>null</code>
103      * @return singleton instance of this class
104      */
105     public static MCRJobQueue getInstance(Class<? extends MCRJobAction> action) {
106         String key = action != null && !singleQueue ? action.getName() : "single";
107         MCRJobQueue queue = INSTANCES.computeIfAbsent(key, k -> new MCRJobQueue(singleQueue ? null : action));
108 
109         if (!queue.running) {
110             return null;
111         }
112         return queue;
113     }
114 
115     /**
116      * @return next available job instance
117      */
118     @Override
119     public MCRJob poll() {
120         if (!running) {
121             return null;
122         }
123         try {
124             pollLock.lock();
125             MCRJob job = getElement();
126             if (job != null) {
127                 job.setStart(new Date(System.currentTimeMillis()));
128                 job.setStatus(MCRJobStatus.PROCESSING);
129                 if (!updateJob(job)) {
130                     job = null;
131                 }
132             }
133             return job;
134         } finally {
135             pollLock.unlock();
136         }
137     }
138 
139     /**
140      * removes next job.
141      * same as {@link #poll()} but never returns null
142      * @throws NoSuchElementException if {@link #poll()} would return null
143      */
144     @Override
145     public MCRJob remove() throws NoSuchElementException {
146         if (!running) {
147             return null;
148         }
149         MCRJob job = poll();
150         if (job == null) {
151             throw new NoSuchElementException();
152         }
153         return job;
154     }
155 
156     /**
157      * get next job without modifying it state to {@link MCRJobStatus#PROCESSING}
158      * @return the next job
159      */
160     @Override
161     public MCRJob peek() {
162         if (!running) {
163             return null;
164         }
165         return getElement();
166     }
167 
168     /**
169      * removes next job.
170      * same as {@link #peek()} but never returns null
171      * @throws NoSuchElementException if {@link #peek()} would return null
172      */
173     @Override
174     public MCRJob element() throws NoSuchElementException {
175         if (!running) {
176             return null;
177         }
178         MCRJob job = peek();
179         if (job == null) {
180             throw new NoSuchElementException();
181         }
182         return job;
183     }
184 
185     /**
186      * adds {@link MCRJob} to queue and starts {@link MCRJobMaster} if
187      * <code>"MCR.QueuedJob.autostart"</code> is set <code>true</code>.
188      * alters date added to current time and status of job to {@link MCRJobStatus#NEW}
189      */
190     @Override
191     public boolean offer(MCRJob job) {
192         if (!running) {
193             return false;
194         }
195 
196         if (job.getAction() == null && action != null) {
197             job.setAction(action);
198         }
199 
200         MCRJob oldJob = getJob(job.getAction(), job.getParameters());
201         if (oldJob != null) {
202             job = oldJob;
203         } else {
204             job.setAdded(new Date());
205         }
206         job.setStatus(MCRJobStatus.NEW);
207         job.setStart(null);
208         if ((job.getId() == 0 && addJob(job)) || (updateJob(job))) {
209             notifyListener();
210             return true;
211         } else {
212             return false;
213         }
214     }
215 
216     /**
217      * Deletes all jobs no matter what the current state is.
218      */
219     @Override
220     public void clear() {
221         if (!running) {
222             return;
223         }
224 
225         EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
226 
227         StringBuilder sb = new StringBuilder("DELETE FROM MCRJob");
228         if (action != null) {
229             sb.append(" WHERE action='").append(action.getName()).append('\'');
230         }
231 
232         Query query = em.createQuery(sb.toString());
233         query.executeUpdate();
234     }
235 
236     /**
237      * iterates over jobs of status {@link MCRJobStatus#NEW}
238      *
239      * does not change the status.
240      */
241     @Override
242     public Iterator<MCRJob> iterator() {
243         return iterator(MCRJobStatus.NEW);
244     }
245 
246     /**
247      * Builds iterator for jobs with given {@link MCRJobStatus} or <code>null</code> for all jobs.
248      */
249     public Iterator<MCRJob> iterator(MCRJobStatus status) {
250         if (!running) {
251             List<MCRJob> empty = Collections.emptyList();
252             return empty.iterator();
253         }
254         EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
255 
256         CriteriaBuilder cb = em.getCriteriaBuilder();
257         CriteriaQuery<MCRJob> cq = cb.createQuery(MCRJob.class);
258         Root<MCRJob> root = cq.from(MCRJob.class);
259 
260         List<Predicate> predicates = new ArrayList<>();
261         if (status != null) {
262             predicates.add(cb.equal(root.get("status"), status));
263         }
264         if (action != null) {
265             predicates.add(cb.equal(root.get("action"), action));
266         }
267         cq.where(cb.and(predicates.toArray(new Predicate[] {})));
268         cq.orderBy(cb.asc(root.get("added")));
269         cq.distinct(true);
270 
271         TypedQuery<MCRJob> query = em.createQuery(cq);
272 
273         return query.getResultList().iterator();
274     }
275 
276     /**
277      * returns the current size of this queue
278      */
279     @Override
280     public int size() {
281         if (!running) {
282             return 0;
283         }
284         EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
285 
286         StringBuilder sb = new StringBuilder("SELECT count(*) FROM MCRJob WHERE ");
287         if (action != null) {
288             sb.append("action='").append(action.getName()).append("' AND ");
289         }
290         sb.append("status='" + MCRJobStatus.NEW + "'");
291 
292         return em.createQuery(sb.toString(), Number.class).getSingleResult().intValue();
293     }
294 
295     /**
296      * get the specific job and alters it status to {@link MCRJobStatus#PROCESSING}
297      *
298      * @param action the {@link MCRJobAction}
299      */
300     public MCRJob getElementOutOfOrder(Class<? extends MCRJobAction> action, Map<String, String> params)
301         throws NoSuchElementException {
302         if (!running) {
303             return null;
304         }
305         MCRJob job = getJob(action, params);
306         if (job == null) {
307             return null;
308         }
309         job.setStart(new Date(System.currentTimeMillis()));
310         job.setStatus(MCRJobStatus.PROCESSING);
311         if (!updateJob(job)) {
312             throw new NoSuchElementException();
313         }
314         return job;
315     }
316 
317     /**
318      * returns a specific job from given parameters or null if not found.
319      *
320      * @param params the parameters
321      * @return the job
322      */
323     public MCRJob getJob(Map<String, String> params) {
324         return getJob(action, params);
325     }
326 
327     private MCRJob getJob(Class<? extends MCRJobAction> action, Map<String, String> params) {
328         if (!running) {
329             return null;
330         }
331 
332         return buildQuery(action, params, (q) -> {
333             try {
334                 return q.getSingleResult();
335             } catch (NoResultException e) {
336                 return null;
337             }
338 
339         });
340     }
341 
342     /**
343      * Returns specific jobs by the given parameters or an empty list.
344      *
345      * @param params the parameters
346      *
347      * @return the job
348      */
349     public List<MCRJob> getJobs(Map<String, String> params) {
350         return getJobs(action, params);
351     }
352 
353     private List<MCRJob> getJobs(Class<? extends MCRJobAction> action, Map<String, String> params) {
354         if (!running) {
355             return null;
356         }
357 
358         return buildQuery(action, params, TypedQuery::getResultList);
359     }
360 
361     /**
362      * @param action
363      * @param params
364      *
365      * @return the query for the given parameters
366      * */
367     private <T> T buildQuery(Class<? extends MCRJobAction> action, Map<String, String> params,
368         Function<TypedQuery<MCRJob>, T> consumer) {
369         EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
370         CriteriaBuilder cb = em.getCriteriaBuilder();
371 
372         CriteriaQuery<MCRJob> query = cb.createQuery(MCRJob.class);
373         Root<MCRJob> jobRoot = query.from(MCRJob.class);
374         query.select(jobRoot);
375 
376         params.keySet().forEach(key -> {
377             MapJoin<MCRJob, String, String> parameterJoin = jobRoot.join(MCRJob_.parameters, JoinType.INNER);
378             Path<String> keyPath = parameterJoin.key();
379             Path<String> valuePath = parameterJoin.value();
380             parameterJoin.on(cb.equal(keyPath, key), cb.equal(valuePath, params.get(key)));
381         });
382 
383         query.where(cb.equal(jobRoot.get(MCRJob_.action), action));
384         T result = consumer.apply(em.createQuery(query));
385         clearPreFetch();
386         return result;
387     }
388 
389     private MCRJob getElement() {
390         if (!running) {
391             return null;
392         }
393         MCRJob job = getNextPrefetchedElement();
394         if (job != null) {
395             return job;
396         }
397         LOGGER.debug("No prefetched jobs available");
398         if (preFetch(MCRConfiguration2.getInt(CONFIG_PREFIX + "preFetchAmount").orElse(50)) == 0) {
399             return null;
400         }
401         return getNextPrefetchedElement();
402     }
403 
404     private MCRJob getNextPrefetchedElement() {
405         MCRJob job = preFetch.poll();
406         LOGGER.debug("Fetched job: {}", job);
407         return job;
408     }
409 
410     private int preFetch(int amount) {
411         EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
412 
413         CriteriaBuilder cb = em.getCriteriaBuilder();
414         CriteriaQuery<MCRJob> cq = cb.createQuery(MCRJob.class);
415         Root<MCRJob> root = cq.from(MCRJob.class);
416 
417         List<Predicate> predicates = new ArrayList<>();
418         predicates.add(cb.equal(root.get("status"), MCRJobStatus.NEW));
419         if (action != null) {
420             predicates.add(cb.equal(root.get("action"), action));
421         }
422         cq.where(cb.and(predicates.toArray(new Predicate[] {})));
423         cq.orderBy(cb.asc(root.get("added")));
424         cq.distinct(true);
425 
426         TypedQuery<MCRJob> query = em.createQuery(cq);
427         query.setMaxResults(amount);
428 
429         List<MCRJob> jobs = query.getResultList();
430 
431         int i = 0;
432         for (MCRJob job : jobs) {
433             if (job.getParameters().isEmpty()) {
434                 continue;
435             }
436 
437             i++;
438             preFetch.add(job.clone());
439             em.detach(job);
440         }
441         LOGGER.debug("prefetched {} jobs", i);
442         return i;
443     }
444 
445     private void clearPreFetch() {
446         preFetch.clear();
447     }
448 
449     private boolean updateJob(MCRJob job) {
450         if (!running) {
451             return false;
452         }
453         EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
454         em.merge(job);
455         return true;
456     }
457 
458     private boolean addJob(MCRJob job) {
459         if (!running) {
460             return false;
461         }
462         EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
463         em.persist(job);
464         return true;
465     }
466 
467     /**
468      * every attached listener is informed that something happened to the state of the queue.
469      * Starts {@link MCRJobMaster} if <code>"MCR.QueuedJob.autostart"</code> is set <code>true</code>.
470      */
471     public synchronized void notifyListener() {
472         this.notifyAll();
473 
474         boolean autostart = MCRConfiguration2.getBoolean(CONFIG_PREFIX + "autostart").orElse(true);
475         autostart = MCRConfiguration2.getBoolean(CONFIG_PREFIX + configPrefixAdd + "autostart").orElse(autostart);
476 
477         if (autostart) {
478             MCRJobMaster.startMasterThread(action);
479         }
480     }
481 
482     /**
483      * removes specific job from queue no matter what its current status is.
484      *
485      * @param action - the action class
486      * @param params - parameters to get jobs
487      * @return the number of jobs deleted
488      */
489     public int remove(Class<? extends MCRJobAction> action, Map<String, String> params) {
490         if (!running) {
491             return 0;
492         }
493 
494         EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
495 
496         StringBuilder qStr = new StringBuilder("FROM MCRJob job WHERE action = '" + action.getName() + "' ");
497         for (String paramKey : params.keySet()) {
498             qStr.append(" AND job.parameters['")
499                 .append(paramKey)
500                 .append("'] = '")
501                 .append(params.get(paramKey))
502                 .append('\'');
503         }
504 
505         Query query = em.createQuery(qStr.toString());
506 
507         @SuppressWarnings("unchecked")
508         Iterator<MCRJob> results = query.getResultList().iterator();
509         if (!results.hasNext()) {
510             return 0;
511         }
512 
513         MCRJob job = results.next();
514 
515         try {
516             em.remove(job);
517             em.detach(job);
518             return 1;
519         } finally {
520             clearPreFetch();
521         }
522     }
523 
524     /**
525      * Removes all jobs from queue of specified action.
526      *
527      * @param action - the action class
528      * @return the number of jobs deleted
529      */
530     public int remove(Class<? extends MCRJobAction> action) {
531         if (!running) {
532             return 0;
533         }
534 
535         EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
536 
537         Query query = em.createQuery("FROM MCRJob job WHERE action = '" + action.getName() + "'");
538 
539         @SuppressWarnings("unchecked")
540         Iterator<MCRJob> results = query.getResultList().iterator();
541         if (!results.hasNext()) {
542             return 0;
543         }
544         try {
545             int delC = 0;
546             while (results.hasNext()) {
547                 MCRJob job = results.next();
548 
549                 em.remove(job);
550                 em.detach(job);
551                 delC++;
552             }
553             return delC;
554         } finally {
555             clearPreFetch();
556         }
557     }
558 
559     /**
560      * Shuts down {@link MCRStalledJobResetter} and does not alter any job anymore.
561      */
562     @Override
563     public void prepareClose() {
564         stalledJobScheduler.shutdownNow();
565         running = false;
566         try {
567             stalledJobScheduler.awaitTermination(60, TimeUnit.SECONDS);
568         } catch (InterruptedException e) {
569             LOGGER.info("Could not wait for 60 seconds...");
570             stalledJobScheduler.shutdownNow();
571         }
572     }
573 
574     /**
575      * does nothing
576      */
577     @Override
578     public void close() {
579         //nothing to be done in this phase
580     }
581 
582     /**
583      * @return "MCRJobQueue"
584      */
585     @Override
586     public String toString() {
587         return "MCRJobQueue";
588     }
589 
590     @Override
591     public int getPriority() {
592         return MCRShutdownHandler.Closeable.DEFAULT_PRIORITY;
593     }
594 }