1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.mycore.util.concurrent.processing;
20
21 import java.time.Instant;
22 import java.util.concurrent.Callable;
23 import java.util.concurrent.CompletableFuture;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Future;
26 import java.util.function.Supplier;
27
28 import org.mycore.common.MCRException;
29 import org.mycore.common.processing.MCRProcessable;
30 import org.mycore.common.processing.MCRProcessableStatus;
31 import org.mycore.common.processing.MCRProcessableTask;
32 import org.mycore.common.processing.MCRProgressable;
33 import org.mycore.util.concurrent.MCRDecorator;
34 import org.mycore.util.concurrent.MCRPrioritySupplier;
35
36
37
38
39
40
41
42
43
44
45 public class MCRProcessableSupplier<R> extends MCRProcessableTask<Callable<R>> implements Supplier<R> {
46
47 protected CompletableFuture<R> future;
48
49
50
51
52
53
54
55
56 public static <T> MCRProcessableSupplier<T> of(Callable<T> task, CompletableFuture<T> future) {
57 MCRProcessableSupplier<T> ps = new MCRProcessableSupplier<>(task);
58 ps.future = future;
59 return ps;
60 }
61
62
63
64
65
66
67
68
69
70
71 public static <T> MCRProcessableSupplier<T> of(Callable<T> task, ExecutorService executorService,
72 Integer priority) {
73 MCRProcessableSupplier<T> ps = new MCRProcessableSupplier<>(task);
74 ps.future = new MCRPrioritySupplier<>(ps, priority).runAsync(executorService);
75 return ps;
76 }
77
78
79
80
81
82
83 private MCRProcessableSupplier(Callable<R> task) {
84 super(task);
85 }
86
87
88
89
90
91 @Override
92 public R get() {
93 try {
94 this.setStatus(MCRProcessableStatus.processing);
95 this.startTime = Instant.now();
96 R result = getTask().call();
97 this.setStatus(MCRProcessableStatus.successful);
98 return result;
99 } catch (InterruptedException exc) {
100 this.error = exc;
101 this.setStatus(MCRProcessableStatus.canceled);
102 throw new MCRException(this.error);
103 } catch (Exception exc) {
104 this.error = exc;
105 this.setStatus(MCRProcessableStatus.failed);
106 throw new MCRException(this.error);
107 }
108 }
109
110
111
112
113
114
115 public CompletableFuture<R> getFuture() {
116 return future;
117 }
118
119
120
121
122
123
124
125 public boolean isFutureDone() {
126 return future.isDone();
127 }
128
129
130
131
132
133
134
135
136
137 public boolean cancel(boolean mayInterruptIfRunning) {
138 return future.cancel(mayInterruptIfRunning);
139 }
140
141
142
143
144
145
146
147
148 @Override
149 public Integer getProgress() {
150 if (task instanceof MCRProgressable) {
151 return ((MCRProgressable) task).getProgress();
152 }
153 return super.getProgress();
154 }
155
156
157
158
159
160
161 @Override
162 public String getProgressText() {
163 if (task instanceof MCRProgressable) {
164 return ((MCRProgressable) task).getProgressText();
165 }
166 return super.getProgressText();
167 }
168
169
170
171
172
173
174
175 @Override
176 public String getName() {
177 String name = super.getName();
178 if (name == null) {
179 return MCRDecorator.resolve(this.task).map(object -> {
180 if (object instanceof MCRProcessable) {
181 return ((MCRProcessable) object).getName();
182 }
183 return object.toString();
184 }).orElse(this.task.toString());
185 }
186 return name;
187 }
188
189 }