RPC是flume开发中比较核心的部分。
flume开发基础可见:https://cloud.tencent.com/developer/article/1195082
一个RPC客户端接口的实现,包含了支持Flume的RPC方法.用户的程序可以简单地调用Flume SDK客户端的append(Event)或者appendBatch(List<Event>)接口来发送数据,而不用考虑消息交互的细节.用户可以通过使用诸如SimpleEvent类,或者使用EventBuilder的 静态helper方法withBody(),便捷地实现直接提供事件接口所需的事件ARG.
Flume 1.4.0时,Avro成为默认的RPC协议.NettyAvroRpcClient和ThriftRpcClient实现了RpcClient的接口.客户端需要建立一个包含目的Flume agent host和port信息的对象.使用RpcClient来发送数据到客户端.下例展示了如何在程序中使用flume client sdk api.
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.nio.charset.Charset;
public class MyApp {
public static void main(String[] args) {
MyRpcClientFacade client = new MyRpcClientFacade();
// Initialize client with the remote Flume agent's host and port
client.init("host.example.org", 41414);
// Send 10 events to the remote Flume agent. That agent should be
// configured to listen with an AvroSource.
String sampleData = "Hello Flume!";
for (int i = 0; i < 10; i++) {
client.sendDataToFlume(sampleData);
}
client.cleanUp();
}
}
class MyRpcClientFacade {
private RpcClient client;
private String hostname;
private int port;
public void init(String hostname, int port) {
// Setup the RPC connection
this.hostname = hostname;
this.port = port;
this.client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
public void sendDataToFlume(String data) {
// Create a Flume Event object that encapsulates the sample data
Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
// Send the event
try {
client.append(event);
} catch (EventDeliveryException e) {
// clean up and recreate the client
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
}
public void cleanUp() {
// Close the RPC connection
client.close();
}
}
远端的flume agent需要AvroSource在监听相关端口(或者是ThriftSource).下面是一个正在等待MyApp连接的flume agent示例配置文件.
a1.channels = c1
a1.sources = r1
a1.sinks = k1
a1.channels.c1.type = memory
a1.sources.r1.channels = c1
a1.sources.r1.type = avro
# For using a thrift source set the following instead of the above line. # a1.source.r1.type = thrift
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414
a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger
更灵活一点,Flume client实现((NettyAvroRpcClient and ThriftRpcClient) )可以如下配置:
client.type = default (for avro) or thrift (for thrift)
hosts = h1 # default client accepts only 1 host
# (additional hosts will be ignored)
hosts.h1 = host1.example.org:41414
# host and port must both be specified
# (neither has a default)
batch-size = 100 # Must be >=1 (default: 100)
connect-timeout = 20000 # Must be >=1000 (default: 20000)
request-timeout = 20000 # Must be >=1000 (default: 20000)
Flume 1.6.0时,Thrift source和sink支持基于kerberos的认证.客户端需要使用SecureRpcClientFactory的getThriftInstance方法来实现SecureThriftRpcClient.当你使用SecureRpcClientFactory时,kerberos认证模块需要放在classpath的flume-ng-auth路径下.客户端的主体和密钥表通过properties以参数形式传入.同时目的服务端的Thrift source也需要使用这样处理.用户的数据程序可以参考如下例子来使用SecureRpcClientFactory:
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.api.SecureRpcClientFactory;
import org.apache.flume.api.RpcClientConfigurationConstants;
import org.apache.flume.api.RpcClient;
import java.nio.charset.Charset;
import java.util.Properties;
public class MyApp {
public static void main(String[] args) {
MySecureRpcClientFacade client = new MySecureRpcClientFacade();
// Initialize client with the remote Flume agent's host, port
Properties props = new Properties();
props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "thrift");
props.setProperty("hosts", "h1");
props.setProperty("hosts.h1", "client.example.org"+":"+ String.valueOf(41414));
// Initialize client with the kerberos authentication related properties
props.setProperty("kerberos", "true");
props.setProperty("client-principal", "flumeclient/client.example.org@EXAMPLE.ORG");
props.setProperty("client-keytab", "/tmp/flumeclient.keytab");
props.setProperty("server-principal", "flume/server.example.org@EXAMPLE.ORG");
client.init(props);
// Send 10 events to the remote Flume agent. That agent should be
// configured to listen with an AvroSource.
String sampleData = "Hello Flume!";
for (int i = 0; i < 10; i++) {
client.sendDataToFlume(sampleData);
}
client.cleanUp();
}
}
class MySecureRpcClientFacade {
private RpcClient client;
private Properties properties;
public void init(Properties properties) {
// Setup the RPC connection
this.properties = properties;
// Create the ThriftSecureRpcClient instance by using SecureRpcClientFactory
this.client = SecureRpcClientFactory.getThriftInstance(properties);
}
public void sendDataToFlume(String data) {
// Create a Flume Event object that encapsulates the sample data
Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
// Send the event
try {
client.append(event);
} catch (EventDeliveryException e) {
// clean up and recreate the client
client.close();
client = null;
client = SecureRpcClientFactory.getThriftInstance(properties);
}
}
public void cleanUp() {
// Close the RPC connection
client.close();
}
}
远端的ThriftSource需要以kerberos模式启动.下面这个示例Flume agent配置文件用于等待MyApp的连接:
a1.channels = c1
a1.sources = r1
a1.sinks = k1
a1.channels.c1.type = memory
a1.sources.r1.channels = c1
a1.sources.r1.type = thrift
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414
a1.sources.r1.kerberos = true
a1.sources.r1.agent-principal = flume/server.example.org@EXAMPLE.ORG
a1.sources.r1.agent-keytab = /tmp/flume.keytab
a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger
这个class包含一个默认的Avro RPC客户端,来提供客户端的故障切换处理能力.它使用了一个以空格分隔的list(<host>:<port>代表flume agent)作为一个故障切换组.目前failoverRPC客户端还不支持thrift.如果与选中的host agent通信出现错误,那么failover client会自动的选取列表中下一个host.例如:
// Setup properties for the failover
Properties props = new Properties();
props.put("client.type", "default_failover");
// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts", "h1 h2 h3");
// host/port pair for each host alias
String host1 = "host1.example.org:41414";
String host2 = "host2.example.org:41414";
String host3 = "host3.example.org:41414";
props.put("hosts.h1", host1);
props.put("hosts.h2", host2);
props.put("hosts.h3", host3);
// create the client with failover properties
RpcClient client = RpcClientFactory.getInstance(props);
FailoverRpcClient可以以下列参数更灵活的配置:
client.type = default_failover
hosts = h1 h2 h3 # at least one is required, but 2 or
# more makes better sense
hosts.h1 = host1.example.org:41414
hosts.h2 = host2.example.org:41414
hosts.h3 = host3.example.org:41414
max-attempts = 3 # Must be >=0 (default: number of hosts
# specified, 3 in this case). A '0'
# value doesn't make much sense because
# it will just cause an append call to
# immmediately fail. A '1' value means
# that the failover client will try only
# once to send the Event, and if it
# fails then there will be no failover
# to a second client, so this value
# causes the failover client to
# degenerate into just a default client.
# It makes sense to set this value to at
# least the number of hosts that you
# specified.
batch-size = 100 # Must be >=1 (default: 100)
connect-timeout = 20000 # Must be >=1000 (default: 20000)
request-timeout = 20000 # Must be >=1000 (default: 20000)
Flume Client SDK同样支持RPC的负载均衡.它使用了一个以空格分隔的list(<host>:<port>代表flume agent)作为一个负载均衡组.同时支持随机和轮询两种负载均衡策略.你同样可以通过implement loadBalancingRpcClient$HostSelector接口在你自定义的class中实现选取算法.这种情况下,你需要在host-selector属性中填上你自定义类的FQCN(Full Qualified Class Name).目前负载均衡的RPC client同样没有支持thrift.
如果启用了backoff,则客户端将暂时将失败的主机列入黑名单,从而在给定的timeout时间内将它们排除为故障转移的host.当timeout时间到了,如果host仍然没有响应,那么这被认为是顺序故障,并且timeout时间会以指数方式增加,以避免在无响应的主机上长时间等待时卡住.
可以通过设置maxBackoff(以毫秒为单位)来配置最大backoff时间. maxBackoff默认值为30秒(在OrderSelector类中指定,它是两个负载平衡策略的超类). 退避超时将随着连续故障呈指数级增长,直至最大可能的退避超时(最大可能的退避限制为65536秒(约18.2小时)).
// Setup properties for the load balancing
Properties props = new Properties();
props.put("client.type", "default_loadbalance");
// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts", "h1 h2 h3");
// host/port pair for each host alias
String host1 = "host1.example.org:41414";
String host2 = "host2.example.org:41414";
String host3 = "host3.example.org:41414";
props.put("hosts.h1", host1);
props.put("hosts.h2", host2);
props.put("hosts.h3", host3);
props.put("host-selector", "random"); // For random host selection
// props.put("host-selector", "round_robin"); // For round-robin host
// // selection
props.put("backoff", "true"); // Disabled by default.
props.put("maxBackoff", "10000"); // Defaults 0, which effectively
// becomes 30000 ms
// Create the client with load balancing properties
RpcClient client = RpcClientFactory.getInstance(props);
LoadBalancingRpcClient可以以下列参数更灵活的配置:
client.type = default_loadbalance
hosts = h1 h2 h3 # At least 2 hosts are required
hosts.h1 = host1.example.org:41414
hosts.h2 = host2.example.org:41414
hosts.h3 = host3.example.org:41414
backoff = false # Specifies whether the client should
# back-off from (i.e. temporarily
# blacklist) a failed host
# (default: false).
maxBackoff = 0 # Max timeout in millis that a will
# remain inactive due to a previous
# failure with that host (default: 0,
# which effectively becomes 30000)
host-selector = round_robin # The host selection strategy used
# when load-balancing among hosts
# (default: round_robin).
# Other values are include "random"
# or the FQCN of a custom class
# that implements
# LoadBalancingRpcClient$HostSelector
batch-size = 100 # Must be >=1 (default: 100)
connect-timeout = 20000 # Must be >=1000 (default: 20000)
request-timeout = 20000 # Must be >=1000 (default: 20000)
Flume有一个嵌入式式的api,允许用户在他们的应用程序中嵌入.当不使用全部的source、sink、channel时,agent就会很轻量.具体而言,使用的source是一个特殊的嵌入式的source,事件通过EmbeddedAgent对象的put、putall方法来发送数据到source.Avro是唯一支持的sink,而channel只允许是File和Memory Channel.嵌入式的agent同样支持Interceptors.
注意:嵌入式agent依赖hadoop-core.jar.
嵌入式agent的配置类似于完整agent的配置.以下是一份详尽的可选配置列表:
加粗的为必选项.
属性名 | 默认 | 描述 |
---|---|---|
source.type | embedded | 只能选embedded source. |
channel.type | - | memory或者file |
channel.* | - | 详见MemoryChannel和FileChannel的用户指引 |
sinks | - | sink名的列表 |
sink.type | - | 必须为avro |
sink.* | - | 参考AvroSink的用户指引 |
processor.type | - | failover or load_balance |
processor.* | - | 参考failover or load_balance的用户指引 |
source.interceptors | - | 空格分隔的interceptors列表 |
source.interceptors.* | - | 每个独立source.interceptors配置 |
使用案例:
Map<String, String> properties = new HashMap<String, String>();
properties.put("channel.type", "memory");
properties.put("channel.capacity", "200");
properties.put("sinks", "sink1 sink2");
properties.put("sink1.type", "avro");
properties.put("sink2.type", "avro");
properties.put("sink1.hostname", "collector1.apache.org");
properties.put("sink1.port", "5564");
properties.put("sink2.hostname", "collector2.apache.org");
properties.put("sink2.port", "5565");
properties.put("processor.type", "load_balance");
properties.put("source.interceptors", "i1");
properties.put("source.interceptors.i1.type", "static");
properties.put("source.interceptors.i1.key", "key1");
properties.put("source.interceptors.i1.value", "value1");
EmbeddedAgent agent = new EmbeddedAgent("myagent");
agent.configure(properties);
agent.start();
List<Event> events = Lists.newArrayList();
events.add(event);
events.add(event);
events.add(event);
events.add(event);
agent.putAll(events);
...
agent.stop();
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。