我试图在数据流作业运行时以编程方式找出它的状态。与此相同,我希望轮询批/流作业,一旦它完成,就会触发下一个作业。是否有一个Java/Scala可以用来完成这一任务。我已经尝试过使用com.google.api.services.dataflow.Dataflow$Projects$Locations$Jobs并能够检索JobMetrics,但我正在尝试获取更多的信息,比如作业输入和输出指标,以确定流作业是否处理了所有数据。
根据后续问题编辑。这些是我试图从数据流作业运行时得到的信号:
running、completed或failed的情况下。基于这些,我可以分别决定是等待、触发下一个作业还是不继续下一个作业。在手动检查流作业时,我们得出结论,当我们看到吞吐量降到零并且几分钟内没有改变时,任务就“完成”了。在自动化这种方法的同时,也想做同样的事情。
有什么可以帮我的吗?
发布于 2022-01-14 05:37:49
抱歉,我不懂Java,但是关于您需要的细节,我可以在Dataflow API中指向哪个端点来获取它们。我的示例是通过使用curl向Dataflow API发送HTTP请求来完成的。
- Using [Jobs endpoint](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs), you can submit a request and it should return the `currentState`. See [currentState](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#jobstate) to view all states available for the job. This includes JOB\_STATE\_DONE,JOB\_STATE\_RUNNING, JOB\_STATE\_FAILED, etc.
- This can be used for both batch and streaming.
- In Java if I'm not mistaken you can use [Job class and method getCurrentState](https://developers.google.com/resources/api-libraries/documentation/dataflow/v1b3/java/latest/com/google/api/services/dataflow/model/Job.html#getCurrentState--)
- Example: 
- You mentioned that you already retrieved JobMetrics.
- [JobMetrics endpoint](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/JobMetrics) returns lists of [MetricStructuredName objects](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/MetricUpdate). These objects contain the source that generated the metric. You can find your metrics that can be seen on "Steps".
- For example if you have a `ReadFromPubSub` step, you can find the the number of messages that it read. See screenshot below on what the object looks like and what it looks like on the UI.
- You just need to filter out the data properly for you to see the details you need.
- In Java it should be from [JobMetrics class and method getMetrics](https://developers.google.com/resources/api-libraries/documentation/dataflow/v1b3/java/latest/com/google/api/services/dataflow/model/JobMetrics.html#getMetrics--)
- Example (`scalar` contains the value of **"Elements Added"** values are not the same because it is continuously streaming):
- 
- Data freshness cannot be pulled using the Dataflow API.
- The data is created using Cloud Monitoring and if you need this data you need to learn Monitoring API. See [sample codes](https://cloud.google.com/monitoring/docs/samples/monitoring-read-timeseries-simple#monitoring_read_timeseries_simple-java) on how to use the API.
- To view the created metric go to GCP Console > Monitoring > Dashboards > GCP > Dataflow
- You should see a list of the Dataflow jobs and their metrics.
- 
我希望这些信息能为您指明正确的方向,并能够使用Java实现它。
https://stackoverflow.com/questions/70690096
复制相似问题