网络请求偶尔失败就让整个应用崩溃?一个智能的重试机制让你的应用在不稳定环境中依然稳如磐石!
在现代Web应用中,网络请求无处不在:调用API获取数据、上传文件、发送表单、实时通信等。但网络环境往往不可预测:服务器临时过载、网络连接不稳定、CDN节点故障、第三方服务限流等问题时有发生。一个偶然的网络错误就可能让整个功能失效,影响用户体验。今天我们就来打造一个智能的异步重试机制,让应用在各种不稳定环境中都能稳定运行。
想象你在开发一个电商网站的支付功能:
用户点击支付按钮 → 调用支付接口
↓
网络超时/服务器繁忙 → 支付失败
↓
用户看到错误提示 → 用户体验糟糕
可能的结果:用户放弃购买、订单丢失、收入损失
如果有智能重试机制:
用户点击支付按钮 → 调用支付接口
↓
第一次失败 → 等待1秒后自动重试
第二次失败 → 等待2秒后自动重试
第三次成功 → 支付完成,用户无感知
在文件管理系统中上传大文件:
上传进度:[████████████████████████████████████████] 95%
↓
网络中断 → 上传失败 → 用户需要重新上传整个文件
vs
上传进度:[████████████████████████████████████████] 95%
↓
网络中断 → 自动重试 → 断点续传 → 上传完成
在微服务系统中,服务之间频繁调用:
用户服务 → 调用订单服务 → 调用库存服务 → 调用支付服务
任何一个环节的临时故障都可能导致整个流程失败
需要智能重试来提高系统的容错能力
// 原始的重试方式:固定间隔,没有策略
asyncfunction simpleRetry(fn, maxRetries = 3) {
for (let i = 0; i < maxRetries; i++) {
try {
returnawait fn();
} catch (error) {
if (i === maxRetries - 1) throw error;
awaitnewPromise(resolve => setTimeout(resolve, 1000)); // 固定等待1秒
}
}
}
这种方式的问题:
// 问题:大量客户端同时重试,加重服务器负担
async function badRetry() {
// 100个客户端在同一时间重试
// 第1秒: 100个请求同时发出
// 第2秒: 100个请求同时重试
// 第3秒: 100个请求同时重试
// 服务器压力巨大!
}
// 问题:所有错误一视同仁
try {
const response = await fetch('/api/data');
if (!response.ok) {
thrownewError('Request failed');
}
} catch (error) {
// 不管是404(不存在)还是503(服务不可用)都重试
// 404重试是浪费,503才需要重试
}
// 问题:重试过程黑盒化
async function fetchData() {
try {
return await retryRequest();
} catch (error) {
// 用户不知道重试了几次、为什么失败
console.log('请求失败'); // 信息太少
}
}
现在让我们来实现一个功能完备的重试管理器:
class RetryManager {
constructor(options = {}) {
this.options = {
maxAttempts: 3, // 最大重试次数
baseDelay: 1000, // 基础延迟时间(毫秒)
maxDelay: 30000, // 最大延迟时间
backoffFactor: 2, // 退避因子(指数退避)
jitter: true, // 是否添加随机抖动
retryCondition: (error) =>true, // 重试条件判断函数
onRetry: (attempt, error, delay) => {}, // 重试回调
onSuccess: (result, attempts) => {}, // 成功回调
onFailure: (error, attempts) => {}, // 最终失败回调
timeout: 0, // 单次请求超时时间(0表示不限制)
abortSignal: null, // 取消信号
...options
};
// 统计信息
this.stats = {
totalAttempts: 0,
totalSuccesses: 0,
totalFailures: 0,
totalRetries: 0,
averageAttempts: 0
};
}
// 执行带重试的异步函数
async execute(asyncFunction, ...args) {
let lastError;
let attempts = 0;
const startTime = Date.now();
for (let attempt = 1; attempt <= this.options.maxAttempts; attempt++) {
attempts = attempt;
this.stats.totalAttempts++;
try {
// 检查是否被取消
this.checkAbortSignal();
// 执行函数(可能带超时)
const result = awaitthis.executeWithTimeout(asyncFunction, ...args);
// 成功统计
this.stats.totalSuccesses++;
this.updateAverageAttempts();
// 成功回调
this.options.onSuccess(result, attempts);
// 记录成功日志
if (attempt > 1) {
console.log(`✅ 重试成功: 第${attempt}次尝试成功,总耗时${Date.now() - startTime}ms`);
}
return result;
} catch (error) {
lastError = error;
// 检查是否应该重试此错误
if (!this.shouldRetry(error, attempt)) {
break;
}
// 如果不是最后一次尝试,则等待后重试
if (attempt < this.options.maxAttempts) {
const delay = this.calculateDelay(attempt);
// 重试回调
this.options.onRetry(attempt, error, delay);
console.log(`⚠️ 第${attempt}次尝试失败: ${error.message},${delay}ms后重试`);
// 等待指定时间
awaitthis.sleep(delay);
this.stats.totalRetries++;
}
}
}
// 所有重试都失败了
this.stats.totalFailures++;
this.updateAverageAttempts();
// 失败回调
this.options.onFailure(lastError, attempts);
console.error(`❌ 重试失败: ${attempts}次尝试后仍然失败,总耗时${Date.now() - startTime}ms`);
throw lastError;
}
// 计算延迟时间(指数退避 + 随机抖动)
calculateDelay(attempt) {
// 指数退避: baseDelay * backoffFactor^(attempt-1)
const exponentialDelay = this.options.baseDelay * Math.pow(this.options.backoffFactor, attempt - 1);
// 限制最大延迟
const cappedDelay = Math.min(exponentialDelay, this.options.maxDelay);
// 添加随机抖动,避免惊群效应
if (this.options.jitter) {
// 在延迟时间的±25%范围内添加随机抖动
const jitterRange = cappedDelay * 0.25;
const jitter = (Math.random() - 0.5) * 2 * jitterRange;
returnMath.max(0, Math.round(cappedDelay + jitter));
}
return cappedDelay;
}
// 判断是否应该重试
shouldRetry(error, attempt) {
// 检查是否被取消
if (this.isAborted()) {
returnfalse;
}
// 已经是最后一次尝试
if (attempt >= this.options.maxAttempts) {
returnfalse;
}
// 使用自定义重试条件判断
returnthis.options.retryCondition(error, attempt);
}
// 带超时的函数执行
async executeWithTimeout(asyncFunction, ...args) {
if (this.options.timeout <= 0) {
return asyncFunction(...args);
}
const timeoutPromise = newPromise((_, reject) => {
setTimeout(() => {
reject(newError(`操作超时: 超过${this.options.timeout}ms`));
}, this.options.timeout);
});
returnPromise.race([
asyncFunction(...args),
timeoutPromise
]);
}
// 检查取消信号
checkAbortSignal() {
if (this.options.abortSignal && this.options.abortSignal.aborted) {
thrownewError('操作已取消');
}
}
// 检查是否被取消
isAborted() {
returnthis.options.abortSignal && this.options.abortSignal.aborted;
}
// 等待指定时间
sleep(ms) {
returnnewPromise(resolve => setTimeout(resolve, ms));
}
// 更新平均尝试次数
updateAverageAttempts() {
const totalCompleted = this.stats.totalSuccesses + this.stats.totalFailures;
if (totalCompleted > 0) {
this.stats.averageAttempts = (this.stats.totalAttempts / totalCompleted).toFixed(2);
}
}
// 获取统计信息
getStats() {
return {
...this.stats,
successRate: this.stats.totalSuccesses + this.stats.totalFailures > 0 ?
(this.stats.totalSuccesses / (this.stats.totalSuccesses + this.stats.totalFailures) * 100).toFixed(2) + '%' :
'0%'
};
}
// 重置统计信息
resetStats() {
this.stats = {
totalAttempts: 0,
totalSuccesses: 0,
totalFailures: 0,
totalRetries: 0,
averageAttempts: 0
};
}
// 静态方法:HTTP请求重试
staticasync retryFetch(url, options = {}, retryOptions = {}) {
const retryManager = new RetryManager({
retryCondition: (error) => {
// 网络错误总是重试
if (error.name === 'TypeError' && error.message.includes('fetch')) {
returntrue;
}
// HTTP状态码判断
if (error.status) {
// 5xx服务器错误 - 重试
if (error.status >= 500) returntrue;
// 429限流错误 - 重试
if (error.status === 429) returntrue;
// 408请求超时 - 重试
if (error.status === 408) returntrue;
// 4xx客户端错误 - 不重试
if (error.status >= 400 && error.status < 500) returnfalse;
}
returntrue; // 其他情况默认重试
},
...retryOptions
});
return retryManager.execute(async () => {
const response = await fetch(url, options);
if (!response.ok) {
const error = newError(`HTTP ${response.status}: ${response.statusText}`);
error.status = response.status;
error.response = response;
throw error;
}
return response;
});
}
}
// 专门的数据库重试管理器
class DatabaseRetryManager extends RetryManager {
constructor(options = {}) {
super({
maxAttempts: 5,
baseDelay: 500,
maxDelay: 5000,
retryCondition: (error) => {
// 数据库相关的可重试错误
const retryableErrors = [
'ConnectionError',
'TimeoutError',
'DeadlockError',
'LockWaitTimeoutError',
'ConnectionLostError'
];
return retryableErrors.some(errorType =>
error.name.includes(errorType) || error.message.includes(errorType)
);
},
onRetry: (attempt, error, delay) => {
console.log(`🗃️ 数据库操作重试: 第${attempt}次失败(${error.message}),${delay}ms后重试`);
},
...options
});
}
}
// 专门的API重试管理器
class APIRetryManager extends RetryManager {
constructor(options = {}) {
super({
maxAttempts: 3,
baseDelay: 1000,
maxDelay: 10000,
retryCondition: (error) => {
// 不重试客户端错误(4xx),除了429限流
if (error.status >= 400 && error.status < 500 && error.status !== 429) {
returnfalse;
}
// 其他情况都重试
returntrue;
},
onRetry: (attempt, error, delay) => {
if (error.status === 429) {
console.log(`🚦 API限流重试: 第${attempt}次触发限流,${delay}ms后重试`);
} else {
console.log(`🌐 API请求重试: 第${attempt}次失败(${error.message}),${delay}ms后重试`);
}
},
...options
});
}
}
// 文件上传重试管理器
class UploadRetryManager extends RetryManager {
constructor(options = {}) {
super({
maxAttempts: 5,
baseDelay: 2000,
maxDelay: 30000,
timeout: 60000, // 60秒超时
retryCondition: (error) => {
// 网络错误和服务器错误都重试
if (error.name === 'TypeError') returntrue;
if (error.status >= 500) returntrue;
if (error.status === 408 || error.status === 429) returntrue;
// 超时错误重试
if (error.message.includes('超时') || error.message.includes('timeout')) {
returntrue;
}
returnfalse;
},
onRetry: (attempt, error, delay) => {
console.log(`📤 文件上传重试: 第${attempt}次失败,${delay}ms后重试上传`);
},
...options
});
}
}
让我们看看这个重试管理器的基本使用:
// 创建基础重试管理器
const retryManager = new RetryManager({
maxAttempts: 3,
baseDelay: 1000,
backoffFactor: 2,
jitter: true,
onRetry: (attempt, error, delay) => {
console.log(`重试中: 第${attempt}次失败,${delay}ms后重试`);
}
});
// 重试不可靠的网络请求
asyncfunction unreliableNetworkCall() {
// 模拟70%的失败率
if (Math.random() < 0.7) {
thrownewError('网络连接超时');
}
return { data: '请求成功的数据' };
}
try {
const result = await retryManager.execute(unreliableNetworkCall);
console.log('请求成功:', result);
} catch (error) {
console.error('所有重试失败:', error.message);
}
// 使用静态方法快速重试HTTP请求
asyncfunction fetchUserData(userId) {
try {
const response = await RetryManager.retryFetch(`/api/users/${userId}`, {
method: 'GET',
headers: { 'Authorization': 'Bearer token123' }
}, {
maxAttempts: 5,
baseDelay: 2000
});
returnawait response.json();
} catch (error) {
console.error('用户数据获取失败:', error.message);
throw error;
}
}
// 查看重试统计
console.log('重试统计:', retryManager.getStats());
class RobustHttpClient {
constructor(options = {}) {
this.baseURL = options.baseURL || '';
this.defaultHeaders = options.headers || {};
this.timeout = options.timeout || 30000;
// 为不同类型的请求配置不同的重试策略
this.retryManagers = {
// GET请求:读操作,可以多次重试
GET: new APIRetryManager({
maxAttempts: 5,
baseDelay: 1000,
maxDelay: 10000
}),
// POST请求:写操作,需要谨慎重试
POST: new APIRetryManager({
maxAttempts: 3,
baseDelay: 2000,
retryCondition: (error) => {
// 只重试网络错误和5xx服务器错误
if (error.name === 'TypeError') returntrue;
if (error.status >= 500) returntrue;
returnfalse;
}
}),
// PUT/DELETE请求:幂等操作,可以重试
PUT: new APIRetryManager({
maxAttempts: 4,
baseDelay: 1500
}),
DELETE: new APIRetryManager({
maxAttempts: 4,
baseDelay: 1500
})
};
// 请求拦截器
this.requestInterceptors = [];
this.responseInterceptors = [];
// 统计信息
this.stats = {
totalRequests: 0,
successfulRequests: 0,
failedRequests: 0,
totalRetries: 0
};
}
// 添加请求拦截器
addRequestInterceptor(interceptor) {
this.requestInterceptors.push(interceptor);
}
// 添加响应拦截器
addResponseInterceptor(interceptor) {
this.responseInterceptors.push(interceptor);
}
// 通用请求方法
async request(config) {
// 应用请求拦截器
let processedConfig = { ...config };
for (const interceptor ofthis.requestInterceptors) {
processedConfig = await interceptor(processedConfig);
}
// 构建完整的请求配置
const requestConfig = this.buildRequestConfig(processedConfig);
// 选择重试管理器
const retryManager = this.retryManagers[requestConfig.method.toUpperCase()] ||
this.retryManagers.GET;
this.stats.totalRequests++;
try {
const response = await retryManager.execute(async () => {
returnthis.executeRequest(requestConfig);
});
// 应用响应拦截器
let processedResponse = response;
for (const interceptor ofthis.responseInterceptors) {
processedResponse = await interceptor(processedResponse);
}
this.stats.successfulRequests++;
this.stats.totalRetries += (response._retryCount || 0);
return processedResponse;
} catch (error) {
this.stats.failedRequests++;
this.stats.totalRetries += (error._retryCount || 0);
throw error;
}
}
// 执行实际的HTTP请求
async executeRequest(config) {
const url = config.url.startsWith('http') ? config.url : this.baseURL + config.url;
// 添加取消信号支持
const controller = new AbortController();
if (config.timeout) {
setTimeout(() => controller.abort(), config.timeout);
}
const response = await fetch(url, {
method: config.method,
headers: config.headers,
body: config.body,
signal: controller.signal,
...config.fetchOptions
});
// 检查响应状态
if (!response.ok) {
const error = newError(`HTTP ${response.status}: ${response.statusText}`);
error.status = response.status;
error.response = response;
throw error;
}
// 解析响应数据
const contentType = response.headers.get('content-type');
let data;
if (contentType && contentType.includes('application/json')) {
data = await response.json();
} elseif (contentType && contentType.includes('text/')) {
data = await response.text();
} else {
data = await response.blob();
}
return {
data,
status: response.status,
statusText: response.statusText,
headers: Object.fromEntries(response.headers.entries()),
config
};
}
// 构建请求配置
buildRequestConfig(config) {
return {
method: config.method || 'GET',
url: config.url,
headers: {
'Content-Type': 'application/json',
...this.defaultHeaders,
...config.headers
},
body: config.data ? JSON.stringify(config.data) : config.body,
timeout: config.timeout || this.timeout,
fetchOptions: config.fetchOptions || {}
};
}
// 便捷方法
asyncget(url, config = {}) {
returnthis.request({ method: 'GET', url, ...config });
}
async post(url, data, config = {}) {
returnthis.request({ method: 'POST', url, data, ...config });
}
async put(url, data, config = {}) {
returnthis.request({ method: 'PUT', url, data, ...config });
}
asyncdelete(url, config = {}) {
returnthis.request({ method: 'DELETE', url, ...config });
}
// 上传文件
async upload(url, file, options = {}) {
const uploadRetryManager = new UploadRetryManager({
onRetry: (attempt, error, delay) => {
if (options.onRetry) {
options.onRetry(attempt, error, delay);
}
}
});
return uploadRetryManager.execute(async () => {
const formData = new FormData();
formData.append(options.fieldName || 'file', file);
// 添加额外字段
if (options.fields) {
Object.entries(options.fields).forEach(([key, value]) => {
formData.append(key, value);
});
}
returnthis.executeRequest({
method: 'POST',
url,
body: formData,
headers: {
// 不设置Content-Type,让浏览器自动设置multipart/form-data
...this.defaultHeaders,
...options.headers
},
timeout: options.timeout || 60000
});
});
}
// 批量请求(并发控制)
async batchRequest(requests, options = {}) {
const { concurrency = 5, failFast = false } = options;
const results = [];
// 分批处理请求
for (let i = 0; i < requests.length; i += concurrency) {
const batch = requests.slice(i, i + concurrency);
const batchPromises = batch.map(async (requestConfig, index) => {
try {
const result = awaitthis.request(requestConfig);
return { index: i + index, success: true, data: result };
} catch (error) {
if (failFast) {
throw error;
}
return { index: i + index, success: false, error };
}
});
const batchResults = awaitPromise.all(batchPromises);
results.push(...batchResults);
}
return results.sort((a, b) => a.index - b.index);
}
// 获取统计信息
getStats() {
return {
...this.stats,
successRate: this.stats.totalRequests > 0 ?
(this.stats.successfulRequests / this.stats.totalRequests * 100).toFixed(2) + '%' :
'0%',
averageRetries: this.stats.totalRequests > 0 ?
(this.stats.totalRetries / this.stats.totalRequests).toFixed(2) :
'0'
};
}
// 健康检查
async healthCheck(endpoint = '/health') {
try {
const response = awaitthis.get(endpoint, { timeout: 5000 });
return {
healthy: true,
status: response.status,
data: response.data,
timestamp: newDate().toISOString()
};
} catch (error) {
return {
healthy: false,
error: error.message,
timestamp: newDate().toISOString()
};
}
}
}
// 使用示例
const httpClient = new RobustHttpClient({
baseURL: 'https://api.example.com',
headers: {
'Authorization': 'Bearer your-token-here'
},
timeout: 30000
});
// 添加请求拦截器(自动添加时间戳)
httpClient.addRequestInterceptor(async (config) => {
console.log(`🚀 发送请求: ${config.method} ${config.url}`);
config.headers['X-Request-Time'] = Date.now();
return config;
});
// 添加响应拦截器(记录响应时间)
httpClient.addResponseInterceptor(async (response) => {
const requestTime = response.config.headers['X-Request-Time'];
const responseTime = Date.now() - requestTime;
console.log(`✅ 请求完成: ${response.config.method} ${response.config.url} (${responseTime}ms)`);
return response;
});
// 使用示例
asyncfunction demonstrateHttpClient() {
try {
// GET请求
const userData = await httpClient.get('/users/123');
console.log('用户数据:', userData.data);
// POST请求
const newUser = await httpClient.post('/users', {
name: '张三',
email: 'zhangsan@example.com'
});
console.log('创建用户:', newUser.data);
// 文件上传
const fileInput = document.getElementById('file-input');
if (fileInput && fileInput.files[0]) {
const uploadResult = await httpClient.upload('/upload', fileInput.files[0], {
fields: { description: '用户头像' },
onRetry: (attempt, error, delay) => {
console.log(`上传重试: 第${attempt}次失败,${delay}ms后重试`);
}
});
console.log('上传成功:', uploadResult.data);
}
// 批量请求
const batchRequests = [
{ method: 'GET', url: '/users/1' },
{ method: 'GET', url: '/users/2' },
{ method: 'GET', url: '/users/3' }
];
const batchResults = await httpClient.batchRequest(batchRequests, {
concurrency: 2
});
console.log('批量请求结果:', batchResults);
// 健康检查
const health = await httpClient.healthCheck();
console.log('服务健康状态:', health);
// 查看统计信息
console.log('HTTP客户端统计:', httpClient.getStats());
} catch (error) {
console.error('请求失败:', error.message);
}
}
// 定期健康检查
setInterval(async () => {
const health = await httpClient.healthCheck();
if (!health.healthy) {
console.warn('⚠️ 服务不健康:', health.error);
}
}, 60000); // 每分钟检查一次
class DistributedTaskProcessor {
constructor(options = {}) {
this.options = {
maxConcurrency: options.maxConcurrency || 5,
taskTimeout: options.taskTimeout || 30000,
retryDelay: options.retryDelay || 1000,
maxRetries: options.maxRetries || 3,
...options
};
// 任务队列
this.taskQueue = [];
this.runningTasks = newMap();
this.completedTasks = [];
this.failedTasks = [];
// 重试管理器
this.retryManager = new RetryManager({
maxAttempts: this.options.maxRetries,
baseDelay: this.options.retryDelay,
timeout: this.options.taskTimeout,
onRetry: (attempt, error, delay) => {
console.log(`📋 任务重试: 第${attempt}次失败(${error.message}),${delay}ms后重试`);
}
});
// 任务统计
this.stats = {
totalTasks: 0,
completedTasks: 0,
failedTasks: 0,
averageExecutionTime: 0
};
this.isProcessing = false;
}
// 添加任务
addTask(task) {
const taskId = this.generateTaskId();
const taskWrapper = {
id: taskId,
task: task.fn,
data: task.data || {},
priority: task.priority || 0,
createdAt: newDate(),
retryCount: 0,
maxRetries: task.maxRetries || this.options.maxRetries,
timeout: task.timeout || this.options.taskTimeout,
onProgress: task.onProgress,
onComplete: task.onComplete,
onError: task.onError
};
this.taskQueue.push(taskWrapper);
this.stats.totalTasks++;
// 按优先级排序
this.taskQueue.sort((a, b) => b.priority - a.priority);
console.log(`➕ 任务已添加: ${taskId} (优先级: ${taskWrapper.priority})`);
// 如果没在处理,开始处理
if (!this.isProcessing) {
this.processQueue();
}
return taskId;
}
// 批量添加任务
addTasks(tasks) {
return tasks.map(task =>this.addTask(task));
}
// 处理任务队列
async processQueue() {
if (this.isProcessing) return;
this.isProcessing = true;
console.log('🔄 开始处理任务队列');
while (this.taskQueue.length > 0 || this.runningTasks.size > 0) {
// 启动新任务(在并发限制内)
while (this.taskQueue.length > 0 &&
this.runningTasks.size < this.options.maxConcurrency) {
const task = this.taskQueue.shift();
this.executeTask(task);
}
// 等待一小段时间再检查
awaitthis.sleep(100);
}
this.isProcessing = false;
console.log('✅ 任务队列处理完成');
// 输出最终统计
this.printStats();
}
// 执行单个任务
async executeTask(taskWrapper) {
const { id, task, data, timeout, onProgress, onComplete, onError } = taskWrapper;
this.runningTasks.set(id, taskWrapper);
console.log(`🚀 开始执行任务: ${id}`);
const startTime = Date.now();
try {
// 使用重试管理器执行任务
const result = awaitthis.retryManager.execute(async () => {
// 创建任务执行上下文
const taskContext = {
id,
data,
progress: (percent, message) => {
if (onProgress) {
onProgress(percent, message);
}
console.log(`📈 任务进度 ${id}: ${percent}% - ${message}`);
},
isCancelled: () =>false// 可以扩展为支持任务取消
};
returnawait task(taskContext);
});
// 任务完成
const executionTime = Date.now() - startTime;
this.onTaskComplete(taskWrapper, result, executionTime);
if (onComplete) {
onComplete(result);
}
} catch (error) {
// 任务失败
const executionTime = Date.now() - startTime;
this.onTaskFailed(taskWrapper, error, executionTime);
if (onError) {
onError(error);
}
} finally {
this.runningTasks.delete(id);
}
}
// 任务完成处理
onTaskComplete(taskWrapper, result, executionTime) {
const completedTask = {
...taskWrapper,
result,
executionTime,
completedAt: newDate(),
status: 'completed'
};
this.completedTasks.push(completedTask);
this.stats.completedTasks++;
// 更新平均执行时间
this.updateAverageExecutionTime(executionTime);
console.log(`✅ 任务完成: ${taskWrapper.id} (耗时: ${executionTime}ms)`);
}
// 任务失败处理
onTaskFailed(taskWrapper, error, executionTime) {
const failedTask = {
...taskWrapper,
error: error.message,
executionTime,
failedAt: newDate(),
status: 'failed'
};
this.failedTasks.push(failedTask);
this.stats.failedTasks++;
console.error(`❌ 任务失败: ${taskWrapper.id} (耗时: ${executionTime}ms) - ${error.message}`);
}
// 获取任务状态
getTaskStatus(taskId) {
// 检查正在运行的任务
if (this.runningTasks.has(taskId)) {
return {
status: 'running',
task: this.runningTasks.get(taskId)
};
}
// 检查已完成的任务
const completed = this.completedTasks.find(t => t.id === taskId);
if (completed) {
return { status: 'completed', task: completed };
}
// 检查失败的任务
const failed = this.failedTasks.find(t => t.id === taskId);
if (failed) {
return { status: 'failed', task: failed };
}
// 检查队列中的任务
const queued = this.taskQueue.find(t => t.id === taskId);
if (queued) {
return { status: 'queued', task: queued };
}
return { status: 'not_found' };
}
// 取消任务
cancelTask(taskId) {
// 从队列中移除
const queueIndex = this.taskQueue.findIndex(t => t.id === taskId);
if (queueIndex !== -1) {
this.taskQueue.splice(queueIndex, 1);
console.log(`🚫 任务已取消: ${taskId} (从队列中移除)`);
returntrue;
}
// 正在运行的任务标记为取消(需要任务本身支持检查取消状态)
if (this.runningTasks.has(taskId)) {
const task = this.runningTasks.get(taskId);
task.cancelled = true;
console.log(`🚫 任务已标记取消: ${taskId} (正在运行,等待任务检查取消状态)`);
returntrue;
}
returnfalse;
}
// 暂停任务处理
pause() {
this.isProcessing = false;
console.log('⏸️ 任务处理已暂停');
}
// 恢复任务处理
resume() {
if (!this.isProcessing) {
console.log('▶️ 任务处理已恢复');
this.processQueue();
}
}
// 获取统计信息
getStats() {
return {
...this.stats,
queuedTasks: this.taskQueue.length,
runningTasks: this.runningTasks.size,
successRate: this.stats.totalTasks > 0 ?
(this.stats.completedTasks / this.stats.totalTasks * 100).toFixed(2) + '%' :
'0%'
};
}
// 输出统计信息
printStats() {
const stats = this.getStats();
console.log('\n📊 任务处理统计:');
console.log(`总任务数: ${stats.totalTasks}`);
console.log(`已完成: ${stats.completedTasks}`);
console.log(`失败: ${stats.failedTasks}`);
console.log(`成功率: ${stats.successRate}`);
console.log(`平均执行时间: ${stats.averageExecutionTime}ms`);
}
// 工具方法
generateTaskId() {
return`task_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
sleep(ms) {
returnnewPromise(resolve => setTimeout(resolve, ms));
}
updateAverageExecutionTime(executionTime) {
const totalCompleted = this.stats.completedTasks;
if (totalCompleted === 1) {
this.stats.averageExecutionTime = executionTime;
} else {
this.stats.averageExecutionTime = Math.round(
(this.stats.averageExecutionTime * (totalCompleted - 1) + executionTime) / totalCompleted
);
}
}
}
// 使用示例
const taskProcessor = new DistributedTaskProcessor({
maxConcurrency: 3,
taskTimeout: 10000,
maxRetries: 2
});
// 定义一些示例任务
const tasks = [
{
fn: async (context) => {
context.progress(0, '开始处理图片');
// 模拟图片处理
for (let i = 0; i <= 100; i += 10) {
context.progress(i, `处理进度 ${i}%`);
awaitnewPromise(resolve => setTimeout(resolve, 100));
// 模拟随机失败
if (i === 50 && Math.random() < 0.3) {
thrownewError('图片处理失败');
}
}
return { processed: true, size: '1024x768' };
},
data: { imageId: 'img_001', format: 'jpg' },
priority: 1,
onComplete: (result) => {
console.log('🖼️ 图片处理完成:', result);
},
onError: (error) => {
console.error('🖼️ 图片处理失败:', error.message);
}
},
{
fn: async (context) => {
context.progress(0, '开始数据同步');
// 模拟数据同步
const steps = ['连接数据库', '读取数据', '转换格式', '写入目标', '验证结果'];
for (let i = 0; i < steps.length; i++) {
context.progress((i / steps.length) * 100, steps[i]);
awaitnewPromise(resolve => setTimeout(resolve, 200));
// 模拟偶发错误
if (i === 2 && Math.random() < 0.2) {
thrownewError('数据转换失败');
}
}
return { syncedRecords: 1500, duration: '2.1s' };
},
data: { source: 'database_a', target: 'database_b' },
priority: 2,
onComplete: (result) => {
console.log('🔄 数据同步完成:', result);
}
},
{
fn: async (context) => {
context.progress(0, '开始文件上传');
// 模拟文件上传
for (let i = 0; i <= 100; i += 5) {
context.progress(i, `上传进度 ${i}%`);
awaitnewPromise(resolve => setTimeout(resolve, 50));
}
return { uploaded: true, url: 'https://cdn.example.com/file123.pdf' };
},
data: { fileName: 'report.pdf', size: '2.5MB' },
priority: 0,
onComplete: (result) => {
console.log('📤 文件上传完成:', result);
}
}
];
// 添加任务到处理器
console.log('添加任务到处理队列...');
const taskIds = taskProcessor.addTasks(tasks);
// 监控任务状态
setTimeout(() => {
taskIds.forEach(id => {
const status = taskProcessor.getTaskStatus(id);
console.log(`任务状态 ${id}: ${status.status}`);
});
}, 3000);
// 5秒后输出最终统计
setTimeout(() => {
console.log('最终统计信息:', taskProcessor.getStats());
}, 8000);
class AdaptiveRetryManager extends RetryManager {
constructor(options = {}) {
super(options);
this.successHistory = [];
this.errorPatterns = newMap();
}
// 自适应延迟计算
calculateDelay(attempt, error) {
// 基础指数退避
let delay = super.calculateDelay(attempt);
// 根据错误类型调整
if (error && error.status === 429) {
// 限流错误:增加延迟
delay *= 2;
} elseif (error && error.status >= 500) {
// 服务器错误:根据历史成功率调整
const recentSuccessRate = this.getRecentSuccessRate();
if (recentSuccessRate < 0.5) {
delay *= 1.5; // 成功率低时增加延迟
}
}
return delay;
}
// 获取最近的成功率
getRecentSuccessRate() {
const recentResults = this.successHistory.slice(-10);
if (recentResults.length === 0) return1;
const successes = recentResults.filter(result => result).length;
return successes / recentResults.length;
}
// 记录执行结果
recordResult(success, error = null) {
this.successHistory.push(success);
if (this.successHistory.length > 100) {
this.successHistory.shift();
}
if (!success && error) {
const pattern = this.getErrorPattern(error);
const count = this.errorPatterns.get(pattern) || 0;
this.errorPatterns.set(pattern, count + 1);
}
}
getErrorPattern(error) {
return`${error.name}:${error.status || 'unknown'}`;
}
}
class CircuitBreakerRetryManager extends RetryManager {
constructor(options = {}) {
super(options);
this.circuitBreaker = {
state: 'CLOSED', // CLOSED, OPEN, HALF_OPEN
failureCount: 0,
failureThreshold: options.failureThreshold || 5,
timeoutDuration: options.timeoutDuration || 60000,
nextAttempt: 0,
successCount: 0
};
}
async execute(asyncFunction, ...args) {
// 检查断路器状态
if (this.circuitBreaker.state === 'OPEN') {
if (Date.now() < this.circuitBreaker.nextAttempt) {
thrownewError('断路器开启状态,请稍后重试');
} else {
this.circuitBreaker.state = 'HALF_OPEN';
this.circuitBreaker.successCount = 0;
}
}
try {
const result = awaitsuper.execute(asyncFunction, ...args);
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
onSuccess() {
this.circuitBreaker.failureCount = 0;
if (this.circuitBreaker.state === 'HALF_OPEN') {
this.circuitBreaker.successCount++;
if (this.circuitBreaker.successCount >= 2) {
this.circuitBreaker.state = 'CLOSED';
console.log('🔧 断路器已关闭,恢复正常');
}
}
}
onFailure() {
this.circuitBreaker.failureCount++;
if (this.circuitBreaker.state === 'HALF_OPEN') {
this.openCircuit();
} elseif (this.circuitBreaker.failureCount >= this.circuitBreaker.failureThreshold) {
this.openCircuit();
}
}
openCircuit() {
this.circuitBreaker.state = 'OPEN';
this.circuitBreaker.nextAttempt = Date.now() + this.circuitBreaker.timeoutDuration;
console.log(`⚡ 断路器已开启,${this.circuitBreaker.timeoutDuration / 1000}秒后重试`);
}
getCircuitState() {
return {
state: this.circuitBreaker.state,
failureCount: this.circuitBreaker.failureCount,
nextAttempt: this.circuitBreaker.nextAttempt
};
}
}
class ConcurrentRetryManager {
constructor(maxConcurrency = 5) {
this.maxConcurrency = maxConcurrency;
this.running = 0;
this.queue = [];
}
async execute(retryManager, asyncFunction, ...args) {
returnnewPromise((resolve, reject) => {
this.queue.push({
retryManager,
asyncFunction,
args,
resolve,
reject
});
this.processQueue();
});
}
async processQueue() {
if (this.running >= this.maxConcurrency || this.queue.length === 0) {
return;
}
this.running++;
const task = this.queue.shift();
try {
const result = await task.retryManager.execute(task.asyncFunction, ...task.args);
task.resolve(result);
} catch (error) {
task.reject(error);
} finally {
this.running--;
this.processQueue();
}
}
}
通过我们自制的异步重试机制,我们实现了:
核心优势:
强大功能:
实际应用场景:
高级特性:
这个重试机制不仅解决了网络不稳定环境下的可靠性问题,更重要的是提供了企业级应用所需的监控、统计和控制能力。无论是简单的API调用还是复杂的分布式任务处理,都能提供稳定可靠的重试保障。
掌握了这个工具,你的应用就能在各种不稳定的网络环境中稳如磐石,为用户提供始终可靠的服务体验!
《JavaScript原生实战手册》专栏持续更新中,下期预告:《表单验证引擎:构建企业级验证系统》