1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.mycore.services.queuedjob;
20
21 import java.lang.reflect.Constructor;
22 import java.util.ArrayList;
23 import java.util.Date;
24 import java.util.List;
25 import java.util.concurrent.ExecutionException;
26
27 import jakarta.persistence.EntityManager;
28 import jakarta.persistence.EntityTransaction;
29 import org.apache.logging.log4j.LogManager;
30 import org.apache.logging.log4j.Logger;
31 import org.mycore.backend.jpa.MCREntityManagerProvider;
32 import org.mycore.common.MCRClassTools;
33 import org.mycore.common.MCRSession;
34 import org.mycore.common.MCRSessionMgr;
35 import org.mycore.common.MCRSystemUserInformation;
36 import org.mycore.common.config.MCRConfiguration2;
37 import org.mycore.common.processing.MCRAbstractProcessable;
38 import org.mycore.common.processing.MCRProcessableStatus;
39
40
41
42
43
44
45
46
47
48
49 public class MCRJobThread extends MCRAbstractProcessable implements Runnable {
50
51 private static Logger LOGGER = LogManager.getLogger(MCRJobThread.class);
52
53 protected final MCRJobQueue queue;
54
55 protected MCRJob job = null;
56
57 private List<MCRJobStatusListener> listeners;
58
59 public MCRJobThread(MCRJob job) {
60 this.job = job;
61 setName(this.job.getId() + " - " + this.job.getAction().getSimpleName());
62 setStatus(MCRProcessableStatus.created);
63 this.queue = MCRJobQueue.getInstance(job.getAction());
64 job.getParameters().forEach((k, v) -> this.getProperties().put(k, v));
65
66 listeners = new ArrayList<>();
67 MCRConfiguration2.getString("MCR.QueuedJob." + job.getAction().getSimpleName() + ".Listeners")
68 .ifPresent(classNames -> {
69 for (String className : classNames.split(",")) {
70 try {
71 MCRJobStatusListener instance = (MCRJobStatusListener) MCRClassTools.forName(className)
72 .getDeclaredConstructor().newInstance();
73 listeners.add(instance);
74 } catch (Exception e) {
75 LOGGER.error("Could not load class {}", className, e);
76 }
77 }
78 });
79 }
80
81 public void run() {
82 MCRSessionMgr.unlock();
83 MCRSession mcrSession = MCRSessionMgr.getCurrentSession();
84 mcrSession.setUserInformation(MCRSystemUserInformation.getSystemUserInstance());
85 EntityManager em = MCREntityManagerProvider.getEntityManagerFactory().createEntityManager();
86 EntityTransaction transaction = em.getTransaction();
87 try {
88 Class<? extends MCRJobAction> actionClass = job.getAction();
89 Constructor<? extends MCRJobAction> actionConstructor = actionClass.getConstructor(MCRJob.class);
90 MCRJobAction action = actionConstructor.newInstance(job);
91
92 transaction.begin();
93
94 try {
95 setStatus(MCRProcessableStatus.processing);
96 job.setStart(new Date());
97 listeners.forEach(l -> l.onProcessing(job));
98
99 action.execute();
100
101 job.setFinished(new Date());
102 job.setStatus(MCRJobStatus.FINISHED);
103 setStatus(MCRProcessableStatus.successful);
104 listeners.forEach(l -> l.onSuccess(job));
105 } catch (ExecutionException ex) {
106 LOGGER.error("Exception occured while try to start job. Perform rollback.", ex);
107 setError(ex);
108 action.rollback();
109 listeners.forEach(l -> l.onError(job));
110 } catch (Exception ex) {
111 LOGGER.error("Exception occured while try to start job.", ex);
112 setError(ex);
113 listeners.forEach(l -> l.onError(job));
114 }
115 em.merge(job);
116 transaction.commit();
117
118
119 synchronized (queue) {
120 queue.notifyAll();
121 }
122 } catch (Exception e) {
123 LOGGER.error("Error while getting next job.", e);
124 if (transaction != null) {
125 transaction.rollback();
126 }
127 } finally {
128 em.close();
129 MCRSessionMgr.releaseCurrentSession();
130 mcrSession.close();
131 }
132 }
133
134 }