View Javadoc
1   /*
2    * This file is part of ***  M y C o R e  ***
3    * See http://www.mycore.de/ for details.
4    *
5    * MyCoRe is free software: you can redistribute it and/or modify
6    * it under the terms of the GNU General Public License as published by
7    * the Free Software Foundation, either version 3 of the License, or
8    * (at your option) any later version.
9    *
10   * MyCoRe is distributed in the hope that it will be useful,
11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   * GNU General Public License for more details.
14   *
15   * You should have received a copy of the GNU General Public License
16   * along with MyCoRe.  If not, see <http://www.gnu.org/licenses/>.
17   */
18  
19  package org.mycore.frontend.cli;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  import java.io.UncheckedIOException;
25  import java.nio.charset.Charset;
26  import java.util.Optional;
27  import java.util.concurrent.CompletableFuture;
28  import java.util.stream.Collectors;
29  import java.util.stream.Stream;
30  
31  import org.apache.logging.log4j.LogManager;
32  import org.apache.logging.log4j.Logger;
33  import org.mycore.common.content.MCRByteContent;
34  import org.mycore.common.content.MCRContent;
35  
36  public class MCRExternalProcess {
37  
38      private static final Logger LOGGER = LogManager.getLogger();
39  
40      private final InputStream stdin;
41  
42      private String[] command;
43  
44      private int exitValue;
45  
46      private CompletableFuture<MCRByteContent> output;
47  
48      private CompletableFuture<MCRByteContent> errors;
49  
50      public MCRExternalProcess(String... command) {
51          this(null, command);
52      }
53  
54      public MCRExternalProcess(String command) {
55          this(command.split("\\s+"));
56      }
57  
58      public MCRExternalProcess(InputStream stdin, String... command) {
59          this.stdin = stdin;
60          this.command = command;
61      }
62  
63      public int run() throws IOException, InterruptedException {
64  
65          LOGGER.debug(() -> Stream.of(command)
66              .collect(Collectors.joining(" ")));
67  
68          ProcessBuilder pb = new ProcessBuilder(command);
69          Process p = pb.start();
70          CompletableFuture<Void> input = MCRStreamSucker.writeAllBytes(stdin, p);
71          output = MCRStreamSucker.readAllBytesAsync(p.getInputStream()).thenApply(MCRStreamSucker::toContent);
72          errors = MCRStreamSucker.readAllBytesAsync(p.getErrorStream()).thenApply(MCRStreamSucker::toContent);
73          try {
74              exitValue = p.waitFor();
75          } catch (InterruptedException e) {
76              p.destroy();
77              throw e;
78          }
79          CompletableFuture.allOf(input, output, errors)
80              .whenComplete((Void, ex) -> MCRStreamSucker.destroyProcess(p, ex));
81          return exitValue;
82      }
83  
84      public int getExitValue() {
85          return exitValue;
86      }
87  
88      public String getErrors() {
89          return new String(errors.join().asByteArray(), Charset.defaultCharset());
90      }
91  
92      public MCRContent getOutput() throws IOException {
93          return output.join();
94      }
95  
96      private static class MCRStreamSucker {
97  
98          private static final Logger LOGGER = LogManager.getLogger();
99  
100         private static CompletableFuture<byte[]> readAllBytesAsync(InputStream is) {
101             return CompletableFuture.supplyAsync(() -> readAllBytes(is));
102         }
103 
104         private static byte[] readAllBytes(InputStream is) throws UncheckedIOException {
105             try {
106                 return is.readAllBytes();
107             } catch (IOException e) {
108                 throw new UncheckedIOException(e);
109             }
110         }
111 
112         private static MCRByteContent toContent(byte[] data) {
113             return new MCRByteContent(data, System.currentTimeMillis());
114         }
115 
116         private static void destroyProcess(Process p, Throwable ex) {
117             if (ex != null) {
118                 LOGGER.warn("Error while sucking stdout or stderr streams.", ex);
119             }
120             LOGGER.debug("Destroy process {}", p.pid());
121             p.destroy();
122         }
123 
124         public static CompletableFuture<Void> writeAllBytes(InputStream stdin, Process p) throws UncheckedIOException {
125             return Optional.ofNullable(stdin)
126                 .map(in -> CompletableFuture.runAsync(() -> {
127                     try {
128                         try (OutputStream stdinPipe = p.getOutputStream()) {
129                             in.transferTo(stdinPipe);
130                         }
131                     } catch (IOException e) {
132                         throw new UncheckedIOException(e);
133                     }
134                 }))
135                 .orElseGet(CompletableFuture::allOf);
136         }
137     }
138 
139 }