我试图使用flink 1.15.1编写一个简单的表API S3流接收器(csv格式),并面临以下例外:
Caused by: org.apache.flink.util.SerializedThrowable: S3RecoverableFsDataOutputStream cannot sync state to S3. Use persist() to create a persistent recoverable intermediate point.
at org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream
在eclipse中尝试执行基本的Flink程序时,由于.print()被datastream_name.print()调用来打印我的数据,我得到了错误。
使用Java8
错误
Exception in thread "main" java.lang.reflect.InaccessibleObjectException: Unable to make field private final byte[] java.lang.String.value accessible: module java.base does not "opens java.lang" t
我曾计划用python statefun示例(链接:https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-python-greeter-example)来交流嵌入式有状态函数 但是(总会有一个但是)我一直在努力构建python发行版。我想我的配置是正确的。 This was the error which I tried to described 我的pip版本:19.2.3 我的python版本:3.7.5 顺便说一句,python sdk安装程序说它; setup(
name
我读取了一个csv文件。我想使用一个长类型的列来翻滚。我使用UDF传输长类型到时间戳类型,但是无法工作错误消息:窗口只能在时间属性列上定义。
我试着调试。TimeIndicatorRelDataType不是时间戳,我不知道怎么转,为什么?
def isTimeIndicatorType(relDataType: RelDataType): Boolean = relDataType match {
case ti: TimeIndicatorRelDataType => true
case _ => false
}
码
public static void mai
我有一个工作正常的嵌入式作业,我想部署更多的同地作业。这些新增的工作将收到来自旧工作的信息,并将其发送到kafka主题。
代码如下
@AutoService(StatefulFunctionModule.class)
public final class CoLocatedModule implements StatefulFunctionModule {
@Override
public void configure(Map<String, String> globalConfiguration, Binder binder) {
FunctionProvide
我们感兴趣的是从连接到常规的Flink流应用程序,最好使用Table。这样做的想法是查阅Statefun在Flink中注册的表,是否有此可能,如何做才是正确的方法?
到目前为止,我的想法是在一些主函数中初始化我的表流,并注册一个有状态函数提供程序来连接到该表:
@AutoService(StatefulFunctionModule.class)
public class Module implements StatefulFunctionModule {
@Override
public void configure(Map<String, String> globalC