1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.mycore.solr.index.handlers.document;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.List;
25
26 import org.apache.logging.log4j.LogManager;
27 import org.apache.logging.log4j.Logger;
28 import org.apache.solr.client.solrj.SolrClient;
29 import org.apache.solr.client.solrj.SolrServerException;
30 import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient;
31 import org.apache.solr.client.solrj.request.UpdateRequest;
32 import org.apache.solr.client.solrj.response.UpdateResponse;
33 import org.apache.solr.common.SolrInputDocument;
34 import org.mycore.solr.MCRSolrClientFactory;
35 import org.mycore.solr.MCRSolrConstants;
36 import org.mycore.solr.index.MCRSolrIndexHandler;
37 import org.mycore.solr.index.handlers.MCRSolrAbstractIndexHandler;
38 import org.mycore.solr.index.statistic.MCRSolrIndexStatistic;
39 import org.mycore.solr.index.statistic.MCRSolrIndexStatisticCollector;
40
41
42
43
44
45 public class MCRSolrInputDocumentsHandler extends MCRSolrAbstractIndexHandler {
46 Collection<SolrInputDocument> documents;
47
48 List<MCRSolrIndexHandler> subHandlerList;
49
50 private static Logger LOGGER = LogManager.getLogger(MCRSolrInputDocumentsHandler.class);
51
52 public MCRSolrInputDocumentsHandler(Collection<SolrInputDocument> documents) {
53 this(documents, MCRSolrClientFactory.getMainSolrClient());
54 }
55
56 public MCRSolrInputDocumentsHandler(Collection<SolrInputDocument> documents, SolrClient solrClient) {
57 super(solrClient);
58 this.documents = documents;
59 }
60
61
62
63
64 @Override
65 public MCRSolrIndexStatistic getStatistic() {
66 return MCRSolrIndexStatisticCollector.DOCUMENTS;
67 }
68
69
70
71
72 @Override
73 public void index() throws IOException, SolrServerException {
74 if (documents == null || documents.isEmpty()) {
75 LOGGER.warn("No input documents to index.");
76 return;
77 }
78 int totalCount = documents.size();
79 LOGGER.info("Handling {} documents", totalCount);
80 SolrClient solrClient = getSolrClient();
81 if (solrClient instanceof ConcurrentUpdateSolrClient) {
82 LOGGER.info("Detected ConcurrentUpdateSolrClient. Split up batch update.");
83 splitDocuments();
84
85 documents.clear();
86 return;
87 }
88 UpdateResponse updateResponse;
89 try {
90 UpdateRequest updateRequest = getUpdateRequest(MCRSolrConstants.SOLR_UPDATE_PATH);
91 updateRequest.add(documents);
92 updateResponse = updateRequest.process(getSolrClient());
93 } catch (Throwable e) {
94 LOGGER.warn("Error while indexing document collection. Split and retry.");
95 splitDocuments();
96 return;
97 }
98 if (updateResponse.getStatus() != 0) {
99 LOGGER.error("Error while indexing document collection. Split and retry: {}", updateResponse.getResponse());
100 splitDocuments();
101 } else {
102 LOGGER.info("Sending {} documents was successful in {} ms.", totalCount, updateResponse.getElapsedTime());
103 }
104 }
105
106 private void splitDocuments() {
107 subHandlerList = new ArrayList<>(documents.size());
108 for (SolrInputDocument document : documents) {
109 MCRSolrInputDocumentHandler subHandler = new MCRSolrInputDocumentHandler(document, getSolrClient());
110 subHandler.setCommitWithin(getCommitWithin());
111 this.subHandlerList.add(subHandler);
112 }
113 }
114
115 @Override
116 public List<MCRSolrIndexHandler> getSubHandlers() {
117 return subHandlerList == null ? super.getSubHandlers() : subHandlerList;
118 }
119
120 @Override
121 public int getDocuments() {
122 return documents.size();
123 }
124
125 @Override
126 public String toString() {
127 return "index " + this.documents.size() + " mycore documents";
128 }
129
130 }