1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.mycore.iview2.services;
20
21 import java.util.AbstractQueue;
22 import java.util.Collections;
23 import java.util.Date;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.NoSuchElementException;
27 import java.util.Queue;
28 import java.util.concurrent.ConcurrentLinkedQueue;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.ScheduledExecutorService;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.locks.ReentrantLock;
33
34 import org.apache.logging.log4j.LogManager;
35 import org.apache.logging.log4j.Logger;
36 import org.mycore.backend.jpa.MCREntityManagerProvider;
37 import org.mycore.common.events.MCRShutdownHandler;
38 import org.mycore.common.events.MCRShutdownHandler.Closeable;
39
40 import jakarta.persistence.EntityManager;
41 import jakarta.persistence.NoResultException;
42 import jakarta.persistence.Query;
43 import jakarta.persistence.TypedQuery;
44
45 public class MCRTilingQueue extends AbstractQueue<MCRTileJob> implements Closeable {
46 private static MCRTilingQueue instance = new MCRTilingQueue();
47
48 private static Queue<MCRTileJob> preFetch;
49
50 private static ScheduledExecutorService StalledJobScheduler;
51
52 private static Logger LOGGER = LogManager.getLogger(MCRTilingQueue.class);
53
54 private final ReentrantLock pollLock;
55
56 private boolean running;
57
58 private MCRTilingQueue() {
59
60 int waitTime = Integer.parseInt(MCRIView2Tools.getIView2Property("TimeTillReset")) * 60;
61 StalledJobScheduler = Executors.newSingleThreadScheduledExecutor();
62 StalledJobScheduler.scheduleAtFixedRate(MCRStalledJobResetter.getInstance(), waitTime, waitTime,
63 TimeUnit.SECONDS);
64 preFetch = new ConcurrentLinkedQueue<>();
65 running = true;
66 pollLock = new ReentrantLock();
67 MCRShutdownHandler.getInstance().addCloseable(this);
68 }
69
70
71
72
73 public static MCRTilingQueue getInstance() {
74 if (!instance.running) {
75 return null;
76 }
77 return instance;
78 }
79
80
81
82
83 public MCRTileJob poll() {
84 if (!running) {
85 return null;
86 }
87 try {
88 pollLock.lock();
89 MCRTileJob job = getElement();
90 if (job != null) {
91 job.setStart(new Date(System.currentTimeMillis()));
92 job.setStatus(MCRJobState.PROCESSING);
93 if (!updateJob(job)) {
94 job = null;
95 }
96 }
97 return job;
98 } finally {
99 pollLock.unlock();
100 }
101 }
102
103
104
105
106
107
108 @Override
109 public MCRTileJob remove() throws NoSuchElementException {
110 if (!running) {
111 return null;
112 }
113 MCRTileJob job = poll();
114 if (job == null) {
115 throw new NoSuchElementException();
116 }
117 return job;
118 }
119
120
121
122
123 public MCRTileJob peek() {
124 if (!running) {
125 return null;
126 }
127 return getElement();
128 }
129
130
131
132
133
134
135 @Override
136 public MCRTileJob element() throws NoSuchElementException {
137 if (!running) {
138 return null;
139 }
140 MCRTileJob job = peek();
141 if (job == null) {
142 throw new NoSuchElementException();
143 }
144 return job;
145 }
146
147
148
149
150
151 public boolean offer(MCRTileJob job) {
152 MCRTileJob newJob = job;
153 if (!running) {
154 return false;
155 }
156 MCRTileJob oldJob = getJob(newJob.getDerivate(), newJob.getPath());
157 if (oldJob != null) {
158 newJob = oldJob;
159 } else {
160 newJob.setAdded(new Date());
161 }
162 newJob.setStatus(MCRJobState.NEW);
163 newJob.setStart(null);
164 if (addJob(newJob)) {
165 notifyListener();
166 return true;
167 } else {
168 return false;
169 }
170 }
171
172
173
174
175 @Override
176 public void clear() {
177 if (!running) {
178 return;
179 }
180 EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
181 Query query = em.createQuery("DELETE FROM MCRTileJob");
182 query.executeUpdate();
183 }
184
185
186
187
188
189
190 @Override
191 public Iterator<MCRTileJob> iterator() {
192 if (!running) {
193 List<MCRTileJob> empty = Collections.emptyList();
194 return empty.iterator();
195 }
196 EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
197 TypedQuery<MCRTileJob> query = em.createQuery("FROM MCRTileJob WHERE status='" + MCRJobState.NEW.toChar()
198 + "' ORDER BY added ASC", MCRTileJob.class);
199 List<MCRTileJob> result = query.getResultList();
200 return result.iterator();
201 }
202
203
204
205
206 @Override
207 public int size() {
208 if (!running) {
209 return 0;
210 }
211 EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
212 TypedQuery<Number> query = em
213 .createQuery("SELECT count(*) FROM MCRTileJob WHERE status='" + MCRJobState.NEW.toChar()
214 + "'", Number.class);
215 return query.getSingleResult().intValue();
216 }
217
218
219
220
221 public MCRTileJob getElementOutOfOrder(String derivate, String path) throws NoSuchElementException {
222 if (!running) {
223 return null;
224 }
225 MCRTileJob job = getJob(derivate, path);
226 if (job == null) {
227 return null;
228 }
229 job.setStart(new Date(System.currentTimeMillis()));
230 job.setStatus(MCRJobState.PROCESSING);
231 if (!updateJob(job)) {
232 throw new NoSuchElementException();
233 }
234 return job;
235 }
236
237 private MCRTileJob getJob(String derivate, String path) {
238 if (!running) {
239 return null;
240 }
241 EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
242 TypedQuery<MCRTileJob> query = em.createQuery("FROM MCRTileJob WHERE derivate= :derivate AND path = :path",
243 MCRTileJob.class);
244 query.setParameter("derivate", derivate);
245 query.setParameter("path", path);
246 try {
247 MCRTileJob job = query.getSingleResult();
248 clearPreFetch();
249 return job;
250 } catch (NoResultException e) {
251 return null;
252 }
253 }
254
255 private MCRTileJob getElement() {
256 if (!running) {
257 return null;
258 }
259 MCRTileJob job = getNextPrefetchedElement();
260 if (job != null) {
261 return job;
262 }
263 LOGGER.debug("No prefetched jobs available");
264 if (preFetch(100) == 0) {
265 return null;
266 }
267 return getNextPrefetchedElement();
268 }
269
270 private MCRTileJob getNextPrefetchedElement() {
271 MCRTileJob job = preFetch.poll();
272 LOGGER.debug("Fetched job: {}", job);
273 return job;
274 }
275
276 private int preFetch(int amount) {
277 EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
278 TypedQuery<MCRTileJob> query = em.createQuery(
279 "FROM MCRTileJob WHERE status='" + MCRJobState.NEW.toChar() + "' ORDER BY added ASC", MCRTileJob.class)
280 .setMaxResults(amount);
281 Iterator<MCRTileJob> queryResult = query.getResultList().iterator();
282 int i = 0;
283 while (queryResult.hasNext()) {
284 i++;
285 MCRTileJob job = queryResult.next();
286 preFetch.add(job.clone());
287 em.detach(job);
288 }
289 LOGGER.debug("prefetched {} tile jobs", i);
290 return i;
291 }
292
293 private void clearPreFetch() {
294 preFetch.clear();
295 }
296
297 private boolean updateJob(MCRTileJob job) {
298 if (!running) {
299 return false;
300 }
301 EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
302 em.merge(job);
303 return true;
304 }
305
306 private boolean addJob(MCRTileJob job) {
307 if (!running) {
308 return false;
309 }
310 EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
311 em.persist(job);
312 return true;
313 }
314
315
316
317
318 public synchronized void notifyListener() {
319 this.notifyAll();
320 }
321
322
323
324
325
326
327
328 public int remove(String derivate, String path) {
329 if (!running) {
330 return 0;
331 }
332 EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
333 Query query = em.createQuery("DELETE FROM " + MCRTileJob.class.getName()
334 + " WHERE derivate = :derivate AND path = :path");
335 query.setParameter("derivate", derivate);
336 query.setParameter("path", path);
337 try {
338 return query.executeUpdate();
339 } finally {
340 clearPreFetch();
341 }
342 }
343
344
345
346
347
348
349 public int remove(String derivate) {
350 if (!running) {
351 return 0;
352 }
353 EntityManager em = MCREntityManagerProvider.getCurrentEntityManager();
354 Query query = em
355 .createQuery("DELETE FROM " + MCRTileJob.class.getName() + " WHERE derivate = :derivate");
356 query.setParameter("derivate", derivate);
357 try {
358 return query.executeUpdate();
359 } finally {
360 clearPreFetch();
361 }
362 }
363
364
365
366
367 public void prepareClose() {
368 StalledJobScheduler.shutdownNow();
369 running = false;
370 try {
371 StalledJobScheduler.awaitTermination(60, TimeUnit.SECONDS);
372 } catch (InterruptedException e) {
373 LOGGER.info("Could not wait for 60 seconds...");
374 StalledJobScheduler.shutdownNow();
375 }
376 }
377
378
379
380
381 public void close() {
382
383 }
384
385
386
387
388 @Override
389 public String toString() {
390 return "MCRTilingQueue";
391 }
392
393 @Override
394 public int getPriority() {
395 return MCRShutdownHandler.Closeable.DEFAULT_PRIORITY;
396 }
397 }