View Javadoc
1   /*
2    * This file is part of ***  M y C o R e  ***
3    * See http://www.mycore.de/ for details.
4    *
5    * MyCoRe is free software: you can redistribute it and/or modify
6    * it under the terms of the GNU General Public License as published by
7    * the Free Software Foundation, either version 3 of the License, or
8    * (at your option) any later version.
9    *
10   * MyCoRe is distributed in the hope that it will be useful,
11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   * GNU General Public License for more details.
14   *
15   * You should have received a copy of the GNU General Public License
16   * along with MyCoRe.  If not, see <http://www.gnu.org/licenses/>.
17   */
18  
19  package org.mycore.services.queuedjob;
20  
21  import java.util.Date;
22  import java.util.HashMap;
23  
24  import org.apache.logging.log4j.LogManager;
25  import org.apache.logging.log4j.Logger;
26  import org.mycore.backend.jpa.MCREntityManagerProvider;
27  import org.mycore.common.config.MCRConfiguration2;
28  
29  import jakarta.persistence.EntityManager;
30  import jakarta.persistence.EntityTransaction;
31  import jakarta.persistence.RollbackException;
32  import jakarta.persistence.TypedQuery;
33  
34  /**
35   * Resets jobs that took to long to perform action.
36   * Set property <code>MCR.QueuedJob.TimeTillReset</code> to alter grace period.
37   * 
38   * @author Ren\u00E9 Adler
39   */
40  public class MCRStalledJobResetter implements Runnable {
41  
42      private static HashMap<String, MCRStalledJobResetter> INSTANCES = new HashMap<>();
43  
44      private static Logger LOGGER = LogManager.getLogger(MCRStalledJobResetter.class);
45  
46      private int maxTimeDiff = MCRConfiguration2.getInt(MCRJobQueue.CONFIG_PREFIX + "TimeTillReset").orElse(10);
47  
48      private Class<? extends MCRJobAction> action = null;
49  
50      private MCRStalledJobResetter(Class<? extends MCRJobAction> action) {
51          if (action != null) {
52              this.action = action;
53              maxTimeDiff = MCRConfiguration2
54                  .getInt(MCRJobQueue.CONFIG_PREFIX + action.getSimpleName() + ".TimeTillReset").orElse(maxTimeDiff);
55          }
56      }
57  
58      public static MCRStalledJobResetter getInstance(Class<? extends MCRJobAction> action) {
59          String key = action != null && !MCRJobQueue.singleQueue ? action.getName() : "single";
60  
61          return INSTANCES.computeIfAbsent(key,
62              k -> new MCRStalledJobResetter(MCRJobQueue.singleQueue ? null : action));
63      }
64  
65      /**
66       * Resets jobs to {@link MCRJobStatus#NEW} that where in status {@link MCRJobStatus#PROCESSING} for to long time.
67       */
68      public void run() {
69          EntityManager em = MCREntityManagerProvider.getEntityManagerFactory().createEntityManager();
70          EntityTransaction transaction = em.getTransaction();
71  
72          LOGGER.info("MCRJob is Checked for dead Entries");
73          transaction.begin();
74  
75          StringBuilder sb = new StringBuilder("FROM MCRJob WHERE ");
76          if (action != null) {
77              sb.append("action='").append(action.getName()).append("' AND ");
78          }
79          sb.append(" status='" + MCRJobStatus.PROCESSING + "' ORDER BY id ASC");
80  
81          TypedQuery<MCRJob> query = em.createQuery(sb.toString(), MCRJob.class);
82  
83          long current = new Date(System.currentTimeMillis()).getTime() / 60000;
84  
85          boolean reset = query
86              .getResultList()
87              .stream()
88              .map(job -> {
89                  boolean ret = false;
90                  long start = job.getStart().getTime() / 60000;
91                  if (current - start >= maxTimeDiff) {
92                      LOGGER.debug("->Resetting too long in queue");
93  
94                      job.setStatus(MCRJobStatus.NEW);
95                      job.setStart(null);
96                      ret = true;
97                  } else {
98                      LOGGER.debug("->ok");
99                  }
100                 return ret;
101             })
102             .reduce(Boolean::logicalOr)
103             .orElse(false);
104         try {
105             transaction.commit();
106         } catch (RollbackException e) {
107             e.printStackTrace();
108             if (transaction != null) {
109                 transaction.rollback();
110                 reset = false;//No changes are applied, so no notification is needed as well
111             }
112         }
113         //Only notify Listeners on Queue if really something is set back
114         if (reset) {
115             synchronized (MCRJobQueue.getInstance(action)) {
116                 MCRJobQueue.getInstance(action).notifyListener();
117             }
118         }
119         em.close();
120         LOGGER.info("MCRJob checking is done");
121     }
122 }