前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flume 1.8.0 开发之RPC

flume 1.8.0 开发之RPC

原创
作者头像
皮皮熊
发布2018-08-26 18:33:20
1.4K0
发布2018-08-26 18:33:20
举报
文章被收录于专栏:大数据与实时计算

RPC是flume开发中比较核心的部分。

flume开发基础可见:https://cloud.tencent.com/developer/article/1195082

RPC客户端接口

一个RPC客户端接口的实现,包含了支持Flume的RPC方法.用户的程序可以简单地调用Flume SDK客户端的append(Event)或者appendBatch(List<Event>)接口来发送数据,而不用考虑消息交互的细节.用户可以通过使用诸如SimpleEvent类,或者使用EventBuilder的 静态helper方法withBody(),便捷地实现直接提供事件接口所需的事件ARG.

RPC clients - Avro and Thrift

Flume 1.4.0时,Avro成为默认的RPC协议.NettyAvroRpcClient和ThriftRpcClient实现了RpcClient的接口.客户端需要建立一个包含目的Flume agent host和port信息的对象.使用RpcClient来发送数据到客户端.下例展示了如何在程序中使用flume client sdk api.

代码语言:txt
复制
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.nio.charset.Charset;

public class MyApp {
  public static void main(String[] args) {
    MyRpcClientFacade client = new MyRpcClientFacade();
    // Initialize client with the remote Flume agent's host and port
    client.init("host.example.org", 41414);

    // Send 10 events to the remote Flume agent. That agent should be
    // configured to listen with an AvroSource.
    String sampleData = "Hello Flume!";
    for (int i = 0; i < 10; i++) {
      client.sendDataToFlume(sampleData);
    }

    client.cleanUp();
  }
}

class MyRpcClientFacade {
  private RpcClient client;
  private String hostname;
  private int port;

  public void init(String hostname, int port) {
    // Setup the RPC connection
    this.hostname = hostname;
    this.port = port;
    this.client = RpcClientFactory.getDefaultInstance(hostname, port);
    // Use the following method to create a thrift client (instead of the above line):
    // this.client = RpcClientFactory.getThriftInstance(hostname, port);
  }

  public void sendDataToFlume(String data) {
    // Create a Flume Event object that encapsulates the sample data
    Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));

    // Send the event
    try {
      client.append(event);
    } catch (EventDeliveryException e) {
      // clean up and recreate the client
      client.close();
      client = null;
      client = RpcClientFactory.getDefaultInstance(hostname, port);
      // Use the following method to create a thrift client (instead of the above line):
      // this.client = RpcClientFactory.getThriftInstance(hostname, port);
    }
  }

  public void cleanUp() {
    // Close the RPC connection
    client.close();
  }

}

远端的flume agent需要AvroSource在监听相关端口(或者是ThriftSource).下面是一个正在等待MyApp连接的flume agent示例配置文件.

代码语言:txt
复制
a1.channels = c1 
a1.sources = r1 
a1.sinks = k1
a1.channels.c1.type = memory
a1.sources.r1.channels = c1 
a1.sources.r1.type = avro

# For using a thrift source set the following instead of the above line. # a1.source.r1.type = thrift
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414
a1.sinks.k1.channel = c1 
a1.sinks.k1.type = logger

更灵活一点,Flume client实现((NettyAvroRpcClient and ThriftRpcClient) )可以如下配置:

代码语言:txt
复制
client.type = default (for avro) or thrift (for thrift)
hosts = h1     # default client accepts only 1 host
                # (additional hosts will be ignored)
hosts.h1 = host1.example.org:41414
                # host and port must both be specified
                # (neither has a default)
batch-size = 100        # Must be >=1 (default: 100)
connect-timeout = 20000 # Must be >=1000 (default: 20000)
request-timeout = 20000 # Must be >=1000 (default: 20000)

Secure RPC client - Thrift

Flume 1.6.0时,Thrift source和sink支持基于kerberos的认证.客户端需要使用SecureRpcClientFactory的getThriftInstance方法来实现SecureThriftRpcClient.当你使用SecureRpcClientFactory时,kerberos认证模块需要放在classpath的flume-ng-auth路径下.客户端的主体和密钥表通过properties以参数形式传入.同时目的服务端的Thrift source也需要使用这样处理.用户的数据程序可以参考如下例子来使用SecureRpcClientFactory:

代码语言:txt
复制
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.api.SecureRpcClientFactory;
import org.apache.flume.api.RpcClientConfigurationConstants;
import org.apache.flume.api.RpcClient;
import java.nio.charset.Charset;
import java.util.Properties;

public class MyApp {
  public static void main(String[] args) {
    MySecureRpcClientFacade client = new MySecureRpcClientFacade();
    // Initialize client with the remote Flume agent's host, port
    Properties props = new Properties();
    props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "thrift");
    props.setProperty("hosts", "h1");
    props.setProperty("hosts.h1", "client.example.org"+":"+ String.valueOf(41414));

    // Initialize client with the kerberos authentication related properties
    props.setProperty("kerberos", "true");
    props.setProperty("client-principal", "flumeclient/client.example.org@EXAMPLE.ORG");
    props.setProperty("client-keytab", "/tmp/flumeclient.keytab");
    props.setProperty("server-principal", "flume/server.example.org@EXAMPLE.ORG");
    client.init(props);

    // Send 10 events to the remote Flume agent. That agent should be
    // configured to listen with an AvroSource.
    String sampleData = "Hello Flume!";
    for (int i = 0; i < 10; i++) {
      client.sendDataToFlume(sampleData);
    }

    client.cleanUp();
  }
}

class MySecureRpcClientFacade {
  private RpcClient client;
  private Properties properties;

  public void init(Properties properties) {
    // Setup the RPC connection
    this.properties = properties;
    // Create the ThriftSecureRpcClient instance by using SecureRpcClientFactory
    this.client = SecureRpcClientFactory.getThriftInstance(properties);
  }

  public void sendDataToFlume(String data) {
    // Create a Flume Event object that encapsulates the sample data
    Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));

    // Send the event
    try {
      client.append(event);
    } catch (EventDeliveryException e) {
      // clean up and recreate the client
      client.close();
      client = null;
      client = SecureRpcClientFactory.getThriftInstance(properties);
    }
  }

  public void cleanUp() {
    // Close the RPC connection
    client.close();
  }
}

远端的ThriftSource需要以kerberos模式启动.下面这个示例Flume agent配置文件用于等待MyApp的连接:

代码语言:txt
复制
a1.channels = c1
a1.sources = r1
a1.sinks = k1

a1.channels.c1.type = memory

a1.sources.r1.channels = c1
a1.sources.r1.type = thrift
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414
a1.sources.r1.kerberos = true
a1.sources.r1.agent-principal = flume/server.example.org@EXAMPLE.ORG
a1.sources.r1.agent-keytab = /tmp/flume.keytab


a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger

failover client

这个class包含一个默认的Avro RPC客户端,来提供客户端的故障切换处理能力.它使用了一个以空格分隔的list(<host>:<port>代表flume agent)作为一个故障切换组.目前failoverRPC客户端还不支持thrift.如果与选中的host agent通信出现错误,那么failover client会自动的选取列表中下一个host.例如:

代码语言:txt
复制
// Setup properties for the failover
Properties props = new Properties();
props.put("client.type", "default_failover");

// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts", "h1 h2 h3");

// host/port pair for each host alias
String host1 = "host1.example.org:41414";
String host2 = "host2.example.org:41414";
String host3 = "host3.example.org:41414";
props.put("hosts.h1", host1);
props.put("hosts.h2", host2);
props.put("hosts.h3", host3);

// create the client with failover properties
RpcClient client = RpcClientFactory.getInstance(props);

FailoverRpcClient可以以下列参数更灵活的配置:

代码语言:txt
复制
client.type = default_failover

hosts = h1 h2 h3                     # at least one is required, but 2 or
                                     # more makes better sense

hosts.h1 = host1.example.org:41414

hosts.h2 = host2.example.org:41414

hosts.h3 = host3.example.org:41414

max-attempts = 3                     # Must be >=0 (default: number of hosts
                                     # specified, 3 in this case). A '0'
                                     # value doesn't make much sense because
                                     # it will just cause an append call to
                                     # immmediately fail. A '1' value means
                                     # that the failover client will try only
                                     # once to send the Event, and if it
                                     # fails then there will be no failover
                                     # to a second client, so this value
                                     # causes the failover client to
                                     # degenerate into just a default client.
                                     # It makes sense to set this value to at
                                     # least the number of hosts that you
                                     # specified.

batch-size = 100                     # Must be >=1 (default: 100)

connect-timeout = 20000              # Must be >=1000 (default: 20000)

request-timeout = 20000              # Must be >=1000 (default: 20000)

负载均衡的RPC client

Flume Client SDK同样支持RPC的负载均衡.它使用了一个以空格分隔的list(<host>:<port>代表flume agent)作为一个负载均衡组.同时支持随机和轮询两种负载均衡策略.你同样可以通过implement loadBalancingRpcClient$HostSelector接口在你自定义的class中实现选取算法.这种情况下,你需要在host-selector属性中填上你自定义类的FQCN(Full Qualified Class Name).目前负载均衡的RPC client同样没有支持thrift.

如果启用了backoff,则客户端将暂时将失败的主机列入黑名单,从而在给定的timeout时间内将它们排除为故障转移的host.当timeout时间到了,如果host仍然没有响应,那么这被认为是顺序故障,并且timeout时间会以指数方式增加,以避免在无响应的主机上长时间等待时卡住.

可以通过设置maxBackoff(以毫秒为单位)来配置最大backoff时间. maxBackoff默认值为30秒(在OrderSelector类中指定,它是两个负载平衡策略的超类). 退避超时将随着连续故障呈指数级增长,直至最大可能的退避超时(最大可能的退避限制为65536秒(约18.2小时)).

代码语言:txt
复制
// Setup properties for the load balancing
Properties props = new Properties();
props.put("client.type", "default_loadbalance");

// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts", "h1 h2 h3");

// host/port pair for each host alias
String host1 = "host1.example.org:41414";
String host2 = "host2.example.org:41414";
String host3 = "host3.example.org:41414";
props.put("hosts.h1", host1);
props.put("hosts.h2", host2);
props.put("hosts.h3", host3);

props.put("host-selector", "random"); // For random host selection
// props.put("host-selector", "round_robin"); // For round-robin host
//                                            // selection
props.put("backoff", "true"); // Disabled by default.

props.put("maxBackoff", "10000"); // Defaults 0, which effectively
                                  // becomes 30000 ms

// Create the client with load balancing properties
RpcClient client = RpcClientFactory.getInstance(props);

LoadBalancingRpcClient可以以下列参数更灵活的配置:

代码语言:txt
复制
client.type = default_loadbalance

hosts = h1 h2 h3                     # At least 2 hosts are required

hosts.h1 = host1.example.org:41414

hosts.h2 = host2.example.org:41414

hosts.h3 = host3.example.org:41414

backoff = false                      # Specifies whether the client should
                                     # back-off from (i.e. temporarily
                                     # blacklist) a failed host
                                     # (default: false).

maxBackoff = 0                       # Max timeout in millis that a will
                                     # remain inactive due to a previous
                                     # failure with that host (default: 0,
                                     # which effectively becomes 30000)

host-selector = round_robin          # The host selection strategy used
                                     # when load-balancing among hosts
                                     # (default: round_robin).
                                     # Other values are include "random"
                                     # or the FQCN of a custom class
                                     # that implements
                                     # LoadBalancingRpcClient$HostSelector

batch-size = 100                     # Must be >=1 (default: 100)

connect-timeout = 20000              # Must be >=1000 (default: 20000)

request-timeout = 20000              # Must be >=1000 (default: 20000)

嵌入式的agent

Flume有一个嵌入式式的api,允许用户在他们的应用程序中嵌入.当不使用全部的source、sink、channel时,agent就会很轻量.具体而言,使用的source是一个特殊的嵌入式的source,事件通过EmbeddedAgent对象的put、putall方法来发送数据到source.Avro是唯一支持的sink,而channel只允许是File和Memory Channel.嵌入式的agent同样支持Interceptors.

注意:嵌入式agent依赖hadoop-core.jar.

嵌入式agent的配置类似于完整agent的配置.以下是一份详尽的可选配置列表:

加粗的为必选项.

属性名

默认

描述

source.type

embedded

只能选embedded source.

channel.type

-

memory或者file

channel.*

-

详见MemoryChannel和FileChannel的用户指引

sinks

-

sink名的列表

sink.type

-

必须为avro

sink.*

-

参考AvroSink的用户指引

processor.type

-

failover or load_balance

processor.*

-

参考failover or load_balance的用户指引

source.interceptors

-

空格分隔的interceptors列表

source.interceptors.*

-

每个独立source.interceptors配置

使用案例:

代码语言:txt
复制
Map<String, String> properties = new HashMap<String, String>();
properties.put("channel.type", "memory");
properties.put("channel.capacity", "200");
properties.put("sinks", "sink1 sink2");
properties.put("sink1.type", "avro");
properties.put("sink2.type", "avro");
properties.put("sink1.hostname", "collector1.apache.org");
properties.put("sink1.port", "5564");
properties.put("sink2.hostname", "collector2.apache.org");
properties.put("sink2.port",  "5565");
properties.put("processor.type", "load_balance");
properties.put("source.interceptors", "i1");
properties.put("source.interceptors.i1.type", "static");
properties.put("source.interceptors.i1.key", "key1");
properties.put("source.interceptors.i1.value", "value1");

EmbeddedAgent agent = new EmbeddedAgent("myagent");

agent.configure(properties);
agent.start();

List<Event> events = Lists.newArrayList();

events.add(event);
events.add(event);
events.add(event);
events.add(event);

agent.putAll(events);

...

agent.stop();

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RPC客户端接口
  • RPC clients - Avro and Thrift
  • Secure RPC client - Thrift
  • failover client
  • 负载均衡的RPC client
  • 嵌入式的agent
相关产品与服务
负载均衡
负载均衡(Cloud Load Balancer,CLB)提供安全快捷的四七层流量分发服务,访问流量经由 CLB 可以自动分配到多台后端服务器上,扩展系统的服务能力并消除单点故障。轻松应对大流量访问场景。 网关负载均衡(Gateway Load Balancer,GWLB)是运行在网络层的负载均衡。通过 GWLB 可以帮助客户部署、扩展和管理第三方虚拟设备,操作简单,安全性强。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档