View Javadoc
1   /*
2    * This file is part of ***  M y C o R e  ***
3    * See http://www.mycore.de/ for details.
4    *
5    * MyCoRe is free software: you can redistribute it and/or modify
6    * it under the terms of the GNU General Public License as published by
7    * the Free Software Foundation, either version 3 of the License, or
8    * (at your option) any later version.
9    *
10   * MyCoRe is distributed in the hope that it will be useful,
11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   * GNU General Public License for more details.
14   *
15   * You should have received a copy of the GNU General Public License
16   * along with MyCoRe.  If not, see <http://www.gnu.org/licenses/>.
17   */
18  
19  package org.mycore.services.queuedjob;
20  
21  import java.lang.reflect.Constructor;
22  import java.util.ArrayList;
23  import java.util.Date;
24  import java.util.List;
25  import java.util.concurrent.ExecutionException;
26  
27  import jakarta.persistence.EntityManager;
28  import jakarta.persistence.EntityTransaction;
29  import org.apache.logging.log4j.LogManager;
30  import org.apache.logging.log4j.Logger;
31  import org.mycore.backend.jpa.MCREntityManagerProvider;
32  import org.mycore.common.MCRClassTools;
33  import org.mycore.common.MCRSession;
34  import org.mycore.common.MCRSessionMgr;
35  import org.mycore.common.MCRSystemUserInformation;
36  import org.mycore.common.config.MCRConfiguration2;
37  import org.mycore.common.processing.MCRAbstractProcessable;
38  import org.mycore.common.processing.MCRProcessableStatus;
39  
40  /**
41   * A slave thread of {@link MCRJobMaster}.
42   *
43   * This class execute the specified action for {@link MCRJob} and performs {@link MCRJobAction#rollback()}
44   * if an error occurs. 
45   *
46   * @author Ren\u00E9 Adler
47   *
48   */
49  public class MCRJobThread extends MCRAbstractProcessable implements Runnable {
50  
51      private static Logger LOGGER = LogManager.getLogger(MCRJobThread.class);
52  
53      protected final MCRJobQueue queue;
54  
55      protected MCRJob job = null;
56  
57      private List<MCRJobStatusListener> listeners;
58  
59      public MCRJobThread(MCRJob job) {
60          this.job = job;
61          setName(this.job.getId() + " - " + this.job.getAction().getSimpleName());
62          setStatus(MCRProcessableStatus.created);
63          this.queue = MCRJobQueue.getInstance(job.getAction());
64          job.getParameters().forEach((k, v) -> this.getProperties().put(k, v));
65  
66          listeners = new ArrayList<>();
67          MCRConfiguration2.getString("MCR.QueuedJob." + job.getAction().getSimpleName() + ".Listeners")
68              .ifPresent(classNames -> {
69                  for (String className : classNames.split(",")) {
70                      try {
71                          MCRJobStatusListener instance = (MCRJobStatusListener) MCRClassTools.forName(className)
72                              .getDeclaredConstructor().newInstance();
73                          listeners.add(instance);
74                      } catch (Exception e) {
75                          LOGGER.error("Could not load class {}", className, e);
76                      }
77                  }
78              });
79      }
80  
81      public void run() {
82          MCRSessionMgr.unlock();
83          MCRSession mcrSession = MCRSessionMgr.getCurrentSession();
84          mcrSession.setUserInformation(MCRSystemUserInformation.getSystemUserInstance());
85          EntityManager em = MCREntityManagerProvider.getEntityManagerFactory().createEntityManager();
86          EntityTransaction transaction = em.getTransaction();
87          try {
88              Class<? extends MCRJobAction> actionClass = job.getAction();
89              Constructor<? extends MCRJobAction> actionConstructor = actionClass.getConstructor(MCRJob.class);
90              MCRJobAction action = actionConstructor.newInstance(job);
91  
92              transaction.begin();
93  
94              try {
95                  setStatus(MCRProcessableStatus.processing);
96                  job.setStart(new Date());
97                  listeners.forEach(l -> l.onProcessing(job));
98  
99                  action.execute();
100 
101                 job.setFinished(new Date());
102                 job.setStatus(MCRJobStatus.FINISHED);
103                 setStatus(MCRProcessableStatus.successful);
104                 listeners.forEach(l -> l.onSuccess(job));
105             } catch (ExecutionException ex) {
106                 LOGGER.error("Exception occured while try to start job. Perform rollback.", ex);
107                 setError(ex);
108                 action.rollback();
109                 listeners.forEach(l -> l.onError(job));
110             } catch (Exception ex) {
111                 LOGGER.error("Exception occured while try to start job.", ex);
112                 setError(ex);
113                 listeners.forEach(l -> l.onError(job));
114             }
115             em.merge(job);
116             transaction.commit();
117 
118             // notify the queue we have processed the job
119             synchronized (queue) {
120                 queue.notifyAll();
121             }
122         } catch (Exception e) {
123             LOGGER.error("Error while getting next job.", e);
124             if (transaction != null) {
125                 transaction.rollback();
126             }
127         } finally {
128             em.close();
129             MCRSessionMgr.releaseCurrentSession();
130             mcrSession.close();
131         }
132     }
133 
134 }