这段时间合作方送了一台打印机,因为是送的所以是基础款,联网功能都没有,刚好我有需要远程打印的需要,于是打算自己开发一款云打印盒(市场上最便宜也要百元),于是有了今天的这篇文章,选一款价格合适的开发版作为服务端,打印数据存储到KWDB数据库,通过远程指令驱动打印机完成打印操作,关于KWDB数据库的安装,前面写了一篇《香橙派Ai Pro安装部署KWDB数据库踩坑经验》的教程,本文不再重复安装过程。
后面我给这个云打印盒子起了个名字——橙印云盒,是一款偏向于居家使用的云打印盒子,可以使没有联网、不支持远程的打印机也能轻松互联,仅需将打印机USB端口连接云盒即可,可以轻松实现异地打印文件,实时监控打印机状态
功能 | |
---|---|
支持检测打印机状态,(例如:卡纸,点击打印机上的取消按钮,即可恢复正常 | |
支持所有惠普打印机 | |
可删除文件 | |
可上传文件 | |
可查询打印机型号 | |
支持多种文件打印格式(DOCX、TXT、PDF | |
支持内网、公网访问 |
橙印云盒
惠普打印机(HP Deskjet 2130 series) x 1 边缘计算盒子(例如:Orange Ai Pro 或 Purple Pi ) x 1
(开发版可以选择便宜的板子具备联网功能即可,驱动打印机对算力无要求,本次教程是因为KWDB是安装在香橙派开发版上)
USB数据线 x 1 WIFI天线(Wi-Fi 5双频2.4G和5G)x 1 电源线 x 2
HPLIP (HP Linux Imaging and Printing) 是 HP 为 Linux 系统提供的打印机驱动和工具套件。以下是使用 HPLIP 连接 HP 打印机的步骤:
检查是否安装
dpkg -l | grep hplip
没有安装的话,安装 HPLIP:
sudo apt update
sudo apt install hplip hplip-gui
通过 USB 线连接打印机和香橙派
2.运行 HP 设备管理器
hp-setup
这时候桌面会弹出可视化界面进行操作,选择连接类型(USB)
继续往下即可
安装后,查看已安装的打印机列表
lpstat -p
可以看到打印机已经和开发版连接上了
安装墨盒检测插件(可忽略)
这一步本来是想让开发版监控到打印机的墨盒状态的,但尝试了下代码无法获取或者是方法不兼容所有的打印机,所以放弃了,不过配之后可以在终端 通过命令查看到墨盒状态
hp-plugin -i
发现下载失败,需要手动下载插件
下载对应版本的插件(选择hplip-3.21.12-plugin.run)
通过SFTP将文件传到开发板的/home/print/目录
这里的/home/print目录 是我自己创建的,作为服务端脚本的工作目录
离线安装插件
cd /home/print
# 添加执行权限并安装
chmod +x hplip-3.21.12-plugin.run
./hplip-3.21.12-plugin.run
依然会弹出可视化界面引导安装
安装完成
查看墨盒状态:
hp-levels
可以看到显示一个墨盒,状态OK,以及墨量多少
好了前面的配置已经完成,成功连接上了打印机,我们现在进行测试基本的打印功能
# 确保pip已安装
sudo apt update
sudo apt install python3-pip # 如果pip不存在
# 安装模块(国内用户可换清华源)
pip install python-docx -i https://pypi.tuna.tsinghua.edu.cn/simple
# 安装 unoconv 和依赖
sudo apt install unoconv libreoffice-core
本地编辑一个,测试文档:1.docx
上传到开发版,并执行python程序
python3 tmp.py
可以看到已经将文件推送到打印机的队列中去了
看一下打印的效果
基础版代码
import cups
import dbus
import os
import subprocess
import tempfile
from docx import Document
def get_printer_status(printer_name="DeskJet_2130"):
try:
# 连接到CUPS服务器
conn = cups.Connection()
# 获取打印机状态
printers = conn.getPrinters()
if printer_name in printers:
printer_info = printers[printer_name]
status = printer_info.get('printer-state', '未知')
print(status)
status_message = printer_info.get('printer-state-message', '无状态信息')
# 状态代码解释
status_codes = {
3: '空闲(IDLE)',
4: '打印中(PRINTING)',
5: '已停止(STOPPED)'
}
status_text = status_codes.get(status, f"未知状态({status})")
print(f"打印机 '{printer_name}' 状态:")
print(f"- 状态: {status_text}")
print(f"- 状态信息: {status_message}")
# 安全地检查打印机是否接受作业
accepting_jobs = printer_info.get('printer-is-accepting-jobs', '未知')
print(f"- 是否接受作业: {'是' if accepting_jobs == True else '否' if accepting_jobs == False else '未知'}")
else:
print(f"未找到名为 '{printer_name}' 的打印机")
print("可用打印机列表:")
for printer in printers:
print(f"- {printer}")
except cups.IPPError as e:
print(f"连接CUPS服务器时出错: {e}")
except Exception as e:
print(f"发生错误: {e}")
def convert_docx_to_pdf(docx_path, output_pdf=None):
"""使用 unoconv 将 DOCX 转为 PDF"""
if not output_pdf:
output_pdf = os.path.splitext(docx_path)[0] + ".pdf"
try:
subprocess.run(["unoconv", "-f", "pdf", "-o", output_pdf, docx_path], check=True)
return output_pdf
except subprocess.CalledProcessError as e:
print(f"转换失败: {e}")
return None
def print_file(printer_name, file_path):
"""通过 CUPS 打印文件"""
try:
conn = cups.Connection()
job_id = conn.printFile(printer_name, file_path, "Python打印任务", {})
print(f"已提交打印任务 (ID: {job_id})")
return True
except cups.IPPError as e:
print(f"打印错误: {e}")
return False
def print_docx(printer_name, docx_path):
"""主函数:打印 DOCX 文件"""
# 1. 转换为 PDF
pdf_path = convert_docx_to_pdf(docx_path)
if not pdf_path or not os.path.exists(pdf_path):
return False
# 2. 打印 PDF
success = print_file(printer_name, pdf_path)
# 清理临时文件(可选)
# os.unlink(pdf_path)
return success
if __name__ == "__main__":
get_printer_status()
# 打印文件
printer = "DeskJet_2130" # 替换为你的打印机名
docx_file = "1.docx" # DOCX 文件路径
if print_docx(printer, docx_file):
print("打印任务已发送!")
else:
print("打印失败,请检查错误信息")
使用 KWDB 开发者中心连接 KWDB 数据库(详细操作可参考《香橙派Ai Pro安装部署KWDB数据库踩坑经验》
主机输入你的部署服务器的IP,外网服务器的话直接用公网IP即可,输入账号密码点击测试连接,连接成功即可进入数据库管理。
KWDB有两种数据库类型,分别是时序数据库和关系数据库,由于我们后面还有项目案例,根据项目的需求,确定方案用关系型数据库
创建数据库doc_list
这个步骤我们可以用 SQL命令进行创建
-- 1. 先创建被引用的主表(必须先执行)
CREATE TABLE doc_list.public.print_files (
id INT8 NOT NULL DEFAULT unique_rowid(),
filename VARCHAR(255) NOT NULL,
filepath STRING NOT NULL,
file_size INT8 NOT NULL,
file_type VARCHAR(20) NOT NULL,
upload_time TIMESTAMP NOT NULL,
print_status VARCHAR(20) NOT NULL DEFAULT 'pending',
print_count INT4 DEFAULT 0,
user_id INT8 NULL,
PRIMARY KEY (id), -- 明确声明主键
FAMILY "primary" (id, filename, filepath, file_size, file_type, upload_time, print_status, print_count, user_id)
);
CREATE TABLE doc_list.public.printers (
id INT8 NOT NULL DEFAULT unique_rowid(),
name VARCHAR(100) NOT NULL,
device_uri STRING NOT NULL,
make_model VARCHAR(150) NOT NULL,
location VARCHAR(100) NULL,
status VARCHAR(20) NOT NULL DEFAULT 'idle',
last_check TIMESTAMP NOT NULL,
is_active BOOL DEFAULT true,
PRIMARY KEY (id), -- 明确声明主键
FAMILY "primary" (id, name, device_uri, make_model, location, status, last_check, is_active)
);
-- 2. 再创建有关联关系的从表
CREATE TABLE doc_list.public.print_jobs (
id INT8 NOT NULL DEFAULT unique_rowid(),
file_id INT8 NOT NULL,
printer_id INT8 NOT NULL,
start_time TIMESTAMP NOT NULL,
end_time TIMESTAMP NULL,
status VARCHAR(20) NOT NULL,
pages_printed INT4 DEFAULT 0,
total_pages INT4 NOT NULL,
user_id INT8 NULL,
PRIMARY KEY (id),
FAMILY "primary" (id, file_id, printer_id, start_time, end_time, status, pages_printed, total_pages, user_id),
CONSTRAINT fk_file FOREIGN KEY (file_id) REFERENCES doc_list.public.print_files(id),
CONSTRAINT fk_printer FOREIGN KEY (printer_id) REFERENCES doc_list.public.printers(id)
);
-- 3. 添加注释(可选)
COMMENT ON COLUMN doc_list.public.print_files.id IS '文件ID';
COMMENT ON COLUMN doc_list.public.printers.id IS '打印机ID';
COMMENT ON COLUMN doc_list.public.print_jobs.id IS '任务ID';
创建后可在数据库管理工具查看字段和数据
数据库创建好后,我们先写一个demo,去开发版运行看下能否连接上KWDB数据库
安装依赖
pip install psycopg2-binary -i https://pypi.tuna.tsinghua.edu.cn/simple
编写demo
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
import psycopg2
def main():
try:
con = psycopg2.connect(database="defaultdb", user="orange", password="123456", host="192.168.1.139",port="26257")
print("Connected!")
con.set_session(autocommit=True)
cur = con.cursor()
except psycopg2.Error as e:
print(f"Failed to connect to Kaiwudb: {e}")
cur.close()
con.close()
return
if __name__ == "__main__":
main()
开发板内运行测试连接状态
cd /home/print/
python3 lianjie.py
可以看到成功连接上了KWDB数据库
现在已经实现基本功能了,但是我们需要连起来才能成为一个远程云盒,要能正常使用,我们还需要一个客户端用来操作上传,服务端用来接收指令,所以需要更新下打印盒子的代码,通过flask上传文件 暴露接口获取打印机信息
# 安装依赖(测试中开启跨域)
pip install flask -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install flask-cors -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install psycopg2-binary -i https://pypi.tuna.tsinghua.edu.cn/simple
<template>
<view class="container">
<!-- 顶部标题栏 -->
<view class="header">
<text class="title">云打印盒子</text>
<view class="printer-status" :class="printerStateClass">
{{ printerStatusText }}
</view>
</view>
<!-- 打印机信息卡片 -->
<!-- 打印机卡片部分修改为 -->
<view class="printer-card" v-if="printerStatus.printers">
<view class="printer-info">
<uni-icons type="printer" size="24" color="#2979FF"></uni-icons>
<view class="printer-details">
<text class="printer-name">
{{ Object.values(printerStatus.printers)[0]['printer-make-and-model'] }}
</text>
<text class="printer-uri">
设备: {{ formatDeviceUri(Object.values(printerStatus.printers)[0]['device-uri']) }}
</text>
</view>
</view>
<view class="printer-state">
<text>状态: {{ printerStatusText }}</text>
<text>队列: {{ status_print }}</text>
</view>
</view>
<!-- 文件上传区域 -->
<view class="upload-area">
<button class="upload-btn" @tap="chooseFile">
<uni-icons type="plus" size="20" color="#fff"></uni-icons>
<text>上传文件</text>
</button>
<text class="upload-tips">支持格式: DOCX, PDF, TXT</text>
</view>
<!-- 文件列表 -->
<view class="file-list">
<view class="section-title">
<text>待打印文件</text>
<view class="action-group">
<uni-icons
type="refresh"
size="18"
color="#666"
@tap="refreshFiles"
></uni-icons>
<text class="refresh-text">刷新</text>
</view>
</view>
<scroll-view scroll-y class="scroll-view">
<view
v-for="(file, index) in files"
:key="index"
class="file-item"
>
<view class="file-info">
<uni-icons :type="getFileIcon(file.filename)" size="24" color="#2979FF"></uni-icons>
<view class="file-details">
<text class="filename">{{ file.filename }}</text>
<text class="upload-time">{{ formatTime(file.upload_time) }}</text>
</view>
</view>
<view class="file-actions">
<button
class="print-btn"
@tap="printFile(file.filename)"
:disabled="file.status !== 'pending'"
>
{{ file.status === 'pending' ? '打印' : '打印中' }}
</button>
<button style="color: red;width: 120rpx;height: 60rpx;font-size: 25rpx;" @tap="confirmDelete(file.id)">
删除
</button>
</view>
</view>
<view v-if="files.length === 0" class="empty-tips">
<uni-icons type="info" size="24" color="#999"></uni-icons>
<text>暂无待打印文件</text>
</view>
</scroll-view>
</view>
<!-- 打印队列 -->
<view class="queue-section" v-if="queue.length > 0">
<view class="section-title">
<text>当前打印队列 ({{ queue.length }})</text>
</view>
<view class="queue-list">
<view
v-for="(job, index) in queue"
:key="index"
class="queue-item"
>
<view class="job-info">
<text class="job-id">#{{ job.id }}</text>
<text class="job-title">{{ job.title || '未命名任务' }}</text>
</view>
<text class="job-status">{{ formatJobStatus(job.state) }}</text>
</view>
</view>
</view>
</view>
</template>
<script>
export default {
data() {
return {
status_print:'',
printerStatus: {},
files: [],
queue: [],
lastUpdate: null,
API_BASE: 'http://192.168.1.139:5000/api'
}
},
computed: {
printerStatusText() {
if(!this.printerStatus.printers) return '未连接'
const printer = Object.values(this.printerStatus.printers)[0]
return {
3: '空闲',
4: '打印中',
5: '已停止'
}[printer['printer-state']] || '未知'
},
printerStateClass() {
const state = this.printerStatus['printer-state']
const classes = {
3: 'status-idle',
4: 'status-printing',
5: 'status-stopped'
}
return classes[state] || 'status-unknown'
}
},
onLoad() {
this.initData()
},
onPullDownRefresh() {
this.refreshFiles()
uni.stopPullDownRefresh()
},
methods: {
confirmDelete(fileId) {
uni.showModal({
title: '确认删除',
content: '确定要删除这个文件吗?',
success: (res) => {
if (res.confirm) {
this.deleteFile(fileId)
}
}
})
},
async deleteFile(fileId) {
uni.showLoading({ title: '删除中...' })
try {
const [err, res] = await uni.request({
url: `${this.API_BASE}/files/${fileId}`,
method: 'DELETE'
})
if (res.statusCode === 200) {
uni.showToast({ title: '删除成功' })
this.fetchFiles()
} else {
throw new Error(res.data.error || '删除失败')
}
} catch (e) {
} finally {
uni.hideLoading()
}
},
// 修改后的打印方法
async printFile(fileId) {
this.$set(this.printingStatus, fileId, 'pending')
try {
const [err, res] = await uni.request({
url: `${this.API_BASE}/print`,
method: 'POST',
data: { file_id: fileId },
header: { 'Content-Type': 'application/json' }
})
if (res.statusCode === 200) {
this.$set(this.printingStatus, fileId, 'printing')
this.monitorPrintJob(res.data.job_id, fileId)
} else {
throw new Error(res.data.error || '打印失败')
}
} catch (e) {
this.$set(this.printingStatus, fileId, 'error')
uni.showToast({ title: e.message, icon: 'none' })
}
},
initData() {
this.fetchPrinterStatus()
this.fetchFiles()
this.fetchPrintQueue()
// 定时刷新(每30秒)
this.refreshTimer = setInterval(() => {
this.fetchPrinterStatus()
this.fetchPrintQueue()
}, 30000)
},
fetchPrinterStatus() {
uni.request({
url: this.API_BASE + '/printer/status',
method: 'GET',
success: (res) => {
if(res.statusCode === 200) {
this.printerStatus = res.data
this.lastUpdate = new Date()
}
},
fail: (err) => {
console.error('获取打印机状态失败:', err)
}
})
},
fetchFiles() {
uni.showLoading({ title: '加载文件列表' })
uni.request({
url: this.API_BASE + '/files',
method: 'GET',
success: (res) => {
uni.hideLoading()
if(res.statusCode === 200) {
this.files = res.data.files || []
}
},
fail: (err) => {
uni.hideLoading()
console.error('加载文件列表失败:', err)
}
})
},
fetchPrintQueue() {
uni.request({
url: this.API_BASE + '/printer/queue',
method: 'GET',
success: (res) => {
if(res.statusCode === 200) {
uni.showToast({
title:'打印机队列正常'
})
this.status_print='正常'
this.queue = Object.values(res.data)
}
if(res.statusCode === 503) {
uni.showToast({
title:'打印机队列异常',
icon:'error'
})
this.status_print='卡纸异常'
}
},
fail: (err) => {
console.error('获取打印队列失败:', err)
}
})
},
chooseFile() {
uni.chooseFile({
count: 1,
extension: ['.docx', '.pdf', '.txt'],
success: (res) => {
this.uploadFile(res.tempFiles[0])
}
})
},
uploadFile(file) {
uni.showLoading({ title: '上传文件中...' })
uni.uploadFile({
url: this.API_BASE + '/upload',
filePath: file.path,
name: 'file',
formData: {
filename: file.name
},
success: (res) => {
if(res.statusCode === 200) {
uni.showToast({ title: '上传成功' })
this.fetchFiles()
} else {
uni.showToast({ title: '上传失败', icon: 'none' })
}
},
fail: (err) => {
uni.showToast({ title: '上传错误', icon: 'none' })
},
complete: () => {
uni.hideLoading()
}
})
},
refreshFiles() {
this.fetchPrinterStatus()
this.fetchFiles()
this.fetchPrintQueue()
},
formatTime(timestamp) {
if (!timestamp) return ''
const date = new Date(timestamp)
const padZero = num => (num < 10 ? '0' + num : num)
return `${date.getFullYear()}-${padZero(date.getMonth()+1)}-${padZero(date.getDate())} ${padZero(date.getHours())}:${padZero(date.getMinutes())}`
},
formatDeviceUri(uri) {
if (!uri) return '未知设备'
// 提取USB设备信息
const usbMatch = uri.match(/usb\/([^?]+)/)
if (usbMatch) return usbMatch[1].replace(/_/g, ' ')
return uri
},
getFileIcon(filename) {
const ext = filename.split('.').pop().toLowerCase()
const icons = {
pdf: 'pdf',
docx: 'word',
txt: 'text'
}
return icons[ext] || 'file'
},
formatJobStatus(state) {
const states = {
3: '等待中',
4: '打印中',
5: '已暂停',
6: '已取消',
7: '已完成'
}
return states[state] || '未知状态'
}
},
beforeUnmount() {
// 清除定时器
if (this.refreshTimer) {
clearInterval(this.refreshTimer)
}
}
}
</script>
<style>
/* 基础样式 */
page {
background-color: #f5f5f5;
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen,
Ubuntu, Cantarell, 'Open Sans', 'Helvetica Neue', sans-serif;
color: #333;
}
.container {
padding: 20rpx;
max-width: 800px;
margin: 0 auto;
}
/* 头部样式 */
.header {
display: flex;
justify-content: space-between;
align-items: center;
padding: 20rpx 0;
margin-bottom: 20rpx;
border-bottom: 1rpx solid #eee;
}
.title {
font-size: 36rpx;
font-weight: bold;
}
.printer-status {
padding: 6rpx 20rpx;
border-radius: 40rpx;
font-size: 24rpx;
color: #fff;
}
.status-idle {
background-color: #52c41a;
}
.status-printing {
background-color: #1890ff;
}
.status-stopped {
background-color: #f5222d;
}
.status-unknown {
background-color: #fa8c16;
}
/* 打印机卡片 */
.printer-card {
background-color: #fff;
border-radius: 12rpx;
padding: 24rpx;
margin-bottom: 30rpx;
box-shadow: 0 2rpx 8rpx rgba(0, 0, 0, 0.05);
}
.printer-info {
display: flex;
align-items: flex-start;
margin-bottom: 16rpx;
}
.printer-details {
margin-left: 20rpx;
flex: 1;
}
.printer-name {
display: block;
font-size: 30rpx;
font-weight: 500;
margin-bottom: 6rpx;
}
.printer-location,
.printer-uri {
display: block;
font-size: 24rpx;
color: #666;
margin-top: 6rpx;
}
.printer-state {
display: flex;
justify-content: space-between;
font-size: 26rpx;
color: #666;
padding-top: 12rpx;
border-top: 1rpx dashed #eee;
}
.update-time {
font-size: 24rpx;
color: #999;
}
/* 上传区域 */
.upload-area {
margin-bottom: 30rpx;
text-align: center;
}
.upload-btn {
display: inline-flex;
align-items: center;
justify-content: center;
height: 80rpx;
padding: 0 40rpx;
background-color: #2979ff;
color: #fff;
border-radius: 40rpx;
font-size: 28rpx;
box-shadow: 0 4rpx 12rpx rgba(41, 121, 255, 0.3);
}
.upload-btn text {
margin-left: 10rpx;
}
.upload-tips {
display: block;
margin-top: 12rpx;
font-size: 24rpx;
color: #999;
}
/* 文件列表 */
.section-title {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 20rpx;
font-size: 30rpx;
font-weight: 500;
}
.action-group {
display: flex;
align-items: center;
}
.refresh-text {
font-size: 26rpx;
color: #666;
margin-left: 8rpx;
}
.scroll-view {
max-height: 60vh;
}
.file-item {
display: flex;
justify-content: space-between;
align-items: center;
padding: 24rpx;
margin-bottom: 20rpx;
background-color: #fff;
border-radius: 12rpx;
box-shadow: 0 2rpx 8rpx rgba(0, 0, 0, 0.05);
}
.file-info {
display: flex;
align-items: center;
flex: 1;
min-width: 0;
}
.file-details {
margin-left: 20rpx;
flex: 1;
min-width: 0;
}
.filename {
display: block;
font-size: 28rpx;
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
margin-bottom: 6rpx;
}
.upload-time {
font-size: 24rpx;
color: #999;
}
.file-actions {
display: flex;
align-items: center;
margin-left: 20rpx;
}
.print-btn {
height: 56rpx;
line-height: 56rpx;
padding: 0 24rpx;
font-size: 26rpx;
color: #2979ff;
background-color: #e6f7ff;
border: 1rpx solid #91d5ff;
border-radius: 6rpx;
margin-right: 16rpx;
}
.print-btn[disabled] {
color: #bfbfbf;
background-color: #f5f5f5;
border-color: #d9d9d9;
}
.delete-btn {
width: 56rpx;
height: 56rpx;
display: flex;
align-items: center;
justify-content: center;
background-color: #fff2f0;
border: 1rpx solid #ffccc7;
border-radius: 6rpx;
}
.empty-tips {
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
padding: 60rpx 0;
color: #999;
font-size: 28rpx;
}
.empty-tips text {
margin-top: 20rpx;
}
/* 打印队列 */
.queue-section {
margin-top: 40rpx;
}
.queue-list {
background-color: #fff;
border-radius: 12rpx;
padding: 0 20rpx;
box-shadow: 0 2rpx 8rpx rgba(0, 0, 0, 0.05);
}
.queue-item {
display: flex;
justify-content: space-between;
align-items: center;
padding: 24rpx 0;
border-bottom: 1rpx solid #f0f0f0;
}
.queue-item:last-child {
border-bottom: none;
}
.job-info {
display: flex;
align-items: center;
flex: 1;
min-width: 0;
}
.job-id {
font-size: 26rpx;
color: #1890ff;
margin-right: 20rpx;
font-family: monospace;
}
.job-title {
font-size: 26rpx;
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
flex: 1;
}
.job-status {
font-size: 24rpx;
color: #666;
margin-left: 20rpx;
}
</style>
写一个方法用于后面处理连接
# KWDB连接器(带错误重试)
# 在get_kwdb_connection()中添加更详细的错误处理
def get_kwdb_connection():
try:
conn = psycopg2.connect(**app.config['KWDB_CONFIG'])
conn.set_session(autocommit=True)
# 测试连接有效性
with conn.cursor() as cur:
cur.execute("SELECT 1")
if cur.fetchone()[0] != 1:
raise psycopg2.OperationalError("Connection test failed")
return conn
except psycopg2.Error as e:
logging.error(f"DB connection failed: {str(e)}")
raise ConnectionError("Database service unavailable") from e
用于客户端用户上传文件接收,将文件移动到开发版指定目录,并上传数据到数据库
# 文件上传接口(完全适配KWDB语法)
@app.route('/api/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify({'error': 'No file part'}), 400
file = request.files['file']
if not file or file.filename == '':
return jsonify({'error': 'No selected file'}), 400
if not allowed_file(file.filename):
return jsonify({'error': 'File type not allowed'}), 400
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
try:
# 先保存文件
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
file.save(filepath)
file_size = os.path.getsize(filepath)
file_type = filename.split('.')[-1].lower()
file_id = generate_file_id()
with get_kwdb_connection() as conn, conn.cursor() as cur:
# 使用更安全的ID生成方式(避免手动计算MAX+1)
cur.execute("""
INSERT INTO doc_list.public.print_files
(id,filename, filepath, file_size, file_type, upload_time)
VALUES (%s,%s, %s, %s, %s, %s)
RETURNING id::text
""", (file_id,filename, filepath, file_size, file_type, datetime.utcnow()))
file_id = cur.fetchone()[0]
# 打印文件
printer = "DeskJet_2130" # 替换为你的打印机名
docx_file = filename # DOCX 文件路径
if print_docx(printer, docx_file):
print("打印任务已发送!")
else:
print("打印失败,请检查错误信息")
return jsonify({
'id': file_id,
'filename': filename,
'size': file_size,
'message': 'File uploaded successfully'
}), 200
except psycopg2.Error as e:
logging.error(f"Database error: {e.pgerror}")
if os.path.exists(filepath):
os.unlink(filepath)
return jsonify({'error': 'Database operation failed'}), 500
except Exception as e:
logging.error(f"Upload failed: {str(e)}", exc_info=True)
return jsonify({'error': 'Upload processing failed'}), 500
用户点击打印按钮,后端接收到打印的ID去查找对应的文件,发送给打印机进行打印
# 打印接口(适配KWDB事务)
@app.route('/api/print', methods=['POST'])
def print_file():
data = request.json
if not data or 'file_id' not in data:
return jsonify({'error': 'Missing file ID'}), 400
try:
with get_kwdb_connection() as conn, conn.cursor() as cur:
# 获取文件信息
cur.execute("""
SELECT filepath, filename
FROM doc_list.public.print_files
WHERE id = %s AND status = 'pending'
FOR UPDATE NOWAIT
""", (data['file_id'],))
result = cur.fetchone()
if not result:
return jsonify({'error': 'File not available for printing'}), 404
filepath, filename = result
ext = filename.split('.')[-1].lower()
# 文件转换逻辑
if ext == 'docx':
with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as tmp_pdf:
conv_result = subprocess.run(
['unoconv', '-f', 'pdf', '-o', tmp_pdf.name, filepath],
capture_output=True
)
if conv_result.returncode != 0:
raise RuntimeError(f"Conversion failed: {conv_result.stderr.decode()}")
print_filepath = tmp_pdf.name
else:
print_filepath = filepath
# 提交打印任务
conn = cups.Connection()
printer = conn.getDefault()
if not printer:
raise RuntimeError("No default printer set")
job_id = conn.printFile(
printer,
print_filepath,
f"PrintJob_{filename}",
{
'media': 'A4',
'fit-to-page': 'True',
'print-quality': '3'
}
)
# 更新数据库状态
cur.execute("""
UPDATE doc_list.public.print_files
SET status = 'printing',
print_job_id = %s,
print_time = %s
WHERE id = %s
""", (job_id, datetime.now(), data['file_id']))
return jsonify({
'job_id': job_id,
'message': 'Print job started'
}), 200
except Exception as e:
logging.error(f"Print failed: {str(e)}", exc_info=True)
return jsonify({'error': str(e)}), 500
客户端能够显示历史上传的文件,可进行对文件的操作
# 修改后的文件列表接口(修正SQL语法)
@app.route('/api/files', methods=['GET'])
def get_files():
try:
with get_kwdb_connection() as conn, conn.cursor() as cur:
# 安全查询:先获取表结构
cur.execute("""
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'print_files'
""")
columns = [row[0] for row in cur.fetchall()]
if not columns:
return jsonify({'files': []})
# 动态构建查询
cur.execute(f"""
SELECT {', '.join(columns)}
FROM doc_list.public.print_files
ORDER BY upload_time DESC
""")
# 自动适配字段类型
files = []
for row in cur.fetchall():
file_data = {}
for i, col in enumerate(columns):
value = row[i]
if isinstance(value, datetime):
value = value.isoformat()
file_data[col] = value
files.append(file_data)
return jsonify({'files': files})
except ConnectionError:
return jsonify({'error': '数据库连接失败'}), 503
except Exception as e:
logging.error(f"查询失败: {str(e)}", exc_info=True)
return jsonify({'error': '数据查询异常'}), 500
在这里插入代码片