View Javadoc
1   /*
2    * This file is part of ***  M y C o R e  ***
3    * See http://www.mycore.de/ for details.
4    *
5    * MyCoRe is free software: you can redistribute it and/or modify
6    * it under the terms of the GNU General Public License as published by
7    * the Free Software Foundation, either version 3 of the License, or
8    * (at your option) any later version.
9    *
10   * MyCoRe is distributed in the hope that it will be useful,
11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   * GNU General Public License for more details.
14   *
15   * You should have received a copy of the GNU General Public License
16   * along with MyCoRe.  If not, see <http://www.gnu.org/licenses/>.
17   */
18  
19  package org.mycore.iview2.services;
20  
21  import java.lang.reflect.Constructor;
22  import java.util.Arrays;
23  import java.util.concurrent.LinkedBlockingQueue;
24  import java.util.concurrent.ThreadFactory;
25  import java.util.concurrent.ThreadPoolExecutor;
26  import java.util.concurrent.TimeUnit;
27  import java.util.concurrent.atomic.AtomicInteger;
28  import java.util.concurrent.locks.ReentrantLock;
29  import java.util.stream.Collectors;
30  
31  import javax.imageio.ImageIO;
32  
33  import org.apache.logging.log4j.LogManager;
34  import org.apache.logging.log4j.Logger;
35  import org.mycore.backend.jpa.MCREntityManagerProvider;
36  import org.mycore.common.MCRException;
37  import org.mycore.common.MCRSession;
38  import org.mycore.common.MCRSessionMgr;
39  import org.mycore.common.MCRSystemUserInformation;
40  import org.mycore.common.config.MCRConfiguration2;
41  import org.mycore.common.events.MCRShutdownHandler;
42  import org.mycore.common.events.MCRShutdownHandler.Closeable;
43  import org.mycore.common.processing.MCRProcessableDefaultCollection;
44  import org.mycore.common.processing.MCRProcessableRegistry;
45  import org.mycore.util.concurrent.processing.MCRProcessableExecutor;
46  import org.mycore.util.concurrent.processing.MCRProcessableFactory;
47  
48  import jakarta.persistence.EntityManager;
49  import jakarta.persistence.EntityTransaction;
50  import jakarta.persistence.PersistenceException;
51  
52  /**
53   * Master image tiler thread.
54   * 
55   * @author Thomas Scheffler (yagee)
56   */
57  public class MCRImageTiler implements Runnable, Closeable {
58      private static MCRImageTiler instance = null;
59  
60      private static Logger LOGGER = LogManager.getLogger(MCRImageTiler.class);
61  
62      private static final MCRTilingQueue TQ = MCRTilingQueue.getInstance();
63  
64      private MCRProcessableExecutor tilingServe;
65  
66      private volatile boolean running = true;
67  
68      private ReentrantLock runLock;
69  
70      private Constructor<? extends MCRTilingAction> tilingActionConstructor;
71  
72      private volatile Thread waiter;
73  
74      private MCRImageTiler() {
75          MCRShutdownHandler.getInstance().addCloseable(this);
76          runLock = new ReentrantLock();
77          try {
78              Class<? extends MCRTilingAction> tilingActionImpl = MCRConfiguration2.<MCRTilingAction>getClass(
79                  MCRIView2Tools.CONFIG_PREFIX + "MCRTilingActionImpl").orElse(MCRTilingAction.class);
80              tilingActionConstructor = tilingActionImpl.getConstructor(MCRTileJob.class);
81          } catch (Exception e) {
82              LOGGER.error("Error while initializing", e);
83              throw new MCRException(e);
84          }
85      }
86  
87      /**
88       * @return true if image tiler thread is running.
89       */
90      public static boolean isRunning() {
91          return instance != null;
92      }
93  
94      /**
95       * @return an instance of this class.
96       */
97      public static MCRImageTiler getInstance() {
98          if (instance == null) {
99              instance = new MCRImageTiler();
100         }
101         return instance;
102     }
103 
104     /**
105      * Starts local tiler threads ( {@link MCRTilingAction}) and gives {@link MCRTileJob} instances to them. Use
106      * property <code>MCR.Module-iview2.TilingThreads</code> to specify how many concurrent threads should be running.
107      */
108     public void run() {
109         waiter = Thread.currentThread();
110         Thread.currentThread().setName("TileMaster");
111         //get this MCRSession a speaking name
112         MCRSessionMgr.unlock();
113         MCRSession mcrSession = MCRSessionMgr.getCurrentSession();
114         mcrSession.setUserInformation(MCRSystemUserInformation.getSystemUserInstance());
115         boolean activated = MCRConfiguration2.getBoolean(MCRIView2Tools.CONFIG_PREFIX + "LocalTiler.activated")
116             .orElse(true) && MCRConfiguration2.getBoolean("MCR.Persistence.Database.Enable").orElse(true)
117             && MCREntityManagerProvider.getEntityManagerFactory() != null;
118         LOGGER.info("Local Tiling is {}", activated ? "activated" : "deactivated");
119         ImageIO.scanForPlugins();
120         LOGGER.info("Supported image file types for reading: {}", Arrays.toString(ImageIO.getReaderFormatNames()));
121 
122         MCRProcessableDefaultCollection imageTilerCollection = new MCRProcessableDefaultCollection("Image Tiler");
123         MCRProcessableRegistry registry = MCRProcessableRegistry.getSingleInstance();
124         registry.register(imageTilerCollection);
125 
126         if (activated) {
127             int tilingThreadCount = Integer.parseInt(MCRIView2Tools.getIView2Property("TilingThreads"));
128             ThreadFactory slaveFactory = new ThreadFactory() {
129                 AtomicInteger tNum = new AtomicInteger();
130 
131                 ThreadGroup tg = new ThreadGroup("MCR slave tiling thread group");
132 
133                 public Thread newThread(Runnable r) {
134                     return new Thread(tg, r, "TileSlave#" + tNum.incrementAndGet());
135                 }
136             };
137             final AtomicInteger activeThreads = new AtomicInteger();
138             final LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
139             ThreadPoolExecutor baseExecutor = new ThreadPoolExecutor(tilingThreadCount, tilingThreadCount, 1,
140                 TimeUnit.DAYS, workQueue, slaveFactory) {
141 
142                 @Override
143                 protected void afterExecute(Runnable r, Throwable t) {
144                     super.afterExecute(r, t);
145                     activeThreads.decrementAndGet();
146                 }
147 
148                 @Override
149                 protected void beforeExecute(Thread t, Runnable r) {
150                     super.beforeExecute(t, r);
151                     activeThreads.incrementAndGet();
152                 }
153             };
154             this.tilingServe = MCRProcessableFactory.newPool(baseExecutor, imageTilerCollection);
155             imageTilerCollection.setProperty("running", running);
156             LOGGER.info("TilingMaster is started");
157             while (running) {
158                 try {
159                     while (activeThreads.get() < tilingThreadCount) {
160                         runLock.lock();
161                         try {
162                             if (!running) {
163                                 break;
164                             }
165                             EntityTransaction transaction = null;
166                             MCRTileJob job = null;
167                             EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
168                             try {
169                                 transaction = em.getTransaction();
170                                 transaction.begin();
171                                 job = TQ.poll();
172                                 imageTilerCollection.setProperty("queue",
173                                     TQ.stream().map(MCRTileJob::getPath).collect(Collectors.toList()));
174                                 transaction.commit();
175                             } catch (PersistenceException e) {
176                                 LOGGER.error("Error while getting next tiling job.", e);
177                                 if (transaction != null) {
178                                     try {
179                                         transaction.rollback();
180                                     } catch (RuntimeException re) {
181                                         LOGGER.warn("Could not rollback transaction.", re);
182                                     }
183                                 }
184                             } finally {
185                                 em.close();
186                             }
187                             if (job != null && !tilingServe.getExecutor().isShutdown()) {
188                                 LOGGER.info("Creating:{}", job.getPath());
189                                 tilingServe.submit(getTilingAction(job));
190                             } else {
191                                 try {
192                                     synchronized (TQ) {
193                                         if (running) {
194                                             LOGGER.debug("No Picture in TilingQueue going to sleep");
195                                             //fixes a race conditioned deadlock situation
196                                             //do not wait longer than 60 sec. for a new MCRTileJob
197                                             TQ.wait(60000);
198                                         }
199                                     }
200                                 } catch (InterruptedException e) {
201                                     LOGGER.error("Image Tiling thread was interrupted.", e);
202                                 }
203                             }
204                         } finally {
205                             runLock.unlock();
206                         }
207                     } // while(tilingServe.getActiveCount() < tilingServe.getCorePoolSize())
208                     if (activeThreads.get() < tilingThreadCount) {
209                         try {
210                             LOGGER.info("Waiting for a tiling job to finish");
211                             Thread.sleep(1000);
212                         } catch (InterruptedException e) {
213                             if (running) {
214                                 LOGGER.error("Image Tiling thread was interrupted.", e);
215                             }
216                         }
217                     }
218                 } catch (Throwable e) {
219                     LOGGER.error("Keep running while catching exceptions.", e);
220                 }
221             } // while(running)
222             imageTilerCollection.setProperty("running", false);
223         }
224         LOGGER.info("Tiling thread finished");
225         MCRSessionMgr.releaseCurrentSession();
226         waiter = null;
227     }
228 
229     private MCRTilingAction getTilingAction(MCRTileJob job) {
230         try {
231             return tilingActionConstructor.newInstance(job);
232         } catch (Exception e) {
233             throw new MCRException(e);
234         }
235     }
236 
237     /**
238      * stops transmitting {@link MCRTileJob} to {@link MCRTilingAction} and prepares shutdown.
239      */
240     public void prepareClose() {
241         LOGGER.info("Closing master image tiling thread");
242         //signal master thread to stop now
243         running = false;
244         //Wake up, Neo!
245         synchronized (TQ) {
246             LOGGER.debug("Wake up tiling queue");
247             TQ.notifyAll();
248         }
249         runLock.lock();
250         try {
251             if (tilingServe != null) {
252                 LOGGER.debug("Shutdown tiling executor jobs.");
253                 tilingServe.getExecutor().shutdown();
254                 try {
255                     LOGGER.debug("Await termination of tiling executor jobs.");
256                     tilingServe.getExecutor().awaitTermination(60, TimeUnit.SECONDS);
257                     LOGGER.debug("All jobs finished.");
258                 } catch (InterruptedException e) {
259                     LOGGER.debug("Could not wait 60 seconds...", e);
260                 }
261             }
262         } finally {
263             runLock.unlock();
264         }
265     }
266 
267     /**
268      * Shuts down this thread and every local tiling threads spawned by {@link #run()}.
269      */
270     public void close() {
271         if (tilingServe != null && !tilingServe.getExecutor().isShutdown()) {
272             LOGGER.info("We are in a hurry, closing tiling service right now");
273             tilingServe.getExecutor().shutdownNow();
274             try {
275                 tilingServe.getExecutor().awaitTermination(60, TimeUnit.SECONDS);
276             } catch (InterruptedException e) {
277                 LOGGER.debug("Could not wait  60 seconds...", e);
278             }
279         }
280         if (waiter != null && waiter.isAlive()) {
281             //thread still running
282             LOGGER.info("{} is still running.", waiter.getName());
283             Thread masterThread = waiter;
284             waiter = null;
285             masterThread.interrupt();
286             try {
287                 masterThread.join();
288                 LOGGER.info("{} has died.", masterThread.getName());
289             } catch (InterruptedException e) {
290                 e.printStackTrace(System.err);
291             }
292         }
293     }
294 
295     @Override
296     public int getPriority() {
297         return MCRShutdownHandler.Closeable.DEFAULT_PRIORITY - 1;
298     }
299 }