Apache Kylin™是一个开源的分布式分析引擎,提供Hadoop/Spark之上的SQL查询接口及多维分析(OLAP)能力以支持超大规模数据,最初由eBay Inc. 开发并贡献至开源社区。它能在亚秒内查询巨大的Hive表。
image.png
image.png
基于Apache Kylin认证RESTFUL服务。支持的参数:
user : 用户名
password : 密码
ssl: true或false。 默认为flas;如果为true,所有的服务调用都会使用https。
jdbc:kylin://<hostname>:<port>/<kylin_project_name>
如果“ssl”为true,“port”应该是Kylin server的HTTPS端口。 如果“port”未被指定,driver会使用默认的端口:HTTP 80,HTTPS 443。 必须指定“kylin_project_name”并且用户需要确保它在Kylin server上存在。
Driver driver = (Driver) Class.forName("org.apache.kylin.jdbc.Driver").newInstance();
Properties info = new Properties();
info.put("user", "ADMIN");
info.put("password", "KYLIN");
Connection conn = driver.connect("jdbc:kylin://localhost:7070/kylin_project_name", info);
Statement state = conn.createStatement();
ResultSet resultSet = state.executeQuery("select * from test_table");
while (resultSet.next()) {
assertEquals("foo", resultSet.getString(1));
assertEquals("bar", resultSet.getString(2));
assertEquals("tool", resultSet.getString(3));
}
支持的PreparedStatement参数: setString setInt setShort setLong setFloat setDouble setBoolean setByte setDate setTime setTimestamp
Driver driver = (Driver) Class.forName("org.apache.kylin.jdbc.Driver").newInstance();
Properties info = new Properties();
info.put("user", "ADMIN");
info.put("password", "KYLIN");
Connection conn = driver.connect("jdbc:kylin://localhost:7070/kylin_project_name", info);
PreparedStatement state = conn.prepareStatement("select * from test_table where id=?");
state.setInt(1, 10);
ResultSet resultSet = state.executeQuery();
while (resultSet.next()) {
assertEquals("foo", resultSet.getString(1));
assertEquals("bar", resultSet.getString(2));
assertEquals("tool", resultSet.getString(3));
}
Kylin jdbc driver支持元数据列表方法: 通过sql模式过滤器(比如 %)列出catalog、schema、table和column。
Driver driver = (Driver) Class.forName("org.apache.kylin.jdbc.Driver").newInstance();
Properties info = new Properties();
info.put("user", "ADMIN");
info.put("password", "KYLIN");
Connection conn = driver.connect("jdbc:kylin://localhost:7070/kylin_project_name", info);
Statement state = conn.createStatement();
ResultSet resultSet = state.executeQuery("select * from test_table");
ResultSet tables = conn.getMetaData().getTables(null, null, "dummy", null);
while (tables.next()) {
for (int i = 0; i < 10; i++) {
assertEquals("dummy", tables.getString(i + 1));
}
}
JDBC方式在开发使用中十分不便,而如果能封装为Spring 提供的DataSource方式,使用过程中就会便捷很多。
创建SqlProperties,封装jdbc连接的参数
@Data
public class KylinSqlProperties {
private static final String DEFAULT_DRIVER_CLASS_NAME = "org.apache.kylin.jdbc.Driver";
private static final int DEFAULT_POOL_SIZE = 10;
private static final Long DEFAULT_MAX_WAIT_TIME = 10000L;
/**
* 用户名
*/
private String userName;
/**
* 密码
*/
private String password;
/**
* 是否加密
*/
private boolean decrypt;
/**
* 主库连接地址
*/
private String connectionUrl;
/**
* 最长等待连接时间
*/
private long maxWaitTime = DEFAULT_MAX_WAIT_TIME;
private int poolSize = DEFAULT_POOL_SIZE;
private String driverClassName = DEFAULT_DRIVER_CLASS_NAME;
}
实现 DataSource 接口,创建连接池
@Slf4j
public class KylinDataSource implements DataSource {
private LinkedList<Connection> connectionPoolList = new LinkedList<>();
private long maxWaitTime;
public KylinDataSource(KylinSqlProperties sqlProperties) {
try {
this.maxWaitTime = sqlProperties.getMaxWaitTime();
Driver driverManager = (Driver) Class.forName(sqlProperties.getDriverClassName())
.newInstance();
Properties info = new Properties();
info.put("user", sqlProperties.getUserName());
info.put("password", sqlProperties.getPassword());
for (int i = 0; i < sqlProperties.getPoolSize(); i++) {
Connection connection = driverManager
.connect(sqlProperties.getConnectionUrl(), info);
connectionPoolList.add(ConnectionProxy.getProxy(connection, connectionPoolList));
}
log.info("PrestoDataSource has initialized {} size connection pool",
connectionPoolList.size());
} catch (Exception e) {
log.error("kylinDataSource initialize error, ex: ", e);
}
}
@Override
public Connection getConnection() throws SQLException {
synchronized (connectionPoolList) {
if (connectionPoolList.size() <= 0) {
try {
connectionPoolList.wait(maxWaitTime);
} catch (InterruptedException e) {
throw new SQLException("getConnection timeout..." + e.getMessage());
}
}
return connectionPoolList.removeFirst();
}
}
@Override
public Connection getConnection(String username, String password) throws SQLException {
return getConnection();
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
throw new RuntimeException("Unsupport operation.");
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return DataSource.class.equals(iface);
}
@Override
public PrintWriter getLogWriter() throws SQLException {
throw new RuntimeException("Unsupport operation.");
}
@Override
public void setLogWriter(PrintWriter out) throws SQLException {
}
@Override
public void setLoginTimeout(int seconds) throws SQLException {
throw new RuntimeException("Unsupport operation.");
}
@Override
public int getLoginTimeout() throws SQLException {
return 0;
}
@Override
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
return null;
}
static class ConnectionProxy implements InvocationHandler {
private Object obj;
private LinkedList<Connection> pool;
private String DEFAULT_CLOSE_METHOD = "close";
private ConnectionProxy(Object obj, LinkedList<Connection> pool) {
this.obj = obj;
this.pool = pool;
}
public static Connection getProxy(Object o, LinkedList<Connection> pool) {
Object proxed = Proxy
.newProxyInstance(o.getClass().getClassLoader(), new Class[]{Connection.class},
new ConnectionProxy(o, pool));
return (Connection) proxed;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.getName().equals(DEFAULT_CLOSE_METHOD)) {
synchronized (pool) {
pool.add((Connection) proxy);
pool.notify();
}
return null;
} else {
return method.invoke(obj, args);
}
}
}
创建JdbcPoolConfiguration类,注册template bean
@Slf4j
@Configuration
@Component
public class KylinJdbcPoolConfiguration implements BeanFactoryPostProcessor, EnvironmentAware {
private ConfigurableEnvironment environment;
@Value("${kylin.decrypt}")
private boolean decrypt = false;
private final static String prefixName = "kylin";
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
KylinSqlProperties properties = new KylinSqlProperties();
properties.setUserName("xxxxx");
properties.setPassword("xxxx");
properties.setConnectionUrl("xxxx");
properties.setDecrypt(decrypt);
createDataSourceBean(beanFactory, properties);
}
public void createDataSourceBean(ConfigurableListableBeanFactory beanFactory,
KylinSqlProperties sqlProperties) {
DataSource baseDataSource = new KylinDataSource(sqlProperties);
register(beanFactory, new JdbcTemplate(baseDataSource), prefixName + "JdbcTemplateFactory", prefixName);
}
private void register(ConfigurableListableBeanFactory beanFactory, Object bean, String name,
String alias) {
beanFactory.registerSingleton(name, bean);
if (!beanFactory.containsSingleton(alias)) {
beanFactory.registerAlias(name, alias);
}
}
@Override
public void setEnvironment(Environment environment) {
this.environment = (ConfigurableEnvironment) environment;
}
}
RowMapper实现
public class CommonBeanPropertyRowMapper<T> implements RowMapper<T> {
protected final Log logger = LogFactory.getLog(this.getClass());
private Class<T> mappedClass;
private boolean checkFullyPopulated = false;
private boolean primitivesDefaultedForNullValue = false;
private ConversionService conversionService = DefaultConversionService.getSharedInstance();
private Map<String, PropertyDescriptor> mappedFields;
private Set<String> mappedProperties;
public CommonBeanPropertyRowMapper() {
}
public CommonBeanPropertyRowMapper(Class<T> mappedClass) throws Exception {
this.initialize(mappedClass);
}
public CommonBeanPropertyRowMapper(Class<T> mappedClass, boolean checkFullyPopulated)
throws Exception {
this.initialize(mappedClass);
this.checkFullyPopulated = checkFullyPopulated;
}
public void setMappedClass(Class<T> mappedClass) throws Exception {
if (this.mappedClass == null) {
this.initialize(mappedClass);
} else if (this.mappedClass != mappedClass) {
throw new InvalidDataAccessApiUsageException(
"The mapped class can not be reassigned to map to " + mappedClass
+ " since it is already providing mapping for " + this.mappedClass);
}
}
public final Class<T> getMappedClass() {
return this.mappedClass;
}
public void setCheckFullyPopulated(boolean checkFullyPopulated) {
this.checkFullyPopulated = checkFullyPopulated;
}
public boolean isCheckFullyPopulated() {
return this.checkFullyPopulated;
}
public void setPrimitivesDefaultedForNullValue(boolean primitivesDefaultedForNullValue) {
this.primitivesDefaultedForNullValue = primitivesDefaultedForNullValue;
}
public boolean isPrimitivesDefaultedForNullValue() {
return this.primitivesDefaultedForNullValue;
}
public void setConversionService(ConversionService conversionService) {
this.conversionService = conversionService;
}
public ConversionService getConversionService() {
return this.conversionService;
}
protected void initialize(Class<T> mappedClass) throws Exception {
this.mappedClass = mappedClass;
this.mappedFields = new HashMap();
this.mappedProperties = new HashSet();
PropertyDescriptor[] pds = BeanUtils.getPropertyDescriptors(mappedClass);
PropertyDescriptor[] var3 = pds;
int var4 = pds.length;
for (int var5 = 0; var5 < var4; ++var5) {
PropertyDescriptor pd = var3[var5];
if (pd.getWriteMethod() != null) {
Field field = mappedClass.getDeclaredField(pd.getName());
SerializedName annotation = field.getAnnotation(SerializedName.class);
if (annotation != null) {
this.mappedFields.put(annotation.value(), pd);
} else {
this.mappedFields.put(this.lowerCaseName(pd.getName()), pd);
String underscoredName = this.underscoreName(pd.getName());
if (!this.lowerCaseName(pd.getName()).equals(underscoredName)) {
this.mappedFields.put(underscoredName, pd);
}
}
this.mappedProperties.add(pd.getName());
}
}
}
protected String underscoreName(String name) {
if (!StringUtils.hasLength(name)) {
return "";
} else {
StringBuilder result = new StringBuilder();
result.append(this.lowerCaseName(name.substring(0, 1)));
for (int i = 1; i < name.length(); ++i) {
String s = name.substring(i, i + 1);
String slc = this.lowerCaseName(s);
if (!s.equals(slc)) {
result.append("_").append(slc);
} else {
result.append(s);
}
}
return result.toString();
}
}
protected String lowerCaseName(String name) {
return name.toLowerCase(Locale.US);
}
@Override
public T mapRow(ResultSet rs, int rowNumber) throws SQLException {
Assert.state(this.mappedClass != null, "Mapped class was not specified");
T mappedObject = BeanUtils.instantiateClass(this.mappedClass);
BeanWrapper bw = PropertyAccessorFactory.forBeanPropertyAccess(mappedObject);
this.initBeanWrapper(bw);
ResultSetMetaData rsmd = rs.getMetaData();
int columnCount = rsmd.getColumnCount();
HashSet populatedProperties = this.isCheckFullyPopulated() ? new HashSet() : null;
for (int index = 1; index <= columnCount; ++index) {
String column = JdbcUtils.lookupColumnName(rsmd, index);
String field = this.lowerCaseName(column.replaceAll(" ", ""));
PropertyDescriptor pd = (PropertyDescriptor) this.mappedFields.get(field);
if (pd == null) {
if (rowNumber == 0 && this.logger.isDebugEnabled()) {
this.logger.debug(
"No property found for column \'" + column + "\' mapped to field \'" + field + "\'");
}
} else {
try {
Object ex = this.getColumnValue(rs, index, pd);
if (rowNumber == 0 && this.logger.isDebugEnabled()) {
this.logger.debug(
"Mapping column \'" + column + "\' to property \'" + pd.getName() + "\' of type \'"
+ ClassUtils
.getQualifiedName(pd.getPropertyType()) + "\'");
}
try {
bw.setPropertyValue(pd.getName(), ex);
} catch (TypeMismatchException var14) {
if (ex != null || !this.primitivesDefaultedForNullValue) {
throw var14;
}
if (this.logger.isDebugEnabled()) {
this.logger.debug(
"Intercepted TypeMismatchException for row " + rowNumber + " and column \'"
+ column + "\' with null value when setting property \'" + pd.getName()
+ "\' of type \'" + ClassUtils.getQualifiedName(pd.getPropertyType())
+ "\' on object: " + mappedObject, var14);
}
}
if (populatedProperties != null) {
populatedProperties.add(pd.getName());
}
} catch (NotWritablePropertyException var15) {
throw new DataRetrievalFailureException(
"Unable to map column \'" + column + "\' to property \'" + pd.getName() + "\'",
var15);
}
}
}
if (populatedProperties != null && !populatedProperties.equals(this.mappedProperties)) {
throw new InvalidDataAccessApiUsageException(
"Given ResultSet does not contain all fields necessary to populate object of class ["
+ this.mappedClass.getName() + "]: " + this.mappedProperties);
} else {
return mappedObject;
}
}
protected void initBeanWrapper(BeanWrapper bw) {
ConversionService cs = this.getConversionService();
if (cs != null) {
bw.setConversionService(cs);
}
}
protected Object getColumnValue(ResultSet rs, int index, PropertyDescriptor pd)
throws SQLException {
return JdbcUtils.getResultSetValue(rs, index, pd.getPropertyType());
}
public static <T> org.springframework.jdbc.core.BeanPropertyRowMapper<T> newInstance(
Class<T> mappedClass) {
return new org.springframework.jdbc.core.BeanPropertyRowMapper(mappedClass);
}
}
RowMapper子类
public class RowMapper<T> extends CommonBeanPropertyRowMapper<T> {
private List<MapperPlugin> mapperPlugins;
private RowMapper(Class<T> tClass, List<MapperPlugin> mapperPlugins) throws Exception {
super(tClass);
this.mapperPlugins = mapperPlugins;
}
@Override
protected Object getColumnValue(ResultSet rs, int index, PropertyDescriptor pd)
throws SQLException {
Object object = rs.getObject(index);
return mapperPlugins.stream()
.filter(mapperPlugin -> mapperPlugin.test(pd))
.map(mapperPlugin -> mapperPlugin.getColumnValue(object, pd))
.findFirst()
.orElse(super.getColumnValue(rs, index, pd));
}
public static <T> RowMapper<T> getDefault(Class<T> tClass) {
return RowMapper.<T>builder().tClass(tClass)
.mapperPlugins(JSONObjectPlugin)
.mapperPlugins(ListPlugin)
.mapperPlugins(SetPlugin)
.mapperPlugins(MapPlugin)
.mapperPlugins(EnumPlugin)
.mapperPlugins(JsonPlugin)
.build();
}
public static <T> RowMapper<T> withDefault(Class<T> tClass, MapperPlugin... mapperPlugins) {
RhllorRowMapperBuilder<T> builder = RowMapper.<T>builder().tClass(tClass);
for (final MapperPlugin mapperPlugin : mapperPlugins) {
builder.mapperPlugins(mapperPlugin);
}
return builder
.mapperPlugins(JSONObjectPlugin)
.mapperPlugins(ListPlugin)
.mapperPlugins(SetPlugin)
.mapperPlugins(MapPlugin)
.mapperPlugins(EnumPlugin)
.mapperPlugins(JsonPlugin)
.build();
}
public static <T> RowMapper.RhllorRowMapperBuilder<T> builder() {
return new RowMapper.RhllorRowMapperBuilder<>();
}
public static class RhllorRowMapperBuilder<T> {
private Class<T> tClass;
private ArrayList<MapperPlugin> mapperPlugins;
RhllorRowMapperBuilder() {
}
public RowMapper.RhllorRowMapperBuilder<T> tClass(Class<T> tClass) {
this.tClass = tClass;
return this;
}
public RowMapper.RhllorRowMapperBuilder<T> mapperPlugins(MapperPlugin mapperPlugin) {
if (this.mapperPlugins == null) {
this.mapperPlugins = new ArrayList();
}
this.mapperPlugins.add(mapperPlugin);
return this;
}
public RowMapper<T> build() {
List<MapperPlugin> mapperPlugins;
switch (this.mapperPlugins == null ? 0 : this.mapperPlugins.size()) {
case 0:
mapperPlugins = Collections.emptyList();
break;
case 1:
mapperPlugins = Collections.singletonList(this.mapperPlugins.get(0));
break;
default:
mapperPlugins = Collections.unmodifiableList(new ArrayList<>(this.mapperPlugins));
}
try {
return new RowMapper<>(this.tClass, mapperPlugins);
} catch (Exception ex) {
ex.printStackTrace();
}
return null;
}
@Override
public String toString() {
return "PrestoRowMapper.KylinRowMapperBuilder(tClass=" + this.tClass + ", mapperPlugins="
+ this.mapperPlugins + ")";
}
}
}
MapperPlugin
public class MapperPlugin {
private static final Function<Object, String> bytes2UTF8String =
bytes -> bytes instanceof String ? bytes.toString() :
new String((byte[]) bytes, Charset.forName("UTF-8"));
private static final Function<PropertyDescriptor, Class> pd2Generic =
pd -> getCollectionGeneric(pd.getReadMethod());
private final Predicate<PropertyDescriptor> predicate;
private final ColumnValue columnValue;
private MapperPlugin(Predicate<PropertyDescriptor> predicate,
ColumnValue columnValue) {
this.predicate = predicate;
this.columnValue = columnValue;
}
boolean test(PropertyDescriptor pd) {
return predicate.test(pd);
}
Object getColumnValue(Object object, PropertyDescriptor pd) {
return columnValue.get(object, pd);
}
public static MapperPluginsBuilder of(Predicate<PropertyDescriptor> predicate) {
return new MapperPluginsBuilder(predicate);
}
public static MapperPluginsBuilder ofNot(Predicate<PropertyDescriptor> predicate) {
return of(predicate.negate());
}
public static MapperPluginsBuilder of(Class clazz) {
return of(pd -> clazz.isAssignableFrom(pd.getPropertyType()));
}
@FunctionalInterface
public interface ColumnValue {
Object get(Object object, PropertyDescriptor pd);
}
public static class MapperPluginsBuilder {
Predicate<PropertyDescriptor> predicate;
public MapperPluginsBuilder(Predicate<PropertyDescriptor> predicate) {
this.predicate = predicate;
}
public MapperPlugin columnValue(ColumnValue columnValue) {
return new MapperPlugin(predicate, columnValue);
}
}
static final MapperPlugin JsonPlugin =
MapperPlugin.ofNot(pd -> pd.getPropertyType().isPrimitive() ||
Primitives.isWrapperType(pd.getPropertyType()) ||
String.class.isAssignableFrom(pd.getPropertyType()) ||
Date.class.isAssignableFrom(pd.getPropertyType()))
.columnValue((object, pd) ->
Optional.ofNullable(object)
.map(bytes2UTF8String)
.map(json -> JSON.parseObject(json, pd.getPropertyType()))
.orElse(null));
static final MapperPlugin JSONObjectPlugin =
MapperPlugin.of(JSONObject.class)
.columnValue((object, pd) ->
Optional.ofNullable(object)
.map(bytes2UTF8String)
.map(JSONObject::parseObject)
.orElse(new JSONObject()));
static final MapperPlugin ListPlugin =
MapperPlugin.of(List.class)
.columnValue((object, pd) ->
Optional.ofNullable(object)
.map(bytes2UTF8String)
.map(json -> JSON.parseArray(json, pd2Generic.apply(pd)))
.orElse(new ArrayList<>()));
static final MapperPlugin SetPlugin =
MapperPlugin.of(Set.class)
.columnValue((object, pd) ->
Optional.ofNullable(object)
.map(bytes2UTF8String)
.map(json -> JSON.parseArray(json, pd2Generic.apply(pd)))
.map(list -> Sets.newHashSet(List.class.cast(list)))
.orElse(new HashSet<>()));
static final MapperPlugin MapPlugin =
MapperPlugin.of(Map.class)
.columnValue((object, pd) ->
Optional.ofNullable(object)
.map(bytes2UTF8String)
.map(json -> JSONObject.parseObject(json, Map.class))
.orElse(new HashMap<>()));
static final MapperPlugin EnumPlugin =
MapperPlugin.of(Enum.class)
.columnValue((o, pd) -> {
try {
if (o == null) {
return null;
}
if (o instanceof Number) {
Number number = (Number) o;
Method method = pd.getPropertyType()
.getMethod("valueByIndex", Integer.TYPE);
return method.invoke(null, number.intValue());
} else {
String val = o.toString();
Method method = pd.getPropertyType().getMethod("fromString", String.class);
return method.invoke(null, val);
}
} catch (NoSuchMethodException e) {
throw new RuntimeException(
"getColumnValue error, NoSuchMethod : valueByIndex or fromString", e);
} catch (InvocationTargetException e) {
throw new RuntimeException(
"getColumnValue error, InvocationTargetException ", e);
} catch (IllegalAccessException e) {
throw new RuntimeException(
"getColumnValue error, IllegalAccessException ", e);
}
});
private static Class<?> getCollectionGeneric(Method method) {
if (Collection.class.isAssignableFrom(method.getReturnType())) {
Type fc = method.getGenericReturnType();
if (fc == null) {
return Object.class;
}
if (fc instanceof ParameterizedType) {
ParameterizedType pt = (ParameterizedType) fc;
return (Class) pt.getActualTypeArguments()[0];
}
return Object.class;
}
return Object.class;
}
}
具体使用
@Component
@Log4j2
public class MetricDaoImpl {
@Resource(name = "kylinJdbcTemplateFactory")
private JdbcTemplate kylinJdbcTemplate;
public List<TotalModelMetricEntity> getDistinctIds() {
StringBuilder sqlBuilder = new StringBuilder()
.append(" select * ")
.append(" from LOG_DID_VIEW ")
.append(" ORDER BY DT ,total DESC limit 1000");
log.info(sqlBuilder);
return kylinJdbcTemplate.query(sqlBuilder.toString(), RowMapper.getDefault(TotalModelMetricEntity.class));
}
综上我们就完成了对Kylin JDBC的封装,同样的如Presto等其他支持JDBC的查询引擎封装方式类似。
欢迎关注 高广超的简书博客 与 收藏文章 ! 欢迎关注 头条号:互联网技术栈 !
个人介绍: 高广超:多年一线互联网研发与架构设计经验,擅长设计与落地高可用、高性能、可扩展的互联网架构。目前从事大数据相关研发与架构工作。 本文首发在 高广超的简书博客 转载请注明!