ExecSource是很多人接触flume时第一个使用的source,这里我简单的分析下这个source的使用与实现。
FlumeUserGuide:https://flume.apache.org/FlumeUserGuide.html
这里我简单翻译和总结下:
Exec source在启动时运行Unix命令,并且期望它会不断的在标准输出中产生数据。 (stderr默认会被丢弃).如果进程因为某些原因退出,Exce Source也将退出并且不会再产生数据。
详细配置说明如下: 加粗的是必配项:
注意:ExecSource无法感知数据的丢失
,比如channel满的时候数据发送失败。为了更健壮的数据可靠性,推荐:Spooling Directory Source、Taildir Source或者通过flume的SDK直接实现。
如此一个简单的配置,我们就能实时地将/var/log/secure下的日志发送到flume的channel里面。
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1
简单说下shell和command,shell主要是区分Bash 或 Powershell。command作为一个参数传递给shell执行,这样command就可以使用shell的一些特性,如wildcards, back ticks, pipes, loops, conditionals,如果shell没有指定,一般会使用默认值‘/bin/sh -c’, ‘/bin/ksh -c’, ‘cmd /c’, ‘powershell -Command’。
public class ExecSource extends AbstractSource implements EventDrivenSource, Configurable{
xxx;
}
主要函数/类有:
代码比较少,功能比较简单:
功能同样比较简单
获取excel里面的那些参数
static class ExecRunnable implements Runnable
核心部分是一个
do{
xxx;
}while(restart);
所以stop时需要将restart置为false,防止do重做。
do内容主要包括:
if (shell != null) {
String[] commandArgs = formulateShellCommand(shell, command);
process = Runtime.getRuntime().exec(commandArgs);
} else {
String[] commandArgs = command.split("\\s+");
process = new ProcessBuilder(commandArgs).start();
}
reader = new BufferedReader(
new InputStreamReader(process.getInputStream(), charset));
StderrReader stderrReader = new StderrReader(new BufferedReader(
new InputStreamReader(process.getErrorStream(), charset)), logStderr);
xxxx;
future = timedFlushService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
synchronized (eventList) {
if (!eventList.isEmpty() && timeout()) {
flushEventBatch(eventList);
}
}
} catch (Exception e) {
logger.error("Exception occurred when processing event batch", e);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
}
}
},
batchTimeout, batchTimeout, TimeUnit.MILLISECONDS);
while ((line = reader.readLine()) != null) {
sourceCounter.incrementEventReceivedCount();
synchronized (eventList) {
eventList.add(EventBuilder.withBody(line.getBytes(charset)));
if (eventList.size() >= bufferCount || timeout()) {
flushEventBatch(eventList);
}
}
}
synchronized (eventList) {
if (!eventList.isEmpty()) {
flushEventBatch(eventList);
}
}
这个通用的小技巧,几乎所有的批处理代码里面都有这类写法:
1. 没到最后一行,达到bufferCount或者到timeout一批处理列表里面的数据
2. 读完最后一行(字节流没数据了),执行批处理
static class StderrReader extends Thread
private void flushEventBatch(List<Event> eventList) {
channelProcessor.processEventBatch(eventList);
sourceCounter.addToEventAcceptedCount(eventList.size());
eventList.clear();
lastPushToChannel = systemClock.currentTimeMillis();
}
private static String[] formulateShellCommand(String shell, String command) {
String[] shellArgs = shell.split("\\s+");
String[] result = new String[shellArgs.length + 1];
System.arraycopy(shellArgs, 0, result, 0, shellArgs.length);
result[shellArgs.length] = command;
return result;
}
catch (Exception e) {
logger.error("Failed while running command: " + command, e);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
}
private boolean timeout() {
return (systemClock.currentTimeMillis() - lastPushToChannel) >= batchTimeout;
}
“了解ExecSource的使用” 放在最后是因为ExecSource确实不太适合用在生产环境
参考文章:https://blog.csdn.net/qianshangding0708/article/details/49736019
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。