001 /*
002 *
003 * $Revision: 15590 $ $Date: 2009-07-23 13:10:57 +0200 (Thu, 23 Jul 2009) $
004 *
005 * This file is part of *** M y C o R e ***
006 * See http://www.mycore.de/ for details.
007 *
008 * This program is free software; you can use it, redistribute it
009 * and / or modify it under the terms of the GNU General Public License
010 * (GPL) as published by the Free Software Foundation; either version 2
011 * of the License or (at your option) any later version.
012 *
013 * This program is distributed in the hope that it will be useful, but
014 * WITHOUT ANY WARRANTY; without even the implied warranty of
015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
016 * GNU General Public License for more details.
017 *
018 * You should have received a copy of the GNU General Public License
019 * along with this program, in a file called gpl.txt or license.txt.
020 * If not, write to the Free Software Foundation Inc.,
021 * 59 Temple Place - Suite 330, Boston, MA 02111-1307 USA
022 */
023
024 package org.mycore.backend.lucene;
025
026 import java.io.BufferedReader;
027 import java.io.File;
028 import java.io.IOException;
029 import java.text.DecimalFormat;
030 import java.util.ArrayList;
031 import java.util.Collection;
032 import java.util.Collections;
033 import java.util.GregorianCalendar;
034 import java.util.Iterator;
035 import java.util.List;
036 import java.util.Vector;
037 import java.util.concurrent.BlockingQueue;
038 import java.util.concurrent.Executors;
039 import java.util.concurrent.LinkedBlockingQueue;
040 import java.util.concurrent.RejectedExecutionException;
041 import java.util.concurrent.ScheduledExecutorService;
042 import java.util.concurrent.ScheduledFuture;
043 import java.util.concurrent.ThreadPoolExecutor;
044 import java.util.concurrent.TimeUnit;
045 import java.util.concurrent.locks.Lock;
046 import java.util.concurrent.locks.ReadWriteLock;
047 import java.util.concurrent.locks.ReentrantReadWriteLock;
048
049 import org.apache.log4j.Logger;
050 import org.apache.lucene.analysis.Analyzer;
051 import org.apache.lucene.analysis.PerFieldAnalyzerWrapper;
052 import org.apache.lucene.analysis.SimpleAnalyzer;
053 import org.apache.lucene.analysis.de.GermanAnalyzer;
054 import org.apache.lucene.document.Document;
055 import org.apache.lucene.document.Field;
056 import org.apache.lucene.index.CorruptIndexException;
057 import org.apache.lucene.index.IndexReader;
058 import org.apache.lucene.index.IndexWriter;
059 import org.apache.lucene.index.Term;
060 import org.apache.lucene.index.IndexWriter.MaxFieldLength;
061 import org.apache.lucene.search.IndexSearcher;
062 import org.apache.lucene.search.Query;
063 import org.apache.lucene.search.ScoreDoc;
064 import org.apache.lucene.search.Sort;
065 import org.apache.lucene.search.SortField;
066 import org.apache.lucene.search.TermQuery;
067 import org.apache.lucene.search.TopDocs;
068 import org.apache.lucene.search.TopFieldDocCollector;
069 import org.apache.lucene.search.TopFieldDocs;
070 import org.apache.lucene.store.Directory;
071 import org.apache.lucene.store.FSDirectory;
072 import org.apache.lucene.store.RAMDirectory;
073 import org.jdom.Element;
074 import org.mycore.common.MCRConfiguration;
075 import org.mycore.common.MCRConfigurationException;
076 import org.mycore.common.MCRException;
077 import org.mycore.common.MCRNormalizer;
078 import org.mycore.common.events.MCRShutdownHandler;
079 import org.mycore.datamodel.ifs.MCRFile;
080 import org.mycore.parsers.bool.MCRCondition;
081 import org.mycore.services.fieldquery.MCRFieldDef;
082 import org.mycore.services.fieldquery.MCRFieldValue;
083 import org.mycore.services.fieldquery.MCRHit;
084 import org.mycore.services.fieldquery.MCRResults;
085 import org.mycore.services.fieldquery.MCRSearcher;
086 import org.mycore.services.fieldquery.MCRSortBy;
087 import org.mycore.services.plugins.TextFilterPluginManager;
088
089 /**
090 * This class builds indexes from mycore meta data.
091 *
092 * @author Harald Richter
093 * @author Thomas Scheffler (yagee)
094 */
095 public class MCRLuceneSearcher extends MCRSearcher implements MCRShutdownHandler.Closeable {
096 private static final String SORTABLE_SUFFIX = ".sortable";
097
098 /** The logger */
099 private final static Logger LOGGER = Logger.getLogger(MCRLuceneSearcher.class);
100
101 static int INT_BEFORE = 10;
102
103 static int DEC_BEFORE = 10;
104
105 static int DEC_AFTER = 4;
106
107 private static TextFilterPluginManager PLUGIN_MANAGER = null;
108
109 static Analyzer analyzer = new PerFieldAnalyzerWrapper(new GermanAnalyzer());
110
111 File IndexDir;
112
113 private IndexWriteExecutor modifyExecutor;
114
115 private boolean useRamDir = false;
116
117 private RAMDirectory ramDir = null;
118
119 private IndexWriter writerRamDir;
120
121 private int ramDirEntries = 0;
122
123 private IndexReader indexReader = null;
124
125 private IndexSearcher indexSearcher = null;
126
127 private Vector<MCRFieldDef> addableFields = new Vector<MCRFieldDef>();
128
129 public void init(String ID) {
130 super.init(ID);
131
132 MCRConfiguration config = MCRConfiguration.instance();
133 IndexDir = new File(config.getString(prefix + "IndexDir"));
134 LOGGER.info(prefix + "indexDir: " + IndexDir);
135 if (!IndexDir.exists())
136 IndexDir.mkdirs();
137 if (!IndexDir.isDirectory()) {
138 String msg = IndexDir + " is not a directory!";
139 throw new MCRConfigurationException(msg);
140 }
141 if (!IndexDir.canWrite()) {
142 String msg = IndexDir + " is not writeable!";
143 throw new MCRConfigurationException(msg);
144 }
145
146 // is index directory initialized, .....?
147 try {
148 IndexWriter writer = MCRLuceneTools.getLuceneWriter(config.getString(prefix + "IndexDir"), true);
149 writer.close();
150 } catch (IOException e) {
151 LOGGER.error(e.getClass().getName() + ": " + e.getMessage());
152 LOGGER.error(MCRException.getStackTraceAsString(e));
153 } catch (Exception e) {
154 LOGGER.error(e.getClass().getName() + ": " + e.getMessage());
155 LOGGER.error(MCRException.getStackTraceAsString(e));
156 }
157
158 deleteLuceneLockFile();
159
160 long writeLockTimeout = config.getLong("MCR.Lucene.writeLockTimeout", 5000);
161 LOGGER.debug("Property MCR.Lucene.writeLockTimeout: " + writeLockTimeout);
162 IndexWriter.setDefaultWriteLockTimeout(writeLockTimeout);
163
164 try {
165 modifyExecutor = new IndexWriteExecutor(new LinkedBlockingQueue<Runnable>(), IndexDir);
166 } catch (Exception e) {
167 throw new MCRException("Cannot start IndexWriter thread.", e);
168 }
169 // should work like GermanAnalyzer without stemming and removing of stopwords
170 SimpleAnalyzer simpleAnalyzer = new SimpleAnalyzer();
171 List<MCRFieldDef> fds = MCRFieldDef.getFieldDefs(getIndex());
172 for (MCRFieldDef fd : fds) {
173 if ("name".equals(fd.getDataType())) {
174 ((PerFieldAnalyzerWrapper) analyzer).addAnalyzer(fd.getName(), simpleAnalyzer);
175 }
176 if (fd.isAddable())
177 addableFields.add(fd);
178 }
179 MCRShutdownHandler.getInstance().addCloseable(this);
180 }
181
182 private void deleteLuceneLockFile() {
183 GregorianCalendar cal = new GregorianCalendar();
184
185 File file = new File(IndexDir, "write.lock");
186
187 if (file.exists()) {
188 long l = (cal.getTimeInMillis() - file.lastModified()) / 1000; // age of file in seconds
189 if (l > 100) {
190 LOGGER.info("Delete lucene lock file " + file.getAbsolutePath() + " Age " + l);
191 file.delete();
192 }
193 }
194 }
195
196 public static String handleNumber(String content, String type, long add) {
197 int before, after;
198 int dez;
199 long l;
200 try {
201 if ("decimal".equals(type)) {
202 before = DEC_BEFORE;
203 after = DEC_AFTER;
204 dez = before + after;
205 double d = Double.parseDouble(content);
206 d = d * Math.pow(10, after) + Math.pow(10, dez);
207 l = (long) d;
208 } else {
209 before = INT_BEFORE;
210 dez = before;
211 if (content.indexOf('.') > 0)
212 content = content.substring(content.lastIndexOf('.') + 1);
213 l = Long.parseLong(content);
214 l = l + (long) (Math.pow(10, dez) + 0.1);
215 }
216 long m = l + add;
217 String n = "0000000000000000000";
218 String h = Long.toString(m);
219 return n.substring(0, dez + 1 - h.length()) + h;
220 } catch (Exception all) {
221 LOGGER.info("MCRLuceneSearcher can't format this Number, ignore this content: " + content);
222 return "0";
223 }
224 }
225
226 public void removeFromIndex(String entryID) {
227 LOGGER.info("MCRLuceneSearcher removing indexed data of " + entryID);
228
229 try {
230 deleteLuceneDocument("mcrid", entryID);
231 } catch (Exception e) {
232 LOGGER.warn(e.getMessage());
233 }
234 }
235
236 /**
237 * Delete all documents in Lucene with id
238 *
239 * @param fieldname
240 * string name of lucene field with stored id
241 * @param id
242 * string document id
243 * @param indexDir *
244 * the directory where index is stored
245 *
246 */
247 public void deleteLuceneDocument(String fieldname, String id) throws Exception {
248 Term deleteTerm = new Term(fieldname, id);
249 IndexWriterAction modifyAction = IndexWriterAction.removeAction(modifyExecutor, deleteTerm);
250 modifyIndex(modifyAction);
251 }
252
253 /**
254 * As opposed to {@link MCRSearcher} the returned MCRResult is read only.
255 * @see MCRSearcher#search(MCRCondition, int, List, boolean)
256 */
257 public MCRResults search(MCRCondition condition, int maxResults, List<MCRSortBy> sortBy, boolean addSortData) {
258 try {
259 List<Element> f = new ArrayList<Element>();
260 f.add(condition.toXML());
261
262 boolean reqf = true;
263 // required flag Term with AND (true) or OR (false) combined
264 Query luceneQuery = MCRBuildLuceneQuery.buildLuceneQuery(null, reqf, f, analyzer);
265 LOGGER.debug("Lucene Query: " + luceneQuery.toString());
266 return getLuceneHits(luceneQuery, maxResults, sortBy, addSortData);
267 } catch (Exception e) {
268 LOGGER.error("Exception in MCRLuceneSearcher", e);
269 return new MCRResults();
270 }
271 }
272
273 /**
274 * method does lucene query
275 *
276 * @return result set
277 */
278 private MCRResults getLuceneHits(Query luceneQuery, int maxResults, List<MCRSortBy> sortBy, boolean addSortData) throws Exception {
279 if (maxResults <= 0)
280 maxResults = 1000000;
281
282 long start = System.currentTimeMillis();
283 if (indexReader == null && indexSearcher == null) {
284 //Lucene 2.4.0 has problems with initializing IndexReader with File|String
285 //see https://issues.apache.org/jira/browse/LUCENE-1430
286 FSDirectory indexDir = FSDirectory.getDirectory(IndexDir.getAbsolutePath());
287 indexReader = IndexReader.open(indexDir);
288 indexSearcher = new IndexSearcher(indexReader);
289 } else {
290 if (!indexReader.isCurrent()) {
291 IndexReader newReader = indexReader.reopen();
292 if (newReader != indexReader) {
293 LOGGER.info("new Searcher for index: " + ID);
294 indexReader.close();
295 indexSearcher.close();
296 indexReader = newReader;
297 indexSearcher = new IndexSearcher(indexReader);
298 }
299 }
300 }
301 if (indexReader.maxDoc() == 0) {
302 //lucene index is empty
303 LOGGER.warn("Searching on empty index " + super.index);
304 return new MCRResults();
305 }
306 final Sort sortFields = buildSortFields(sortBy);
307 TopFieldDocCollector collector = new TopFieldDocCollector(indexReader, sortFields, maxResults);
308 indexSearcher.search(luceneQuery, collector);
309 //Lucene 2.4.1 has a bug: be sure to call collector.topDocs() just once
310 //see http://issues.apache.org/jira/browse/LUCENE-942
311 TopFieldDocs topFieldDocs = (TopFieldDocs) collector.topDocs();
312 LOGGER.info("Number of Objects found: " + topFieldDocs.scoreDocs.length + " Time for Search: "
313 + (System.currentTimeMillis() - start));
314 return new MCRLuceneResults(indexSearcher, topFieldDocs, addableFields);
315 }
316
317 private Sort buildSortFields(List<MCRSortBy> sortBy) {
318 ArrayList<SortField> sortList = new ArrayList<SortField>(sortBy.size());
319 for (MCRSortBy sortByElement : sortBy) {
320 SortField sortField;
321 if (sortByElement.getField().getName().equals("score"))
322 sortField = SortField.FIELD_SCORE;
323 else {
324 String name = sortByElement.getField().getName();
325 if (isTokenized(sortByElement.getField())) {
326 name += SORTABLE_SUFFIX;
327 }
328 sortField = new SortField(name, sortByElement.getSortOrder() == MCRSortBy.DESCENDING);
329 }
330 sortList.add(sortField);
331 }
332 if (LOGGER.isDebugEnabled()) {
333 for (SortField sortField : sortList) {
334 String name = (SortField.FIELD_SCORE == sortField ? "score" : sortField.getField());
335 LOGGER.debug("Sort by: " + name + (sortField.getReverse() ? " descending" : " accending"));
336 }
337 }
338 return new Sort(sortList.toArray(new SortField[0]));
339 }
340
341 /**
342 * @param sortBy
343 * @param doc
344 * lucene document to get sortdata from
345 * @param hit
346 * sortdata are added
347 * @param score
348 * of hit
349 */
350 private void addSortDataToHit(List<MCRSortBy> sortBy, org.apache.lucene.document.Document doc, MCRHit hit, String score) {
351 for (int j = 0; j < sortBy.size(); j++) {
352 MCRSortBy sb = sortBy.get(j);
353 MCRFieldDef fds = sb.getField();
354 if (null != fds) {
355 String field = fds.getName();
356 if ("score".equals(field)) {
357 if (null != score) {
358 MCRFieldDef fd = MCRFieldDef.getDef(field);
359 MCRFieldValue fv = new MCRFieldValue(fd, score);
360 hit.addSortData(fv);
361 }
362 } else {
363 if (isTokenized(fds)) {
364 field += SORTABLE_SUFFIX;
365 }
366 String values[] = doc.getValues(field);
367 for (int i = 0; i < values.length; i++) {
368 MCRFieldValue fv = new MCRFieldValue(fds, values[i]);
369 hit.addSortData(fv);
370 }
371 }
372 }
373 }
374 }
375
376 public void addToIndex(String entryID, String returnID, List<MCRFieldValue> fields) {
377 LOGGER.info("MCRLuceneSearcher indexing data of " + entryID);
378
379 if ((fields == null) || (fields.size() == 0)) {
380 return;
381 }
382
383 try {
384 Document doc = buildLuceneDocument(fields);
385 doc.add(new Field("mcrid", entryID, Field.Store.YES, Field.Index.NOT_ANALYZED));
386 doc.add(new Field("returnid", returnID, Field.Store.YES, Field.Index.NOT_ANALYZED));
387 LOGGER.debug("lucene document build " + entryID);
388 addDocumentToLucene(doc, analyzer);
389 } catch (Exception e) {
390 LOGGER.error(e.getClass().getName() + ": " + e.getMessage());
391 LOGGER.error(MCRException.getStackTraceAsString(e));
392 }
393 }
394
395 /**
396 * Adds document to Lucene
397 *
398 * @param doc
399 * lucene document to add to index
400 *
401 */
402 private void addDocumentToLucene(Document doc, Analyzer analyzer) throws Exception {
403 if (useRamDir) {
404 writerRamDir.addDocument(doc, analyzer);
405 ramDirEntries++;
406 if (ramDirEntries > 5000) {
407 writerRamDir.close();
408 IndexWriterAction modifyAction = IndexWriterAction.addRamDir(modifyExecutor, ramDir);
409 modifyIndex(modifyAction);
410 ramDir = new RAMDirectory();
411 writerRamDir = new IndexWriter(ramDir, analyzer, true, MaxFieldLength.LIMITED);
412 ramDirEntries = 0;
413 }
414 } else {
415 IndexWriterAction modifyAction = IndexWriterAction.addAction(modifyExecutor, doc, analyzer);
416 modifyIndex(modifyAction);
417 }
418 }
419
420 private void modifyIndex(IndexWriterAction modifyAction) {
421 modifyExecutor.submit(modifyAction);
422 }
423
424 /**
425 * Build lucene document from transformed xml list
426 *
427 * @param fields
428 * corresponding to lucene fields
429 *
430 * @return The lucene document
431 *
432 */
433 public static Document buildLuceneDocument(List<MCRFieldValue> fields) throws Exception {
434 Document doc = new Document();
435
436 for (int i = 0; i < fields.size(); i++) {
437 MCRFieldValue field = (MCRFieldValue) (fields.get(i));
438 String name = field.getField().getName();
439 String type = field.getField().getDataType();
440 String content = field.getValue();
441 MCRFile mcrfile = field.getFile();
442
443 if (null != mcrfile) {
444 if (PLUGIN_MANAGER == null) {
445 PLUGIN_MANAGER = TextFilterPluginManager.getInstance();
446 }
447 if (PLUGIN_MANAGER.isSupported(mcrfile.getContentType())) {
448 LOGGER.debug("####### Index MCRFile: " + mcrfile.getPath());
449
450 BufferedReader in = new BufferedReader(PLUGIN_MANAGER.transform(mcrfile.getContentType(), mcrfile
451 .getContentAsInputStream()));
452 String s;
453 StringBuffer text = new StringBuffer();
454 while ((s = in.readLine()) != null) {
455 text.append(s).append(" ");
456 }
457
458 s = text.toString();
459 s = MCRNormalizer.normalizeString(s);
460
461 doc.add(new Field(name, s, Field.Store.NO, Field.Index.ANALYZED));
462 }
463 } else {
464 if ("date".equals(type) || "time".equals(type) || "timestamp".equals(type)) {
465 type = "identifier";
466 } else if ("boolean".equals(type)) {
467 content = "true".equals(content) ? "1" : "0";
468 type = "identifier";
469 } else if ("decimal".equals(type)) {
470 content = handleNumber(content, "decimal", 0);
471 type = "identifier";
472 } else if ("integer".equals(type)) {
473 content = handleNumber(content, "integer", 0);
474 type = "identifier";
475 }
476
477 if (type.equals("identifier")) {
478 doc.add(new Field(name, content, Field.Store.YES, Field.Index.NOT_ANALYZED));
479 }
480 if(type.equals("index")){
481 doc.add(new Field(name, MCRBuildLuceneQuery.convertToGermanIndexString(content), Field.Store.YES, Field.Index.NOT_ANALYZED));
482 }
483
484 if (type.equals("Text") || type.equals("name") || (type.equals("text") && field.getField().isSortable())) {
485 doc.add(new Field(name, content, Field.Store.YES, Field.Index.ANALYZED));
486 if (field.getField().isSortable())
487 doc.add(new Field(name + SORTABLE_SUFFIX, content, Field.Store.YES, Field.Index.NOT_ANALYZED));
488 } else if (type.equals("text")) {
489 doc.add(new Field(name, content, Field.Store.NO, Field.Index.ANALYZED));
490 }
491 }
492 }
493
494 return doc;
495 }
496
497 private boolean isTokenized(MCRFieldDef fieldDef) {
498 String type = fieldDef.getDataType();
499 if (type.equals("Text") || type.equals("name") || type.equals("text"))
500 return true;
501 return false;
502 }
503
504 public void addSortData(Iterator<MCRHit> hits, List<MCRSortBy> sortBy) {
505 try {
506 while (hits.hasNext()) {
507 MCRHit hit = hits.next();
508 String id = hit.getID();
509 Term te1 = new Term("mcrid", id);
510
511 TermQuery qu = new TermQuery(te1);
512
513 TopDocs hitl = indexSearcher.search(qu, 1);
514 if (hitl.totalHits > 0) {
515 org.apache.lucene.document.Document doc = indexSearcher.doc(hitl.scoreDocs[0].doc);
516 addSortDataToHit(sortBy, doc, hit, null);
517 }
518 }
519 } catch (IOException e) {
520 LOGGER.error("Exception in MCRLuceneSearcher (addSortData)", e);
521 }
522 }
523
524 public void clearIndex() {
525 try {
526 IndexWriter writer = new IndexWriter(IndexDir, analyzer, true, MaxFieldLength.LIMITED);
527 writer.close();
528 } catch (IOException e) {
529 LOGGER.error(e.getClass().getName() + ": " + e.getMessage());
530 LOGGER.error(MCRException.getStackTraceAsString(e));
531 }
532 }
533
534 public void clearIndex(String fieldname, String value) {
535 try {
536 deleteLuceneDocument(fieldname, value);
537 } catch (Exception e) {
538 LOGGER.error(e.getClass().getName() + ": " + e.getMessage());
539 LOGGER.error(MCRException.getStackTraceAsString(e));
540 }
541 }
542
543 public void notifySearcher(String mode) {
544 LOGGER.info("mode: " + mode);
545
546 handleRamDir();
547
548 useRamDir = false;
549
550 if ("rebuild".equals(mode) || "insert".equals(mode)) {
551 try {
552 ramDir = new RAMDirectory();
553 writerRamDir = new IndexWriter(ramDir, analyzer, true, MaxFieldLength.LIMITED);
554 ramDirEntries = 0;
555 useRamDir = true;
556 } catch (Exception e) {
557 }
558 } else if ("optimize".equals(mode)) {
559 IndexWriterAction modifyAction = IndexWriterAction.optimizeAction(modifyExecutor);
560 modifyIndex(modifyAction);
561 } else if (!"finish".equals(mode))
562 LOGGER.error("invalid mode " + mode);
563 }
564
565 private void handleRamDir() {
566 if (useRamDir) {
567 try {
568 writerRamDir.close();
569 } catch (IOException e) {
570 LOGGER.error(e.getClass().getName() + ": " + e.getMessage());
571 LOGGER.error(MCRException.getStackTraceAsString(e));
572 }
573 if (ramDirEntries > 0) {
574 IndexWriterAction modifyAction = IndexWriterAction.addRamDir(modifyExecutor, ramDir);
575 modifyIndex(modifyAction);
576 }
577 }
578 }
579
580 public void close() {
581 try {
582 if (null != indexReader)
583 indexReader.close();
584 if (null != indexSearcher)
585 indexSearcher.close();
586 } catch (IOException e1) {
587 LOGGER.warn("Error while closing indexreader " + toString(), e1);
588 }
589 handleRamDir();
590 LOGGER.info("Closing " + toString() + "...");
591 modifyExecutor.shutdown();
592 try {
593 modifyExecutor.awaitTermination(60 * 60, TimeUnit.SECONDS);
594 } catch (InterruptedException e) {
595 LOGGER.warn("Error while closing " + toString(), e);
596 }
597 LOGGER.info("Processed " + modifyExecutor.getCompletedTaskCount() + " modification requests.");
598 }
599
600 public String toString() {
601 return getClass().getSimpleName() + ":" + ID;
602 }
603
604 private static class IndexWriteExecutor extends ThreadPoolExecutor {
605 boolean modifierClosed, firstJob, closeModifierEarly;
606
607 private IndexWriter indexWriter;
608
609 private File indexDir;
610
611 private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
612
613 private final DelayedIndexWriterCloser delayedCloser = new DelayedIndexWriterCloser(this);
614
615 private ScheduledFuture<?> delayedFuture;
616
617 private int maxIndexWriteActions;
618
619 private ReadWriteLock IndexCloserLock = new ReentrantReadWriteLock(true);
620
621 private ThreadLocal<Lock> writeAccess = new ThreadLocal<Lock>() {
622
623 @Override
624 protected Lock initialValue() {
625 return IndexCloserLock.readLock();
626 }
627 };
628
629 public IndexWriteExecutor(BlockingQueue<Runnable> workQueue, File indexDir) {
630 // single thread mode
631 super(1, 1, 0, TimeUnit.SECONDS, workQueue);
632 this.indexDir = indexDir;
633 modifierClosed = true;
634 firstJob = true;
635 closeModifierEarly = MCRConfiguration.instance().getBoolean("MCR.Lucene.closeModifierEarly", false);
636 maxIndexWriteActions = MCRConfiguration.instance().getInt("MCR.Lucene.maxIndexWriteActions", 500);
637 }
638
639 @Override
640 protected void afterExecute(Runnable r, Throwable t) {
641 super.afterExecute(r, t);
642 //allow to close the IndexWriter
643 writeAccess.get().unlock();
644 if (firstJob)
645 firstJob = false;
646 if (closeModifierEarly || this.getCompletedTaskCount() % maxIndexWriteActions == 0)
647 closeIndexWriter();
648 else {
649 if (delayedFuture != null && !delayedFuture.isDone()) {
650 cancelDelayedIndexCloser();
651 }
652 try {
653 delayedFuture = scheduler.schedule(delayedCloser, 2, TimeUnit.SECONDS);
654 } catch (RejectedExecutionException e) {
655 LOGGER.warn("Cannot schedule delayed IndexWriter closer. Closing IndexWriter now.");
656 closeIndexWriter();
657 }
658 }
659 }
660
661 @Override
662 protected void beforeExecute(Thread t, Runnable r) {
663 //do not close IndexWriter while IndexWriterActions is processed
664 writeAccess.get().lock();
665 cancelDelayedIndexCloser();
666 if (modifierClosed)
667 openIndexWriter();
668 super.beforeExecute(t, r);
669 }
670
671 private void cancelDelayedIndexCloser() {
672 if (delayedFuture != null && !delayedFuture.isDone()) {
673 delayedFuture.cancel(false);
674 }
675 }
676
677 @Override
678 public void shutdown() {
679 cancelDelayedIndexCloser();
680 closeIndexWriter();
681 scheduler.shutdown();
682 try {
683 scheduler.awaitTermination(60 * 60, TimeUnit.SECONDS);
684 } catch (InterruptedException e) {
685 LOGGER.warn("Error while closing DelayedIndexWriterCloser", e);
686 }
687 super.shutdown();
688 }
689
690 private synchronized void openIndexWriter() {
691 try {
692 LOGGER.debug("Opening Lucene index for writing.");
693 if (indexWriter == null)
694 indexWriter = getLuceneWriter(indexDir, firstJob);
695 } catch (Exception e) {
696 LOGGER.warn("Error while reopening IndexWriter.", e);
697 } finally {
698 modifierClosed = false;
699 }
700 }
701
702 private synchronized void closeIndexWriter() {
703 //TODO: check if indexWriter.commit() is sufficient here
704 Lock writerLock = IndexCloserLock.writeLock();
705 try {
706 //do not allow IndexWriterAction being processed while closing IndexWriter
707 writerLock.lock();
708 if (indexWriter != null) {
709 LOGGER.debug("Writing Lucene index changes to disk.");
710 indexWriter.close();
711 }
712 } catch (IOException e) {
713 LOGGER.warn("Error while closing IndexWriter.", e);
714 } catch (IllegalStateException e) {
715 LOGGER.debug("IndexWriter was allready closed.");
716 } finally {
717 modifierClosed = true;
718 indexWriter = null;
719 writerLock.unlock();
720 }
721 }
722
723 private static IndexWriter getLuceneWriter(File indexDir, boolean first) throws Exception {
724 IndexWriter modifier;
725 Analyzer analyzer = new GermanAnalyzer();
726 boolean create = false;
727 // check if indexDir is empty before creating a new index
728 if (first && (indexDir.list().length == 0)) {
729 LOGGER.info("No Entries in Directory, initialize: " + indexDir);
730 create = true;
731 }
732 modifier = new IndexWriter(indexDir, analyzer, create, MaxFieldLength.LIMITED);
733 modifier.setMergeFactor(200);
734 modifier.setMaxBufferedDocs(2000);
735 return modifier;
736 }
737
738 public IndexWriter getIndexWriter() {
739 return indexWriter;
740 }
741
742 @Override
743 protected void finalize() {
744 closeIndexWriter();
745 super.finalize();
746 }
747
748 }
749
750 private static class IndexWriterAction implements Runnable {
751 private IndexWriteExecutor executor;
752
753 private Document doc;
754
755 private Analyzer analyzer;
756
757 private boolean add = false;
758
759 private boolean delete = false;
760
761 private boolean optimize = false;
762
763 private Term deleteTerm;
764
765 private RAMDirectory ramDir;
766
767 private IndexWriterAction(IndexWriteExecutor executor) {
768 this.executor = executor;
769 }
770
771 public static IndexWriterAction addAction(IndexWriteExecutor executor, Document doc, Analyzer analyzer) {
772 IndexWriterAction e = new IndexWriterAction(executor);
773 e.doc = doc;
774 e.analyzer = analyzer;
775 e.add = true;
776 return e;
777 }
778
779 public static IndexWriterAction removeAction(IndexWriteExecutor executor, Term deleteTerm) {
780 IndexWriterAction e = new IndexWriterAction(executor);
781 e.delete = true;
782 e.deleteTerm = deleteTerm;
783 return e;
784 }
785
786 public static IndexWriterAction optimizeAction(IndexWriteExecutor executor) {
787 IndexWriterAction e = new IndexWriterAction(executor);
788 e.optimize = true;
789 return e;
790 }
791
792 public static IndexWriterAction addRamDir(IndexWriteExecutor executor, RAMDirectory ramDir) {
793 IndexWriterAction e = new IndexWriterAction(executor);
794 e.ramDir = ramDir;
795 return e;
796 }
797
798 public void run() {
799 try {
800 if (delete) {
801 deleteDocument();
802 } else if (add) {
803 addDocument();
804 } else if (optimize) {
805 optimizeIndex();
806 } else
807 addDirectory();
808 } catch (Exception e) {
809 LOGGER.error("Error while writing Lucene Index ", e);
810 }
811 }
812
813 private void addDocument() throws IOException {
814 LOGGER.debug("add Document:" + toString());
815 executor.getIndexWriter().addDocument(doc, analyzer);
816 LOGGER.debug("adding done.");
817 }
818
819 private void deleteDocument() throws IOException {
820 LOGGER.debug("delete Document:" + toString());
821 executor.getIndexWriter().deleteDocuments(deleteTerm);
822 }
823
824 private void optimizeIndex() throws IOException {
825 LOGGER.info("optimize Index:" + toString());
826 executor.getIndexWriter().optimize();
827 LOGGER.info("Optimizing done.");
828 }
829
830 private void addDirectory() throws IOException {
831 LOGGER.info("add Directory");
832 executor.getIndexWriter().addIndexesNoOptimize(new Directory[] { ramDir });
833 LOGGER.info("Adding done.");
834 }
835
836 public String toString() {
837 if (doc != null)
838 return doc.toString();
839 if (deleteTerm != null)
840 return deleteTerm.toString();
841 return "empty IndexWriterAction";
842 }
843 }
844
845 private static class DelayedIndexWriterCloser implements Runnable {
846 private IndexWriteExecutor executor;
847
848 private DelayedIndexWriterCloser(IndexWriteExecutor executor) {
849 this.executor = executor;
850 }
851
852 public void run() {
853 if (!executor.modifierClosed && executor.getQueue().isEmpty()) {
854 executor.closeIndexWriter();
855 }
856 }
857
858 }
859
860 /**
861 * This class is a special Lucene version of MCRResults
862 * It is read only but fast on large result set as it is filled lazy.
863 * @author Thomas Scheffler (yagee)
864 */
865 private static class MCRLuceneResults extends MCRResults {
866
867 private TopFieldDocs topDocs;
868
869 private IndexSearcher indexSearcher;
870
871 private Collection<MCRFieldDef> addableFields;
872
873 private static final DecimalFormat df = new DecimalFormat("0.00000000000");
874
875 private boolean loadComplete = false;
876
877 public MCRLuceneResults(IndexSearcher indexSearcher, TopFieldDocs topDocs, Collection<MCRFieldDef> addableFields) {
878 super();
879 this.indexSearcher = indexSearcher;
880 this.topDocs = topDocs;
881 this.addableFields = addableFields;
882 topDocs.totalHits = topDocs.scoreDocs.length;
883 super.hits = new ArrayList<MCRHit>(topDocs.totalHits);
884 super.hits.addAll(Collections.nCopies(topDocs.totalHits, (MCRHit) null));
885 setSorted(true);
886 }
887
888 @Override
889 public boolean isReadonly() {
890 return true;
891 }
892
893 @Override
894 public void addHit(MCRHit hit) {
895 throw new UnsupportedOperationException("MCRResults are read only");
896 }
897
898 @Override
899 protected int merge(org.jdom.Document doc, String hostAlias) {
900 throw new UnsupportedOperationException("MCRResults are read only");
901 }
902
903 @Override
904 protected MCRHit getHit(String key) {
905 if (!loadComplete) {
906 for (int i = 0; i < getNumHits(); i++)
907 inititializeTopDoc(i);
908 loadComplete = true;
909 }
910 return super.getHit(key);
911 }
912
913 @Override
914 public MCRHit getHit(int i) {
915 if (i < 0 || i > topDocs.totalHits) {
916 return null;
917 }
918 MCRHit hit = super.getHit(i);
919 if (hit == null) {
920 inititializeTopDoc(i);
921 hit = super.getHit(i);
922 }
923 return hit;
924 }
925
926 private void inititializeTopDoc(int i) {
927 //initialize
928 MCRHit hit;
929 try {
930 hit = getMCRHit(topDocs.scoreDocs[i]);
931 } catch (Exception e) {
932 if (topDocs.scoreDocs.length <= i) {
933 throw new MCRException("TopDocs is not initialized.", e);
934 }
935 throw new MCRException("Error while fetching Lucene document: " + topDocs.scoreDocs[i].doc, e);
936 }
937 super.hits.set(i, hit);
938 MCRHit oldHit = super.map.get(hit.getKey());
939 if (oldHit != null)
940 oldHit.merge(hit);
941 else
942 super.map.put(hit.getKey(), hit);
943 }
944
945 private MCRHit getMCRHit(ScoreDoc scoreDoc) throws CorruptIndexException, IOException {
946 org.apache.lucene.document.Document doc = indexSearcher.doc(scoreDoc.doc);
947
948 String id = doc.get("returnid");
949 MCRHit hit = new MCRHit(id);
950
951 for (MCRFieldDef fd : addableFields) {
952 String[] values = doc.getValues(fd.getName());
953 for (String value : values) {
954 MCRFieldValue fv = new MCRFieldValue(fd, value);
955 hit.addMetaData(fv);
956 }
957 }
958
959 String score = df.format(scoreDoc.score);
960 addSortDataToHit(doc, hit, score, topDocs.fields);
961 return hit;
962 }
963
964 private static void addSortDataToHit(org.apache.lucene.document.Document doc, MCRHit hit, String score, SortField[] sortFields) {
965 for (SortField sortField : sortFields) {
966 if (SortField.FIELD_SCORE == sortField || sortField.getField() == null) {
967 if (score != null)
968 hit.addSortData(new MCRFieldValue(MCRFieldDef.getDef("score"), score));
969 } else {
970 String fieldName = sortField.getField();
971 if (fieldName.endsWith(SORTABLE_SUFFIX))
972 fieldName = fieldName.substring(0, fieldName.length() - SORTABLE_SUFFIX.length());
973
974 String values[] = doc.getValues(fieldName);
975 for (int i = 0; i < values.length; i++) {
976 MCRFieldValue fv = new MCRFieldValue(MCRFieldDef.getDef(fieldName), values[i]);
977 hit.addSortData(fv);
978 }
979 }
980 }
981 }
982
983 @Override
984 public int getNumHits() {
985 return topDocs.totalHits;
986 }
987
988 @Override
989 public void cutResults(int maxResults) {
990 while ((hits.size() > maxResults) && (maxResults > 0)) {
991 MCRHit hit = hits.remove(hits.size() - 1);
992 topDocs.totalHits--;
993 if (hit != null)
994 map.remove(hit.getKey());
995 }
996 }
997
998 @Override
999 public Iterator<MCRHit> iterator() {
1000 return new Iterator<MCRHit>() {
1001 int i = 0;
1002
1003 public boolean hasNext() {
1004 return i < topDocs.totalHits;
1005 }
1006
1007 public MCRHit next() {
1008 MCRHit hit = getHit(i);
1009 i++;
1010 return hit;
1011 }
1012
1013 public void remove() {
1014 throw new UnsupportedOperationException("MCRResults are read only");
1015 }
1016
1017 };
1018 }
1019 }
1020 }