我正在尝试将Flink 1.14.4与HBase版本2.2.14连接;我添加了Hbase连接器jar flink-SQL连接器-HBASE-2.2-1.15.2.jar,但是对于2.2.x版本,这是jar的最后一个版本。
但我得到了以下错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o1.executeSql.
: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.hTable'.
Table options are:
'connector'='hbase-2.2'
'table-name'='test'
'zookeeper.quorum'='127.0.0.1:2181'
at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:184)
at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:388)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:222)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:182)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:182)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:182)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:752)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:872)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:742)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NoSuchMethodError: org.apache.flink.table.factories.DynamicTableFactory$Context.getPhysicalRowDataType()Lorg/apache/flink/table/types/DataType;
at org.apache.flink.connector.hbase2.HBase2DynamicTableFactory.createDynamicTableSink(HBase2DynamicTableFactory.java:95)
at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:181)
... 28 more
定义了我的hbase表:
sink_ddl = """
CREATE TABLE hTable (
datemin STRING,
family2 ROW<datemax STRING>,
family3 ROW<channel_title STRING, channel_id STRING>,
PRIMARY KEY (datemin) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'test',
'zookeeper.quorum' = '127.0.0.1:2181'
)
"""
T创建了一个视图来选择元素的数据并将它们插入hTable中:
table_env.create_temporary_view('table_api_table', table)
table_env.execute_sql("""
INSERT INTO hTable
SELECT
datemin,
ROW(datemax),
ROW(channel_title, channel_id)
FROM table_api_table
""").wait()
我看到Flink 1.14不支持Hbase
所以我必须改变hbase版本吗?
发布于 2022-10-25 16:20:10
终于起作用了!!我通过以下操作解决了这个问题:
我编辑了hbase-env.sh:
# Extra Java CLASSPATH elements. Optional.
export HBASE_CLASSPATH=/home/hadoop/hbase/conf
我编辑了hbase-site.xml,因此添加了以下属性:
<property>
<name>hbase.defaults.for.version.skip</name>
<value>true</value>
</property>
然后编辑连接器jar,实际上我解压缩了jar,然后编辑了hbase-default.xml。
<property>
<name>hbase.defaults.for.version.skip</name>
<value>true</value>
<description>Set to true to skip the 'hbase.defaults.for.version' check.
Setting this to true can be useful in contexts other than
the other side of a maven generation; i.e. running in an
IDE. You'll want to set this boolean to true to avoid
seeing the RuntimeException complaint: "hbase-default.xml file
seems to be for and old version of HBase (\${hbase.version}), this
version is X.X.X-SNAPSHOT"</description>
</property>
最后,在flink文件夹中移动jar (它比:
table_env.get_config().get_configuration().set_string("pipeline.jars","file:///home/hadoop/hbase/conf/flink-sql-connector-hbase-2.2_2.11-1.14.4.jar")
)
发布于 2022-10-24 09:16:52
你不能混合和匹配来自不同Flink版本的罐子。
如果您使用的是Flink 1.14,则需要使用通过https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/hbase/
提供的JAR
您所指的屏幕截图提到支持HBase,所以我不明白为什么您提到Flink 1.14不支持HBase。
https://stackoverflow.com/questions/74156056
复制相似问题