在本指南中,我们将深入探讨构建强大的数据管道,用 Kafka 进行数据流处理、Spark 进行处理、Airflow 进行编排、Docker 进行容器化、S3 进行存储,Python 作为主要脚本语言。
为了说明这个过程,我们将使用 Random Name API,这是一个多功能工具,每次触发都会生成新的随机数据。它提供了许多企业日常处理实时数据的实用表示。我们第一步涉及一个 Python 脚本,该脚本经过精心设计,用于从该 API 获取数据。为了模拟数据的流式传输性质,我们将定期执行此脚本。这个脚本还将充当我们与 Kafka 的桥梁,将获取的数据直接写入 Kafka 主题。
随着我们的深入,Airflow 的有向无环图 (DAG) 发挥着关键作用。Airflow DAG 脚本编排我们的流程,确保我们的 Python 脚本像时钟一样运行,持续流式传输数据并将其输入到我们的管道中。一旦我们的数据到达 Kafka producer,Spark Structured Streaming 就会接过接力棒。使用这些数据,对其进行处理,然后将修改后的数据无缝写入 S3,确保其为后续分析过程做好准备。
项目的一个重要方面是其模块化架构。得益于 Docker 容器,每个服务,无论是 Kafka、Spark 还是 Airflow,都在隔离的环境中运行。不仅确保了平滑的互操作性,还简化了可扩展性和调试。
对于这个项目,我们利用GitHub存储库来托管我们的整个设置,使任何人都可以轻松开始。
A、Docker:Docker 将成为我们编排和运行各种服务的主要工具。
B、S3:AWS S3 是我们数据存储的首选。
C、设置项目:
git clone <https://github.com/simardeep1792/Data-Engineering-Streaming-Project.git>
cd Data-Engineering-Streaming-Project
使用以下方式部署服务docker-compose:在项目目录中,您将找到一个
docker-compose.yml文件。该文件描述了所有服务。
docker network create docker_streaming
docker-compose -f docker-compose.yml up -d
该命令协调 Docker 容器中所有必要服务的启动,例如 Kafka、Spark、Airflow 等。
docker-compose.yml
version: '3.7'
services:
# Airflow PostgreSQL Database
airflow_db:
image: postgres:16.0
environment:
- POSTGRES_USER=${POSTGRES_USER}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
- POSTGRES_DB=${POSTGRES_DB}
logging:
options:
max-size: 10m
max-file: "3"
# Apache Airflow Webserver
airflow_webserver:
command: bash -c "airflow db init && airflow webserver && airflow users create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin"
image: apache/airflow:latest
restart: always
depends_on:
- airflow_db
environment:
- LOAD_EX=${LOAD_EX}
- EXECUTOR=${EXECUTOR}
- AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://${POSTGRES_USER}:${POSTGRES_PASSWORD}@airflow_db:5432/${POSTGRES_DB}
logging:
options:
max-size: 10m
max-file: "3"
volumes:
- ./dags:/opt/airflow/dags
- ./requirements.txt:/opt/airflow/requirements.txt
ports:
- "8080:8080"
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
# Zookeeper for Kafka
kafka_zookeeper:
image: confluentinc/cp-zookeeper:latest
ports:
- "2181:2181"
environment:
- ZOOKEEPER_CLIENT_PORT=${ZOOKEEPER_CLIENT_PORT}
- ZOOKEEPER_SERVER_ID=${ZOOKEEPER_SERVER_ID}
- ZOOKEEPER_SERVERS=kafka_zookeeper:2888:3888
networks:
- kafka_network
- default
# Kafka Broker Instances
kafka_broker_1:
extends:
service: kafka_base
environment:
- KAFKA_BROKER_ID=1
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka_broker_1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
kafka_broker_2:
extends:
service: kafka_base
environment:
- KAFKA_BROKER_ID=2
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka_broker_2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093
kafka_broker_3:
extends:
service: kafka_base
environment:
- KAFKA_BROKER_ID=3
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka_broker_3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094
kafka_base:
image: confluentinc/cp-kafka:latest
environment:
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=${KAFKA_LISTENER_SECURITY_PROTOCOL_MAP}
- KAFKA_INTER_BROKER_LISTENER_NAME=${KAFKA_INTER_BROKER_LISTENER_NAME}
- KAFKA_ZOOKEEPER_CONNECT=kafka_zookeeper:2181
- KAFKA_LOG4J_LOGGERS=${KAFKA_LOG4J_LOGGERS}
- KAFKA_AUTHORIZER_CLASS_NAME=${KAFKA_AUTHORIZER_CLASS_NAME}
- KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND=${KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND}
networks:
- kafka_network
- default
# Kafka Connect
kafka_connect:
image: confluentinc/cp-kafka-connect:latest
ports:
- "8083:8083"
environment:
- CONNECT_BOOTSTRAP_SERVERS=${CONNECT_BOOTSTRAP_SERVERS}
- CONNECT_REST_PORT=${CONNECT_REST_PORT}
- CONNECT_GROUP_ID=${CONNECT_GROUP_ID}
- CONNECT_CONFIG_STORAGE_TOPIC=${CONNECT_CONFIG_STORAGE_TOPIC}
- CONNECT_OFFSET_STORAGE_TOPIC=${CONNECT_OFFSET_STORAGE_TOPIC}
- CONNECT_STATUS_STORAGE_TOPIC=${CONNECT_STATUS_STORAGE_TOPIC}
- CONNECT_KEY_CONVERTER=${CONNECT_KEY_CONVERTER}
- CONNECT_VALUE_CONVERTER=${CONNECT_VALUE_CONVERTER}
- CONNECT_INTERNAL_KEY_CONVERTER=${CONNECT_INTERNAL_KEY_CONVERTER}
- CONNECT_INTERNAL_VALUE_CONVERTER=${CONNECT_INTERNAL_VALUE_CONVERTER}
- CONNECT_REST_ADVERTISED_HOST_NAME=${CONNECT_REST_ADVERTISED_HOST_NAME}
- CONNECT_LOG4J_ROOT_LOGLEVEL=${CONNECT_LOG4J_ROOT_LOGLEVEL}
- CONNECT_LOG4J_LOGGERS=${CONNECT_LOG4J_LOGGERS}
- CONNECT_PLUGIN_PATH=${CONNECT_PLUGIN_PATH}
networks:
- kafka_network
- default
# Kafka Schema Registry
kafka_schema_registry:
image: confluentinc/cp-schema-registry:latest
ports:
- "8081:8081"
environment:
- SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=${SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS}
- SCHEMA_REGISTRY_HOST_NAME=${SCHEMA_REGISTRY_HOST_NAME}
- SCHEMA_REGISTRY_LISTENERS=${SCHEMA_REGISTRY_LISTENERS}
networks:
- kafka_network
- default
# Kafka User Interface
kafka_ui:
container_name: kafka-ui-1
image: provectuslabs/kafka-ui:latest
ports:
- 8888:8080
depends_on:
- kafka_broker_1
- kafka_broker_2
- kafka_broker_3
- kafka_schema_registry
- kafka_connect
environment:
- KAFKA_CLUSTERS_0_NAME=${KAFKA_CLUSTERS_0_NAME}
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=${KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS}
- KAFKA_CLUSTERS_0_SCHEMAREGISTRY=${KAFKA_CLUSTERS_0_SCHEMAREGISTRY}
- KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME=${KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME}
- KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS=${KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS}
- DYNAMIC_CONFIG_ENABLED=${DYNAMIC_CONFIG_ENABLED}
networks:
- kafka_network
- default
# Apache Spark Master Node
spark_master:
image: bitnami/spark:3
container_name: spark_master
ports:
- 8085:8080
environment:
- SPARK_UI_PORT=${SPARK_UI_PORT}
- SPARK_MODE=${SPARK_MODE}
- SPARK_RPC_AUTHENTICATION_ENABLED=${SPARK_RPC_AUTHENTICATION_ENABLED}
- SPARK_RPC_ENCRYPTION_ENABLED=${SPARK_RPC_ENCRYPTION_ENABLED}
volumes:
- ./:/home
- spark_data:/opt/bitnami/spark/data
networks:
- default
- kafka_network
#volumes for data
volumes:
spark_data:
#network for Kafka
networks:
kafka_network:
driver: bridge
default:
external:
name: docker_streaming
项目设置的核心在于文件 docker-compose.yml 。它协调我们的服务,确保顺畅的通信和初始化。这是一个细分:
1)版本
使用 Docker Compose 文件格式版本“3.7”,确保与服务兼容。
2)服务
项目包含多项服务:
3)卷
利用持久卷spark_data来确保 Spark 的数据一致性。
4)网络
服务有两个网络:
kafka_stream_dag.py
# Importing required modules
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from kafka_streaming_service import initiate_stream
# Configuration for the DAG's start date
DAG_START_DATE = datetime(2018, 12, 21, 12, 12)
# Default arguments for the DAG
DAG_DEFAULT_ARGS = {
'owner': 'airflow',
'start_date': DAG_START_DATE,
'retries': 1,
'retry_delay': timedelta(seconds=5)
}
# Creating the DAG with its configuration
with DAG(
'name_stream_dag', # Renamed for uniqueness
default_args=DAG_DEFAULT_ARGS,
schedule_interval='0 1 * * *',
catchup=False,
description='Stream random names to Kafka topic',
max_active_runs=1
) as dag:
# Defining the data streaming task using PythonOperator
kafka_stream_task = PythonOperator(
task_id='stream_to_kafka_task',
python_callable=initiate_stream,
dag=dag
)
kafka_stream_task
该文件主要定义了一个Airflow Directed Acyclic Graph(DAG),用于处理数据流到Kafka主题。
1)进口
导入基本模块和函数,特别是 Airflow DAG 和 PythonOperator,以及initiate_stream来自kafka_streaming_service.
2)配置
3)DAG定义
将创建一个名为 的新 DAG name_stream_dag,配置为每天凌晨 1 点运行。它的设计目的是不运行任何错过的间隔(带有catchup=False),并且一次只允许一次活动运行。
4)任务
单个任务 kafka_stream_task 是使用 PythonOperator 定义的。此任务调用该initiate_stream函数,在 DAG 运行时有效地将数据流式传输到 Kafka。
kafka_streaming_service.py
# Importing necessary libraries and modules
import requests
import json
import time
import hashlib
from confluent_kafka import Producer
# Constants and configuration
API_ENDPOINT = "https://randomuser.me/api/?results=1"
KAFKA_BOOTSTRAP_SERVERS = ['kafka_broker_1:19092','kafka_broker_2:19093','kafka_broker_3:19094']
KAFKA_TOPIC = "names_topic"
PAUSE_INTERVAL = 10
STREAMING_DURATION = 120
def retrieve_user_data(url=API_ENDPOINT) -> dict:
"""Fetches random user data from the provided API endpoint."""
response = requests.get(url)
return response.json()["results"][0]
def transform_user_data(data: dict) -> dict:
"""Formats the fetched user data for Kafka streaming."""
return {
"name": f"{data['name']['title']}. {data['name']['first']} {data['name']['last']}",
"gender": data["gender"],
"address": f"{data['location']['street']['number']}, {data['location']['street']['name']}",
"city": data['location']['city'],
"nation": data['location']['country'],
"zip": encrypt_zip(data['location']['postcode']),
"latitude": float(data['location']['coordinates']['latitude']),
"longitude": float(data['location']['coordinates']['longitude']),
"email": data["email"]
}
def encrypt_zip(zip_code):
"""Hashes the zip code using MD5 and returns its integer representation."""
zip_str = str(zip_code)
return int(hashlib.md5(zip_str.encode()).hexdigest(), 16)
def configure_kafka(servers=KAFKA_BOOTSTRAP_SERVERS):
"""Creates and returns a Kafka producer instance."""
settings = {
'bootstrap.servers': ','.join(servers),
'client.id': 'producer_instance'
}
return Producer(settings)
def publish_to_kafka(producer, topic, data):
"""Sends data to a Kafka topic."""
producer.produce(topic, value=json.dumps(data).encode('utf-8'), callback=delivery_status)
producer.flush()
def delivery_status(err, msg):
"""Reports the delivery status of the message to Kafka."""
if err is not None:
print('Message delivery failed:', err)
else:
print('Message delivered to', msg.topic(), '[Partition: {}]'.format(msg.partition()))
def initiate_stream():
"""Initiates the process to stream user data to Kafka."""
kafka_producer = configure_kafka()
for _ in range(STREAMING_DURATION // PAUSE_INTERVAL):
raw_data = retrieve_user_data()
kafka_formatted_data = transform_user_data(raw_data)
publish_to_kafka(kafka_producer, KAFKA_TOPIC, kafka_formatted_data)
time.sleep(PAUSE_INTERVAL)
if __name__ == "__main__":
initiate_stream()
1)导入和配置
导入基本库并设置常量,例如 API 端点、Kafka 引导服务器、主题名称和流间隔详细信息。
2)用户数据检索
该retrieve_user_data函数从指定的 API 端点获取随机用户详细信息。
3)数据转换
该 transform_user_data 函数格式化用于 Kafka 流的原始用户数据,同时 encrypt_zip 对邮政编码进行哈希处理以维护用户隐私。
4)Kafka 配置与发布
5)主要流功能
initiate_stream 协调整个流程,定期检索、转换用户数据并将其发布到 Kafka。
6)执行
当直接运行脚本时,initiate_stream 将执行该函数,并在指定的持续时间内流式传输数据 STREAMING_DURATION。
spark_processing.py
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
# Initialize logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s:%(funcName)s:%(levelname)s:%(message)s')
logger = logging.getLogger("spark_structured_streaming")
def initialize_spark_session(app_name, access_key, secret_key):
"""
Initialize the Spark Session with provided configurations.
:param app_name: Name of the spark application.
:param access_key: Access key for S3.
:param secret_key: Secret key for S3.
:return: Spark session object or None if there's an error.
"""
try:
spark = SparkSession \
.builder \
.appName(app_name) \
.config("spark.hadoop.fs.s3a.access.key", access_key) \
.config("spark.hadoop.fs.s3a.secret.key", secret_key) \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
logger.info('Spark session initialized successfully')
return spark
except Exception as e:
logger.error(f"Spark session initialization failed. Error: {e}")
return None
def get_streaming_dataframe(spark, brokers, topic):
"""
Get a streaming dataframe from Kafka.
:param spark: Initialized Spark session.
:param brokers: Comma-separated list of Kafka brokers.
:param topic: Kafka topic to subscribe to.
:return: Dataframe object or None if there's an error.
"""
try:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", brokers) \
.option("subscribe", topic) \
.option("delimiter", ",") \
.option("startingOffsets", "earliest") \
.load()
logger.info("Streaming dataframe fetched successfully")
return df
except Exception as e:
logger.warning(f"Failed to fetch streaming dataframe. Error: {e}")
return None
def transform_streaming_data(df):
"""
Transform the initial dataframe to get the final structure.
:param df: Initial dataframe with raw data.
:return: Transformed dataframe.
"""
schema = StructType([
StructField("full_name", StringType(), False),
StructField("gender", StringType(), False),
StructField("location", StringType(), False),
StructField("city", StringType(), False),
StructField("country", StringType(), False),
StructField("postcode", IntegerType(), False),
StructField("latitude", FloatType(), False),
StructField("longitude", FloatType(), False),
StructField("email", StringType(), False)
])
transformed_df = df.selectExpr("CAST(value AS STRING)") \
.select(from_json(col("value"), schema).alias("data")) \
.select("data.*")
return transformed_df
def initiate_streaming_to_bucket(df, path, checkpoint_location):
"""
Start streaming the transformed data to the specified S3 bucket in parquet format.
:param df: Transformed dataframe.
:param path: S3 bucket path.
:param checkpoint_location: Checkpoint location for streaming.
:return: None
"""
logger.info("Initiating streaming process...")
stream_query = (df.writeStream
.format("parquet")
.outputMode("append")
.option("path", path)
.option("checkpointLocation", checkpoint_location)
.start())
stream_query.awaitTermination()
def main():
app_name = "SparkStructuredStreamingToS3"
access_key = "ENTER_YOUR_ACCESS_KEY"
secret_key = "ENTER_YOUR_SECRET_KEY"
brokers = "kafka_broker_1:19092,kafka_broker_2:19093,kafka_broker_3:19094"
topic = "names_topic"
path = "BUCKET_PATH"
checkpoint_location = "CHECKPOINT_LOCATION"
spark = initialize_spark_session(app_name, access_key, secret_key)
if spark:
df = get_streaming_dataframe(spark, brokers, topic)
if df:
transformed_df = transform_streaming_data(df)
initiate_streaming_to_bucket(transformed_df, path, checkpoint_location)
# Execute the main function if this script is run as the main module
if __name__ == '__main__':
main()
1. 导入和日志初始化
导入必要的库,并创建日志记录设置以更好地调试和监控。
2. Spark会话初始化
initialize_spark_session:此函数使用从 S3 访问数据所需的配置来设置 Spark 会话。
3. 数据检索与转换
4. 流式传输到 S3
initiate_streaming_to_bucket:此函数将转换后的数据以 parquet 格式流式传输到 S3 存储桶。它使用检查点机制来确保流式传输期间数据的完整性。
5. 主执行
该 main 函数协调整个过程:初始化 Spark 会话、从 Kafka 获取数据、转换数据并将其流式传输到 S3。
6. 脚本执行
如果脚本是正在运行的主模块,它将执行该 main 函数,启动整个流处理过程。
使用以下命令启动 Kafka 集群:
docker network create docker_streaming
docker-compose -f docker-compose.yml up -d
2. 为 Kafka 创建主题(http://localhost:8888/)
创建具有管理员权限的 Airflow 用户:
docker-compose run airflow_webserver airflow users create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin
我们应该将脚本移动kafka_stream_dag.py到文件夹下以便能够运行 DAG 使用提供的脚本访问 Airflow bash 并安装所需的软件包:kafka_streaming_service.py dags
./airflow.sh bash
pip install -r ./requirements.txt
确保您的 DAG 没有错误:
airflow dags list
要启动 DAG,请运行调度程序:
airflow scheduler
将 Spark 脚本复制到 Docker 容器中:
docker cp spark_processing.py spark_master:/opt/bitnami/spark/
访问 Spark bash,导航到jars目录并下载必要的 JAR 文件。下载后,提交Spark作业:
docker exec -it spark_master /bin/bash
cd jars
curl -O <https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.8.1/kafka-clients-2.8.1.jar>
curl -O <https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.13/3.3.0/spark-sql-kafka-0-10_2.13-3.3.0.jar>
curl -O <https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.0/hadoop-aws-3.2.0.jar>
curl -O <https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.11.375/aws-java-sdk-s3-1.11.375.jar>
curl -O <https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.8.0/commons-pool2-2.8.0.jar>
cd ..
spark-submit \\
--master local[2] \\
--jars /opt/bitnami/spark/jars/kafka-clients-2.8.1.jar,\\
/opt/bitnami/spark/jars/spark-sql-kafka-0-10_2.13-3.3.0.jar,\\
/opt/bitnami/spark/jars/hadoop-aws-3.2.0.jar,\\
/opt/bitnami/spark/jars/aws-java-sdk-s3-1.11.375.jar,\\
/opt/bitnami/spark/jars/commons-pool2-2.8.0.jar \\
spark_processing.py
10. 验证S3上的数据
执行这些步骤后,检查您的 S3 存储桶以确保数据已上传
在整个旅程中,我们深入研究了现实世界数据工程的复杂性,从原始的未经处理的数据发展到可操作的见解。从收集随机用户数据开始,我们利用 Kafka、Spark 和 Airflow 的功能来管理、处理和自动化这些数据的流式传输。Docker 简化了部署,确保了环境的一致性,而 S3 和 Python 等其他工具发挥了关键作用。
这项努力不仅仅是建造一条管道,而是理解工具之间的协同作用。我鼓励大家进一步尝试、调整和增强此流程,以满足独特的需求并发现更深刻的见解。潜心、探索、创新!
原文作者:Simardeep Singh