1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.mycore.iview2.services;
20
21 import java.lang.reflect.Constructor;
22 import java.util.Arrays;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.ThreadFactory;
25 import java.util.concurrent.ThreadPoolExecutor;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import java.util.concurrent.locks.ReentrantLock;
29 import java.util.stream.Collectors;
30
31 import javax.imageio.ImageIO;
32
33 import org.apache.logging.log4j.LogManager;
34 import org.apache.logging.log4j.Logger;
35 import org.mycore.backend.jpa.MCREntityManagerProvider;
36 import org.mycore.common.MCRException;
37 import org.mycore.common.MCRSession;
38 import org.mycore.common.MCRSessionMgr;
39 import org.mycore.common.MCRSystemUserInformation;
40 import org.mycore.common.config.MCRConfiguration2;
41 import org.mycore.common.events.MCRShutdownHandler;
42 import org.mycore.common.events.MCRShutdownHandler.Closeable;
43 import org.mycore.common.processing.MCRProcessableDefaultCollection;
44 import org.mycore.common.processing.MCRProcessableRegistry;
45 import org.mycore.util.concurrent.processing.MCRProcessableExecutor;
46 import org.mycore.util.concurrent.processing.MCRProcessableFactory;
47
48 import jakarta.persistence.EntityManager;
49 import jakarta.persistence.EntityTransaction;
50 import jakarta.persistence.PersistenceException;
51
52
53
54
55
56
57 public class MCRImageTiler implements Runnable, Closeable {
58 private static MCRImageTiler instance = null;
59
60 private static Logger LOGGER = LogManager.getLogger(MCRImageTiler.class);
61
62 private static final MCRTilingQueue TQ = MCRTilingQueue.getInstance();
63
64 private MCRProcessableExecutor tilingServe;
65
66 private volatile boolean running = true;
67
68 private ReentrantLock runLock;
69
70 private Constructor<? extends MCRTilingAction> tilingActionConstructor;
71
72 private volatile Thread waiter;
73
74 private MCRImageTiler() {
75 MCRShutdownHandler.getInstance().addCloseable(this);
76 runLock = new ReentrantLock();
77 try {
78 Class<? extends MCRTilingAction> tilingActionImpl = MCRConfiguration2.<MCRTilingAction>getClass(
79 MCRIView2Tools.CONFIG_PREFIX + "MCRTilingActionImpl").orElse(MCRTilingAction.class);
80 tilingActionConstructor = tilingActionImpl.getConstructor(MCRTileJob.class);
81 } catch (Exception e) {
82 LOGGER.error("Error while initializing", e);
83 throw new MCRException(e);
84 }
85 }
86
87
88
89
90 public static boolean isRunning() {
91 return instance != null;
92 }
93
94
95
96
97 public static MCRImageTiler getInstance() {
98 if (instance == null) {
99 instance = new MCRImageTiler();
100 }
101 return instance;
102 }
103
104
105
106
107
108 public void run() {
109 waiter = Thread.currentThread();
110 Thread.currentThread().setName("TileMaster");
111
112 MCRSessionMgr.unlock();
113 MCRSession mcrSession = MCRSessionMgr.getCurrentSession();
114 mcrSession.setUserInformation(MCRSystemUserInformation.getSystemUserInstance());
115 boolean activated = MCRConfiguration2.getBoolean(MCRIView2Tools.CONFIG_PREFIX + "LocalTiler.activated")
116 .orElse(true) && MCRConfiguration2.getBoolean("MCR.Persistence.Database.Enable").orElse(true)
117 && MCREntityManagerProvider.getEntityManagerFactory() != null;
118 LOGGER.info("Local Tiling is {}", activated ? "activated" : "deactivated");
119 ImageIO.scanForPlugins();
120 LOGGER.info("Supported image file types for reading: {}", Arrays.toString(ImageIO.getReaderFormatNames()));
121
122 MCRProcessableDefaultCollection imageTilerCollection = new MCRProcessableDefaultCollection("Image Tiler");
123 MCRProcessableRegistry registry = MCRProcessableRegistry.getSingleInstance();
124 registry.register(imageTilerCollection);
125
126 if (activated) {
127 int tilingThreadCount = Integer.parseInt(MCRIView2Tools.getIView2Property("TilingThreads"));
128 ThreadFactory slaveFactory = new ThreadFactory() {
129 AtomicInteger tNum = new AtomicInteger();
130
131 ThreadGroup tg = new ThreadGroup("MCR slave tiling thread group");
132
133 public Thread newThread(Runnable r) {
134 return new Thread(tg, r, "TileSlave#" + tNum.incrementAndGet());
135 }
136 };
137 final AtomicInteger activeThreads = new AtomicInteger();
138 final LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
139 ThreadPoolExecutor baseExecutor = new ThreadPoolExecutor(tilingThreadCount, tilingThreadCount, 1,
140 TimeUnit.DAYS, workQueue, slaveFactory) {
141
142 @Override
143 protected void afterExecute(Runnable r, Throwable t) {
144 super.afterExecute(r, t);
145 activeThreads.decrementAndGet();
146 }
147
148 @Override
149 protected void beforeExecute(Thread t, Runnable r) {
150 super.beforeExecute(t, r);
151 activeThreads.incrementAndGet();
152 }
153 };
154 this.tilingServe = MCRProcessableFactory.newPool(baseExecutor, imageTilerCollection);
155 imageTilerCollection.setProperty("running", running);
156 LOGGER.info("TilingMaster is started");
157 while (running) {
158 try {
159 while (activeThreads.get() < tilingThreadCount) {
160 runLock.lock();
161 try {
162 if (!running) {
163 break;
164 }
165 EntityTransaction transaction = null;
166 MCRTileJob job = null;
167 EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
168 try {
169 transaction = em.getTransaction();
170 transaction.begin();
171 job = TQ.poll();
172 imageTilerCollection.setProperty("queue",
173 TQ.stream().map(MCRTileJob::getPath).collect(Collectors.toList()));
174 transaction.commit();
175 } catch (PersistenceException e) {
176 LOGGER.error("Error while getting next tiling job.", e);
177 if (transaction != null) {
178 try {
179 transaction.rollback();
180 } catch (RuntimeException re) {
181 LOGGER.warn("Could not rollback transaction.", re);
182 }
183 }
184 } finally {
185 em.close();
186 }
187 if (job != null && !tilingServe.getExecutor().isShutdown()) {
188 LOGGER.info("Creating:{}", job.getPath());
189 tilingServe.submit(getTilingAction(job));
190 } else {
191 try {
192 synchronized (TQ) {
193 if (running) {
194 LOGGER.debug("No Picture in TilingQueue going to sleep");
195
196
197 TQ.wait(60000);
198 }
199 }
200 } catch (InterruptedException e) {
201 LOGGER.error("Image Tiling thread was interrupted.", e);
202 }
203 }
204 } finally {
205 runLock.unlock();
206 }
207 }
208 if (activeThreads.get() < tilingThreadCount) {
209 try {
210 LOGGER.info("Waiting for a tiling job to finish");
211 Thread.sleep(1000);
212 } catch (InterruptedException e) {
213 if (running) {
214 LOGGER.error("Image Tiling thread was interrupted.", e);
215 }
216 }
217 }
218 } catch (Throwable e) {
219 LOGGER.error("Keep running while catching exceptions.", e);
220 }
221 }
222 imageTilerCollection.setProperty("running", false);
223 }
224 LOGGER.info("Tiling thread finished");
225 MCRSessionMgr.releaseCurrentSession();
226 waiter = null;
227 }
228
229 private MCRTilingAction getTilingAction(MCRTileJob job) {
230 try {
231 return tilingActionConstructor.newInstance(job);
232 } catch (Exception e) {
233 throw new MCRException(e);
234 }
235 }
236
237
238
239
240 public void prepareClose() {
241 LOGGER.info("Closing master image tiling thread");
242
243 running = false;
244
245 synchronized (TQ) {
246 LOGGER.debug("Wake up tiling queue");
247 TQ.notifyAll();
248 }
249 runLock.lock();
250 try {
251 if (tilingServe != null) {
252 LOGGER.debug("Shutdown tiling executor jobs.");
253 tilingServe.getExecutor().shutdown();
254 try {
255 LOGGER.debug("Await termination of tiling executor jobs.");
256 tilingServe.getExecutor().awaitTermination(60, TimeUnit.SECONDS);
257 LOGGER.debug("All jobs finished.");
258 } catch (InterruptedException e) {
259 LOGGER.debug("Could not wait 60 seconds...", e);
260 }
261 }
262 } finally {
263 runLock.unlock();
264 }
265 }
266
267
268
269
270 public void close() {
271 if (tilingServe != null && !tilingServe.getExecutor().isShutdown()) {
272 LOGGER.info("We are in a hurry, closing tiling service right now");
273 tilingServe.getExecutor().shutdownNow();
274 try {
275 tilingServe.getExecutor().awaitTermination(60, TimeUnit.SECONDS);
276 } catch (InterruptedException e) {
277 LOGGER.debug("Could not wait 60 seconds...", e);
278 }
279 }
280 if (waiter != null && waiter.isAlive()) {
281
282 LOGGER.info("{} is still running.", waiter.getName());
283 Thread masterThread = waiter;
284 waiter = null;
285 masterThread.interrupt();
286 try {
287 masterThread.join();
288 LOGGER.info("{} has died.", masterThread.getName());
289 } catch (InterruptedException e) {
290 e.printStackTrace(System.err);
291 }
292 }
293 }
294
295 @Override
296 public int getPriority() {
297 return MCRShutdownHandler.Closeable.DEFAULT_PRIORITY - 1;
298 }
299 }