1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.mycore.oai;
20
21 import java.time.Instant;
22 import java.util.Arrays;
23 import java.util.Date;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Objects;
27 import java.util.Optional;
28 import java.util.Timer;
29 import java.util.TimerTask;
30 import java.util.concurrent.CompletableFuture;
31 import java.util.concurrent.ConcurrentHashMap;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.Executors;
34
35 import org.apache.logging.log4j.LogManager;
36 import org.apache.logging.log4j.Logger;
37 import org.mycore.common.MCRException;
38 import org.mycore.common.MCRSession;
39 import org.mycore.common.MCRSessionMgr;
40 import org.mycore.common.config.MCRConfiguration2;
41 import org.mycore.common.events.MCRShutdownHandler;
42 import org.mycore.oai.pmh.BadResumptionTokenException;
43 import org.mycore.oai.pmh.DefaultResumptionToken;
44 import org.mycore.oai.pmh.Header;
45 import org.mycore.oai.pmh.MetadataFormat;
46 import org.mycore.oai.pmh.OAIDataList;
47 import org.mycore.oai.pmh.Record;
48 import org.mycore.oai.set.MCRSet;
49 import org.mycore.util.concurrent.MCRTransactionableRunnable;
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 public class MCROAISearchManager {
66
67 protected static final Logger LOGGER = LogManager.getLogger(MCROAISearchManager.class);
68
69 protected static final String TOKEN_DELIMITER = "@";
70
71 protected static int MAX_AGE;
72
73 protected Map<String, MCROAISearcher> resultMap;
74
75 protected MCROAIIdentify identify;
76
77 protected MCROAIObjectManager objManager;
78
79 protected MCROAISetManager setManager;
80
81 protected int partitionSize;
82
83 private ExecutorService executorService;
84
85 private boolean runListRecordsParallel;
86
87 static {
88 String prefix = MCROAIAdapter.PREFIX + "ResumptionTokens.";
89 MAX_AGE = MCRConfiguration2.getInt(prefix + "MaxAge").orElse(30) * 60 * 1000;
90 }
91
92 public MCROAISearchManager() {
93 this.resultMap = new ConcurrentHashMap<>();
94 TimerTask tt = new TimerTask() {
95 @Override
96 public void run() {
97 for (Map.Entry<String, MCROAISearcher> entry : resultMap.entrySet()) {
98 String searchId = entry.getKey();
99 MCROAISearcher searcher = entry.getValue();
100 if ((searcher != null) && searcher.isExpired()) {
101 LOGGER.info("Removing expired resumption token {}", searchId);
102 resultMap.remove(searchId);
103 }
104 }
105 }
106 };
107 new Timer().schedule(tt, new Date(System.currentTimeMillis() + MAX_AGE), MAX_AGE);
108 runListRecordsParallel = MCRConfiguration2
109 .getOrThrow(MCROAIAdapter.PREFIX + "RunListRecordsParallel", Boolean::parseBoolean);
110 if (runListRecordsParallel) {
111 executorService = Executors.newWorkStealingPool();
112 MCRShutdownHandler.getInstance().addCloseable(executorService::shutdownNow);
113 }
114 }
115
116 public void init(MCROAIIdentify identify, MCROAIObjectManager objManager, MCROAISetManager setManager,
117 int partitionSize) {
118 this.identify = identify;
119 this.objManager = objManager;
120 this.setManager = setManager;
121 this.partitionSize = partitionSize;
122 }
123
124 public Optional<Header> getHeader(String oaiId) {
125 MCROAISearcher searcher = getSearcher(this.identify, null, 1, setManager, objManager);
126 return searcher.getHeader(objManager.getMyCoReId(oaiId));
127 }
128
129 public OAIDataList<Header> searchHeader(String resumptionToken) throws BadResumptionTokenException {
130 String searchId = getSearchId(resumptionToken);
131 String tokenCursor = getTokenCursor(resumptionToken);
132 MCROAISearcher searcher = this.resultMap.get(searchId);
133 if (searcher == null || tokenCursor == null || tokenCursor.length() <= 0) {
134 throw new BadResumptionTokenException(resumptionToken);
135 }
136 MCROAIResult result = searcher.query(tokenCursor);
137 return getHeaderList(searcher, result);
138 }
139
140 public OAIDataList<Record> searchRecord(String resumptionToken) throws BadResumptionTokenException {
141 String searchId = getSearchId(resumptionToken);
142 String tokenCursor = getTokenCursor(resumptionToken);
143 MCROAISearcher searcher = this.resultMap.get(searchId);
144 if (searcher == null || tokenCursor == null || tokenCursor.length() <= 0) {
145 throw new BadResumptionTokenException(resumptionToken);
146 }
147 MCROAIResult result = searcher.query(tokenCursor);
148 return getRecordList(searcher, result);
149 }
150
151 public OAIDataList<Header> searchHeader(MetadataFormat format, MCRSet set, Instant from, Instant until) {
152 MCROAISearcher searcher = getSearcher(this.identify, format, getPartitionSize(), setManager, objManager);
153 this.resultMap.put(searcher.getID(), searcher);
154 MCROAIResult result = searcher.query(set, from, until);
155 return getHeaderList(searcher, result);
156 }
157
158 public OAIDataList<Record> searchRecord(MetadataFormat format, MCRSet set, Instant from, Instant until) {
159 MCROAISearcher searcher = getSearcher(this.identify, format, getPartitionSize(), setManager, objManager);
160 this.resultMap.put(searcher.getID(), searcher);
161 MCROAIResult result = searcher.query(set, from, until);
162 return getRecordList(searcher, result);
163 }
164
165 protected OAIDataList<Record> getRecordList(MCROAISearcher searcher, MCROAIResult result) {
166 OAIDataList<Record> recordList = runListRecordsParallel ? getRecordListParallel(searcher, result)
167 : getRecordListSequential(searcher, result);
168 if (recordList.contains(null)) {
169 if (MCRConfiguration2.getBoolean("MCR.OAIDataProvider.FailOnErrorRecords").orElse(false)) {
170 throw new MCRException(
171 "An internal error occur. Some of the following records are invalid and cannot be processed."
172 + " Please inform the system administrator. " + result.list());
173 }
174 recordList.removeIf(Objects::isNull);
175 }
176 this.setResumptionToken(recordList, searcher, result);
177 return recordList;
178 }
179
180 private OAIDataList<Record> getRecordListSequential(MCROAISearcher searcher, MCROAIResult result) {
181 OAIDataList<Record> recordList = new OAIDataList<>();
182 result.list().forEach(header -> {
183 Record record = this.objManager.getRecord(header, searcher.getMetadataFormat());
184 recordList.add(record);
185 });
186 return recordList;
187 }
188
189 private OAIDataList<Record> getRecordListParallel(MCROAISearcher searcher, MCROAIResult result) {
190 List<Header> headerList = result.list();
191 int listSize = headerList.size();
192 Record[] records = new Record[listSize];
193 @SuppressWarnings("rawtypes")
194 CompletableFuture[] futures = new CompletableFuture[listSize];
195 MetadataFormat metadataFormat = searcher.getMetadataFormat();
196 MCRSession mcrSession = MCRSessionMgr.getCurrentSession();
197 for (int i = 0; i < listSize; i++) {
198 Header header = headerList.get(i);
199 int resultIndex = i;
200 MCRTransactionableRunnable r = new MCRTransactionableRunnable(
201 () -> records[resultIndex] = this.objManager.getRecord(header, metadataFormat), mcrSession);
202 CompletableFuture<Void> future = CompletableFuture.runAsync(r, executorService);
203 futures[i] = future;
204 }
205 CompletableFuture.allOf(futures).join();
206 OAIDataList<Record> recordList = new OAIDataList<>();
207 recordList.addAll(Arrays.asList(records));
208 return recordList;
209 }
210
211 protected OAIDataList<Header> getHeaderList(MCROAISearcher searcher, MCROAIResult result) {
212 OAIDataList<Header> headerList = new OAIDataList<>();
213 headerList.addAll(result.list());
214 this.setResumptionToken(headerList, searcher, result);
215 return headerList;
216 }
217
218 public String getSearchId(String token) throws BadResumptionTokenException {
219 try {
220 return token.split(TOKEN_DELIMITER)[0];
221 } catch (Exception exc) {
222 throw new BadResumptionTokenException(token);
223 }
224 }
225
226 public String getTokenCursor(String token) throws BadResumptionTokenException {
227 try {
228 String[] tokenParts = token.split(TOKEN_DELIMITER);
229 return tokenParts[tokenParts.length - 1];
230 } catch (Exception exc) {
231 throw new BadResumptionTokenException(token);
232 }
233 }
234
235 protected void setResumptionToken(OAIDataList<?> dataList, MCROAISearcher searcher, MCROAIResult result) {
236 result.nextCursor().map(cursor -> {
237 DefaultResumptionToken rsToken = new DefaultResumptionToken();
238 rsToken.setToken(searcher.getID() + TOKEN_DELIMITER + cursor);
239 rsToken.setCompleteListSize(result.getNumHits());
240 rsToken.setExpirationDate(searcher.getExpirationTime());
241 return rsToken;
242 }).ifPresent(dataList::setResumptionToken);
243 }
244
245 public int getPartitionSize() {
246 return partitionSize;
247 }
248
249 public static MCROAISearcher getSearcher(MCROAIIdentify identify, MetadataFormat format, int partitionSize,
250 MCROAISetManager setManager, MCROAIObjectManager objectManager) {
251 String className = identify.getConfigPrefix() + "Searcher";
252 MCROAISearcher searcher = MCRConfiguration2.<MCROAISearcher>getInstanceOf(className)
253 .orElseGet(MCROAICombinedSearcher::new);
254 searcher.init(identify, format, MAX_AGE, partitionSize, setManager, objectManager);
255 return searcher;
256 }
257
258 }