前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >六大方法彻底解决Flink Table & SQL维表Join

六大方法彻底解决Flink Table & SQL维表Join

作者头像
大数据真好玩
发布2021-11-16 15:22:35
3.6K0
发布2021-11-16 15:22:35
举报
文章被收录于专栏:暴走大数据

随着 Flink Table & SQL的发展,Flink SQL中用于进行维表Join也成为了很多场景的选择。

基于之前的总结,再次总结下Flink Table & SQL 中维表Join的实现方式,包括DataStream中的维表Join。

  • 定时加载维度数据
  • Distributed Cache(分布式缓存)
  • Async IO(异步IO)
  • Broadcast State(广播状态)
  • UDTF + LATERAL TABLE语法
  • LookupableTableSource

定时加载维度数据

实现方式

  • 实现RichFlatMapFunction, 在open()方法中起个线程定时读取维度数据并加载到内存。
  • 在flatMap()方法中实现维度关联。

代码示例

代码语言:javascript
复制
package com.bigdata.flink.dimJoin;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

import java.sql.*;
import java.util.HashMap;
import java.util.Timer;
import java.util.TimerTask;

/**
 * 定时加载维度数据到内存
 */
@Slf4j
public class DimRichFlatMapFunction extends RichFlatMapFunction<UserBrowseLog, Tuple2<UserBrowseLog, UserInfo>> {

    private final String url;
    private final String user;
    private final String passwd;
    private final Integer reloadInterval;

    private Connection connection;
    private final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver";
    HashMap dimInfo = new HashMap<String, UserInfo>();

    public DimRichFlatMapFunction(String url, String user, String passwd, Integer reloadInterval) {
        this.url = url;
        this.user = user;
        this.passwd = passwd;
        this.reloadInterval = reloadInterval;
    }

    /**
     * 打开连接
     * 定时加载维度数据
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        Class.forName(JDBC_DRIVER);

        TimerTask timerTask = new TimerTask() {
            @Override
            public void run() {
                try {
                    if (connection == null || connection.isClosed()) {
                        log.warn("No connection. Trying to reconnect...");
                        connection = DriverManager.getConnection(url, user, passwd);
                    }
                    String sql = "select uid,name,age,address from t_user_info";
                    PreparedStatement preparedStatement = connection.prepareStatement(sql);
                    ResultSet resultSet = preparedStatement.executeQuery();
                    while (resultSet.next()) {
                        UserInfo userInfo = new UserInfo();
                        userInfo.setUid(resultSet.getString("uid"));
                        userInfo.setName(resultSet.getString("name"));
                        userInfo.setAge(resultSet.getInt("age"));
                        userInfo.setAddress(resultSet.getString("address"));

                        dimInfo.put(userInfo.getUid(), userInfo);
                    }
                } catch (SQLException e) {
                    log.error("Get dimension data exception...", e);
                }
            }
        };

        Timer timer = new Timer();
        timer.scheduleAtFixedRate(timerTask, 0, reloadInterval * 1000);

    }

    /**
     * 关闭连接
     */
    @Override
    public void close() throws Exception {
        super.close();
        if (connection != null) {
            connection.close();
        }
    }

    /**
     * 维度关联
     * @param value
     * @param out
     * @throws Exception
     */
    @Override
    public void flatMap(UserBrowseLog value, Collector<Tuple2<UserBrowseLog, UserInfo>> out) throws Exception {
        String userID = value.getUserID();
        if (dimInfo.containsKey(userID)) {
            UserInfo dim = (UserInfo) dimInfo.get(userID);
            out.collect(new Tuple2<>(value, dim));
        }
    }
}

  • 注意
  1. 由于数据会存储在内存中,因此,仅支持小数据量维表。
  2. 定时加载,仅适用于更新不太频繁的维表。

Distributed Cache(分布式缓存)

实现方式

  1. 通过env.registerCachedFile(cachedFilePath, cachedFileName)注册本地或HDFS缓存文件。
  2. 程序启动时,Flink会自动将文件分发到TaskManager文件系统中。
  3. 实现RichFlatMapFunction,在open()方法中通过RuntimeContext获取缓存文件并解析。
  4. 解析后的数据在内存中,此时可在flatMap()方法中实现维度关联。

代码示例

代码语言:javascript
复制
package com.bigdata.flink.dimJoin;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.io.File;
import java.util.HashMap;
import java.util.List;

/**
 * 通过Distributed Cache实现维度关联
 */
@Slf4j
public class DistributedCacheJoinDim {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 注册缓存文件 如: file:///some/path 或 hdfs://host:port/and/path
        String cachedFilePath = "./user_info.txt";
        String cachedFileName = "user_info";
        env.registerCachedFile(cachedFilePath, cachedFileName);

        // 添加实时流
        DataStreamSource<Tuple2<String, String>> stream = env.fromElements(
                Tuple2.of("1", "click"),
                Tuple2.of("2", "click"),
                Tuple2.of("3", "browse"));

        // 关联维度
        SingleOutputStreamOperator<String> dimedStream = stream.flatMap(new RichFlatMapFunction<Tuple2<String, String>, String>() {

            HashMap dimInfo = new HashMap<String, Integer>();

            // 读取文件
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                File cachedFile = getRuntimeContext().getDistributedCache().getFile(cachedFileName);
                List<String> lines = FileUtils.readLines(cachedFile);
                for (String line : lines) {
                    String[] split = line.split(",");
                    dimInfo.put(split[0], Integer.valueOf(split[1]));
                }
            }

            // 关联维度
            @Override
            public void flatMap(Tuple2<String, String> value, Collector<String> out) throws Exception {
                if (dimInfo.containsKey(value.f0)) {
                    Integer age = (Integer) dimInfo.get(value.f0);
                    out.collect(value.f0 + "," + value.f1 + "," + age);
                }
            }
        });

        dimedStream.print();

        env.execute();
    }
}

  • 注意
  1. 由于数据会存储在内存中,因此,仅支持小数据量维表。
  2. 启动时加载,在维表变化时,需要重启任务。

Distributed Cache(分布式缓存)

实现方式

  1. 通过env.registerCachedFile(cachedFilePath, cachedFileName)注册本地或HDFS缓存文件。
  2. 程序启动时,Flink会自动将文件分发到TaskManager文件系统中。
  3. 实现RichFlatMapFunction,在open()方法中通过RuntimeContext获取缓存文件并解析。
  4. 解析后的数据在内存中,此时可在flatMap()方法中实现维度关联。

代码示例

代码语言:javascript
复制
package com.bigdata.flink.dimJoin;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.io.File;
import java.util.HashMap;
import java.util.List;

/**
 * Author: Wang Pei
 * Summary:
 * 通过Distributed Cache实现维度关联
 */
@Slf4j
public class DistributedCacheJoinDim {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 注册缓存文件 如: file:///some/path 或 hdfs://host:port/and/path
        String cachedFilePath = "./user_info.txt";
        String cachedFileName = "user_info";
        env.registerCachedFile(cachedFilePath, cachedFileName);

        // 添加实时流
        DataStreamSource<Tuple2<String, String>> stream = env.fromElements(
                Tuple2.of("1", "click"),
                Tuple2.of("2", "click"),
                Tuple2.of("3", "browse"));

        // 关联维度
        SingleOutputStreamOperator<String> dimedStream = stream.flatMap(new RichFlatMapFunction<Tuple2<String, String>, String>() {

            HashMap dimInfo = new HashMap<String, Integer>();

            // 读取文件
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                File cachedFile = getRuntimeContext().getDistributedCache().getFile(cachedFileName);
                List<String> lines = FileUtils.readLines(cachedFile);
                for (String line : lines) {
                    String[] split = line.split(",");
                    dimInfo.put(split[0], Integer.valueOf(split[1]));
                }
            }

            // 关联维度
            @Override
            public void flatMap(Tuple2<String, String> value, Collector<String> out) throws Exception {
                if (dimInfo.containsKey(value.f0)) {
                    Integer age = (Integer) dimInfo.get(value.f0);
                    out.collect(value.f0 + "," + value.f1 + "," + age);
                }
            }
        });

        dimedStream.print();

        env.execute();
    }
}

  • 注意
  1. 由于数据会存储在内存中,因此,仅支持小数据量维表。
  2. 启动时加载,在维表变化时,需要重启任务。

Async IO(异步IO)

实现方式

  1. 维度数据在外部存储中,如ES、Redis、HBase中。
  2. 通过异步IO查询维度数据
  3. 结合本地缓存如Guava Cache 减少对外部存储的访问。

代码示例

代码语言:javascript
复制

package com.bigdata.flink;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.*;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

/**
 * 用Async I/O实现流表与维表Join
 */
public class FlinkAsyncIO {
    public static void main(String[] args) throws Exception{

        /**解析命令行参数*/
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String kafkaBootstrapServers = parameterTool.get("kafka.bootstrap.servers");
        String kafkaGroupID = parameterTool.get("kafka.group.id");
        String kafkaAutoOffsetReset= parameterTool.get("kafka.auto.offset.reset");
        String kafkaTopic = parameterTool.get("kafka.topic");
        int kafkaParallelism =parameterTool.getInt("kafka.parallelism");

        String esHost= parameterTool.get("es.host");
        Integer esPort= parameterTool.getInt("es.port");
        String esUser = parameterTool.get("es.user");
        String esPassword = parameterTool.get("es.password");
        String esIndex = parameterTool.get("es.index");
        String esType = parameterTool.get("es.type");


        /**Flink DataStream 运行环境*/
        Configuration config = new Configuration();
        config.setInteger(RestOptions.PORT,8081);
        config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);

        /**添加数据源*/
        Properties kafkaProperties = new Properties();
        kafkaProperties.put("bootstrap.servers",kafkaBootstrapServers);
        kafkaProperties.put("group.id",kafkaGroupID);
        kafkaProperties.put("auto.offset.reset",kafkaAutoOffsetReset);
        FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(kafkaTopic, new SimpleStringSchema(), kafkaProperties);
        kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
        SingleOutputStreamOperator<String> source = env.addSource(kafkaConsumer).name("KafkaSource").setParallelism(kafkaParallelism);

        //数据转换
        SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> sourceMap = source.map((MapFunction<String, Tuple4<String, String, String, Integer>>) value -> {
            Tuple4<String, String, String, Integer> output = new Tuple4<>();
            try {
                JSONObject obj = JSON.parseObject(value);
                output.f0 = obj.getString("userID");
                output.f1 = obj.getString("eventTime");
                output.f2 = obj.getString("eventType");
                output.f3 = obj.getInteger("productID");
            } catch (Exception e) {
                e.printStackTrace();
            }
            return output;
        }).returns(new TypeHint<Tuple4<String, String, String, Integer>>(){}).name("Map: ExtractTransform");

        //过滤掉异常数据
        SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> sourceFilter = sourceMap.filter((FilterFunction<Tuple4<String, String, String, Integer>>) value -> value.f3 != null).name("Filter: FilterExceptionData");

        //Timeout: 超时时间 默认异步I/O请求超时时,会引发异常并重启或停止作业。 如果要处理超时,可以重写AsyncFunction#timeout方法。
        //Capacity: 并发请求数量
        /**Async IO实现流表与维表Join*/
        SingleOutputStreamOperator<Tuple5<String, String, String, Integer, Integer>> result = AsyncDataStream.orderedWait(sourceFilter, new ElasticsearchAsyncFunction(esHost,esPort,esUser,esPassword,esIndex,esType), 500, TimeUnit.MILLISECONDS, 10).name("Join: JoinWithDim");

        /**结果输出*/
        result.print().name("PrintToConsole");
        env.execute();
    }
}

其中的 ElasticsearchAsyncFunction:

代码语言:javascript
复制
package com.bigdata.flink;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.util.Collections;
import java.util.concurrent.TimeUnit;

/**
 * 自定义ElasticsearchAsyncFunction,实现从ES中查询维度数据
 */
public class ElasticsearchAsyncFunction extends RichAsyncFunction<Tuple4<String, String, String, Integer>, Tuple5<String, String, String, Integer,Integer>> {


    private String host;

    private Integer port;

    private String user;

    private String password;

    private String index;

    private String type;

    public ElasticsearchAsyncFunction(String host, Integer port, String user, String password, String index, String type) {
        this.host = host;
        this.port = port;
        this.user = user;
        this.password = password;
        this.index = index;
        this.type = type;
    }

    private RestHighLevelClient restHighLevelClient;

    private Cache<String,Integer> cache;

    /**
     * 和ES建立连接
     * @param parameters
     */
    @Override
    public void open(Configuration parameters){

        //ES Client
        CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));
        restHighLevelClient = new RestHighLevelClient(
                RestClient
                        .builder(new HttpHost(host, port))
                        .setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider)));

        //初始化缓存
        cache=CacheBuilder.newBuilder().maximumSize(2).expireAfterAccess(5, TimeUnit.MINUTES).build();
    }

    /**
     * 关闭连接
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        restHighLevelClient.close();
    }


    /**
     * 异步调用
     * @param input
     * @param resultFuture
     */
    @Override
    public void asyncInvoke(Tuple4<String, String, String, Integer> input, ResultFuture<Tuple5<String, String, String, Integer, Integer>> resultFuture) {

        // 1、先从缓存中取
        Integer cachedValue = cache.getIfPresent(input.f0);
        if(cachedValue !=null){
            System.out.println("从缓存中获取到维度数据: key="+input.f0+",value="+cachedValue);
            resultFuture.complete(Collections.singleton(new Tuple5<>(input.f0,input.f1,input.f2,input.f3,cachedValue)));

        // 2、缓存中没有,则从外部存储获取
        }else {
            searchFromES(input,resultFuture);
        }
    }

    /**
     * 当缓存中没有数据时,从外部存储ES中获取
     * @param input
     * @param resultFuture
     */
    private void searchFromES(Tuple4<String, String, String, Integer> input, ResultFuture<Tuple5<String, String, String, Integer, Integer>> resultFuture){

        // 1、构造输出对象
        Tuple5<String, String, String, Integer, Integer> output = new Tuple5<>();
        output.f0=input.f0;
        output.f1=input.f1;
        output.f2=input.f2;
        output.f3=input.f3;

        // 2、待查询的Key
        String dimKey = input.f0;

        // 3、构造Ids Query
        SearchRequest searchRequest = new SearchRequest();
        searchRequest.indices(index);
        searchRequest.types(type);
        searchRequest.source(SearchSourceBuilder.searchSource().query(QueryBuilders.idsQuery().addIds(dimKey)));

        // 4、用异步客户端查询数据
        restHighLevelClient.searchAsync(searchRequest, new ActionListener<SearchResponse>() {

            //成功响应时处理
            @Override
            public void onResponse(SearchResponse searchResponse) {
                SearchHit[] searchHits = searchResponse.getHits().getHits();
                if(searchHits.length >0 ){
                    JSONObject obj = JSON.parseObject(searchHits[0].getSourceAsString());
                    Integer dimValue=obj.getInteger("age");
                    output.f4=dimValue;
                    cache.put(dimKey,dimValue);
                    System.out.println("将维度数据放入缓存: key="+dimKey+",value="+dimValue);
                }

                resultFuture.complete(Collections.singleton(output));
            }

            //响应失败时处理
            @Override
            public void onFailure(Exception e) {
                output.f4=null;
                resultFuture.complete(Collections.singleton(output));
            }
        });

    }

    //超时时处理
    @Override
    public void timeout(Tuple4<String, String, String, Integer> input, ResultFuture<Tuple5<String, String, String, Integer, Integer>> resultFuture) {
        searchFromES(input,resultFuture);
    }
}
  • 注意
  1. 此方式不受限于内存,可支持数据量较大的维度数据。
  2. 需要外部存储支持。
  3. 应尽量减少对外部存储访问。

Broadcast State

实现方式

  1. 将维度数据发送到Kafka作为流S1。事实数据是流S2。
  2. 定义状态描述符MapStateDescriptor,如descriptor。
  3. 结合状态描述符,将S1广播出去,如S1.broadcast(descriptor),形成广播流(BroadcastStream) B1。
  4. 事实流S2和广播流B1连接,形成连接后的流BroadcastConnectedStream BC。
  5. 基于BC流,在KeyedBroadcastProcessFunction/BroadcastProcessFunction中实现Join的逻辑处理。

代码示例

代码语言:javascript
复制
package com.bigdata.flink;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.*;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 *  基于Broadcast State 动态更新配置以实现实时过滤数据并增加字段
 */
@Slf4j
public class TestBroadcastState {
    public static void main(String[] args) throws Exception{

        //1、解析命令行参数
        ParameterTool fromArgs = ParameterTool.fromArgs(args);
        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("applicationProperties"));

        //checkpoint配置
        String checkpointDirectory = parameterTool.getRequired("checkpointDirectory");
        long checkpointSecondInterval = parameterTool.getLong("checkpointSecondInterval");

        //事件流配置
        String fromKafkaBootstrapServers = parameterTool.getRequired("fromKafka.bootstrap.servers");
        String fromKafkaGroupID = parameterTool.getRequired("fromKafka.group.id");
        String fromKafkaTopic = parameterTool.getRequired("fromKafka.topic");

        //配置流配置
        String fromMysqlHost = parameterTool.getRequired("fromMysql.host");
        int fromMysqlPort = parameterTool.getInt("fromMysql.port");
        String fromMysqlDB = parameterTool.getRequired("fromMysql.db");
        String fromMysqlUser = parameterTool.getRequired("fromMysql.user");
        String fromMysqlPasswd = parameterTool.getRequired("fromMysql.passwd");
        int fromMysqlSecondInterval = parameterTool.getInt("fromMysql.secondInterval");

        //2、配置运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置StateBackend
        env.setStateBackend((StateBackend) new FsStateBackend(checkpointDirectory, true));
        //设置Checkpoint
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setCheckpointInterval(checkpointSecondInterval * 1000);
        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //3、Kafka事件流
        //从Kafka中获取事件数据
        //数据:某个用户在某个时刻浏览或点击了某个商品,如
        //{"userID": "user_3", "eventTime": "2019-08-17 12:19:47", "eventType": "browse", "productID": 1}
        Properties kafkaProperties = new Properties();
        kafkaProperties.put("bootstrap.servers",fromKafkaBootstrapServers);
        kafkaProperties.put("group.id",fromKafkaGroupID);

        FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(fromKafkaTopic, new SimpleStringSchema(), kafkaProperties);
        kafkaConsumer.setStartFromLatest();
        DataStream<String> kafkaSource = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id-kafka-source");

        SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> eventStream = kafkaSource.process(new ProcessFunction<String, Tuple4<String, String, String, Integer>>() {
            @Override
            public void processElement(String value, Context ctx, Collector<Tuple4<String, String, String, Integer>> out){
                try {
                    JSONObject obj = JSON.parseObject(value);
                    String userID = obj.getString("userID");
                    String eventTime = obj.getString("eventTime");
                    String eventType = obj.getString("eventType");
                    int productID = obj.getIntValue("productID");
                    out.collect(new Tuple4<>(userID, eventTime, eventType, productID));
                }catch (Exception ex){
                    log.warn("异常数据:{}",value,ex);
                }
            }
        });

        //4、Mysql配置流
        //自定义Mysql Source,周期性地从Mysql中获取配置,并广播出去
        //数据: 用户ID,用户姓名,用户年龄
        DataStreamSource<HashMap<String, Tuple2<String, Integer>>> configStream = env.addSource(new MysqlSource(fromMysqlHost, fromMysqlPort, fromMysqlDB, fromMysqlUser, fromMysqlPasswd, fromMysqlSecondInterval));

        /*
          (1) 先建立MapStateDescriptor
          MapStateDescriptor定义了状态的名称、Key和Value的类型。
          这里,MapStateDescriptor中,key是Void类型,value是Map<String, Tuple2<String,Int>>类型。
         */
        MapStateDescriptor<Void, Map<String, Tuple2<String,Integer>>> configDescriptor = new MapStateDescriptor<>("config", Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));

        /*
          (2) 将配置流广播,形成BroadcastStream
         */
        BroadcastStream<HashMap<String, Tuple2<String, Integer>>> broadcastConfigStream = configStream.broadcast(configDescriptor);

        //5、事件流和广播的配置流连接,形成BroadcastConnectedStream
        BroadcastConnectedStream<Tuple4<String, String, String, Integer>, HashMap<String, Tuple2<String, Integer>>> connectedStream = eventStream.connect(broadcastConfigStream);

        //6、对BroadcastConnectedStream应用process方法,根据配置(规则)处理事件
        SingleOutputStreamOperator<Tuple6<String, String, String, Integer, String, Integer>> resultStream = connectedStream.process(new CustomBroadcastProcessFunction());

        //7、输出结果
        resultStream.print();

        //8、生成JobGraph,并开始执行
        env.execute();

    }

    /**
     * 自定义BroadcastProcessFunction
     * 当事件流中的用户ID在配置中出现时,才对该事件处理, 并在事件中补全用户的基础信息
     * Tuple4<String, String, String, Integer>: 第一个流(事件流)的数据类型
     * HashMap<String, Tuple2<String, Integer>>: 第二个流(配置流)的数据类型
     * Tuple6<String, String, String, Integer,String, Integer>: 返回的数据类型
     */
    static class CustomBroadcastProcessFunction extends BroadcastProcessFunction<Tuple4<String, String, String, Integer>, HashMap<String, Tuple2<String, Integer>>, Tuple6<String, String, String, Integer, String, Integer>>{

        /**定义MapStateDescriptor*/
        MapStateDescriptor<Void, Map<String, Tuple2<String,Integer>>> configDescriptor = new MapStateDescriptor<>("config", Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));

        /**
         * 读取状态,并基于状态,处理事件流中的数据
         * 在这里,从上下文中获取状态,基于获取的状态,对事件流中的数据进行处理
         * @param value 事件流中的数据
         * @param ctx 上下文
         * @param out 输出零条或多条数据
         * @throws Exception
         */
        @Override
        public void processElement(Tuple4<String, String, String, Integer> value, ReadOnlyContext ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception {

            //事件流中的用户ID
            String userID = value.f0;

            //获取状态
            ReadOnlyBroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(configDescriptor);
            Map<String, Tuple2<String, Integer>> broadcastStateUserInfo = broadcastState.get(null);

            //配置中有此用户,则在该事件中添加用户的userName、userAge字段。
            //配置中没有此用户,则丢弃
            Tuple2<String, Integer> userInfo = broadcastStateUserInfo.get(userID);
            if(userInfo!=null){
                out.collect(new Tuple6<>(value.f0,value.f1,value.f2,value.f3,userInfo.f0,userInfo.f1));
            }

        }

        /**
         * 处理广播流中的每一条数据,并更新状态
         * @param value 广播流中的数据
         * @param ctx 上下文
         * @param out 输出零条或多条数据
         * @throws Exception
         */
        @Override
        public void processBroadcastElement(HashMap<String, Tuple2<String, Integer>> value, Context ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception {

            //获取状态
            BroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(configDescriptor);

            //清空状态
            broadcastState.clear();

            //更新状态
            broadcastState.put(null,value);

        }
    }



    /**
     * 自定义Mysql Source,每隔 secondInterval 秒从Mysql中获取一次配置
     */
    static class MysqlSource extends RichSourceFunction<HashMap<String, Tuple2<String, Integer>>> {

        private String host;
        private Integer port;
        private String db;
        private String user;
        private String passwd;
        private Integer secondInterval;

        private volatile boolean isRunning = true;

        private Connection connection;
        private PreparedStatement preparedStatement;

        MysqlSource(String host, Integer port, String db, String user, String passwd,Integer secondInterval) {
            this.host = host;
            this.port = port;
            this.db = db;
            this.user = user;
            this.passwd = passwd;
            this.secondInterval = secondInterval;
        }

        /**
         * 开始时, 在open()方法中建立连接
         * @param parameters
         * @throws Exception
         */
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            Class.forName("com.mysql.jdbc.Driver");
            connection= DriverManager.getConnection("jdbc:mysql://"+host+":"+port+"/"+db+"?useUnicode=true&characterEncoding=UTF-8", user, passwd);
            String sql="select userID,userName,userAge from user_info";
            preparedStatement=connection.prepareStatement(sql);
        }

        /**
         * 执行完,调用close()方法关系连接,释放资源
         * @throws Exception
         */
        @Override
        public void close() throws Exception {
            super.close();

            if(connection!=null){
                connection.close();
            }

            if(preparedStatement !=null){
                preparedStatement.close();
            }
        }

        /**
         * 调用run()方法获取数据
         * @param ctx
         */
        @Override
        public void run(SourceContext<HashMap<String, Tuple2<String, Integer>>> ctx) {
            try {
                while (isRunning){
                    HashMap<String, Tuple2<String, Integer>> output = new HashMap<>();
                    ResultSet resultSet = preparedStatement.executeQuery();
                    while (resultSet.next()){
                        String userID = resultSet.getString("userID");
                        String userName = resultSet.getString("userName");
                        int userAge = resultSet.getInt("userAge");
                        output.put(userID,new Tuple2<>(userName,userAge));
                    }
                    ctx.collect(output);
                    //每隔多少秒执行一次查询
                    Thread.sleep(1000*secondInterval);
                }
            }catch (Exception ex){
                log.error("从Mysql获取配置异常...",ex);
            }


        }
        /**
         * 取消时,会调用此方法
         */
        @Override
        public void cancel() {
            isRunning = false;
        }
    }
}

  • 注意
  1. 需要将维度数据的变化转换成Kafka中的流。
  2. 维度的变化可实时感知。
  3. 维度数据保存在内存中,支持的数据量相对较小。

UDTF + LATERAL TABLE语法

实现方式

  1. 假设你用的是Flink SQL。首先,自定义UTDF, 继承TableFunction抽象类,实现open()、close()、eval()方法。
  2. 注册TableFunction。
  3. 在SQL中使用LATERAL TABLE语法和UDTF运行的结果进行关联。

代码示例-Flink Table API

代码语言:javascript
复制
package com.bigdata.flink.tableSqlTemporalTable;

import com.alibaba.fastjson.JSON;
import com.bigdata.flink.beans.table.ProductInfo;
import com.bigdata.flink.beans.table.UserBrowseLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;


/**
 * Summary:
 *  时态表(Temporal Table)
 */
@Slf4j
public class Test {
    public static void main(String[] args) throws Exception{

        args=new String[]{"--application","flink/src/main/java/com/bigdata/flink/tableSqlTemporalTable/application.properties"};

        //1、解析命令行参数
        ParameterTool fromArgs = ParameterTool.fromArgs(args);
        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("application"));

        //browse log
        String kafkaBootstrapServers = parameterTool.getRequired("kafkaBootstrapServers");
        String browseTopic = parameterTool.getRequired("browseTopic");
        String browseTopicGroupID = parameterTool.getRequired("browseTopicGroupID");

        //product history info
        String productInfoTopic = parameterTool.getRequired("productHistoryInfoTopic");
        String productInfoGroupID = parameterTool.getRequired("productHistoryInfoGroupID");

        //2、设置运行环境
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
        streamEnv.setParallelism(1);

        //3、注册Kafka数据源
        //注意: 为了在北京时间和时间戳之间有直观的认识,这里的UserBrowseLog中增加了一个字段eventTimeTimestamp作为eventTime的时间戳
        Properties browseProperties = new Properties();
        browseProperties.put("bootstrap.servers",kafkaBootstrapServers);
        browseProperties.put("group.id",browseTopicGroupID);
        DataStream<UserBrowseLog> browseStream=streamEnv
                .addSource(new FlinkKafkaConsumer010<>(browseTopic, new SimpleStringSchema(), browseProperties))
                .process(new BrowseKafkaProcessFunction())
                .assignTimestampsAndWatermarks(new BrowseTimestampExtractor(Time.seconds(0)));

        tableEnv.registerDataStream("browse",browseStream,"userID,eventTime,eventTimeTimestamp,eventType,productID,productPrice,browseRowtime.rowtime");
        //tableEnv.toAppendStream(tableEnv.scan("browse"),Row.class).print();

        //4、注册时态表(Temporal Table)
        //注意: 为了在北京时间和时间戳之间有直观的认识,这里的ProductInfo中增加了一个字段updatedAtTimestamp作为updatedAt的时间戳
        Properties productInfoProperties = new Properties();
        productInfoProperties.put("bootstrap.servers",kafkaBootstrapServers);
        productInfoProperties.put("group.id",productInfoGroupID);
        DataStream<ProductInfo> productInfoStream=streamEnv
                .addSource(new FlinkKafkaConsumer010<>(productInfoTopic, new SimpleStringSchema(), productInfoProperties))
                .process(new ProductInfoProcessFunction())
                .assignTimestampsAndWatermarks(new ProductInfoTimestampExtractor(Time.seconds(0)));

        tableEnv.registerDataStream("productInfo",productInfoStream, "productID,productName,productCategory,updatedAt,updatedAtTimestamp,productInfoRowtime.rowtime");
        //设置Temporal Table的时间属性和主键
        TemporalTableFunction productInfo = tableEnv.scan("productInfo").createTemporalTableFunction("productInfoRowtime", "productID");
        //注册TableFunction
        tableEnv.registerFunction("productInfoFunc",productInfo);
        //tableEnv.toAppendStream(tableEnv.scan("productInfo"),Row.class).print();

        //5、运行SQL
        String sql = ""
                + "SELECT "
                + "browse.userID, "
                + "browse.eventTime, "
                + "browse.eventTimeTimestamp, "
                + "browse.eventType, "
                + "browse.productID, "
                + "browse.productPrice, "
                + "productInfo.productID, "
                + "productInfo.productName, "
                + "productInfo.productCategory, "
                + "productInfo.updatedAt, "
                + "productInfo.updatedAtTimestamp "
                + "FROM "
                + " browse, "
                + " LATERAL TABLE (productInfoFunc(browse.browseRowtime)) as productInfo "
                + "WHERE "
                + " browse.productID=productInfo.productID";

        Table table = tableEnv.sqlQuery(sql);
        tableEnv.toAppendStream(table,Row.class).print();

        //6、开始执行
        tableEnv.execute(Test.class.getSimpleName());


    }


    /**
     * 解析Kafka数据
     */
    static class BrowseKafkaProcessFunction extends ProcessFunction<String, UserBrowseLog> {
        @Override
        public void processElement(String value, Context ctx, Collector<UserBrowseLog> out) throws Exception {
            try {

                UserBrowseLog log = JSON.parseObject(value, UserBrowseLog.class);

                // 增加一个long类型的时间戳
                // 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
                DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                OffsetDateTime eventTime = LocalDateTime.parse(log.getEventTime(), format).atOffset(ZoneOffset.of("+08:00"));
                // 转换成毫秒时间戳
                long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
                log.setEventTimeTimestamp(eventTimeTimestamp);

                out.collect(log);
            }catch (Exception ex){
                log.error("解析Kafka数据异常...",ex);
            }
        }
    }

    /**
     * 提取时间戳生成水印
     */
    static class BrowseTimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<UserBrowseLog> {

        BrowseTimestampExtractor(Time maxOutOfOrderness) {
            super(maxOutOfOrderness);
        }

        @Override
        public long extractTimestamp(UserBrowseLog element) {
            return element.getEventTimeTimestamp();
        }
    }





    /**
     * 解析Kafka数据
     */
    static class ProductInfoProcessFunction extends ProcessFunction<String, ProductInfo> {
        @Override
        public void processElement(String value, Context ctx, Collector<ProductInfo> out) throws Exception {
            try {

                ProductInfo log = JSON.parseObject(value, ProductInfo.class);

                // 增加一个long类型的时间戳
                // 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
                DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                OffsetDateTime eventTime = LocalDateTime.parse(log.getUpdatedAt(), format).atOffset(ZoneOffset.of("+08:00"));
                // 转换成毫秒时间戳
                long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
                log.setUpdatedAtTimestamp(eventTimeTimestamp);

                out.collect(log);
            }catch (Exception ex){
                log.error("解析Kafka数据异常...",ex);
            }
        }
    }

    /**
     * 提取时间戳生成水印
     */
    static class ProductInfoTimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<ProductInfo> {

        ProductInfoTimestampExtractor(Time maxOutOfOrderness) {
            super(maxOutOfOrderness);
        }

        @Override
        public long extractTimestamp(ProductInfo element) {
            return element.getUpdatedAtTimestamp();
        }
    }

}

代码示例-Flink SQL

定义UDTF:

代码语言:javascript
复制
package com.bigdata.flink.dimJoin;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import redis.clients.jedis.Jedis;

/**
 * UDTF
 */
public class UDTFRedis extends TableFunction<Row> {

    private Jedis jedis;

    /**
     * 打开连接
     * @param context
     * @throws Exception
     */
    @Override
    public void open(FunctionContext context) throws Exception {
        jedis = new Jedis("localhost", 6379);
        jedis.select(0);
    }

    /**
     * 关闭连接
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        if (jedis != null) {
            jedis.close();
        }
    }

    /**
     * 从Redis中查找维度数据
     * @param key
     */
    public void eval(String key) {
        String value = jedis.get(key);
        if (value != null) {
            String[] valueSplit = value.split(",");
            Row row = new Row(2);
            row.setField(0, valueSplit[0]);
            row.setField(1, Integer.valueOf(valueSplit[1]));
            collector.collect(row);
        }
    }

    /**
     * 定义返回的数据类型,返回数据为userName,userAge,所以这里为String,Int。
     * @return
     */
    @Override
    public TypeInformation<Row> getResultType() {
        return new RowTypeInfo(Types.STRING, Types.INT);
    }
}

Kafka Join Redis-Dim

代码语言:javascript
复制
package com.bigdata.flink.dimJoin;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * Author: Wang Pei
 * Summary:
 * Kafka Join Redis-Dim
 */
public class KafkaJoinRedisDimWithUDTF {
    public static void main(String[] args) throws Exception {

        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);

        // Source DDL
        // Kafka数据: {"userID":"user_1","eventType":"click","eventTime":"2015-01-01 00:00:00"}
        String sourceDDL = ""
                + "create table source_kafka "
                + "( "
                + "    userID String, "
                + "    eventType String, "
                + "    eventTime String "
                + ") with ( "
                + "    'connector.type' = 'kafka', "
                + "    'connector.version' = '0.10', "
                + "    'connector.properties.bootstrap.servers' = 'kafka01:9092', "
                + "    'connector.properties.zookeeper.connect' = 'kafka01:2181', "
                + "    'connector.topic' = 'test_1', "
                + "    'connector.properties.group.id' = 'c1_test_1', "
                + "    'connector.startup-mode' = 'latest-offset', "
                + "    'format.type' = 'json' "
                + ")";
        tableEnv.sqlUpdate(sourceDDL);
        tableEnv.toAppendStream(tableEnv.from("source_kafka"), Row.class).print();

        // UDTF DDL
        // Redis中的数据 userID userName,userAge
        // 127.0.0.1:6379> get user_1
        // "name1,10"
        String udtfDDL = ""
                + "CREATE TEMPORARY FUNCTION "
                + "  IF NOT EXISTS UDTFRedis "
                + "  AS 'com.bigdata.flink.dimJoin.UDTFRedis'";
        tableEnv.sqlUpdate(udtfDDL);

        // Query
        // Left Join
        String execSQL = ""
                + "select "
                + " source_kafka.*,dim.* "
                + "from source_kafka "
                + "LEFT JOIN LATERAL TABLE(UDTFRedis(userID)) as dim (userName,userAge) ON TRUE";
        Table table = tableEnv.sqlQuery(execSQL);
        tableEnv.toAppendStream(table, Row.class).print();

        tableEnv.execute(KafkaJoinRedisDimWithUDTF.class.getSimpleName());
    }
}

  • 注意
  1. 需要定义UDTF和使用LATERAL TABLE语法。
  2. 不是很通用,如想用一个UDTF实现所有从Redis获取维度数据的场景,很难实现。
  3. 依赖外部存储,当数据变化时,可及时获取。

LookupableTableSource

实现方式

数据源实现LookupableTableSource接口。

在Flink SQL中直接注册Lookup表即可,在Flink Table API中需要注册LookupFunction 。

本质上,还是通过TableFunction来获取维度数据。

代码示例-Flink Table API

代码语言:javascript
复制
package com.bigdata.flink.tableSqlLookableTableSource;

import com.alibaba.fastjson.JSON;
import com.bigdata.flink.beans.table.UserBrowseLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.addons.hbase.HBaseTableSource;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.io.jdbc.JDBCLookupOptions;
import org.apache.flink.api.java.io.jdbc.JDBCOptions;
import org.apache.flink.api.java.io.jdbc.JDBCTableSource;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.hadoop.conf.Configuration;

import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

/**
 *  Lookup Table Source
 */
@Slf4j
public class Test {
    public static void main(String[] args) throws Exception{

        args=new String[]{"--application","flink/src/main/java/com/bigdata/flink/tableSqlLookableTableSource/application.properties"};

        //1、解析命令行参数
        ParameterTool fromArgs = ParameterTool.fromArgs(args);
        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("application"));

        String kafkaBootstrapServers = parameterTool.getRequired("kafkaBootstrapServers");
        String browseTopic = parameterTool.getRequired("browseTopic");
        String browseTopicGroupID = parameterTool.getRequired("browseTopicGroupID");

        String hbaseZookeeperQuorum = parameterTool.getRequired("hbaseZookeeperQuorum");
        String hbaseZnode = parameterTool.getRequired("hbaseZnode");
        String hbaseTable = parameterTool.getRequired("hbaseTable");

        String mysqlDBUrl = parameterTool.getRequired("mysqlDBUrl");
        String mysqlUser = parameterTool.getRequired("mysqlUser");
        String mysqlPwd = parameterTool.getRequired("mysqlPwd");
        String mysqlTable = parameterTool.getRequired("mysqlTable");

        //2、设置运行环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
        streamEnv.setParallelism(1);

        //3、注册Kafka数据源
        //自己造的测试数据,某个用户在某个时刻点击了某个商品,以及商品的价值,如下
        //{"userID": "user_1", "eventTime": "2016-01-01 10:02:00", "eventType": "browse", "productID": "product_1", "productPrice": 20}
        Properties browseProperties = new Properties();
        browseProperties.put("bootstrap.servers",kafkaBootstrapServers);
        browseProperties.put("group.id",browseTopicGroupID);
        DataStream<UserBrowseLog> browseStream=streamEnv
                .addSource(new FlinkKafkaConsumer010<>(browseTopic, new SimpleStringSchema(), browseProperties))
                .process(new BrowseKafkaProcessFunction());
        tableEnv.registerDataStream("kafka",browseStream,"userID,eventTime,eventTimeTimestamp,eventType,productID,productPrice");
        //tableEnv.toAppendStream(tableEnv.scan("kafka"),Row.class).print();

        //4、注册HBase数据源(Lookup Table Source)
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum);
        conf.set("zookeeper.znode.parent",hbaseZnode);
        HBaseTableSource hBaseTableSource = new HBaseTableSource(conf, hbaseTable);
        hBaseTableSource.setRowKey("uid",String.class);
        hBaseTableSource.addColumn("f1","name",String.class);
        hBaseTableSource.addColumn("f1","age",Integer.class);
        tableEnv.registerTableSource("hbase",hBaseTableSource);
        //注册TableFunction
        tableEnv.registerFunction("hbaseLookup", hBaseTableSource.getLookupFunction(new String[]{"uid"}));

        //5、注册Mysql数据源(Lookup Table Source)
        String[] mysqlFieldNames={"pid","productName","productCategory","updatedAt"};
        DataType[] mysqlFieldTypes={DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING()};
        TableSchema mysqlTableSchema = TableSchema.builder().fields(mysqlFieldNames, mysqlFieldTypes).build();
        JDBCOptions jdbcOptions = JDBCOptions.builder()
                .setDriverName("com.mysql.jdbc.Driver")
                .setDBUrl(mysqlDBUrl)
                .setUsername(mysqlUser)
                .setPassword(mysqlPwd)
                .setTableName(mysqlTable)
                .build();

        JDBCLookupOptions jdbcLookupOptions = JDBCLookupOptions.builder()
                .setCacheExpireMs(10 * 1000) //缓存有效期
                .setCacheMaxSize(10) //最大缓存数据条数
                .setMaxRetryTimes(3) //最大重试次数
                .build();

        JDBCTableSource jdbcTableSource = JDBCTableSource.builder()
                .setOptions(jdbcOptions)
                .setLookupOptions(jdbcLookupOptions)
                .setSchema(mysqlTableSchema)
                .build();
        tableEnv.registerTableSource("mysql",jdbcTableSource);
        //注册TableFunction
        tableEnv.registerFunction("mysqlLookup",jdbcTableSource.getLookupFunction(new String[]{"pid"}));


        //6、查询
        //根据userID, 从HBase表中获取用户基础信息
        //根据productID, 从Mysql表中获取产品基础信息
        String sql = ""
                + "SELECT "
                + "       userID, "
                + "       eventTime, "
                + "       eventType, "
                + "       productID, "
                + "       productPrice, "
                + "       f1.name AS userName, "
                + "       f1.age AS userAge, "
                + "       productName, "
                + "       productCategory "
                + "FROM "
                + "     kafka, "
                + "     LATERAL TABLE(hbaseLookup(userID)), "
                + "     LATERAL TABLE (mysqlLookup(productID))";

        tableEnv.toAppendStream(tableEnv.sqlQuery(sql),Row.class).print();

        //7、开始执行
        tableEnv.execute(Test.class.getSimpleName());
    }

    /**
     * 解析Kafka数据
     */
    static class BrowseKafkaProcessFunction extends ProcessFunction<String, UserBrowseLog> {
        @Override
        public void processElement(String value, Context ctx, Collector<UserBrowseLog> out) throws Exception {
            try {

                UserBrowseLog log = JSON.parseObject(value, UserBrowseLog.class);

                // 增加一个long类型的时间戳
                // 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
                DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                OffsetDateTime eventTime = LocalDateTime.parse(log.getEventTime(), format).atOffset(ZoneOffset.of("+08:00"));
                // 转换成毫秒时间戳
                long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
                log.setEventTimeTimestamp(eventTimeTimestamp);

                out.collect(log);
            }catch (Exception ex){
                log.error("解析Kafka数据异常...",ex);
            }
        }
    }

}

代码示例-Flink SQL

代码语言:javascript
复制
package com.bigdata.flink.dimJoin;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * Author: Wang Pei
 * Summary:
 *  Kafka Join Mysql-Dim
 */
public class KafkaJoinMysqlDim {
    public static void main(String[] args) throws Exception {

        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);

        // Source DDL
        // Kafka数据: {"userID":"user_1","eventType":"click","eventTime":"2015-01-01 00:00:00"}
        String sourceDDL = ""
                + "create table source_kafka "
                + "( "
                + "    userID STRING, "
                + "    eventType STRING, "
                + "    eventTime STRING, "
                + "    proctime AS PROCTIME() "
                + ") with ( "
                + "    'connector.type' = 'kafka', "
                + "    'connector.version' = '0.10', "
                + "    'connector.properties.bootstrap.servers' = 'kafka01:9092', "
                + "    'connector.properties.zookeeper.connect' = 'kafka01:2181', "
                + "    'connector.topic' = 'test_1', "
                + "    'connector.properties.group.id' = 'c1_test_1', "
                + "    'connector.startup-mode' = 'latest-offset', "
                + "    'format.type' = 'json' "
                + ")";
        tableEnv.sqlUpdate(sourceDDL);
        //tableEnv.toAppendStream(tableEnv.from("source_kafka"), Row.class).print();

        // Dim DDL
        // Mysql维度数据
        // mysql> select * from t_user_info limit 1;
        // +--------+----------+---------+
        // | userID | userName | userAge |
        // +--------+----------+---------+
        // | user_1 | name1    |      10 |
        // +--------+----------+---------+
        String dimDDL = ""
                + "CREATE TABLE dim_mysql ( "
                + "    userID STRING, "
                + "    userName STRING, "
                + "    userAge INT "
                + ") WITH ( "
                + "    'connector.type' = 'jdbc', "
                + "    'connector.url' = 'jdbc:mysql://localhost:3306/bigdata', "
                + "    'connector.table' = 't_user_info', "
                + "    'connector.driver' = 'com.mysql.jdbc.Driver', "
                + "    'connector.username' = '****', "
                + "    'connector.password' = '******' "
                + ")";
        tableEnv.sqlUpdate(dimDDL);

        // Query
        // Left Join
        String execSQL = ""
                + "SELECT "
                + "  kafka.*,mysql.userName,mysql.userAge "
                + "FROM "
                + "  source_kafka as kafka"
                + "  LEFT JOIN dim_mysql FOR SYSTEM_TIME AS OF kafka.proctime AS mysql "
                + "  ON kafka.userID = mysql.userID";
        Table table = tableEnv.sqlQuery(execSQL);
        tableEnv.toAppendStream(table, Row.class).print();

        tableEnv.execute(KafkaJoinMysqlDim.class.getSimpleName());

    }
}

  • 注意
  1. 需要实现LookupableTableSource接口。
  2. 比较通用。
  3. 依赖外部存储,当数据变化时,可及时获取。
  4. 目前仅支持Blink Planner。

你好,我是王知无,一个大数据领域的硬核原创作者。 做过后端架构、数据中间件、数据平台&架构&、算法工程化。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-11-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 定时加载维度数据
    • 实现方式
      • 代码示例
      • Distributed Cache(分布式缓存)
        • 实现方式
          • 代码示例
          • Distributed Cache(分布式缓存)
            • 实现方式
              • 代码示例
              • Async IO(异步IO)
                • 实现方式
                  • 代码示例
                  • Broadcast State
                    • 实现方式
                      • 代码示例
                      • UDTF + LATERAL TABLE语法
                        • 实现方式
                          • 代码示例-Flink Table API
                            • 代码示例-Flink SQL
                            • LookupableTableSource
                              • 实现方式
                                • 代码示例-Flink Table API
                                  • 代码示例-Flink SQL
                                  相关产品与服务
                                  对象存储
                                  对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
                                  领券
                                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档