要将上传到容器中单独文件夹的Blob读取到流分析作业中,通常涉及以下几个基础概念和技术步骤:
from azure.storage.blob import BlobServiceClient
from azure.streamanalytics import StreamAnalyticsClient
# 配置Blob存储连接字符串
blob_connection_string = "DefaultEndpointsProtocol=https;AccountName=<your-account-name>;AccountKey=<your-account-key>;EndpointSuffix=core.windows.net"
# 配置流分析作业连接字符串
stream_analytics_connection_string = "Endpoint=https://<your-stream-analytics-job>.streaming.azure.com;SharedAccessSignature=<your-sas-token>"
# 创建Blob服务客户端
blob_service_client = BlobServiceClient.from_connection_string(blob_connection_string)
# 创建流分析客户端
stream_analytics_client = StreamAnalyticsClient.from_connection_string(stream_analytics_connection_string)
# 定义输入源
input_source = {
"name": "BlobInput",
"type": "Microsoft.Storage/Blobs",
"properties": {
"storageAccounts": [
{
"accountName": "<your-account-name>",
"accountKey": "<your-account-key>"
}
],
"container": "<your-container-name>",
"pathPattern": "<your-folder-name>/{date}/{time}/",
"dateFormat": "yyyy/MM/dd",
"timeFormat": "HH"
}
}
# 添加输入源到流分析作业
stream_analytics_client.inputs.create_or_update("<your-job-name>", input_source)
# 定义输出目标
output_target = {
"name": "OutputTarget",
"type": "Microsoft.ServiceBus/Queues",
"properties": {
"serviceBusNamespace": "<your-service-bus-namespace>",
"sharedAccessPolicyName": "<your-policy-name>",
"sharedAccessPolicyKey": "<your-policy-key>",
"queueName": "<your-queue-name>"
}
}
# 添加输出目标到流分析作业
stream_analytics_client.outputs.create_or_update("<your-job-name>", output_target)
# 启动流分析作业
stream_analytics_client.jobs.start("<your-job-name>")
pathPattern
可能没有正确匹配到Blob文件。pathPattern
是否正确,并确保Blob文件的路径符合预期格式。通过以上步骤和方法,你应该能够成功地将上传到容器中单独文件夹的Blob读取到流分析作业中。
领取专属 10元无门槛券
手把手带您无忧上云