在使用Python开发多线程任务队列时,经常会遇到各种错误,例如循环导入、对象访问方式错误、变量作用域问题等。本文基于实际开发案例,分析三个典型错误,并提供详细的解决方案。涉及的场景包括:
queue.Queue + threading.Thread)错误信息:
ImportError: cannot import name 'start_processing' from partially initialized module 'task.national_match_task' (most likely due to a circular import)原因:
app.py 导入了 national_match_task.py 的 start_processingnational_match_task.py 又导入了 app.py 的 app在函数内部导入依赖,而不是在模块顶部:
# national_match_task.py
def get_failed_records():
from app import app # 延迟导入
with app.app_context():
records = db.session.query(CustomerOrder).filter(...).all()
return records让 start_processing 接收 app 参数,而不是直接导入:
# national_match_task.py
def start_processing(app): # 接收app参数
# 使用app而不是直接导入
# app.py
from task.national_match_task import start_processing
start_processing(app) # 传入app实例flask.current_appfrom flask import current_app as app # 替代直接导入错误信息:
TypeError: 'CustomerOrder' object is not subscriptable原因:
match_nationwide_numbers() 函数期望接收字典,但传入的是SQLAlchemy模型对象item['prefix'] 访问属性,但SQLAlchemy对象应该用 item.prefix# match_phone_number.py
def match_nationwide_numbers(item, cookie, logger):
if not (item.prefix and item.suffix): # 使用 . 访问属性
logger.warning("缺少必要的前缀或后缀信息")
return {"匹配状态": "失败: 缺少前缀或后缀"}
# 其他逻辑...# national_match_task.py
def worker():
item = queue.get()
item_dict = {
'prefix': item.prefix,
'suffix': item.suffix,
'tracking_number': item.tracking_number,
}
result = match_nationwide_numbers(item_dict, item.cookie, logger)max_retries = 3
retry_count = getattr(item, '_retry_count', 0)
if retry_count < max_retries:
item._retry_count = retry_count + 1
queue.put(item) # 重新放回队列错误信息:
UnboundLocalError: cannot access local variable 'item' where it is not associated with a value原因:
item 变量在 try 块外未初始化queue.get() 抛出异常时,item 未被赋值,但 finally 仍尝试访问它itemdef worker():
item = None # 初始化
try:
item = queue.get(timeout=1)
# 处理逻辑...
except queue.Empty:
continue
finally:
if item is not None: # 确保变量已赋值
queue.task_done()finally:
if 'item' in locals() and item is not None:
queue.task_done()def worker():
while True:
process_next_item()
def process_next_item():
item = queue.get(timeout=1)
try:
# 处理逻辑...
finally:
queue.task_done(). 访问属性,而不是 []UnboundLocalErrorfinally 确保资源释放national_match_task.pyimport threading
import queue
import time
from flask import current_app as app
from models import CustomerOrder
def worker():
item = None # 初始化
try:
item = queue.get(timeout=1)
if item is None:
return
logger.info(f"处理记录: {item.tracking_number}")
result = match_nationwide_numbers({
'prefix': item.prefix,
'suffix': item.suffix,
}, item.cookie, logger)
update_record(item.id, result["匹配状态"], result.get("手机号"))
except queue.Empty:
return
except Exception as e:
logger.error(f"处理失败: {e}")
if item and getattr(item, '_retry_count', 0) < 3:
item._retry_count += 1
queue.put(item)
finally:
if item is not None:
queue.task_done()
def start_processing(app):
for _ in range(5):
threading.Thread(target=worker, daemon=True).start()多线程任务队列在Python中非常实用,但也容易遇到各种边界情况。通过合理设计代码结构、初始化变量、正确处理对象访问方式,可以大幅减少错误发生。希望本文能帮助你更稳健地开发Python多线程应用!
进一步阅读:
🚀 Happy Coding!