平台开发经常需要使用 shell 脚本调度大数据的组件,在使用 springBoot 开发项目时也是如此,为了保证子 shell 的执行时间可控,需要设置超时时间,如果 shell 无法在给定时间内返回,需要进行相关容错处理。
say_hello.sh
#!/bin/zsh
set -eu
echo "hello ----"
#2*3
for (( i = 0; i < $1; i++ )); do
echo $i 111
sleep 1
done
echo "end *************"
package org.bridge.xjq.bridge;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) throws IOException, InterruptedException {
// one: 使用 processBuilder 构建 process 调度 shell 脚本
URL url = Main3.class.getResource("/say_hello.sh");
ProcessBuilder processBuilder = new ProcessBuilder("sh", url.getPath(), 3);
processBuilder.redirectErrorStream(true);
Process process = processBuilder.start();
// two: 设置超时时间, 等待执行结束或者超时
System.out.println("等待执行完毕或超时 ...");
boolean over = process.waitFor(5, TimeUnit.SECONDS);
System.out.println("进程正常结束了么:" + over);
// three: 判断是否运行完毕了, 如果没有, 就手动 kill 掉子进程
if (over) {
System.out.println("finished in shell ");
} else {
System.out.println("准备 stop 掉子进程");
// 如果不 sleep 直接 destory, 会造成读取线程发生 IO 异常
process.destroy();
}
// four: 读取子进程输出结果
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
StringBuilder stringBuilder = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
System.out.println(line);
stringBuilder.append(line);
}
// five: 等待进程结束(process.destory 内部是一个 native 方法, 不会马上 kill 掉,需要等待一段时间子进程真正被 killed)
int loop = 0;
while (process.isAlive() && loop < 10) {
loop++;
Thread.sleep(1);
System.out.println("loop: " + loop + ", still alive");
}
System.out.println("进程信息" + process.isAlive() + process.exitValue());
System.out.println(stringBuilder.toString());
}
}
日志如图所示:
等待执行完毕或超时 ...
进程正常结束了么:true
finished in shell
hello ----
0 111
1 111
2 111
end *************
进程是否存活:false;进程返回值:0
shell 输出结果:hello ----0 1111 1112 111end *************
在取出结果时会抛出异常 Exception in thread "main" java.io.IOException: Stream closed
等待执行完毕或超时 ...
进程正常结束了么:false
准备 stop 掉子进程
Exception in thread "main" java.io.IOException: Stream closed
at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:170)
at java.io.BufferedInputStream.read(BufferedInputStream.java:336)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:161)
at java.io.BufferedReader.readLine(BufferedReader.java:324)
at java.io.BufferedReader.readLine(BufferedReader.java:389)
at org.bridge.xjq.bridge.Main3.main(Main3.java:37)
当检测到子进程超时依然没有结束时,我们会主动 destroy 掉子进程,destory 子进程的同时也会将 InputStream 流关闭,导致子进程计算结果无法获取,为了避免这种情形,可以考虑使用一个单独线程获取结果。
public class Main3 {
public static void main(String[] args) throws IOException, InterruptedException {
// one: 使用 processBuilder 构建 process 调度 shell 脚本
URL url = Main3.class.getResource("/say_hello.sh");
ProcessBuilder processBuilder = new ProcessBuilder("sh", url.getPath(), "8");
processBuilder.redirectErrorStream(true);
Process process = processBuilder.start();
// 1.5 在进程开始执行后, 立即轮询取出结果到 stringBuilder 中, 为了避免阻塞主逻辑(发现超时时,kill 掉子进程),使用独立线程取数
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
StringBuilder stringBuilder = new StringBuilder();
// 填充 shell 脚本内容
new Thread(new Runnable() {
@Override
public void run() {
// 将数据导出来, 这里也会一直等待子进程输出数据
String line;
while (true) {
try {
if ((line = reader.readLine()) == null) {
System.out.println(Thread.currentThread() + ", 取数结束了...");
break;
}
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
stringBuilder.append(line);
System.out.println(Thread.currentThread() + "result:" + line);
}
System.out.println(Thread.currentThread() + "flush to stringBuilder over");
}
}).start();
// two: 设置超时时间, 等待执行结束或者超时
System.out.println("等待执行完毕或超时 ...");
boolean over = process.waitFor(5, TimeUnit.SECONDS);
System.out.println("进程正常结束了么:" + over);
// three: 判断是否运行完毕了, 如果没有, 就手动 kill 掉子进程
if (over) {
System.out.println("finished in shell ");
} else {
System.out.println("准备 stop 掉子进程");
// 如果不 sleep 直接 destory, 会造成读取线程发生 IO 异常
process.destroy();
}
/*
// four: 读取子进程输出结果
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
StringBuilder stringBuilder = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
System.out.println(line);
stringBuilder.append(line);
}
*/
// five: 等待进程结束(process.destory 内部是一个 native 方法, 不会马上 kill 掉,需要等待一段时间子进程真正被 killed)
int loop = 0;
while (process.isAlive() && loop < 10) {
loop++;
Thread.sleep(1);
System.out.println("loop: " + loop + ", still alive");
}
System.out.println("进程是否存活:" + process.isAlive() + ";进程返回值:"+ process.exitValue());
System.out.println("shell 输出结果:" + stringBuilder.toString());
}
}
等待执行完毕或超时 ...
Thread[Thread-0,5,main]result:hello ----
Thread[Thread-0,5,main]result:0 111
Thread[Thread-0,5,main]result:1 111
Thread[Thread-0,5,main]result:2 111
Thread[Thread-0,5,main]result:end *************
Thread[Thread-0,5,main], 取数结束了...
Thread[Thread-0,5,main]flush to stringBuilder over
进程正常结束了么:true
finished in shell
进程是否存活:false;进程返回值:0
shell 输出结果:hello ----0 1111 1112 111end *************
等待执行完毕或超时 ...
Thread[Thread-0,5,main]result:hello ----
Thread[Thread-0,5,main]result:0 111
Thread[Thread-0,5,main]result:1 111
Thread[Thread-0,5,main]result:2 111
Thread[Thread-0,5,main]result:3 111
Thread[Thread-0,5,main]result:4 111
进程正常结束了么:false
准备 stop 掉子进程
Thread[Thread-0,5,main], 取数结束了...
Thread[Thread-0,5,main]flush to stringBuilder over
loop: 1, still alive
进程是否存活:false;进程返回值:143
shell 输出结果:hello ----0 1111 1112 1113 1114 111
该方法会阻塞当前线程,直到子进程执行完毕或者达到了超时时间,返回值反应子进程是否正常运行完毕
底层使用的一个 JNI(java native interface)方法,可能是异步执行的,并不会同步 kill 掉进程返回,因此需要等待一小段时间,等待子进程被终结
private void destroy(boolean force) {
switch (platform) {
case LINUX:
case BSD:
case AIX:
// There is a risk that pid will be recycled, causing us to
// kill the wrong process! So we only terminate processes
// that appear to still be running. Even with this check,
// there is an unavoidable race condition here, but the window
// is very small, and OSes try hard to not recycle pids too
// soon, so this is quite safe.
synchronized (this) {
if (!hasExited)
destroyProcess(pid, force);
}
try { stdin.close(); } catch (IOException ignored) {}
try { stdout.close(); } catch (IOException ignored) {}
try { stderr.close(); } catch (IOException ignored) {}
break;
...
private static native void destroyProcess(int pid, boolean force);
不用的 InputStream 有不同的 close 实现,对于 process.getInputStream() 返回的类是 ProcessPipeInputStream
,其 close 会补充结束符后,再关闭掉 inputStream,从而避免 reader.readline 读数时抛异常 Stream closed
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。