View Javadoc
1   /*
2    * This file is part of ***  M y C o R e  ***
3    * See http://www.mycore.de/ for details.
4    *
5    * MyCoRe is free software: you can redistribute it and/or modify
6    * it under the terms of the GNU General Public License as published by
7    * the Free Software Foundation, either version 3 of the License, or
8    * (at your option) any later version.
9    *
10   * MyCoRe is distributed in the hope that it will be useful,
11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   * GNU General Public License for more details.
14   *
15   * You should have received a copy of the GNU General Public License
16   * along with MyCoRe.  If not, see <http://www.gnu.org/licenses/>.
17   */
18  
19  package org.mycore.oai;
20  
21  import java.time.Instant;
22  import java.util.Arrays;
23  import java.util.Date;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Objects;
27  import java.util.Optional;
28  import java.util.Timer;
29  import java.util.TimerTask;
30  import java.util.concurrent.CompletableFuture;
31  import java.util.concurrent.ConcurrentHashMap;
32  import java.util.concurrent.ExecutorService;
33  import java.util.concurrent.Executors;
34  
35  import org.apache.logging.log4j.LogManager;
36  import org.apache.logging.log4j.Logger;
37  import org.mycore.common.MCRException;
38  import org.mycore.common.MCRSession;
39  import org.mycore.common.MCRSessionMgr;
40  import org.mycore.common.config.MCRConfiguration2;
41  import org.mycore.common.events.MCRShutdownHandler;
42  import org.mycore.oai.pmh.BadResumptionTokenException;
43  import org.mycore.oai.pmh.DefaultResumptionToken;
44  import org.mycore.oai.pmh.Header;
45  import org.mycore.oai.pmh.MetadataFormat;
46  import org.mycore.oai.pmh.OAIDataList;
47  import org.mycore.oai.pmh.Record;
48  import org.mycore.oai.set.MCRSet;
49  import org.mycore.util.concurrent.MCRTransactionableRunnable;
50  
51  /**
52   * Search manager of the mycore OAI-PMH implementation. Creates a new
53   * {@link MCROAISearcher} instance for each
54   * {@link #searchHeader(MetadataFormat, MCRSet, Instant, Instant)}
55   * and {@link #searchRecord(MetadataFormat, MCRSet, Instant, Instant)} call.
56   * The resumption token created by those methods can be reused for
57   * later calls to the same searcher. A searcher is dropped after an
58   * expiration time. The time increases for each query call.
59   *
60   * <p>Due to token based querying it is not possible to set a current
61   * position for the resumption token. Its always set to -1.</p>
62   *
63   * @author Matthias Eichner
64   */
65  public class MCROAISearchManager {
66  
67      protected static final Logger LOGGER = LogManager.getLogger(MCROAISearchManager.class);
68  
69      protected static final String TOKEN_DELIMITER = "@";
70  
71      protected static int MAX_AGE;
72  
73      protected Map<String, MCROAISearcher> resultMap;
74  
75      protected MCROAIIdentify identify;
76  
77      protected MCROAIObjectManager objManager;
78  
79      protected MCROAISetManager setManager;
80  
81      protected int partitionSize;
82  
83      private ExecutorService executorService;
84  
85      private boolean runListRecordsParallel;
86  
87      static {
88          String prefix = MCROAIAdapter.PREFIX + "ResumptionTokens.";
89          MAX_AGE = MCRConfiguration2.getInt(prefix + "MaxAge").orElse(30) * 60 * 1000;
90      }
91  
92      public MCROAISearchManager() {
93          this.resultMap = new ConcurrentHashMap<>();
94          TimerTask tt = new TimerTask() {
95              @Override
96              public void run() {
97                  for (Map.Entry<String, MCROAISearcher> entry : resultMap.entrySet()) {
98                      String searchId = entry.getKey();
99                      MCROAISearcher searcher = entry.getValue();
100                     if ((searcher != null) && searcher.isExpired()) {
101                         LOGGER.info("Removing expired resumption token {}", searchId);
102                         resultMap.remove(searchId);
103                     }
104                 }
105             }
106         };
107         new Timer().schedule(tt, new Date(System.currentTimeMillis() + MAX_AGE), MAX_AGE);
108         runListRecordsParallel = MCRConfiguration2
109             .getOrThrow(MCROAIAdapter.PREFIX + "RunListRecordsParallel", Boolean::parseBoolean);
110         if (runListRecordsParallel) {
111             executorService = Executors.newWorkStealingPool();
112             MCRShutdownHandler.getInstance().addCloseable(executorService::shutdownNow);
113         }
114     }
115 
116     public void init(MCROAIIdentify identify, MCROAIObjectManager objManager, MCROAISetManager setManager,
117         int partitionSize) {
118         this.identify = identify;
119         this.objManager = objManager;
120         this.setManager = setManager;
121         this.partitionSize = partitionSize;
122     }
123 
124     public Optional<Header> getHeader(String oaiId) {
125         MCROAISearcher searcher = getSearcher(this.identify, null, 1, setManager, objManager);
126         return searcher.getHeader(objManager.getMyCoReId(oaiId));
127     }
128 
129     public OAIDataList<Header> searchHeader(String resumptionToken) throws BadResumptionTokenException {
130         String searchId = getSearchId(resumptionToken);
131         String tokenCursor = getTokenCursor(resumptionToken);
132         MCROAISearcher searcher = this.resultMap.get(searchId);
133         if (searcher == null || tokenCursor == null || tokenCursor.length() <= 0) {
134             throw new BadResumptionTokenException(resumptionToken);
135         }
136         MCROAIResult result = searcher.query(tokenCursor);
137         return getHeaderList(searcher, result);
138     }
139 
140     public OAIDataList<Record> searchRecord(String resumptionToken) throws BadResumptionTokenException {
141         String searchId = getSearchId(resumptionToken);
142         String tokenCursor = getTokenCursor(resumptionToken);
143         MCROAISearcher searcher = this.resultMap.get(searchId);
144         if (searcher == null || tokenCursor == null || tokenCursor.length() <= 0) {
145             throw new BadResumptionTokenException(resumptionToken);
146         }
147         MCROAIResult result = searcher.query(tokenCursor);
148         return getRecordList(searcher, result);
149     }
150 
151     public OAIDataList<Header> searchHeader(MetadataFormat format, MCRSet set, Instant from, Instant until) {
152         MCROAISearcher searcher = getSearcher(this.identify, format, getPartitionSize(), setManager, objManager);
153         this.resultMap.put(searcher.getID(), searcher);
154         MCROAIResult result = searcher.query(set, from, until);
155         return getHeaderList(searcher, result);
156     }
157 
158     public OAIDataList<Record> searchRecord(MetadataFormat format, MCRSet set, Instant from, Instant until) {
159         MCROAISearcher searcher = getSearcher(this.identify, format, getPartitionSize(), setManager, objManager);
160         this.resultMap.put(searcher.getID(), searcher);
161         MCROAIResult result = searcher.query(set, from, until);
162         return getRecordList(searcher, result);
163     }
164 
165     protected OAIDataList<Record> getRecordList(MCROAISearcher searcher, MCROAIResult result) {
166         OAIDataList<Record> recordList = runListRecordsParallel ? getRecordListParallel(searcher, result)
167             : getRecordListSequential(searcher, result);
168         if (recordList.contains(null)) {
169             if (MCRConfiguration2.getBoolean("MCR.OAIDataProvider.FailOnErrorRecords").orElse(false)) {
170                 throw new MCRException(
171                     "An internal error occur. Some of the following records are invalid and cannot be processed."
172                         + " Please inform the system administrator. " + result.list());
173             }
174             recordList.removeIf(Objects::isNull);
175         }
176         this.setResumptionToken(recordList, searcher, result);
177         return recordList;
178     }
179 
180     private OAIDataList<Record> getRecordListSequential(MCROAISearcher searcher, MCROAIResult result) {
181         OAIDataList<Record> recordList = new OAIDataList<>();
182         result.list().forEach(header -> {
183             Record record = this.objManager.getRecord(header, searcher.getMetadataFormat());
184             recordList.add(record);
185         });
186         return recordList;
187     }
188 
189     private OAIDataList<Record> getRecordListParallel(MCROAISearcher searcher, MCROAIResult result) {
190         List<Header> headerList = result.list();
191         int listSize = headerList.size();
192         Record[] records = new Record[listSize];
193         @SuppressWarnings("rawtypes")
194         CompletableFuture[] futures = new CompletableFuture[listSize];
195         MetadataFormat metadataFormat = searcher.getMetadataFormat();
196         MCRSession mcrSession = MCRSessionMgr.getCurrentSession();
197         for (int i = 0; i < listSize; i++) {
198             Header header = headerList.get(i);
199             int resultIndex = i;
200             MCRTransactionableRunnable r = new MCRTransactionableRunnable(
201                 () -> records[resultIndex] = this.objManager.getRecord(header, metadataFormat), mcrSession);
202             CompletableFuture<Void> future = CompletableFuture.runAsync(r, executorService);
203             futures[i] = future;
204         }
205         CompletableFuture.allOf(futures).join();
206         OAIDataList<Record> recordList = new OAIDataList<>();
207         recordList.addAll(Arrays.asList(records));
208         return recordList;
209     }
210 
211     protected OAIDataList<Header> getHeaderList(MCROAISearcher searcher, MCROAIResult result) {
212         OAIDataList<Header> headerList = new OAIDataList<>();
213         headerList.addAll(result.list());
214         this.setResumptionToken(headerList, searcher, result);
215         return headerList;
216     }
217 
218     public String getSearchId(String token) throws BadResumptionTokenException {
219         try {
220             return token.split(TOKEN_DELIMITER)[0];
221         } catch (Exception exc) {
222             throw new BadResumptionTokenException(token);
223         }
224     }
225 
226     public String getTokenCursor(String token) throws BadResumptionTokenException {
227         try {
228             String[] tokenParts = token.split(TOKEN_DELIMITER);
229             return tokenParts[tokenParts.length - 1];
230         } catch (Exception exc) {
231             throw new BadResumptionTokenException(token);
232         }
233     }
234 
235     protected void setResumptionToken(OAIDataList<?> dataList, MCROAISearcher searcher, MCROAIResult result) {
236         result.nextCursor().map(cursor -> {
237             DefaultResumptionToken rsToken = new DefaultResumptionToken();
238             rsToken.setToken(searcher.getID() + TOKEN_DELIMITER + cursor);
239             rsToken.setCompleteListSize(result.getNumHits());
240             rsToken.setExpirationDate(searcher.getExpirationTime());
241             return rsToken;
242         }).ifPresent(dataList::setResumptionToken);
243     }
244 
245     public int getPartitionSize() {
246         return partitionSize;
247     }
248 
249     public static MCROAISearcher getSearcher(MCROAIIdentify identify, MetadataFormat format, int partitionSize,
250         MCROAISetManager setManager, MCROAIObjectManager objectManager) {
251         String className = identify.getConfigPrefix() + "Searcher";
252         MCROAISearcher searcher = MCRConfiguration2.<MCROAISearcher>getInstanceOf(className)
253             .orElseGet(MCROAICombinedSearcher::new);
254         searcher.init(identify, format, MAX_AGE, partitionSize, setManager, objectManager);
255         return searcher;
256     }
257 
258 }