View Javadoc
1   /*
2    * This file is part of ***  M y C o R e  ***
3    * See http://www.mycore.de/ for details.
4    *
5    * MyCoRe is free software: you can redistribute it and/or modify
6    * it under the terms of the GNU General Public License as published by
7    * the Free Software Foundation, either version 3 of the License, or
8    * (at your option) any later version.
9    *
10   * MyCoRe is distributed in the hope that it will be useful,
11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   * GNU General Public License for more details.
14   *
15   * You should have received a copy of the GNU General Public License
16   * along with MyCoRe.  If not, see <http://www.gnu.org/licenses/>.
17   */
18  
19  package org.mycore.iview2.services;
20  
21  import java.util.AbstractQueue;
22  import java.util.Collections;
23  import java.util.Date;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.NoSuchElementException;
27  import java.util.Queue;
28  import java.util.concurrent.ConcurrentLinkedQueue;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.ScheduledExecutorService;
31  import java.util.concurrent.TimeUnit;
32  import java.util.concurrent.locks.ReentrantLock;
33  
34  import org.apache.logging.log4j.LogManager;
35  import org.apache.logging.log4j.Logger;
36  import org.mycore.backend.jpa.MCREntityManagerProvider;
37  import org.mycore.common.events.MCRShutdownHandler;
38  import org.mycore.common.events.MCRShutdownHandler.Closeable;
39  
40  import jakarta.persistence.EntityManager;
41  import jakarta.persistence.NoResultException;
42  import jakarta.persistence.Query;
43  import jakarta.persistence.TypedQuery;
44  
45  public class MCRTilingQueue extends AbstractQueue<MCRTileJob> implements Closeable {
46      private static MCRTilingQueue instance = new MCRTilingQueue();
47  
48      private static Queue<MCRTileJob> preFetch;
49  
50      private static ScheduledExecutorService StalledJobScheduler;
51  
52      private static Logger LOGGER = LogManager.getLogger(MCRTilingQueue.class);
53  
54      private final ReentrantLock pollLock;
55  
56      private boolean running;
57  
58      private MCRTilingQueue() {
59          // periodische Ausführung von runProcess
60          int waitTime = Integer.parseInt(MCRIView2Tools.getIView2Property("TimeTillReset")) * 60;
61          StalledJobScheduler = Executors.newSingleThreadScheduledExecutor();
62          StalledJobScheduler.scheduleAtFixedRate(MCRStalledJobResetter.getInstance(), waitTime, waitTime,
63              TimeUnit.SECONDS);
64          preFetch = new ConcurrentLinkedQueue<>();
65          running = true;
66          pollLock = new ReentrantLock();
67          MCRShutdownHandler.getInstance().addCloseable(this);
68      }
69  
70      /**
71       * @return singleton instance of this class
72       */
73      public static MCRTilingQueue getInstance() {
74          if (!instance.running) {
75              return null;
76          }
77          return instance;
78      }
79  
80      /**
81       * @return next available tile job instance
82       */
83      public MCRTileJob poll() {
84          if (!running) {
85              return null;
86          }
87          try {
88              pollLock.lock();
89              MCRTileJob job = getElement();
90              if (job != null) {
91                  job.setStart(new Date(System.currentTimeMillis()));
92                  job.setStatus(MCRJobState.PROCESSING);
93                  if (!updateJob(job)) {
94                      job = null;
95                  }
96              }
97              return job;
98          } finally {
99              pollLock.unlock();
100         }
101     }
102 
103     /**
104      * removes next job.
105      * same as {@link #poll()} but never returns null
106      * @throws NoSuchElementException if {@link #poll()} would return null
107      */
108     @Override
109     public MCRTileJob remove() throws NoSuchElementException {
110         if (!running) {
111             return null;
112         }
113         MCRTileJob job = poll();
114         if (job == null) {
115             throw new NoSuchElementException();
116         }
117         return job;
118     }
119 
120     /**
121      * get next job without modifying it state to {@link MCRJobState#PROCESSING} 
122      */
123     public MCRTileJob peek() {
124         if (!running) {
125             return null;
126         }
127         return getElement();
128     }
129 
130     /**
131      * removes next job.
132      * same as {@link #peek()} but never returns null
133      * @throws NoSuchElementException if {@link #peek()} would return null
134      */
135     @Override
136     public MCRTileJob element() throws NoSuchElementException {
137         if (!running) {
138             return null;
139         }
140         MCRTileJob job = peek();
141         if (job == null) {
142             throw new NoSuchElementException();
143         }
144         return job;
145     }
146 
147     /**
148      * adds <code>job</code> to queue.
149      * alters date added to current time and status of job to {@link MCRJobState#NEW}
150      */
151     public boolean offer(MCRTileJob job) {
152         MCRTileJob newJob = job;
153         if (!running) {
154             return false;
155         }
156         MCRTileJob oldJob = getJob(newJob.getDerivate(), newJob.getPath());
157         if (oldJob != null) {
158             newJob = oldJob;
159         } else {
160             newJob.setAdded(new Date());
161         }
162         newJob.setStatus(MCRJobState.NEW);
163         newJob.setStart(null);
164         if (addJob(newJob)) {
165             notifyListener();
166             return true;
167         } else {
168             return false;
169         }
170     }
171 
172     /**
173      * Deletes all tile jobs no matter what the current state is.
174      */
175     @Override
176     public void clear() {
177         if (!running) {
178             return;
179         }
180         EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
181         Query query = em.createQuery("DELETE FROM MCRTileJob");
182         query.executeUpdate();
183     }
184 
185     /**
186      * iterates of jobs of status {@link MCRJobState#NEW}
187      * 
188      * does not change the status.
189      */
190     @Override
191     public Iterator<MCRTileJob> iterator() {
192         if (!running) {
193             List<MCRTileJob> empty = Collections.emptyList();
194             return empty.iterator();
195         }
196         EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
197         TypedQuery<MCRTileJob> query = em.createQuery("FROM MCRTileJob WHERE status='" + MCRJobState.NEW.toChar()
198             + "' ORDER BY added ASC", MCRTileJob.class);
199         List<MCRTileJob> result = query.getResultList();
200         return result.iterator();
201     }
202 
203     /**
204      * returns the current size of this queue
205      */
206     @Override
207     public int size() {
208         if (!running) {
209             return 0;
210         }
211         EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
212         TypedQuery<Number> query = em
213             .createQuery("SELECT count(*) FROM MCRTileJob WHERE status='" + MCRJobState.NEW.toChar()
214                 + "'", Number.class);
215         return query.getSingleResult().intValue();
216     }
217 
218     /**
219      * get the specific job and alters it status to {@link MCRJobState#PROCESSING}
220      */
221     public MCRTileJob getElementOutOfOrder(String derivate, String path) throws NoSuchElementException {
222         if (!running) {
223             return null;
224         }
225         MCRTileJob job = getJob(derivate, path);
226         if (job == null) {
227             return null;
228         }
229         job.setStart(new Date(System.currentTimeMillis()));
230         job.setStatus(MCRJobState.PROCESSING);
231         if (!updateJob(job)) {
232             throw new NoSuchElementException();
233         }
234         return job;
235     }
236 
237     private MCRTileJob getJob(String derivate, String path) {
238         if (!running) {
239             return null;
240         }
241         EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
242         TypedQuery<MCRTileJob> query = em.createQuery("FROM MCRTileJob WHERE  derivate= :derivate AND path = :path",
243             MCRTileJob.class);
244         query.setParameter("derivate", derivate);
245         query.setParameter("path", path);
246         try {
247             MCRTileJob job = query.getSingleResult();
248             clearPreFetch();
249             return job;
250         } catch (NoResultException e) {
251             return null;
252         }
253     }
254 
255     private MCRTileJob getElement() {
256         if (!running) {
257             return null;
258         }
259         MCRTileJob job = getNextPrefetchedElement();
260         if (job != null) {
261             return job;
262         }
263         LOGGER.debug("No prefetched jobs available");
264         if (preFetch(100) == 0) {
265             return null;
266         }
267         return getNextPrefetchedElement();
268     }
269 
270     private MCRTileJob getNextPrefetchedElement() {
271         MCRTileJob job = preFetch.poll();
272         LOGGER.debug("Fetched job: {}", job);
273         return job;
274     }
275 
276     private int preFetch(int amount) {
277         EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
278         TypedQuery<MCRTileJob> query = em.createQuery(
279             "FROM MCRTileJob WHERE status='" + MCRJobState.NEW.toChar() + "' ORDER BY added ASC", MCRTileJob.class)
280             .setMaxResults(amount);
281         Iterator<MCRTileJob> queryResult = query.getResultList().iterator();
282         int i = 0;
283         while (queryResult.hasNext()) {
284             i++;
285             MCRTileJob job = queryResult.next();
286             preFetch.add(job.clone());
287             em.detach(job);
288         }
289         LOGGER.debug("prefetched {} tile jobs", i);
290         return i;
291     }
292 
293     private void clearPreFetch() {
294         preFetch.clear();
295     }
296 
297     private boolean updateJob(MCRTileJob job) {
298         if (!running) {
299             return false;
300         }
301         EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
302         em.merge(job);
303         return true;
304     }
305 
306     private boolean addJob(MCRTileJob job) {
307         if (!running) {
308             return false;
309         }
310         EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
311         em.persist(job);
312         return true;
313     }
314 
315     /**
316      * every attached listener is informed that something happened to the state of the queue.
317      */
318     public synchronized void notifyListener() {
319         this.notifyAll();
320     }
321 
322     /**
323      * removes specific job from queue no matter what its current status is.
324      * @param derivate ID of derivate
325      * @param path absolute image path
326      * @return the number of jobs deleted
327      */
328     public int remove(String derivate, String path) {
329         if (!running) {
330             return 0;
331         }
332         EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
333         Query query = em.createQuery("DELETE FROM " + MCRTileJob.class.getName()
334             + " WHERE derivate = :derivate AND path = :path");
335         query.setParameter("derivate", derivate);
336         query.setParameter("path", path);
337         try {
338             return query.executeUpdate();
339         } finally {
340             clearPreFetch();
341         }
342     }
343 
344     /**
345      * removes all jobs from queue for that <code>derivate</code> its current status is.
346      * @param derivate ID of derivate
347      * @return the number of jobs deleted
348      */
349     public int remove(String derivate) {
350         if (!running) {
351             return 0;
352         }
353         EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
354         Query query = em
355             .createQuery("DELETE FROM " + MCRTileJob.class.getName() + " WHERE derivate = :derivate");
356         query.setParameter("derivate", derivate);
357         try {
358             return query.executeUpdate();
359         } finally {
360             clearPreFetch();
361         }
362     }
363 
364     /**
365      * Shuts down {@link MCRStalledJobResetter} and does not alter any job anymore.
366      */
367     public void prepareClose() {
368         StalledJobScheduler.shutdownNow();
369         running = false;
370         try {
371             StalledJobScheduler.awaitTermination(60, TimeUnit.SECONDS);
372         } catch (InterruptedException e) {
373             LOGGER.info("Could not wait for 60 seconds...");
374             StalledJobScheduler.shutdownNow();
375         }
376     }
377 
378     /**
379      * does nothing
380      */
381     public void close() {
382         //nothing to be done in this phase
383     }
384 
385     /**
386      * @return "MCRTilingQueue"
387      */
388     @Override
389     public String toString() {
390         return "MCRTilingQueue";
391     }
392 
393     @Override
394     public int getPriority() {
395         return MCRShutdownHandler.Closeable.DEFAULT_PRIORITY;
396     }
397 }