CDN (Content Delivery Network) 即内容分发网络,是一种通过在全球范围内部署多个服务器节点,将内容缓存到离用户最近的位置,从而加速内容访问的技术。CDN 的核心目标是解决互联网中的 "最后一公里" 问题,减少用户与源服务器之间的物理距离,降低延迟。
一个典型的 CDN 网络包含以下核心组件:
源服务器 (Origin Server):存储原始内容的服务器,如网站的 Web 服务器、文件服务器等
边缘节点 (Edge Node):分布在全球各地的缓存服务器,直接面向用户提供内容服务
负载均衡系统 (Load Balancer):根据用户地理位置、网络状况等因素,将用户请求导向最优的边缘节点
内容分发系统 (Distribution System):负责将源服务器的内容同步到各个边缘节点
CDN 的基本工作流程如下:
下面模拟 CDN 边缘节点的工作原理:
import time
from typing import Dict, Optional
class CDNNode:
def __init__(self, origin_server: str):
# 源服务器地址
self.origin_server = origin_server
# 缓存存储,键为资源路径,值为(内容, 过期时间)
self.cache: Dict[str, (bytes, float)] = {}
# 缓存有效期(秒)
self.cache_ttl = 3600
def fetch_content(self, path: str) -> bytes:
"""获取资源内容,优先从缓存中获取,否则从源服务器获取"""
# 检查缓存
cached_content = self._check_cache(path)
if cached_content:
print(f"从缓存中获取资源: {path}")
return cached_content
# 从源服务器获取
print(f"从源服务器获取资源: {self.origin_server}{path}")
content = self._fetch_from_origin(path)
# 缓存内容
self._cache_content(path, content)
return content
def _check_cache(self, path: str) -> Optional[bytes]:
"""检查缓存中是否有有效的资源"""
if path in self.cache:
content, expire_time = self.cache[path]
if time.time() < expire_time:
return content
else:
# 缓存已过期,删除
del self.cache[path]
return None
def _fetch_from_origin(self, path: str) -> bytes:
"""从源服务器获取资源"""
# 这里应该是实际的HTTP请求,简化为模拟
# 实际应用中可以使用requests库
return f"这是来自源服务器的内容: {path}".encode()
def _cache_content(self, path: str, content: bytes):
"""缓存资源内容"""
expire_time = time.time() + self.cache_ttl
self.cache[path] = (content, expire_time)
print(f"已缓存资源: {path},有效期至: {time.ctime(expire_time)}")
# 使用示例
cdn_node = CDNNode("https://example.com")
# 第一次请求,从源服务器获取
content1 = cdn_node.fetch_content("/index.html")
print(content1.decode())
# 第二次请求,从缓存获取
content2 = cdn_node.fetch_content("/index.html")
print(content2.decode())
CDN 的主要优势包括:
加速内容访问:显著降低用户访问延迟,提升网站性能
减轻源服务器负载:大量请求由 CDN 边缘节点处理,减少源服务器压力
提高可用性:CDN 节点分布在多个地理位置,具有冗余性,可应对单点故障
增强安全性:CDN 可提供 DDoS 防护、Web 应用防火墙等安全功能
节省带宽成本:通过缓存和分发,减少源服务器的带宽消耗
CDN 的典型应用场景包括:
静态资源加速:如 HTML、CSS、JavaScript、图片、视频等
流媒体服务:如在线视频、直播等
大文件下载:如软件更新包、游戏安装文件等
电子商务网站:加速商品图片、详情页等内容的加载
企业应用:如企业官网、办公系统等
CDN 的内容分发技术主要包括以下几种:
主动推送 (Push):源服务器主动将内容推送到 CDN 边缘节点。适用于内容更新频率较低、重要性高的场景,如电商网站的商品图片。
被动拉取 (Pull):当边缘节点收到用户请求且本地没有缓存时,才从源服务器拉取内容。适用于内容更新频繁、访问模式不确定的场景,如新闻网站。
混合模式:结合主动推送和被动拉取的优点,对热门内容进行主动推送,对冷门内容采用被动拉取。
下面演示 CDN 内容分发的主动推送和被动拉取模式:
import threading
import time
from typing import Dict, Optional
class Content:
def __init__(self, path: str, data: bytes):
self.path = path
self.data = data
self.last_updated = time.time()
class OriginServer:
def __init__(self):
self.contents: Dict[str, Content] = {}
def add_content(self, path: str, data: bytes):
"""添加或更新内容"""
self.contents[path] = Content(path, data)
print(f"源服务器添加/更新内容: {path}")
def get_content(self, path: str) -> Optional[Content]:
"""获取内容"""
return self.contents.get(path)
class CDNNode:
def __init__(self, node_id: str, origin_server: OriginServer):
self.node_id = node_id
self.origin_server = origin_server
self.cache: Dict[str, Content] = {}
def push_content(self, content: Content):
"""主动推送内容到CDN节点"""
self.cache[content.path] = content
print(f"CDN节点 {self.node_id} 接收推送内容: {content.path}")
def pull_content(self, path: str) -> Optional[Content]:
"""被动拉取内容"""
if path in self.cache:
print(f"CDN节点 {self.node_id} 缓存命中: {path}")
return self.cache[path]
# 从源服务器拉取
content = self.origin_server.get_content(path)
if content:
self.cache[path] = content
print(f"CDN节点 {self.node_id} 拉取并缓存内容: {path}")
return content
print(f"CDN节点 {self.node_id} 未找到内容: {path}")
return None
# 使用示例
origin = OriginServer()
# 创建两个CDN节点
node1 = CDNNode("node1", origin)
node2 = CDNNode("node2", origin)
# 源服务器添加内容
origin.add_content("/index.html", b"<html>Hello, World!</html>")
# 主动推送模式:源服务器将内容推送到CDN节点
node1.push_content(origin.get_content("/index.html"))
# 被动拉取模式:CDN节点根据用户请求拉取内容
node2.pull_content("/index.html")
CDN 的负载均衡技术主要分为以下几个层次:
DNS 负载均衡:通过修改域名解析结果,将用户导向不同的 CDN 节点。这是最基础的负载均衡方式。
全局负载均衡 (GSLB):基于地理位置、网络状况、节点负载等因素,动态选择最优的 CDN 节点。
本地负载均衡:在单个 CDN 节点内部,对请求进行负载均衡,分配给不同的服务器处理。
下面演示基于地理位置的全局负载均衡:
from typing import List, Dict, Tuple
class GeoLocation:
def __init__(self, latitude: float, longitude: float):
self.latitude = latitude
self.longitude = longitude
class CDNNode:
def __init__(self, node_id: str, location: GeoLocation, capacity: int):
self.node_id = node_id
self.location = location
self.capacity = capacity # 节点容量
self.current_load = 0 # 当前负载
def get_load_percentage(self) -> float:
"""获取负载百分比"""
return (self.current_load / self.capacity) * 100
def can_handle_request(self) -> bool:
"""判断节点是否可以处理新请求"""
return self.current_load < self.capacity
def handle_request(self):
"""处理请求,增加负载"""
if self.can_handle_request():
self.current_load += 1
return True
return False
class GSLB:
def __init__(self, cdn_nodes: List[CDNNode]):
self.cdn_nodes = cdn_nodes
def calculate_distance(self, loc1: GeoLocation, loc2: GeoLocation) -> float:
"""计算两个地理位置之间的距离(简化版,实际应用中可能使用更复杂的算法)"""
# 这里使用简化的欧几里得距离,实际应用中应使用球面距离公式
dx = loc1.latitude - loc2.latitude
dy = loc1.longitude - loc2.longitude
return (dx**2 + dy**2) ** 0.5
def select_best_node(self, user_location: GeoLocation) -> Optional[CDNNode]:
"""选择最优CDN节点"""
# 按距离排序
sorted_nodes = sorted(
self.cdn_nodes,
key=lambda node: self.calculate_distance(node.location, user_location)
)
# 选择距离最近且有处理能力的节点
for node in sorted_nodes:
if node.can_handle_request():
return node
return None
# 使用示例
# 创建几个CDN节点,分布在不同地理位置
cdn_nodes = [
CDNNode("node1", GeoLocation(39.9042, 116.4074), 100), # 北京
CDNNode("node2", GeoLocation(31.2304, 121.4737), 100), # 上海
CDNNode("node3", GeoLocation(22.5431, 114.0579), 100), # 深圳
CDNNode("node4", GeoLocation(34.3416, 108.9398), 100), # 西安
]
# 创建GSLB实例
gslb = GSLB(cdn_nodes)
# 模拟一个来自广州的用户请求
user_location = GeoLocation(23.1291, 113.2644)
# 选择最优节点
best_node = gslb.select_best_node(user_location)
if best_node:
print(f"为用户选择的最优CDN节点: {best_node.node_id}")
best_node.handle_request()
else:
print("没有可用的CDN节点")
CDN 的缓存策略是影响性能的关键因素之一,主要包括以下几种:
基于时间的缓存:设置固定的缓存时间(TTL),到期后重新从源服务器获取内容。
基于内容的缓存:根据内容的变化情况来决定是否缓存。例如,使用 ETag 或 Last-Modified 头信息来判断内容是否更新。
启发式缓存:当没有明确的缓存指示时,CDN 可以根据内容类型、响应状态码等信息,自行决定缓存策略。
动态内容缓存:对于动态生成的内容,如用户个性化页面,可以采用部分缓存、缓存片段等策略。
下面演示基于时间和内容的缓存策略:
import time
from typing import Dict, Optional
class CacheEntry:
def __init__(self, content: bytes, ttl: int, etag: str = None, last_modified: str = None):
self.content = content
self.ttl = ttl # 缓存时间(秒)
self.etag = etag # 内容标签
self.last_modified = last_modified # 最后修改时间
self.created_at = time.time() # 创建时间
def is_expired(self) -> bool:
"""判断缓存是否过期"""
return time.time() - self.created_at > self.ttl
class CDNCache:
def __init__(self):
self.cache: Dict[str, CacheEntry] = {}
def get(self, key: str) -> Optional[CacheEntry]:
"""获取缓存项"""
entry = self.cache.get(key)
if entry and not entry.is_expired():
return entry
# 缓存不存在或已过期
if entry:
del self.cache[key]
return None
def set(self, key: str, content: bytes, ttl: int, etag: str = None, last_modified: str = None):
"""设置缓存项"""
self.cache[key] = CacheEntry(content, ttl, etag, last_modified)
def validate_with_origin(self, key: str, origin_client) -> bool:
"""与源服务器验证缓存是否有效"""
entry = self.get(key)
if not entry:
return False
# 模拟向源服务器发送验证请求
# 实际应用中应发送带If-None-Match或If-Modified-Since头的请求
is_valid = origin_client.check_content_validity(key, entry.etag, entry.last_modified)
if not is_valid:
# 缓存无效,删除
if key in self.cache:
del self.cache[key]
return is_valid
# 模拟源服务器客户端
class OriginServerClient:
def check_content_validity(self, key: str, etag: str, last_modified: str) -> bool:
"""检查内容是否有效(模拟)"""
# 实际应用中应发送HTTP请求并检查响应状态码
# 这里简化为随机返回,模拟50%的概率内容已更新
import random
return random.random() > 0.5
# 使用示例
cdn_cache = CDNCache()
origin_client = OriginServerClient()
# 设置缓存
cdn_cache.set(
key="/index.html",
content=b"<html>Hello, World!</html>",
ttl=3600,
etag="abc123",
last_modified="Mon, 15 May 2023 12:00:00 GMT"
)
# 获取缓存
entry = cdn_cache.get("/index.html")
if entry:
print("从缓存中获取内容")
else:
print("缓存未命中,需要从源服务器获取")
# 验证缓存有效性
is_valid = cdn_cache.validate_with_origin("/index.html", origin_client)
print(f"缓存有效性: {is_valid}")
我们将搭建一个简化版的 CDN 系统,包含以下组件:
源服务器:存储原始内容的服务器
CDN 边缘节点:分布在不同地理位置的缓存服务器
负载均衡器:根据用户地理位置选择最优节点
管理控制台:用于内容管理和配置
下面是系统的整体架构图:
+----------------+ +----------------+ +----------------+
| 用户浏览器 | | 本地DNS服务器 | | CDN负载均衡器 |
+----------------+ +----------------+ +----------------+
| | |
v v v
+----------------+ +----------------+ +----------------+
| 源服务器 |<---| CDN边缘节点1 |<---| CDN边缘节点2 |
+----------------+ +----------------+ +----------------+
^ ^ ^
| | |
+----------------------+----------------------+
首先,我们实现一个简单的源服务器,用于存储和提供原始内容:
import http.server
import socketserver
import json
from urllib.parse import urlparse, parse_qs
class Content:
def __init__(self, path: str, data: bytes, content_type: str = "text/plain"):
self.path = path
self.data = data
self.content_type = content_type
self.last_modified = int(time.time())
def update(self, new_data: bytes):
self.data = new_data
self.last_modified = int(time.time())
class OriginServer:
def __init__(self):
self.contents = {}
def add_content(self, path: str, data: bytes, content_type: str = "text/plain"):
self.contents[path] = Content(path, data, content_type)
print(f"添加内容: {path}")
def get_content(self, path: str):
return self.contents.get(path)
def update_content(self, path: str, new_data: bytes):
if path in self.contents:
self.contents[path].update(new_data)
print(f"更新内容: {path}")
return True
return False
# 创建源服务器实例
origin_server = OriginServer()
# 添加一些测试内容
origin_server.add_content("/index.html", b"<html><body>Hello, World!</body></html>", "text/html")
origin_server.add_content("/style.css", b"body { font-family: Arial; }", "text/css")
origin_server.add_content("/script.js", b"console.log('Hello from CDN!');", "application/javascript")
# 定义HTTP请求处理类
class OriginRequestHandler(http.server.SimpleHTTPRequestHandler):
def do_GET(self):
path = urlparse(self.requestline.split()[1]).path
# 查找内容
content = origin_server.get_content(path)
if content:
# 设置响应头
self.send_response(200)
self.send_header("Content-Type", content.content_type)
self.send_header("Content-Length", len(content.data))
self.send_header("Last-Modified", str(content.last_modified))
self.end_headers()
# 发送响应内容
self.wfile.write(content.data)
else:
# 内容不存在
self.send_error(404, f"Content not found: {path}")
# 启动源服务器
if __name__ == "__main__":
PORT = 8000
with socketserver.TCPServer(("", PORT), OriginRequestHandler) as httpd:
print(f"源服务器运行在端口 {PORT}")
httpd.serve_forever()
import http.server
import socketserver
import requests
import time
import threading
from urllib.parse import urlparse
import json
class CacheEntry:
def __init__(self, content: bytes, headers: dict, ttl: int):
self.content = content
self.headers = headers
self.ttl = ttl
self.created_at = time.time()
def is_expired(self):
return time.time() - self.created_at > self.ttl
class CDNNode:
def __init__(self, node_id: str, origin_server: str, cache_size: int = 100):
self.node_id = node_id
self.origin_server = origin_server
self.cache = {} # 缓存字典,键为路径,值为CacheEntry
self.cache_size = cache_size # 缓存最大条目数
self.lock = threading.Lock() # 用于线程安全
def get_content(self, path: str):
"""从缓存获取内容,如果没有则从源服务器获取"""
with self.lock:
# 检查缓存
if path in self.cache:
entry = self.cache[path]
if not entry.is_expired():
print(f"[{self.node_id}] 缓存命中: {path}")
return entry.content, entry.headers
# 缓存已过期,删除
del self.cache[path]
# 缓存未命中,从源服务器获取
print(f"[{self.node_id}] 缓存未命中,从源服务器获取: {path}")
content, headers = self._fetch_from_origin(path)
# 缓存内容
ttl = self._get_ttl_from_headers(headers)
self._cache_content(path, content, headers, ttl)
return content, headers
def _fetch_from_origin(self, path: str):
"""从源服务器获取内容"""
url = f"{self.origin_server}{path}"
try:
response = requests.get(url)
if response.status_code == 200:
return response.content, response.headers
else:
print(f"[{self.node_id}] 从源服务器获取失败: {url}, 状态码: {response.status_code}")
return None, None
except Exception as e:
print(f"[{self.node_id}] 从源服务器获取异常: {e}")
return None, None
def _cache_content(self, path: str, content: bytes, headers: dict, ttl: int):
"""缓存内容"""
if content and headers:
# 如果缓存已满,移除最旧的条目
if len(self.cache) >= self.cache_size:
oldest_entry = min(self.cache.items(), key=lambda x: x[1].created_at)
del self.cache[oldest_entry[0]]
self.cache[path] = CacheEntry(content, headers, ttl)
print(f"[{self.node_id}] 缓存内容: {path}, TTL: {ttl}秒")
def _get_ttl_from_headers(self, headers: dict) -> int:
"""从响应头中提取TTL信息"""
# 检查Cache-Control头
if "Cache-Control" in headers:
cache_control = headers["Cache-Control"].lower()
if "no-cache" in cache_control or "no-store" in cache_control:
return 0
# 提取max-age
parts = cache_control.split(",")
for part in parts:
part = part.strip()
if part.startswith("max-age="):
try:
return int(part.split("=")[1])
except:
pass
# 默认TTL为3600秒
return 3600
# 定义HTTP请求处理类
class CDNRequestHandler(http.server.SimpleHTTPRequestHandler):
def __init__(self, request, client_address, server, cdn_node):
self.cdn_node = cdn_node
super().__init__(request, client_address, server)
def do_GET(self):
path = urlparse(self.path).path
# 获取内容
content, headers = self.cdn_node.get_content(path)
if content and headers:
# 设置响应头
self.send_response(200)
for key, value in headers.items():
if key.lower() not in ["transfer-encoding", "content-length"]:
self.send_header(key, value)
self.send_header("Content-Length", len(content))
self.end_headers()
# 发送响应内容
self.wfile.write(content)
else:
# 内容不存在
self.send_error(404, f"Content not found: {path}")
# 启动CDN节点
if __name__ == "__main__":
import sys
if len(sys.argv) != 4:
print("Usage: python cdn_node.py <node_id> <origin_server> <port>")
sys.exit(1)
node_id = sys.argv[1]
origin_server = sys.argv[2]
port = int(sys.argv[3])
cdn_node = CDNNode(node_id, origin_server)
# 创建HTTP服务器
class CustomHTTPServer(socketserver.TCPServer):
allow_reuse_address = True
with CustomHTTPServer(("", port), lambda *args, **kwargs: CDNRequestHandler(*args, **kwargs, cdn_node=cdn_node)) as httpd:
print(f"CDN节点 {node_id} 运行在端口 {port},源服务器: {origin_server}")
httpd.serve_forever()
接下来,实现一个简单的负载均衡器,根据用户地理位置选择最优节点:
from flask import Flask, request, jsonify
import requests
import json
import threading
import time
app = Flask(__name__)
# CDN节点列表
cdn_nodes = [
{
"id": "node1",
"location": "北京",
"ip": "192.168.1.101",
"port": 8080,
"load": 0,
"capacity": 100
},
{
"id": "node2",
"location": "上海",
"ip": "192.168.1.102",
"port": 8080,
"load": 0,
"capacity": 100
},
{
"id": "node3",
"location": "广州",
"ip": "192.168.1.103",
"port": 8080,
"load": 0,
"capacity": 100
}
]
# 模拟IP到地理位置的映射(实际应用中应使用IP地理位置数据库)
ip_to_location = {
"192.168.1.1": "北京",
"192.168.1.2": "上海",
"192.168.1.3": "广州",
"192.168.1.4": "深圳",
"192.168.1.5": "成都"
}
# 节点健康检查
def health_check():
while True:
for node in cdn_nodes:
try:
# 模拟检查节点健康状态
# 实际应用中应发送HTTP请求到节点的健康检查接口
response = requests.get(f"http://{node['ip']}:{node['port']}/health")
if response.status_code == 200:
node["status"] = "online"
else:
node["status"] = "offline"
except:
node["status"] = "offline"
# 每10秒检查一次
time.sleep(10)
# 启动健康检查线程
health_thread = threading.Thread(target=health_check)
health_thread.daemon = True
health_thread.start()
# 根据用户IP选择最优节点
def select_best_node(client_ip):
# 获取用户地理位置
client_location = ip_to_location.get(client_ip, "未知")
# 过滤掉离线节点
online_nodes = [node for node in cdn_nodes if node["status"] == "online"]
if not online_nodes:
return None
# 如果用户位置已知,优先选择同一地理位置的节点
same_location_nodes = [node for node in online_nodes if node["location"] == client_location]
if same_location_nodes:
# 选择负载最轻的节点
return min(same_location_nodes, key=lambda node: node["load"])
# 如果没有同一地理位置的节点,选择负载最轻的节点
return min(online_nodes, key=lambda node: node["load"])
# 负载均衡器API
@app.route('/')
def load_balance():
# 获取客户端IP
client_ip = request.remote_addr
# 选择最优节点
best_node = select_best_node(client_ip)
if best_node:
# 更新节点负载
best_node["load"] += 1
# 返回节点信息
return jsonify({
"status": "success",
"node": best_node,
"message": f"为客户端 {client_ip} 选择节点 {best_node['id']}"
})
else:
return jsonify({
"status": "error",
"message": "没有可用的CDN节点"
}), 500
# 节点负载减少API(模拟请求完成后调用)
@app.route('/release/<node_id>')
def release_load(node_id):
for node in cdn_nodes:
if node["id"] == node_id and node["load"] > 0:
node["load"] -= 1
return jsonify({
"status": "success",
"message": f"节点 {node_id} 负载减少"
})
return jsonify({
"status": "error",
"message": f"未找到节点 {node_id}"
}), 404
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8001)
最后,实现一个简单的管理控制台,用于管理内容和配置 CDN 节点:
from flask import Flask, request, jsonify, render_template
import requests
import json
import os
app = Flask(__name__)
# CDN配置
cdn_config = {
"origin_server": "http://localhost:8000",
"cdn_nodes": [
{
"id": "node1",
"ip": "localhost",
"port": 8081,
"location": "北京"
},
{
"id": "node2",
"ip": "localhost",
"port": 8082,
"location": "上海"
},
{
"id": "node3",
"ip": "localhost",
"port": 8083,
"location": "广州"
}
],
"load_balancer": {
"ip": "localhost",
"port": 8001
}
}
# 内容管理
@app.route('/')
def index():
return render_template('index.html', nodes=cdn_config["cdn_nodes"])
# 获取内容列表
@app.route('/api/content', methods=['GET'])
def get_content_list():
# 从源服务器获取内容列表
try:
response = requests.get(f"{cdn_config['origin_server']}/api/content")
if response.status_code == 200:
return jsonify(response.json())
else:
return jsonify({"error": "无法获取内容列表"}), 500
except Exception as e:
return jsonify({"error": str(e)}), 500
# 推送内容到CDN节点
@app.route('/api/push_content', methods=['POST'])
def push_content():
path = request.form.get('path')
node_id = request.form.get('node_id')
if not path or not node_id:
return jsonify({"error": "缺少参数"}), 400
# 找到目标节点
target_node = next((node for node in cdn_config["cdn_nodes"] if node["id"] == node_id), None)
if not target_node:
return jsonify({"error": "节点不存在"}), 404
# 从源服务器获取内容
try:
content_response = requests.get(f"{cdn_config['origin_server']}{path}")
if content_response.status_code != 200:
return jsonify({"error": f"无法从源服务器获取内容: {path}"}), 500
# 推送到CDN节点
push_url = f"http://{target_node['ip']}:{target_node['port']}/api/push"
push_response = requests.post(push_url, data={
"path": path,
"content": content_response.content,
"headers": json.dumps(dict(content_response.headers))
})
if push_response.status_code == 200:
return jsonify({"message": f"内容 {path} 已推送到节点 {node_id}"})
else:
return jsonify({"error": f"推送失败: {push_response.text}"}), 500
except Exception as e:
return jsonify({"error": str(e)}), 500
# 预热所有节点
@app.route('/api/warm_all', methods=['POST'])
def warm_all_nodes():
path = request.form.get('path')
if not path:
return jsonify({"error": "缺少参数"}), 400
results = []
for node in cdn_config["cdn_nodes"]:
try:
# 访问节点上的内容,触发缓存
response = requests.get(f"http://{node['ip']}:{node['port']}{path}")
status = "成功" if response.status_code == 200 else "失败"
results.append({
"node_id": node["id"],
"status": status,
"message": response.text if status == "失败" else ""
})
except Exception as e:
results.append({
"node_id": node["id"],
"status": "失败",
"message": str(e)
})
return jsonify({"results": results})
# 刷新CDN缓存
@app.route('/api/purge', methods=['POST'])
def purge_cache():
path = request.form.get('path')
node_id = request.form.get('node_id')
if not path:
return jsonify({"error": "缺少路径参数"}), 400
if node_id:
# 刷新指定节点
target_node = next((node for node in cdn_config["cdn_nodes"] if node["id"] == node_id), None)
if not target_node:
return jsonify({"error": "节点不存在"}), 404
try:
purge_url = f"http://{target_node['ip']}:{target_node['port']}/api/purge?path={path}"
purge_response = requests.post(purge_url)
if purge_response.status_code == 200:
return jsonify({"message": f"节点 {node_id} 上的 {path} 缓存已刷新"})
else:
return jsonify({"error": f"刷新失败: {purge_response.text}"}), 500
except Exception as e:
return jsonify({"error": str(e)}), 500
else:
# 刷新所有节点
results = []
for node in cdn_config["cdn_nodes"]:
try:
purge_url = f"http://{node['ip']}:{node['port']}/api/purge?path={path}"
purge_response = requests.post(purge_url)
status = "成功" if purge_response.status_code == 200 else "失败"
results.append({
"node_id": node["id"],
"status": status,
"message": purge_response.text if status == "失败" else ""
})
except Exception as e:
results.append({
"node_id": node["id"],
"status": "失败",
"message": str(e)
})
return jsonify({"results": results})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8002)
在 Flask 应用的根目录下创建templates
文件夹,并在其中创建index.html
文件,用于展示和操作 CDN 管理功能。
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>CDN管理控制台</title>
<style>
body {
font-family: Arial, sans-serif;
}
form {
margin-bottom: 15px;
}
label {
display: block;
margin-bottom: 5px;
}
input, select, button {
margin-right: 10px;
padding: 5px;
}
table {
border-collapse: collapse;
width: 100%;
}
table, th, td {
border: 1px solid #ccc;
}
th, td {
padding: 8px;
text-align: left;
}
.success {
color: green;
}
.error {
color: red;
}
</style>
</head>
<body>
<h1>CDN管理控制台</h1>
<!-- 推送内容表单 -->
<form id="pushForm">
<label for="path">内容路径:</label>
<input type="text" id="path" name="path" required>
<label for="node_id">目标节点:</label>
<select id="node_id" name="node_id">
{% for node in nodes %}
<option value="{{ node.id }}">{{ node.id }} - {{ node.location }}</option>
{% endfor %}
</select>
<button type="submit">推送到CDN节点</button>
<div id="pushResult"></div>
</form>
<!-- 预热所有节点表单 -->
<form id="warmForm">
<label for="warmPath">内容路径:</label>
<input type="text" id="warmPath" name="warmPath" required>
<button type="submit">预热所有节点</button>
<div id="warmResult"></div>
</form>
<!-- 刷新缓存表单 -->
<form id="purgeForm">
<label for="purgePath">内容路径:</label>
<input type="text" id="purgePath" name="purgePath" required>
<label for="purgeNode">目标节点:</label>
<select id="purgeNode" name="purgeNode">
<option value="">所有节点</option>
{% for node in nodes %}
<option value="{{ node.id }}">{{ node.id }} - {{ node.location }}</option>
{% endfor %}
</select>
<button type="submit">刷新缓存</button>
<div id="purgeResult"></div>
</form>
<script>
document.getElementById('pushForm').addEventListener('submit', function(event) {
event.preventDefault();
const path = document.getElementById('path').value;
const node_id = document.getElementById('node_id').value;
fetch('/api/push_content', {
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded'
},
body: `path=${path}&node_id=${node_id}`
})
.then(response => response.json())
.then(data => {
const resultDiv = document.getElementById('pushResult');
if (data.status === "success") {
resultDiv.innerHTML = `<p class="success">${data.message}</p>`;
} else {
resultDiv.innerHTML = `<p class="error">${data.error}</p>`;
}
})
.catch(error => {
const resultDiv = document.getElementById('pushResult');
resultDiv.innerHTML = `<p class="error">请求失败: ${error.message}</p>`;
});
});
document.getElementById('warmForm').addEventListener('submit', function(event) {
event.preventDefault();
const path = document.getElementById('warmPath').value;
fetch('/api/warm_all', {
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded'
},
body: `path=${path}`
})
.then(response => response.json())
.then(data => {
const resultDiv = document.getElementById('warmResult');
resultDiv.innerHTML = '';
data.results.forEach(result => {
const statusClass = result.status === "成功" ? "success" : "error";
resultDiv.innerHTML += `<p>${result.node_id}: ${result.status} - ${result.message}</p>`;
});
})
.catch(error => {
const resultDiv = document.getElementById('warmResult');
resultDiv.innerHTML = `<p class="error">请求失败: ${error.message}</p>`;
});
});
document.getElementById('purgeForm').addEventListener('submit', function(event) {
event.preventDefault();
const path = document.getElementById('purgePath').value;
const node_id = document.getElementById('purgeNode').value;
const formData = new FormData();
formData.append('path', path);
if (node_id) {
formData.append('node_id', node_id);
}
fetch('/api/purge', {
method: 'POST',
body: formData
})
.then(response => response.json())
.then(data => {
const resultDiv = document.getElementById('purgeResult');
resultDiv.innerHTML = '';
if (data.results) {
data.results.forEach(result => {
const statusClass = result.status === "成功" ? "success" : "error";
resultDiv.innerHTML += `<p>${result.node_id}: ${result.status} - ${result.message}</p>`;
});
} else if (data.message) {
resultDiv.innerHTML = `<p class="success">${data.message}</p>`;
} else if (data.error) {
resultDiv.innerHTML = `<p class="error">${data.error}</p>`;
}
})
.catch(error => {
const resultDiv = document.getElementById('purgeResult');
resultDiv.innerHTML = `<p class="error">请求失败: ${error.message}</p>`;
});
});
</script>
</body>
</html>
4.1 系统启动与测试
启动源服务器:运行OriginServer
代码,启动源服务器,默认监听8000
端口。
启动 CDN 节点:运行CDNNode
代码,启动多个 CDN 节点,如node1
监听8081
端口、node2
监听8082
端口等。
启动负载均衡器:运行负载均衡器代码,默认监听8001
端口。
启动管理控制台:运行管理控制台代码,默认监听8002
端口。
在浏览器中访问http://localhost:8002
,进入管理控制台,尝试推送内容、预热节点、刷新缓存等操作,检查各功能是否正常。
4.2 系统优化方向
性能优化:
为缓存添加 LRU(最近最少使用)算法,确保缓存始终保留热门内容。对频繁访问的内容使用多级缓存策略,如浏览器缓存、CDN 节点缓存、源服务器缓存。
安全性增强:
对 CDN 节点与源服务器之间的通信进行加密,防止内容被窃取或篡改。在负载均衡器和 CDN 节点中添加防火墙规则,抵御常见的网络攻击。
扩展性提升:
采用分布式存储系统,解决单个节点缓存容量限制问题。引入自动化部署和监控工具,方便 CDN 节点的扩展和管理。
通过以上步骤,我们完成了一个从理论到实践的 CDN 系统搭建,涵盖了 CDN 的核心概念、关键技术以及完整的代码实现。
CDN(内容分发网络)通过全球分布式节点缓存内容,使用户就近获取资源,有效提升访问速度、减轻源服务器负载,在互联网应用中扮演着重要角色。
在基础理论层面,CDN 包含源服务器、边缘节点、负载均衡系统等组件,基于 DNS 解析、内容分发与缓存策略实现工作流程,具备加速访问、削峰填谷等优势,广泛应用于静态资源、流媒体等场景。关键技术上,内容分发涉及主动推送、被动拉取等模式;负载均衡涵盖 DNS、全局和本地负载均衡;缓存策略包括基于时间、内容等多种方式,共同保障 CDN 高效运行。
实战方面,我们构建了简易 CDN 系统:源服务器负责存储原始内容;CDN 边缘节点实现缓存与内容提供;负载均衡器依据用户地理位置选择节点;管理控制台支持内容管理与配置,并搭配前端界面方便操作。系统可完成内容推送、节点预热、缓存刷新等功能。