首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >WideTableMultiDimSQLParser 解析说明:ClickHouse / Hive 数组交并差运算

WideTableMultiDimSQLParser 解析说明:ClickHouse / Hive 数组交并差运算

作者头像
一个会写诗的程序员
发布于 2022-03-23 07:53:39
发布于 2022-03-23 07:53:39
87100
代码可运行
举报
运行总次数:0
代码可运行

WideTableMultiDimSQLParser 解析说明

1.ClickHouse 数组交并差运算

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
--交 t[1] ∩ t[2] : arrayIntersect(t[1], t[2])
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
         select arrayIntersect(t[3], arrayIntersect(t[1], t[2])) as res,
                array(
                            (select groupUniqArray(UserID) from hits_v1 where Sex = 1),
                            (select groupUniqArray(UserID) from hits_v1 where Age > 18),
                            (select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
                    )                                               t
         ) t;

--并 t[1] ∪ t[2]: arrayConcat(t[1], t[2])
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
         select arrayConcat(t[3], arrayConcat(t[1], t[2])) as res,
                array(
                            (select groupUniqArray(UserID) from hits_v1 where Sex = 1),
                            (select groupUniqArray(UserID) from hits_v1 where Age > 18),
                            (select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
                    )                                         t
         ) t;

--差 t[1]-t[2] : arrayMap(x->multiIf(x not in arrayIntersect(t[1], t[2]), x, NULL), t[1])
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
         select arrayIntersect(t[3], arrayMap(x->multiIf(x not in arrayIntersect(t[1], t[2]), x, NULL), t[1])) as res,
                array(
                            (select groupUniqArray(UserID) from hits_v1 where Sex = 1),
                            (select groupUniqArray(UserID) from hits_v1 where Age > 18),
                            (select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
                    )                                                                                             t
         ) t;

--并
select length(arrayDistinct(t.res))
from (
         select arrayConcat(t[3], arrayConcat(t[1], t[2])) as res,
                array(
                            (select groupUniqArray(UserID) from hits_v1 where Sex = 1),
                            (select groupUniqArray(UserID) from hits_v1 where Age > 18),
                            (select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
                    )                                         t
         ) t;

ClickHouse :

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
(arrayMap(x->multiIf(x not in arrayIntersect(t[1],t[2],(arrayIntersect(t[4],t[5],t[6]))), x, NULL), t[1]))
(select collect_set(user_id) from db1.table1 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f1     = '1'   )),
(select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f2     = '22'   )),
(select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f3     = 333   )),
(select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f4     = '4'   )),
(select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f5     = 5   )),
(select collect_set(user_id) from db3.table3 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f6     = 6   ))

2.Hive 数组交并差运算:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
select
    array_intersect(array(1, 2), array(2, 3)) i,
    array_union(array(1, 2), array(2, 3)) u,
    array_except(array(1, 2), array(2, 3)) e;

Hive:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
(array_except(t[1],t[2],(array_intersect(t[4],t[5],t[6]))))
(select collect_set(user_id) from db1.table1 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f1     = '1'   )),
(select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f2     = '22'   )),
(select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f3     = 333   )),
(select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f4     = '4'   )),
(select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f5     = 5   )),
(select collect_set(user_id) from db3.table3 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f6     = 6   ))  

附源码

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
data class TagIdx(var kexprId: Int, var tagCode: String, var tagOptionCode: String, var conditionExpr: String, var index: Int)

fun isLeafNode(e: KunLunExpression) = CollectionUtils.isEmpty(e.subExpression)

fun tagOptionConditions(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): List<TagIdx> {
    val tagIdxList = mutableListOf<TagIdx>()
    //递归解析rule表达式,打平成过滤条件列表
    val kexpr: KunLunExpression = requestDTO.expression
    parseTagIdx(kexpr, tagIdxList, tableMappingMap)
    // 设置 index 字段值,用索引下标+1
    tagIdxList.forEachIndexed { index, tagIdx ->
        tagIdx.index = index + 1
    }
    return tagIdxList
}


fun parseTagIdx(kexpr: KunLunExpression, tagIdxList: MutableList<TagIdx>, tableMappingMap: Map<String, List<KTableMapping>>) {
    val fieldCondition = kexpr.fieldCondition
    if (null != fieldCondition) {

        val dimFilter = StringBuilder()
        // 维度过滤条件,每个标签 TableCode 上都有自己的维度.真正用于过滤的是 FieldCode,所以 fieldCondition 这里加上: tagDimCondition
        val dimConditionList = kexpr.fieldCondition.dimConditionList

        if (CollectionUtils.isEmpty(dimConditionList)) {
            dimFilter.append(" 1=1 ")
        } else {
            val lastIndex = dimConditionList.size - 1
            dimConditionList.forEachIndexed { index, dimField ->

                val dimTagCode = dimField.tableCode
                val dimFieldCode = dimField.fieldCode
                val dimKTableMapping = tableMappingMap[dimTagCode]!![0]
                val dimPhysicalField = dimKTableMapping.fields.first { it.srcField.columnCode == dimFieldCode }.dstField
                val dimPhysicalcolumnCode = dimPhysicalField.columnCode
                val dimFieldValueType = dimPhysicalField.fieldType
                val v = parseFieldValue(dimField, dimFieldValueType)
                val singleValue = v.get(0)?.sqlCondition

                if (index != lastIndex) {
                    dimFilter.append(" $dimPhysicalcolumnCode = $singleValue and ")
                } else {
                    dimFilter.append(" $dimPhysicalcolumnCode = $singleValue ")
                }
            }
        }

        val tagCode = fieldCondition.tableCode
        val fieldCode = fieldCondition.fieldCode
        val KTableMapping = tableMappingMap[tagCode]!![0]

        val physicalField = KTableMapping.fields.first { it.srcField.columnCode == fieldCode }.dstField
        val physicalcolumnCode = physicalField.columnCode
        val fieldValueType = physicalField.fieldType
        val targetFieldCode = KTableMapping.targetField.columnCode
        val dbName = KTableMapping.physicDBName
        val tableName = KTableMapping.getkTableCode()
        val filterConditionClause = genFilterConditionClause(fieldCondition, physicalcolumnCode, fieldValueType)

        val line = "select collect_set($targetFieldCode) from $dbName.$tableName where ( $dimFilter ) and ( $filterConditionClause )"
        val tagIdx = TagIdx(kexprId = kexpr.tfId, tagCode = tagCode, tagOptionCode = fieldCode, conditionExpr = line, index = -1) // index 先设置默认值 -1
        tagIdxList.add(tagIdx)
    }
    // 递归子语句
    kexpr.subExpression?.forEach {
        parseTagIdx(it, tagIdxList, tableMappingMap)
    }
}

fun genFilterConditionClause(fieldCondition: FieldCondition, physicalField: String, fieldValueType: KFieldValueType): String {
    val fv = parseFieldValue(fieldCondition, fieldValueType)
    if (CollectionUtils.isEmpty(fv)) {
        throw IllegalArgumentException("fieldCondition must have fieldValue!")
    }
    val size = fv.size
    // 多值(1,2,3,4)
    val listValue = StringBuilder()
    listValue.append("(")
    fv.forEachIndexed { index, fieldValue ->
        if (index == size - 1)
            listValue.append(fieldValue?.sqlCondition)
        else
            listValue.append(fieldValue?.sqlCondition).append(",")
    }
    listValue.append(")")
    // 单值
    val singleValue = fv.get(0)?.sqlCondition
    val singleValueNoQuote = fv.get(0)?.qlCondition

    var conditionExpr = ""
    conditionExpr = when (fieldCondition.operator) {
        ArithmeticOperatorEnum.LIKE -> "  like '%${singleValueNoQuote}%' "
        ArithmeticOperatorEnum.EQUAL -> "    = ${singleValue} "
        ArithmeticOperatorEnum.GREATER_EQUAL_THAN -> "    >= ${singleValue} "
        ArithmeticOperatorEnum.LESS_THAN -> "    < ${singleValue} "
        ArithmeticOperatorEnum.LESS_EQUAL_THAN -> "    <= ${singleValue} "
        ArithmeticOperatorEnum.GREATER_THAN -> "    > ${singleValue} "
        ArithmeticOperatorEnum.BETWEEN -> "    between ${fv.get(0)?.sqlCondition} and ${fv.get(1)?.sqlCondition} "
        ArithmeticOperatorEnum.IN -> "    in ${listValue} "
        ArithmeticOperatorEnum.NOT_IN -> "    not in ${listValue} "

        else -> throw IllegalStateException("${fieldCondition.operator} not supported yet")
    }

    return " $physicalField $conditionExpr "
}

/**
 * 解析 fieldValue 值
 */
fun parseFieldValue(fieldCondition: FieldCondition, fieldValueType: KFieldValueType): List<FieldValue<*>?> {
    val values = fieldCondition.values
    if (values == null || values.isEmpty()) {
        ExceptionHelper.bizError("illegal value size,values length must greater than 0.")
    }

    // 特征值类型
    lateinit var clazz: Class<out FieldValue<*>>
    when (fieldValueType) {
        KFieldValueType.STRING -> clazz = StringFieldValue::class.java
        KFieldValueType.LONG -> clazz = LongFieldValue::class.java
        KFieldValueType.DOUBLE -> clazz = DoubleFieldValue::class.java
        else -> ExceptionHelper.bizError("$fieldValueType fieldValueType not supported!")
    }
    return FieldValue.create(clazz, *values.toTypedArray())
}





/**
 * 递归遍历KunLun表达式,并添加tagCode/ objectSet.
 */
fun recurExtractTagCodeAndObjectSet(expression: KunLunExpression, tagBaseFieldList: MutableList<TagBaseField>, objectSetList: MutableList<String>) {

    // 子表达式为空,递归结束
    if (isLeafNode(expression)) {
        val fieldCondition = expression.fieldCondition

        // 添加分群
        if (StringUtils.isNotEmpty(fieldCondition.objectSetId)) {
            objectSetList.add(fieldCondition.objectSetId)
        } else {
            // 添加标签
            val tagBaseField = TagBaseField()
            tagBaseField.tableCode = fieldCondition.tableCode
            tagBaseField.fieldCode = fieldCondition.fieldCode
            tagBaseFieldList.add(tagBaseField)
        }
        return
    }

    // 递归遍历子节点
    for (subExpression in expression.subExpression) {
        recurExtractTagCodeAndObjectSet(subExpression, tagBaseFieldList, objectSetList)
    }
}

@Service
class CommonParseUtils {


    fun getTableMappingMap(tenant: Tenant, requestDTO: SQLQueryReqDTO): Map<String, List<KTableMapping>> {
        // 标签 & 分群
        val tagBaseFieldList: MutableList<TagBaseField> = mutableListOf()
        val objectSetList: MutableList<String> = mutableListOf()
        recurExtractTagCodeAndObjectSet(requestDTO.getExpression(), tagBaseFieldList, objectSetList)
        // META
        val tableMappingList: List<KTableMapping> = getTagCodeTableMapping(tenant.id, tagBaseFieldList, requestDTO.getDriverType())
        return tableMappingList.groupBy { it.tableCode }
    }

    /**
     * 获取KunLun表达式中所有标签对应物理表的映射关系.
     */
    fun getTagCodeTableMapping(tenantId: Long, tagBaseFieldList: List<TagBaseField>, driverType: DriverType): List<KTableMapping> {
        if (CollectionUtils.isEmpty(tagBaseFieldList)) {
            return emptyList()
        }

        // 获取映射关系
        // TODO 元数据: kTableMappings
        val kTableMappings: List<KTableMapping> = ArrayList()

        val tagCodeTableMapping = kTableMappings.stream().collect(Collectors.toMap({ obj: KTableMapping -> obj.tableCode }, Function.identity()))

        // check
        for (tagBaseField in tagBaseFieldList) {
            val kTableMapping = tagCodeTableMapping[tagBaseField.tableCode] ?: throw ExceptionHelper.bizError(String.format("tag code [%s] is non-exists", tagBaseField.tableCode))
            val fields = kTableMapping.fields
            val existsTagOption = fields.stream().noneMatch { kFieldMapping: KFieldMapping -> kFieldMapping.srcField.columnCode == tagBaseField.fieldCode }
            if (!existsTagOption) {
                throw ExceptionHelper.bizError(String.format("tag option [%s] is non-exists", tagBaseField.fieldCode))
            }
        }
        return kTableMappings
    }

}
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
 * 宽表多维标签CH SQL 解析器
 * @author chenguangjian.jk
 * @date 2022-03-09 02:28:48
 */
@Service
class WideTableMultiDimCHSQLParser {
    val log = LoggerFactory.getLogger(WideTableMultiDimCHSQLParser::class.java)

    @Resource
    lateinit var commonParseUtils: CommonParseUtils

    /**
     * 宽表多维标签预估 SQL
     */
    fun parseCount(tenant: Tenant, requestDTO: SQLQueryReqDTO): String {
        val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
        // Parse KunLunExpression
        return WIDE_TABLE_COUNT_SQL_TEMPLATE(
            expr = expr(requestDTO, tableMappingMap),
            arrayLines = arrayLines(requestDTO, tableMappingMap)
        )
    }


    /**
     * 宽表多维标签圈选 SQL
     */
    fun parseCircle(tenant: Tenant, requestDTO: SQLQueryReqDTO): String {
        val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
        
        val csvFile = ""
        // Parse KunLunExpression
        return WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
            expr = expr(requestDTO, tableMappingMap),
            arrayLines = arrayLines(requestDTO, tableMappingMap),
            csvFile = csvFile,
        )
    }


    fun expr(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): String {
        val tagIdxs: List<TagIdx> = tagOptionConditions(requestDTO, tableMappingMap)
        val exprMap = tagIdxs.groupBy { it.kexprId }
        return genWhereClause(exprMap, requestDTO.expression)
    }


    private fun genWhereClause(exprMap: Map<Int, List<TagIdx>>, kunLunExpression: KunLunExpression): String {
        val subExpression = kunLunExpression.subExpression
        if (CollectionUtils.isEmpty(subExpression)) { // 叶子节点
            return ""
        }

        val w = StringBuffer()
        val size = subExpression.size
        val logic = kunLunExpression.logic

        w.append("(")

        if (logic == LogicOperatorEnum.AND) {
            w.append("arrayIntersect(")
        } else if (logic == LogicOperatorEnum.OR) {
            w.append("arrayConcat(")
        } else if (logic == LogicOperatorEnum.EXCEPT) {
            w.append("arrayMap(x->multiIf(x not in arrayIntersect(")
        } else {
            throw IllegalArgumentException("logic $logic not supported!")
        }

        var firstTagIdx: Int = 1
        subExpression.forEachIndexed { index, e ->
            // 最叶子节点
            if (isLeafNode(e)) {
                val targetTagIdx = exprMap[e.tfId]?.get(0)
                val tagIdx = targetTagIdx!!.index

                // 计算差集使用
                if (index == 0) {
                    firstTagIdx = tagIdx
                }

                if (index != size - 1) {
                    w.append("t[$tagIdx],")
                } else {
                    w.append("t[$tagIdx]")
                }
            }
            // 递归非叶子节点
            else {
                w.append(genWhereClause(exprMap, e))
            }
        }

        if (logic == LogicOperatorEnum.AND || logic == LogicOperatorEnum.OR) {
            w.append("))")
        } else if (logic == LogicOperatorEnum.EXCEPT) {
            w.append("), x, NULL), t[$firstTagIdx]))")
        }

        return w.toString()
    }


    /**
     * 生成 arrayLines (最后一行没有: , 逗号)
    (select groupUniqArray(UserID) from db.hits_v1 where Sex = 1),
    (select groupUniqArray(UserID) from db.hits_v1 where Age > 18),
    (select groupUniqArray(UserID) from db.hits_v1 where RequestNum > 0)
     */
    fun arrayLines(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): String {
        val tagIdxs: List<TagIdx> = tagOptionConditions(requestDTO, tableMappingMap)
        val size = tagIdxs.size
        val arrayLines = StringBuffer()

        tagIdxs.forEachIndexed { index, tagIdx ->
            if (index != size - 1) {
                arrayLines.append("(${tagIdx.conditionExpr}), \n")
            } else {
                arrayLines.append("(${tagIdx.conditionExpr})  \n")
            }
        }
        return arrayLines.toString()
    }


    /**
    select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
    from (
    select arrayIntersect(t[3], arrayIntersect(t[1], t[2])) as res,
    array(
    (select groupUniqArray(UserID) from hits_v1 where Sex = 1),
    (select groupUniqArray(UserID) from hits_v1 where Age > 18),
    (select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
    ) t
    ) t
     */
    private fun WIDE_TABLE_COUNT_SQL_TEMPLATE(
        expr: String,
        arrayLines: String,
    ) = """
select length(arrayDistinct(arrayFilter(x->x is not null, t.res))) as cnt
from (
    select $expr as res,
    array(
    $arrayLines
    ) t
) t
"""


    /**
    select arrayJoin(arrayDistinct(arrayFilter(x->x is not null, t.res)))
    from (
    select arrayIntersect(t[3], arrayIntersect(t[1], t[2])) as res,
    array(
    (select groupUniqArray(UserID) from hits_v1 where Sex = 1),
    (select groupUniqArray(UserID) from hits_v1 where Age > 18),
    (select groupUniqArray(UserID) from hits_v1 where RequestNum > 0)
    ) t
    ) t
    INTO OUTFILE 'tos:///xxx' FORMAT CSV
    settings distributed_perfect_shard=1,max_execution_time = 600
     */
    private fun WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
        expr: String,
        arrayLines: String,
        csvFile: String,
    ) = """
select arrayJoin(arrayDistinct(arrayFilter(x->x is not null, t.res)))
from (
    select $expr as res,
    array(
    $arrayLines
    ) t
) t
INTO OUTFILE 'tos:///xxx' FORMAT CSV
settings distributed_perfect_shard=1,max_execution_time = 600
"""


}


/**
tagIdxList=[{"conditionExpr":"select groupUniqArray(user_id) from db1.table1 where (  cate_id = '1001'  ) and (  f1     = '1'   )","index":1,"kexprId":684563482,"tagCode":"t1","tagOptionCode":"f1"},{"conditionExpr":"select groupUniqArray(user_id) from db2.table2 where (  cate_id = '1002'  ) and (  f2     = '22'   )","index":2,"kexprId":684642314,"tagCode":"t2","tagOptionCode":"f2"},{"conditionExpr":"select groupUniqArray(user_id) from db2.table2 where (  shop_id = '798322'  ) and (  f3     = 333   )","index":3,"kexprId":568144263,"tagCode":"t2","tagOptionCode":"f3"},{"conditionExpr":"select groupUniqArray(user_id) from db3.table3 where (  cate_id = '1004'  ) and (  f4     = '4'   )","index":4,"kexprId":684626037,"tagCode":"t3","tagOptionCode":"f4"},{"conditionExpr":"select groupUniqArray(user_id) from db3.table3 where (  cate_id = '1005'  ) and (  f5     = 5   )","index":5,"kexprId":684627036,"tagCode":"t3","tagOptionCode":"f5"},{"conditionExpr":"select groupUniqArray(user_id) from db3.table3 where (  cate_id = '1006'  ) and (  f6     = 6   )","index":6,"kexprId":684628027,"tagCode":"t3","tagOptionCode":"f6"}]
(arrayMap(x->multiIf(x not in arrayIntersect(t[1],t[2],(arrayIntersect(t[4],t[5],t[6]))), x, NULL), t[1]))
(select groupUniqArray(user_id) from db1.table1 where (  cate_id = '1001'  ) and (  f1     = '1'   )),
(select groupUniqArray(user_id) from db2.table2 where (  cate_id = '1002'  ) and (  f2     = '22'   )),
(select groupUniqArray(user_id) from db2.table2 where (  shop_id = '798322'  ) and (  f3     = 333   )),
(select groupUniqArray(user_id) from db3.table3 where (  cate_id = '1004'  ) and (  f4     = '4'   )),
(select groupUniqArray(user_id) from db3.table3 where (  cate_id = '1005'  ) and (  f5     = 5   )),
(select groupUniqArray(user_id) from db3.table3 where (  cate_id = '1006'  ) and (  f6     = 6   ))
 */
fun main() {
    val requestDTO = SQLQueryReqDTO()
    val tableMappingMap: HashMap<String, List<KTableMapping>> = hashMapOf()
    val expression = KunLunExpression()
    expression.logic = LogicOperatorEnum.EXCEPT
    val subExpressionList = arrayListOf<KunLunExpression>()
    val e1 = KunLunExpression()
    val e2 = KunLunExpression()
    val e3 = KunLunExpression()

    val dimList = listOf(
        FieldCondition("", "t1", "cate_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("10001")),
        FieldCondition("", "t1", "shop_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("798322"))
    )

    e1.fieldCondition = FieldCondition("", "t1", "f1", dimList, ArithmeticOperatorEnum.EQUAL, listOf("1"))
    e2.fieldCondition = FieldCondition("", "t2", "f2", dimList, ArithmeticOperatorEnum.EQUAL, listOf("22"))
    e3.fieldCondition = FieldCondition("", "t2", "f3", dimList, ArithmeticOperatorEnum.EQUAL, listOf("333"))
    e3.logic = LogicOperatorEnum.AND

    val e3SubExpressionList = arrayListOf<KunLunExpression>()
    val e31 = KunLunExpression()
    val e32 = KunLunExpression()
    val e33 = KunLunExpression()
    e3SubExpressionList.add(e31)
    e3SubExpressionList.add(e32)
    e3SubExpressionList.add(e33)
    e31.fieldCondition = FieldCondition("", "t3", "f4", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("4"))
    e32.fieldCondition = FieldCondition("", "t3", "f5", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("5"))
    e33.fieldCondition = FieldCondition("", "t3", "f6", dimList, ArithmeticOperatorEnum.EQUAL, listOf("6"))
    e3.subExpression = e3SubExpressionList

    subExpressionList.add(e1)
    subExpressionList.add(e2)
    subExpressionList.add(e3)
    expression.subExpression = subExpressionList
    requestDTO.expression = expression

    // KTableMapping(boolean rowMapping, String tableCode, String kTableCode, String physicDBName, KField targetField, KSource source, List<KFieldMapping> fields)
    // KField(String columnCode, String fieldCode, KFieldValueType fieldType, String description)
    // KSource(Long tagSrcTaskId, String tagSrcDb, String tagSrcTable, String tagSrcTableJoinField)
    // KFieldMapping(KField srcField, KField dstField)

    tableMappingMap["t1"] = listOf(KTableMapping(
        "t1",
        "table1",
        "db1",
        KField("user_id", "", KFieldValueType.STRING, ""),
        KSource(0, "db1", "table1", "user_id"),
        listOf(
            KFieldMapping(
                KField("f1", "", KFieldValueType.STRING, ""), // srcField
                KField("f1", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
                KField("cate_id", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
                KField("shop_id", "", KFieldValueType.LONG, "") // dstField
            ),
        )
    ))

    tableMappingMap["t2"] = listOf(KTableMapping(
        "t2",
        "table2",
        "db2",
        KField("user_id", "", KFieldValueType.STRING, ""),
        KSource(0, "db2", "table2", "user_id"),
        listOf(
            KFieldMapping(
                KField("f2", "", KFieldValueType.STRING, ""), // srcField
                KField("f2", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("f3", "", KFieldValueType.LONG, ""), // srcField
                KField("f3", "", KFieldValueType.LONG, "") // dstField
            ),
            KFieldMapping(
                KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
                KField("cate_id", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
                KField("shop_id", "", KFieldValueType.LONG, "") // dstField
            ),
        )
    ))

    tableMappingMap["t3"] = listOf(KTableMapping(
        "t3",
        "table3",
        "db3",
        KField("user_id", "", KFieldValueType.STRING, ""),
        KSource(0, "db3", "table3", "user_id"),
        listOf(
            KFieldMapping(
                KField("f4", "", KFieldValueType.STRING, ""), // srcField
                KField("f4", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("f5", "", KFieldValueType.LONG, ""), // srcField
                KField("f5", "", KFieldValueType.LONG, "") // dstField
            ),
            KFieldMapping(
                KField("f6", "", KFieldValueType.LONG, ""), // srcField
                KField("f6", "", KFieldValueType.LONG, "") // dstField
            ),
            KFieldMapping(
                KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
                KField("cate_id", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
                KField("shop_id", "", KFieldValueType.LONG, "") // dstField
            ),
        )
    ))

    val WideTableMultiDimCHSQLParser = WideTableMultiDimCHSQLParser()
    val expr = WideTableMultiDimCHSQLParser.expr(requestDTO, tableMappingMap)
    val arrayLines = WideTableMultiDimCHSQLParser.arrayLines(requestDTO, tableMappingMap)

    println(expr)
    println(arrayLines)
}









/**
 * 宽表多维标签 HIVE SQL 解析器
 * @author chenguangjian.jk
 * @date 2022-03-09 02:28:48
 */
@Service
class WideTableMultiDimHiveSQLParser {

    val log = LoggerFactory.getLogger(WideTableMultiDimCHSQLParser::class.java)
    @Resource
    lateinit var commonParseUtils: CommonParseUtils
    /**
     * 宽表多维标签预估 SQL
     */
    fun parseCount(tenant: Tenant, requestDTO: SQLQueryReqDTO): String {
        val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
        // Parse KunLunExpression
        return WIDE_TABLE_COUNT_SQL_TEMPLATE(
            expr = expr(requestDTO, tableMappingMap),
            arrayLines = arrayLines(requestDTO, tableMappingMap)
        )
    }


    /**
     * 宽表多维标签圈选 SQL
     */
    fun parseCircle(tenant: Tenant, requestDTO: SQLQueryReqDTO): String {
        val tableMappingMap = commonParseUtils.getTableMappingMap(tenant, requestDTO)
       
        val csvFile = ""
        // Parse KunLunExpression
        return WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
            expr = expr(requestDTO, tableMappingMap),
            arrayLines = arrayLines(requestDTO, tableMappingMap),
            csvFile = csvFile,
        )
    }


    fun expr(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): String {
        val tagIdxs: List<TagIdx> = tagOptionConditions(requestDTO, tableMappingMap)
        val exprMap = tagIdxs.groupBy { it.kexprId }
        return genWhereClause(exprMap, requestDTO.expression)
    }


    private fun genWhereClause(exprMap: Map<Int, List<TagIdx>>, kunLunExpression: KunLunExpression): String {
        val subExpression = kunLunExpression.subExpression
        if (CollectionUtils.isEmpty(subExpression)) { // 叶子节点
            return ""
        }

        val w = StringBuffer()
        val size = subExpression.size
        val logic = kunLunExpression.logic

        w.append("(")

        if (logic == LogicOperatorEnum.AND) {
            w.append("array_intersect(")
        } else if (logic == LogicOperatorEnum.OR) {
            w.append("array_union(")
        } else if (logic == LogicOperatorEnum.EXCEPT) {
            w.append("array_except(")
        } else {
            throw IllegalArgumentException("logic $logic not supported!")
        }

        var firstTagIdx: Int = 1
        subExpression.forEachIndexed { index, e ->
            // 最叶子节点
            if (isLeafNode(e)) {
                val targetTagIdx = exprMap[e.tfId]?.get(0)
                val tagIdx = targetTagIdx!!.index

                // 计算差集使用
                if (index == 0) {
                    firstTagIdx = tagIdx
                }

                if (index != size - 1) {
                    w.append("t[$tagIdx],")
                } else {
                    w.append("t[$tagIdx]")
                }
            }
            // 递归非叶子节点
            else {
                w.append(genWhereClause(exprMap, e))
            }
        }

        w.append("))")
        return w.toString()
    }


    /**
     * 生成 arrayLines (最后一行没有: , 逗号)
    (select groupUniqArray(UserID) from db.hits_v1 where Sex = 1),
    (select groupUniqArray(UserID) from db.hits_v1 where Age > 18),
    (select groupUniqArray(UserID) from db.hits_v1 where RequestNum > 0)
     */
    fun arrayLines(requestDTO: SQLQueryReqDTO, tableMappingMap: Map<String, List<KTableMapping>>): String {
        val tagIdxs: List<TagIdx> = tagOptionConditions(requestDTO, tableMappingMap)
        val size = tagIdxs.size
        val arrayLines = StringBuffer()

        tagIdxs.forEachIndexed { index, tagIdx ->
            if (index != size - 1) {
                arrayLines.append("(${tagIdx.conditionExpr}), \n")
            } else {
                arrayLines.append("(${tagIdx.conditionExpr})  \n")
            }
        }
        return arrayLines.toString()
    }



    /**
    select  size(t.res) as cnt
    from (
    select array_intersect(t[3], array_intersect(t[1], t[2])) as res,
    array(
    (select collect_set(UserID) from hits_v1 where Sex = 1),
    (select collect_set(UserID) from hits_v1 where Age > 18),
    (select collect_set(UserID) from hits_v1 where RequestNum > 0)
    ) t
    ) t
     */
    private fun WIDE_TABLE_COUNT_SQL_TEMPLATE(
        expr: String,
        arrayLines: String,
    ) = """
select size(t.res) as cnt
from (
    select $expr as res,
    array(
    $arrayLines
    ) t
) t
"""


    /**
    select explode(t.res) as ids
    from (
    select array_intersect(t[3], array_intersect(t[1], t[2])) as res,
    array(
    (select collect_set(UserID) from hits_v1 where Sex = 1),
    (select collect_set(UserID) from hits_v1 where Age > 18),
    (select collect_set(UserID) from hits_v1 where RequestNum > 0)
    ) t
    ) t
     */
    private fun WIDE_TABLE_CIRCLE_SQL_TEMPLATE(
        expr: String,
        arrayLines: String,
        csvFile: String,
    ) = """
select explode(t.res) as ids
from (
    select $expr as res,
    array(
    $arrayLines
    ) t
) t
"""






}


/**
WideTableMultiDimCHSQLParser - tagIdxList=[{"conditionExpr":"select collect_set(user_id) from db1.table1 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f1     = '1'   )","index":1,"kexprId":-316732738,"tagCode":"t1","tagOptionCode":"f1"},{"conditionExpr":"select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f2     = '22'   )","index":2,"kexprId":-316653905,"tagCode":"t2","tagOptionCode":"f2"},{"conditionExpr":"select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f3     = 333   )","index":3,"kexprId":-315132611,"tagCode":"t2","tagOptionCode":"f3"},{"conditionExpr":"select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f4     = '4'   )","index":4,"kexprId":127438862,"tagCode":"t3","tagOptionCode":"f4"},{"conditionExpr":"select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f5     = 5   )","index":5,"kexprId":127439854,"tagCode":"t3","tagOptionCode":"f5"},{"conditionExpr":"select collect_set(user_id) from db3.table3 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f6     = 6   )","index":6,"kexprId":-316668196,"tagCode":"t3","tagOptionCode":"f6"}]
(array_except(t[1],t[2],(array_intersect(t[4],t[5],t[6]))))
(select collect_set(user_id) from db1.table1 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f1     = '1'   )),
(select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f2     = '22'   )),
(select collect_set(user_id) from db2.table2 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f3     = 333   )),
(select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f4     = '4'   )),
(select collect_set(user_id) from db3.table3 where (  1=1  ) and (  f5     = 5   )),
(select collect_set(user_id) from db3.table3 where (  cate_id = '10001' and  shop_id = 798322  ) and (  f6     = 6   ))
 */
fun main() {
    val requestDTO = SQLQueryReqDTO()
    val tableMappingMap: HashMap<String, List<KTableMapping>> = hashMapOf()
    val expression = KunLunExpression()
    expression.logic = LogicOperatorEnum.EXCEPT
    val subExpressionList = arrayListOf<KunLunExpression>()
    val e1 = KunLunExpression()
    val e2 = KunLunExpression()
    val e3 = KunLunExpression()

    val dimList = listOf(
        FieldCondition("", "t1", "cate_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("10001")),
        FieldCondition("", "t1", "shop_id", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("798322"))
    )

    e1.fieldCondition = FieldCondition("", "t1", "f1", dimList, ArithmeticOperatorEnum.EQUAL, listOf("1"))
    e2.fieldCondition = FieldCondition("", "t2", "f2", dimList, ArithmeticOperatorEnum.EQUAL, listOf("22"))
    e3.fieldCondition = FieldCondition("", "t2", "f3", dimList, ArithmeticOperatorEnum.EQUAL, listOf("333"))
    e3.logic = LogicOperatorEnum.AND

    val e3SubExpressionList = arrayListOf<KunLunExpression>()
    val e31 = KunLunExpression()
    val e32 = KunLunExpression()
    val e33 = KunLunExpression()
    e3SubExpressionList.add(e31)
    e3SubExpressionList.add(e32)
    e3SubExpressionList.add(e33)
    e31.fieldCondition = FieldCondition("", "t3", "f4", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("4"))
    e32.fieldCondition = FieldCondition("", "t3", "f5", emptyList(), ArithmeticOperatorEnum.EQUAL, listOf("5"))
    e33.fieldCondition = FieldCondition("", "t3", "f6", dimList, ArithmeticOperatorEnum.EQUAL, listOf("6"))
    e3.subExpression = e3SubExpressionList

    subExpressionList.add(e1)
    subExpressionList.add(e2)
    subExpressionList.add(e3)
    expression.subExpression = subExpressionList
    requestDTO.expression = expression

    // KTableMapping(boolean rowMapping, String tableCode, String kTableCode, String physicDBName, KField targetField, KSource source, List<KFieldMapping> fields)
    // KField(String columnCode, String fieldCode, KFieldValueType fieldType, String description)
    // KSource(Long tagSrcTaskId, String tagSrcDb, String tagSrcTable, String tagSrcTableJoinField)
    // KFieldMapping(KField srcField, KField dstField)

    tableMappingMap["t1"] = listOf(KTableMapping(
        "t1",
        "table1",
        "db1",
        KField("user_id", "", KFieldValueType.STRING, ""),
        KSource(0, "db1", "table1", "user_id"),
        listOf(
            KFieldMapping(
                KField("f1", "", KFieldValueType.STRING, ""), // srcField
                KField("f1", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("cate_id", "", KFieldValueType.STRING, ""), // srcField
                KField("cate_id", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("shop_id", "", KFieldValueType.LONG, ""), // srcField
                KField("shop_id", "", KFieldValueType.LONG, "") // dstField
            ),
        )
    ))

    tableMappingMap["t2"] = listOf(KTableMapping(
        "t2",
        "table2",
        "db2",
        KField("user_id", "", KFieldValueType.STRING, ""),
        KSource(0, "db2", "table2", "user_id"),
        listOf(
            KFieldMapping(
                KField("f2", "", KFieldValueType.STRING, ""), // srcField
                KField("f2", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("f3", "", KFieldValueType.LONG, ""), // srcField
                KField("f3", "", KFieldValueType.LONG, "") // dstField
            ),
        )
    ))

    tableMappingMap["t3"] = listOf(KTableMapping(
        "t3",
        "table3",
        "db3",
        KField("user_id", "", KFieldValueType.STRING, ""),
        KSource(0, "db3", "table3", "user_id"),
        listOf(
            KFieldMapping(
                KField("f4", "", KFieldValueType.STRING, ""), // srcField
                KField("f4", "", KFieldValueType.STRING, "") // dstField
            ),
            KFieldMapping(
                KField("f5", "", KFieldValueType.LONG, ""), // srcField
                KField("f5", "", KFieldValueType.LONG, "") // dstField
            ),
            KFieldMapping(
                KField("f6", "", KFieldValueType.LONG, ""), // srcField
                KField("f6", "", KFieldValueType.LONG, "") // dstField
            ),
        )
    ))

    val WideTableMultiDimHiveSQLParser = WideTableMultiDimHiveSQLParser()
    val expr = WideTableMultiDimHiveSQLParser.expr(requestDTO, tableMappingMap)
    val arrayLines = WideTableMultiDimHiveSQLParser.arrayLines(requestDTO, tableMappingMap)

    println(expr)
    println(arrayLines)
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022.03.09 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
Apache Superset1.2.0教程(四)—— CentOS环境安装
前文中,我们已经在windows环境进行了superset的安装,也对图表功能进行了展示。但是在平时使用以及生产环境中,还是需要在centos环境下进行操作。
大数据流动
2021/08/13
1.7K2
Apache Superset1.2.0教程(四)—— CentOS环境安装
CentOS 7上MySQL主从同步的实现
CentOS 7环境部署MySQL主从服务器,根据PHP程序员的要求创建数据库与表。
星哥玩云
2022/08/16
5050
CentOS 7上MySQL主从同步的实现
CentOS 6.4编译安装MySQL8.0
MySQL 8.0 正式版 8.0.11 已发布,官方表示 MySQL 8 要比 MySQL 5.7 快 2 倍,还带来了大量的改进和更快的性能!
星哥玩云
2022/08/16
5820
CentOS6.7下使用 cmake编译安装MySQL5.5.32
cmake是一个跨平台的开源元构建系统,可以构建、测试和打包软件。它可以用来支持多种本地构建环境,包括make,苹果的xcode和微软的Visual Studio,本篇文章将在CentOS6.7下使用 cmake编译安装MySQL5.5.32。
用户9042463
2021/09/30
8600
CentOS7(Linux)源码安装MySQL5.7.30
软件应用最重要的就是数据库了,可是还有小伙伴不会在Linux上安装MySQL数据库,今天就来讲讲如何在CentOS7环境使用源码进行安装MySQL5.7.30。
一个正经的程序员
2022/04/11
7210
CentOS7(Linux)源码安装MySQL5.7.30
Centos搭建powerdns权威dns服务器——安装篇
DNS(Domain Name Service) 域名解析服务,就是将域名和 ip 之间做相应的转换,利用 TCP 和 UDP 的53端口。
山深有杏
2024/01/30
2K0
Centos搭建powerdns权威dns服务器——安装篇
CentOS7下源码安装MySQL 8.x
会选择使用源码安装MySQL,想必对MySQL及其他的安装方式已经有了一定的了解,这里就不对周边信息进行过多赘述了,直接开始吧。
端碗吹水
2020/09/23
8870
CentOS7下源码安装MySQL 8.x
使用lamp搭建博客_php laravel框架
部署 lamp平台–Linux、Apache、MySQL 与 PHP–源码编译安装
全栈程序员站长
2022/11/07
1.2K0
使用lamp搭建博客_php laravel框架
CentOS 7 安装Percona Mysql Server
此方法我也不知道为什么在CentOS7里面安装之后可以用,在Debian服务器里面无法进行编译安装使用。另外在使用的过程中,有时候可能也需要Percona-Server-client的支持。针对这个问题,可以使用以下方法解决:将文件上传到/home/soft/目录下,然后进行安装。安装方法如下:
Tacc
2022/01/11
5540
CentOS 7 安装Percona Mysql Server
centos-6.5安装部署LNMP环境
安装依赖库:yum -y install zlib zlib-devel openssl openssl-devel pcre-devel
用户1679793
2019/02/15
7240
linux(centos7)源码安装系列-mysql
1.下载源码包 wget https://dev.mysql.com/get/Downloads/MySQL-5.7/mysql-5.7.36.tar.gz 2.创建mysql用户 useradd -M -s /sbin/nologin mysql 3.创建mysql数据目录及日志目录,并更改目录所有者 mkdir -p /usr/local/mysql/data mkdir -p /usr/local/mysql/log chown -R mysql:mysql /usr/local/mysql
用户8568307
2022/03/14
5560
【玩转服务器】CentOS下安装mysql
mysql 是一个关系型数据库管理系统,是最流行的关系型数据库管理系统之一,本文将介绍如何在 CentOS 7.x 中安装 mysql 。
参谋带个长
2023/12/11
5830
centos7.4安装LNMP
系统最小化安装,只安装了一些常用包(vim、lirzs、gcc*、wget、bash-completion)
萧晚歌
2020/08/19
9730
Superset 部署
Apache Superset是一个现代的数据探索和可视化平台。它功能强大且十分易用,可对接各种数据源,包括很多现代的大数据分析引擎,拥有丰富的图表展示形式,并且支持自定义仪表盘,且Superset是由Python语言编写的Web应用,要求Python3.7以上的环境
码农GT038527
2024/08/29
3350
Python中操作mysql知识(一)
Python 标准数据库接口为 Python DB-API,Python DB-API为开发人员提供了数据库应用编程接口。
py3study
2020/01/06
6350
Python中操作mysql知识(一)
源码包编译安装mysql & cmake 原
环境: OS:centos 7 _ x86_64 MySQL:mysql-5.6.35
阿dai学长
2019/04/03
2.8K0
CentOS下源码编译安装LNMP(附源码包)
LNMP指的是一个基于CentOS/Debian编写的Nginx、PHP、MySQL、phpMyAdmin、eAccelerator一键安装包。可以在VPS、独立主机上轻松的安装LNMP生产环境。LNMP代表的就是:Linux系统下Nginx+MySQL+PHP这种网站服务器架构。Linux是一类Unix计算机操作系统的统称,是目前最流行的免费操作系统。代表版本有:debian、centos、Ubuntu、Fedora、gentoo等;Nginx是一个高性能的HTTP和反向代理服务器,也是一个IMAP/POP3/SMTP代理服务器;Mysql是一个小型关系型数据库管理系统;PHP是一种在服务器端执行的嵌入HTML文档的脚本语言。这四种软件均为免费开源软件,组合到一起,成为一个免费、高效、扩展性强的网站服务系统。
星哥玩云
2022/07/14
1.7K0
CentOS下源码编译安装LNMP(附源码包)
Centos 7安装mysql-5.7.22
wget https://mirrors.aliyun.com/epel/epel-release-latest-7.noarch.rpm
py3study
2018/08/03
1K0
linux之在centos7中配置java开发环境
首先下载好相应的Linux版本的jdk、tomcat、eclipse、mysql,然后利用xftp5上传到/opt目录下
西西嘛呦
2020/08/26
5750
VPS重做系统都需要做什么
可以参考 http://dev.mysql.com/doc/refman/5.6/en/source-configuration-options.html
老高的技术博客
2022/12/27
1K0
相关推荐
Apache Superset1.2.0教程(四)—— CentOS环境安装
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档