
目标读者:具备基础后端开发经验,希望深入理解架构落地细节的中高级开发者。

我们构建一个简易电商系统,包含用户注册、商品浏览和下单功能。
# app.py (Monolith)
from flask import Flask, request, jsonify
import sqlite3
app = Flask(__name__)
def get_db():
conn = sqlite3.connect('shop.db')
conn.row_factory = sqlite3.Row
return conn
@app.route('/users', methods=['POST'])
def create_user():
data = request.json
db = get_db()
db.execute("INSERT INTO users (name, email) VALUES (?, ?)",
(data['name'], data['email']))
db.commit()
return jsonify({"status": "created"}), 201
@app.route('/products')
def list_products():
db = get_db()
products = db.execute("SELECT * FROM products").fetchall()
return jsonify([dict(p) for p in products])
@app.route('/orders', methods=['POST'])
def create_order():
data = request.json
db = get_db()
# 原子性:下单时扣库存(简化版)
db.execute("UPDATE products SET stock = stock - 1 WHERE id = ?", (data['product_id'],))
db.execute("INSERT INTO orders (user_id, product_id) VALUES (?, ?)",
(data['user_id'], data['product_id']))
db.commit()
return jsonify({"status": "ordered"}), 201
if __name__ == '__main__':
app.run(debug=True)commit() 保证 ACID。python app.py 即可启动。create_order 逻辑变复杂(如加优惠券、风控),函数将膨胀。✅ 适合场景:内部工具、MVP 验证。
我们将上述单体拆分为两个服务:
product.proto)syntax = "proto3";
package product;
service ProductService {
rpc GetStock(GetStockRequest) returns (GetStockResponse);
rpc DeductStock(DeductStockRequest) returns (DeductStockResponse);
}
message GetStockRequest { int32 product_id = 1; }
message GetStockResponse { int32 stock = 1; }
message DeductStockRequest { int32 product_id = 1; }
message DeductStockResponse { bool success = 1; }📌 为什么用 gRPC? 相比 REST,gRPC 基于 HTTP/2 + Protobuf,性能更高、类型安全、支持双向流,适合内部服务通信。
# order_service.py
from fastapi import FastAPI, HTTPException
import grpc
from product_pb2 import DeductStockRequest
from product_pb2_grpc import ProductServiceStub
app = FastAPI()
@app.post("/orders")
def create_order(user_id: int, product_id: int):
# 1. 调用 Product Service 扣库存
with grpc.insecure_channel('product-service:50051') as channel:
stub = ProductServiceStub(channel)
resp = stub.DeductStock(DeductStockRequest(product_id=product_id))
if not resp.success:
raise HTTPException(status_code=400, detail="Insufficient stock")
# 2. 本地记录订单(简化,实际应异步或事务补偿)
# 此处省略数据库操作
return {"order_id": "123", "status": "created"}# product_service.py
import grpc
from concurrent import futures
from product_pb2 import DeductStockResponse
from product_pb2_grpc import ProductServiceServicer, add_ProductServiceServicer_to_server
import sqlite3
class ProductServicer(ProductServiceServicer):
def DeductStock(self, request, context):
conn = sqlite3.connect('product.db')
cur = conn.cursor()
cur.execute("SELECT stock FROM products WHERE id = ?", (request.product_id,))
row = cur.fetchone()
if not row or row[0] <= 0:
return DeductStockResponse(success=False)
cur.execute("UPDATE products SET stock = stock - 1 WHERE id = ?", (request.product_id,))
conn.commit()
return DeductStockResponse(success=True)
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
add_ProductServiceServicer_to_server(ProductServicer(), server)
server.add_insecure_port('[::]:50051')
server.start()
server.wait_for_termination()维度 | 单体 | 微服务 |
|---|---|---|
部署单元 | 1 个 | 2+ 个独立进程 |
数据库 | 共享 | 每服务私有 DB(Product DB / Order DB) |
通信 | 函数调用 | gRPC(网络调用) |
事务 | 本地 ACID | 需 Saga 或事件最终一致 |
⚠️ 挑战:
create_order中若扣库存成功但本地写订单失败,数据不一致。需引入 Saga 模式 或 消息队列补偿。
# Dockerfile for order-service
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY order_service.py .
CMD ["uvicorn", "order_service:app", "--host", "0.0.0.0", "--port", "8000"]# templates/order-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: order-service
spec:
replicas: 3
selector:
matchLabels:
app: order-service
template:
metadata:
labels:
app: order-service
spec:
containers:
- name: order
image: my-registry/order-service:v1
ports:
- containerPort: 8000
env:
- name: PRODUCT_SERVICE_ADDR
value: "product-service:50051"
---
apiVersion: v1
kind: Service
metadata:
name: order-service
spec:
selector:
app: order-service
ports:
- protocol: TCP
port: 8000
targetPort: 8000启用自动 mTLS 和熔断:
# destination-rule.yaml
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: product-service
spec:
host: product-service
trafficPolicy:
connectionPool:
tcp: { maxConnections: 100 }
http: { http1MaxPendingRequests: 10 }
outlierDetection:
consecutive5xxErrors: 5
interval: 30s
baseEjectionTime: 30s✅ 收益:
# order_service_with_metrics.py
from opentelemetry import metrics
from opentelemetry.exporter.prometheus import PrometheusMetricReader
from opentelemetry.sdk.metrics import MeterProvider
from prometheus_client import start_http_server
# 初始化 OpenTelemetry
reader = PrometheusMetricReader()
metrics.set_meter_provider(MeterProvider(metric_readers=[reader]))
meter = metrics.get_meter("order.meter")
# 创建计数器
order_counter = meter.create_counter(
"orders_created_total",
description="Total number of orders created"
)
@app.post("/orders")
def create_order(...):
# ... 扣库存逻辑 ...
order_counter.add(1, {"product_id": str(product_id)})
return {"status": "created"}部署后,访问 http://order-service:8000/metrics 可见:
# HELP orders_created_total Total number of orders created
# TYPE orders_created_total counter
orders_created_total{product_id="101"} 5.0Prometheus 抓取该端点,Grafana 展示实时订单趋势。
src/user/, src/order/)。从单体到云原生,每一步都伴随着复杂性的转移——从代码复杂性转向运维复杂性,再通过平台工程将其收敛。真正的工程智慧,在于:
用自动化对抗复杂性,用抽象隔离变化,用可观测性照亮黑盒。
作为开发者,掌握这些代码背后的架构思维,才能在技术浪潮中稳健前行。
附:完整示例代码仓库结构建议
ecommerce-arch-demo/
├── monolith/
│ └── app.py
├── microservices/
│ ├── user-service/
│ ├── order-service/
│ └── product-service/
├── k8s/
│ ├── order-deployment.yaml
│ └── istio-rules/
└── observability/
└── otel-config.yaml