View Javadoc
1   /*
2    * This file is part of ***  M y C o R e  ***
3    * See http://www.mycore.de/ for details.
4    *
5    * MyCoRe is free software: you can redistribute it and/or modify
6    * it under the terms of the GNU General Public License as published by
7    * the Free Software Foundation, either version 3 of the License, or
8    * (at your option) any later version.
9    *
10   * MyCoRe is distributed in the hope that it will be useful,
11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   * GNU General Public License for more details.
14   *
15   * You should have received a copy of the GNU General Public License
16   * along with MyCoRe.  If not, see <http://www.gnu.org/licenses/>.
17   */
18  
19  package org.mycore.services.queuedjob;
20  
21  import java.lang.reflect.Constructor;
22  import java.util.HashMap;
23  import java.util.Map;
24  import java.util.concurrent.LinkedBlockingQueue;
25  import java.util.concurrent.ThreadFactory;
26  import java.util.concurrent.ThreadPoolExecutor;
27  import java.util.concurrent.TimeUnit;
28  import java.util.concurrent.atomic.AtomicInteger;
29  import java.util.concurrent.locks.ReentrantLock;
30  
31  import org.apache.logging.log4j.LogManager;
32  import org.apache.logging.log4j.Logger;
33  import org.mycore.backend.jpa.MCREntityManagerProvider;
34  import org.mycore.common.MCRSession;
35  import org.mycore.common.MCRSessionMgr;
36  import org.mycore.common.MCRSystemUserInformation;
37  import org.mycore.common.config.MCRConfiguration2;
38  import org.mycore.common.events.MCRShutdownHandler;
39  import org.mycore.common.events.MCRShutdownHandler.Closeable;
40  import org.mycore.common.processing.MCRProcessableCollection;
41  import org.mycore.common.processing.MCRProcessableDefaultCollection;
42  import org.mycore.common.processing.MCRProcessableRegistry;
43  import org.mycore.util.concurrent.processing.MCRProcessableExecutor;
44  import org.mycore.util.concurrent.processing.MCRProcessableFactory;
45  
46  import jakarta.persistence.EntityManager;
47  import jakarta.persistence.EntityTransaction;
48  import jakarta.persistence.PersistenceException;
49  import jakarta.persistence.RollbackException;
50  
51  /**
52   * The master of all {@link MCRJobThread}s threads.
53   * 
54   * @author Ren\u00E9 Adler
55   */
56  public class MCRJobMaster implements Runnable, Closeable {
57  
58      private static Map<String, MCRJobMaster> INSTANCES = new HashMap<>();
59  
60      private static Logger LOGGER = LogManager.getLogger(MCRJobMaster.class);
61  
62      private final MCRJobQueue jobQueue;
63  
64      private Class<? extends MCRJobAction> action;
65  
66      private MCRProcessableExecutor jobServe;
67  
68      private MCRProcessableDefaultCollection processableCollection;
69  
70      private volatile boolean running = true;
71  
72      private ReentrantLock runLock;
73  
74      private MCRJobMaster(Class<? extends MCRJobAction> action) {
75          MCRShutdownHandler.getInstance().addCloseable(this);
76          this.action = action;
77          runLock = new ReentrantLock();
78          jobQueue = MCRJobQueue.getInstance(action);
79  
80          MCRProcessableRegistry registry = MCRProcessableRegistry.getSingleInstance();
81          processableCollection = new MCRProcessableDefaultCollection(getName());
82          registry.register(processableCollection);
83      }
84  
85      /**
86       * Returns an singleton instance of this class.
87       * 
88       * @param action the {@link MCRJobAction} or <code>null</code>
89       * @return the instance of this class
90       */
91      public static MCRJobMaster getInstance(Class<? extends MCRJobAction> action) {
92          String key = action != null && !MCRJobQueue.singleQueue ? action.getName() : "single";
93          MCRJobMaster master = INSTANCES.computeIfAbsent(key,
94              k -> new MCRJobMaster(MCRJobQueue.singleQueue ? null : action));
95  
96          if (!master.running) {
97              return null;
98          }
99          return master;
100     }
101 
102     /**
103      * Return if {@link MCRJobMaster} is running.
104      * 
105      * @return if is running
106      */
107     public static boolean isRunning(Class<? extends MCRJobAction> action) {
108         String key = action != null && !MCRJobQueue.singleQueue ? action.getName() : "single";
109         MCRJobMaster master = INSTANCES.get(key);
110 
111         return master != null && master.running;
112     }
113 
114     /**
115      * Starts the local {@link MCRJobMaster}.
116      * Can be auto started if <code>"MCR.QueuedJob.{?MCRJobAction?.}autostart"</code> 
117      * is set to <code>true</code>.
118      */
119     public static void startMasterThread(Class<? extends MCRJobAction> action) {
120         if (!isRunning(action)) {
121             LOGGER.info("Starting job master thread{}\".", action == null ? "" : " for action \"" + action.getName());
122             final Thread master = new Thread(getInstance(action));
123             master.start();
124         }
125     }
126 
127     /**
128      * Starts local threads ({@link MCRJobThread}) and gives {@link MCRJob} instances to them.
129      * Use property <code>"MCR.QueuedJob.JobThreads"</code> to specify how many concurrent threads should be running.
130      * <code>"MCR.QueuedJob.activated"</code> can be used activate or deactivate general {@link MCRJob} running. 
131      */
132     @Override
133     public void run() {
134         Thread.currentThread().setName(getName());
135         //get this MCRSession a speaking name
136         MCRSessionMgr.unlock();
137         MCRSession mcrSession = MCRSessionMgr.getCurrentSession();
138         mcrSession.setUserInformation(MCRSystemUserInformation.getSystemUserInstance());
139 
140         boolean activated = MCRConfiguration2.getBoolean(MCRJobQueue.CONFIG_PREFIX + "activated").orElse(true);
141         activated = activated
142             && MCRConfiguration2.getBoolean(MCRJobQueue.CONFIG_PREFIX + jobQueue.configPrefixAdd + "activated")
143                 .orElse(true);
144 
145         LOGGER.info("JobQueue{} is {}", MCRJobQueue.singleQueue ? "" : " for \"" + action.getName() + "\"",
146             activated ? "activated" : "deactivated");
147         if (activated) {
148             running = true;
149             int jobThreadCount = MCRConfiguration2.getInt(MCRJobQueue.CONFIG_PREFIX + "JobThreads").orElse(2);
150             jobThreadCount = MCRConfiguration2
151                 .getInt(MCRJobQueue.CONFIG_PREFIX + jobQueue.configPrefixAdd + "JobThreads").orElse(jobThreadCount);
152 
153             ThreadFactory slaveFactory = new ThreadFactory() {
154                 AtomicInteger tNum = new AtomicInteger();
155 
156                 ThreadGroup tg = new ThreadGroup("MCRJob slave job thread group");
157 
158                 public Thread newThread(Runnable r) {
159                     return new Thread(tg, r, getPreLabel() + "Slave#" + tNum.incrementAndGet());
160                 }
161             };
162             final AtomicInteger activeThreads = new AtomicInteger();
163             final LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
164             ThreadPoolExecutor executor = new ThreadPoolExecutor(jobThreadCount, jobThreadCount, 1, TimeUnit.DAYS,
165                 workQueue,
166                 slaveFactory) {
167 
168                 @Override
169                 protected void afterExecute(Runnable r, Throwable t) {
170                     super.afterExecute(r, t);
171                     activeThreads.decrementAndGet();
172                 }
173 
174                 @Override
175                 protected void beforeExecute(Thread t, Runnable r) {
176                     super.beforeExecute(t, r);
177                     activeThreads.incrementAndGet();
178                 }
179             };
180 
181             jobServe = MCRProcessableFactory.newPool(executor, processableCollection);
182             processableCollection.setProperty("running", running);
183 
184             LOGGER.info("JobMaster{} with {} thread(s) is started",
185                 MCRJobQueue.singleQueue ? "" : " for \"" + action.getName() + "\"", jobThreadCount);
186             while (running) {
187                 try {
188                     while (activeThreads.get() < jobThreadCount) {
189                         runLock.lock();
190                         try {
191                             if (!running) {
192                                 break;
193                             }
194 
195                             EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
196                             EntityTransaction transaction = em.getTransaction();
197 
198                             MCRJob job = null;
199                             MCRJobAction action = null;
200                             try {
201                                 transaction.begin();
202 
203                                 job = jobQueue.poll();
204                                 processableCollection.setProperty("queue size", jobQueue.size());
205 
206                                 if (job != null) {
207                                     action = toMCRJobAction(job.getAction());
208 
209                                     if (action != null && !action.isActivated()) {
210                                         job.setStatus(MCRJobStatus.NEW);
211                                         job.setStart(null);
212                                     }
213                                 }
214 
215                                 transaction.commit();
216                             } catch (RollbackException e) {
217                                 LOGGER.error("Error while getting next job.", e);
218                                 if (transaction != null) {
219                                     try {
220                                         transaction.rollback();
221                                     } catch (RuntimeException re) {
222                                         LOGGER.warn("Could not rollback transaction.", re);
223                                     }
224                                 }
225                             } finally {
226                                 em.close();
227                             }
228                             if (job != null && action != null && action.isActivated()
229                                 && !jobServe.getExecutor().isShutdown()) {
230                                 LOGGER.info("Creating:{}", job);
231                                 jobServe.submit(new MCRJobThread(job));
232                             } else {
233                                 try {
234                                     synchronized (jobQueue) {
235                                         if (running) {
236                                             LOGGER.debug("No job in queue going to sleep");
237                                             //fixes a race conditioned deadlock situation
238                                             //do not wait longer than 60 sec. for a new MCRJob
239                                             jobQueue.wait(60000);
240                                         }
241                                     }
242                                 } catch (InterruptedException e) {
243                                     LOGGER.error("Job thread was interrupted.", e);
244                                 }
245                             }
246                         } finally {
247                             runLock.unlock();
248                         }
249                     } // while(activeThreads.get() < jobThreadCount)
250                     if (activeThreads.get() < jobThreadCount) {
251                         try {
252                             LOGGER.info("Waiting for a job to finish");
253                             Thread.sleep(1000);
254                         } catch (InterruptedException e) {
255                             LOGGER.error("Job thread was interrupted.", e);
256                         }
257                     }
258                 } catch (PersistenceException e) {
259                     LOGGER.warn("We have an database error, sleep and run later.", e);
260                     try {
261                         Thread.sleep(60000);
262                     } catch (InterruptedException ie) {
263                         LOGGER.error("Waiting for database was interrupted.", ie);
264                     }
265                 } catch (Throwable e) {
266                     LOGGER.error("Keep running while catching exceptions.", e);
267                 }
268             } // while(running)
269             processableCollection.setProperty("running", running);
270         }
271         LOGGER.info("{} thread finished", getName());
272         MCRSessionMgr.releaseCurrentSession();
273     }
274 
275     /**
276      * stops transmitting {@link MCRJob} to {@link MCRJobThread} and prepares shutdown.
277      */
278     public void prepareClose() {
279         LOGGER.info("Closing master thread");
280         //signal master thread to stop now
281         running = false;
282         //Wake up, Neo!
283         synchronized (jobQueue) {
284             LOGGER.debug("Wake up queue");
285             jobQueue.notifyAll();
286         }
287         runLock.lock();
288         try {
289             if (jobServe != null) {
290                 LOGGER.debug("Shutdown executor jobs.");
291                 jobServe.getExecutor().shutdown();
292                 try {
293                     LOGGER.debug("Await termination of executor jobs.");
294                     jobServe.getExecutor().awaitTermination(60, TimeUnit.SECONDS);
295                     LOGGER.debug("All jobs finished.");
296                 } catch (InterruptedException e) {
297                     LOGGER.debug("Could not wait 60 seconds...", e);
298                 }
299             }
300         } finally {
301             runLock.unlock();
302         }
303     }
304 
305     /**
306      * Shuts down this thread and every local threads spawned by {@link #run()}.
307      */
308     public void close() {
309         if (jobServe != null && !jobServe.getExecutor().isShutdown()) {
310             LOGGER.info("We are in a hurry, closing service right now");
311             jobServe.getExecutor().shutdownNow();
312             try {
313                 jobServe.getExecutor().awaitTermination(60, TimeUnit.SECONDS);
314             } catch (InterruptedException e) {
315                 LOGGER.debug("Could not wait  60 seconds...", e);
316             }
317         }
318     }
319 
320     @Override
321     public int getPriority() {
322         return MCRShutdownHandler.Closeable.DEFAULT_PRIORITY - 1;
323     }
324 
325     protected String getPreLabel() {
326         return (MCRJobQueue.singleQueue ? "Job" : action.getSimpleName());
327     }
328 
329     /**
330      * Returns the name of this job master.
331      * 
332      * @return
333      */
334     public String getName() {
335         return getPreLabel() + " Master";
336     }
337 
338     /**
339      * Returns the processable collection assigned to this job master.
340      * 
341      * @return the processable collection
342      */
343     public MCRProcessableCollection getProcessableCollection() {
344         return processableCollection;
345     }
346 
347     private static MCRJobAction toMCRJobAction(Class<? extends MCRJobAction> actionClass) {
348         try {
349             Constructor<? extends MCRJobAction> actionConstructor = actionClass.getConstructor();
350 
351             return actionConstructor.newInstance();
352         } catch (Exception e) {
353             LOGGER.error(e.getMessage(), e);
354         }
355 
356         return null;
357     }
358 }