1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.mycore.frontend.cli;
20
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.io.UncheckedIOException;
25 import java.nio.charset.Charset;
26 import java.util.Optional;
27 import java.util.concurrent.CompletableFuture;
28 import java.util.stream.Collectors;
29 import java.util.stream.Stream;
30
31 import org.apache.logging.log4j.LogManager;
32 import org.apache.logging.log4j.Logger;
33 import org.mycore.common.content.MCRByteContent;
34 import org.mycore.common.content.MCRContent;
35
36 public class MCRExternalProcess {
37
38 private static final Logger LOGGER = LogManager.getLogger();
39
40 private final InputStream stdin;
41
42 private String[] command;
43
44 private int exitValue;
45
46 private CompletableFuture<MCRByteContent> output;
47
48 private CompletableFuture<MCRByteContent> errors;
49
50 public MCRExternalProcess(String... command) {
51 this(null, command);
52 }
53
54 public MCRExternalProcess(String command) {
55 this(command.split("\\s+"));
56 }
57
58 public MCRExternalProcess(InputStream stdin, String... command) {
59 this.stdin = stdin;
60 this.command = command;
61 }
62
63 public int run() throws IOException, InterruptedException {
64
65 LOGGER.debug(() -> Stream.of(command)
66 .collect(Collectors.joining(" ")));
67
68 ProcessBuilder pb = new ProcessBuilder(command);
69 Process p = pb.start();
70 CompletableFuture<Void> input = MCRStreamSucker.writeAllBytes(stdin, p);
71 output = MCRStreamSucker.readAllBytesAsync(p.getInputStream()).thenApply(MCRStreamSucker::toContent);
72 errors = MCRStreamSucker.readAllBytesAsync(p.getErrorStream()).thenApply(MCRStreamSucker::toContent);
73 try {
74 exitValue = p.waitFor();
75 } catch (InterruptedException e) {
76 p.destroy();
77 throw e;
78 }
79 CompletableFuture.allOf(input, output, errors)
80 .whenComplete((Void, ex) -> MCRStreamSucker.destroyProcess(p, ex));
81 return exitValue;
82 }
83
84 public int getExitValue() {
85 return exitValue;
86 }
87
88 public String getErrors() {
89 return new String(errors.join().asByteArray(), Charset.defaultCharset());
90 }
91
92 public MCRContent getOutput() throws IOException {
93 return output.join();
94 }
95
96 private static class MCRStreamSucker {
97
98 private static final Logger LOGGER = LogManager.getLogger();
99
100 private static CompletableFuture<byte[]> readAllBytesAsync(InputStream is) {
101 return CompletableFuture.supplyAsync(() -> readAllBytes(is));
102 }
103
104 private static byte[] readAllBytes(InputStream is) throws UncheckedIOException {
105 try {
106 return is.readAllBytes();
107 } catch (IOException e) {
108 throw new UncheckedIOException(e);
109 }
110 }
111
112 private static MCRByteContent toContent(byte[] data) {
113 return new MCRByteContent(data, System.currentTimeMillis());
114 }
115
116 private static void destroyProcess(Process p, Throwable ex) {
117 if (ex != null) {
118 LOGGER.warn("Error while sucking stdout or stderr streams.", ex);
119 }
120 LOGGER.debug("Destroy process {}", p.pid());
121 p.destroy();
122 }
123
124 public static CompletableFuture<Void> writeAllBytes(InputStream stdin, Process p) throws UncheckedIOException {
125 return Optional.ofNullable(stdin)
126 .map(in -> CompletableFuture.runAsync(() -> {
127 try {
128 try (OutputStream stdinPipe = p.getOutputStream()) {
129 in.transferTo(stdinPipe);
130 }
131 } catch (IOException e) {
132 throw new UncheckedIOException(e);
133 }
134 }))
135 .orElseGet(CompletableFuture::allOf);
136 }
137 }
138
139 }