首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >【KWDB 创作者计划】橙印云盒 · 搭载 KWDB 极速存储

【KWDB 创作者计划】橙印云盒 · 搭载 KWDB 极速存储

作者头像
德宏大魔王
发布2025-06-02 10:01:19
发布2025-06-02 10:01:19
11000
代码可运行
举报
文章被收录于专栏:cloud stdiocloud stdio
运行总次数:0
代码可运行

1. 前言

这段时间合作方送了一台打印机,因为是送的所以是基础款,联网功能都没有,刚好我有需要远程打印的需要,于是打算自己开发一款云打印盒(市场上最便宜也要百元),于是有了今天的这篇文章,选一款价格合适的开发版作为服务端,打印数据存储到KWDB数据库,通过远程指令驱动打印机完成打印操作,关于KWDB数据库的安装,前面写了一篇《香橙派Ai Pro安装部署KWDB数据库踩坑经验》的教程,本文不再重复安装过程。

2. 项目介绍

后面我给这个云打印盒子起了个名字——橙印云盒,是一款偏向于居家使用的云打印盒子,可以使没有联网、不支持远程的打印机也能轻松互联,仅需将打印机USB端口连接云盒即可,可以轻松实现异地打印文件,实时监控打印机状态

2.1 目前支持

功能

支持检测打印机状态,(例如:卡纸,点击打印机上的取消按钮,即可恢复正常

支持所有惠普打印机

可删除文件

可上传文件

可查询打印机型号

支持多种文件打印格式(DOCX、TXT、PDF

支持内网、公网访问

2.2 图文演示

2.3 视频演示

橙印云盒

3. 准备工作

3.1 用到的工具

惠普打印机(HP Deskjet 2130 series) x 1 边缘计算盒子(例如:Orange Ai Pro 或 Purple Pi ) x 1

(开发版可以选择便宜的板子具备联网功能即可,驱动打印机对算力无要求,本次教程是因为KWDB是安装在香橙派开发版上)

USB数据线 x 1 WIFI天线(Wi-Fi 5双频2.4G和5G)x 1 电源线 x 2

3.2 使用 HPLIP 连接 HP 打印机

HPLIP (HP Linux Imaging and Printing) 是 HP 为 Linux 系统提供的打印机驱动和工具套件。以下是使用 HPLIP 连接 HP 打印机的步骤:

3.3 安装 HPLIP

检查是否安装

代码语言:javascript
代码运行次数:0
运行
复制
dpkg -l | grep hplip

没有安装的话,安装 HPLIP​:

代码语言:javascript
代码运行次数:0
运行
复制
sudo apt update
sudo apt install hplip hplip-gui

3.4 连接打印机

通过 USB 线连接打印机和香橙派

2.​运行 HP 设备管理器​

代码语言:javascript
代码运行次数:0
运行
复制
hp-setup

这时候桌面会弹出可视化界面进行操作,选择连接类型(USB)

继续往下即可

安装后,查看已安装的打印机列表

代码语言:javascript
代码运行次数:0
运行
复制
lpstat -p

可以看到打印机已经和开发版连接上了

安装墨盒检测插件(可忽略)

这一步本来是想让开发版监控到打印机的墨盒状态的,但尝试了下代码无法获取或者是方法不兼容所有的打印机,所以放弃了,不过配之后可以在终端 通过命令查看到墨盒状态

代码语言:javascript
代码运行次数:0
运行
复制
hp-plugin -i

发现下载失败,需要手动下载插件

下载对应版本的插件(选择hplip-3.21.12-plugin.run)

通过SFTP将文件传到开发板的/home/print/目录

这里的/home/print目录 是我自己创建的,作为服务端脚本的工作目录

离线安装插件

代码语言:javascript
代码运行次数:0
运行
复制
cd /home/print

# 添加执行权限并安装
chmod +x hplip-3.21.12-plugin.run



./hplip-3.21.12-plugin.run

依然会弹出可视化界面引导安装

安装完成

查看墨盒状态:

代码语言:javascript
代码运行次数:0
运行
复制
hp-levels

可以看到显示一个墨盒,状态OK,以及墨量多少

4. 打印文件

好了前面的配置已经完成,成功连接上了打印机,我们现在进行测试基本的打印功能

4.1 使用 python-docx + unoconv(推荐)​

代码语言:javascript
代码运行次数:0
运行
复制
# 确保pip已安装
sudo apt update
sudo apt install python3-pip  # 如果pip不存在

# 安装模块(国内用户可换清华源)
pip install python-docx -i https://pypi.tuna.tsinghua.edu.cn/simple


# 安装 unoconv 和依赖
sudo apt install unoconv libreoffice-core

本地编辑一个,测试文档:1.docx

上传到开发版,并执行python程序

代码语言:javascript
代码运行次数:0
运行
复制
python3 tmp.py

可以看到已经将文件推送到打印机的队列中去了

看一下打印的效果

基础版代码

代码语言:javascript
代码运行次数:0
运行
复制
import cups
import dbus
import os
import subprocess
import tempfile
from docx import Document





def get_printer_status(printer_name="DeskJet_2130"):
    try:
        # 连接到CUPS服务器
        conn = cups.Connection()

        # 获取打印机状态
        printers = conn.getPrinters()

        if printer_name in printers:
            printer_info = printers[printer_name]
            status = printer_info.get('printer-state', '未知')
            print(status)
            status_message = printer_info.get('printer-state-message', '无状态信息')

            # 状态代码解释
            status_codes = {
                3: '空闲(IDLE)',
                4: '打印中(PRINTING)',
                5: '已停止(STOPPED)'
            }

            status_text = status_codes.get(status, f"未知状态({status})")

            print(f"打印机 '{printer_name}' 状态:")
            print(f"- 状态: {status_text}")
            print(f"- 状态信息: {status_message}")

            # 安全地检查打印机是否接受作业
            accepting_jobs = printer_info.get('printer-is-accepting-jobs', '未知')
            print(f"- 是否接受作业: {'是' if accepting_jobs == True else '否' if accepting_jobs == False else '未知'}")
        else:
            print(f"未找到名为 '{printer_name}' 的打印机")
            print("可用打印机列表:")
            for printer in printers:
                print(f"- {printer}")

    except cups.IPPError as e:
        print(f"连接CUPS服务器时出错: {e}")
    except Exception as e:
        print(f"发生错误: {e}")




def convert_docx_to_pdf(docx_path, output_pdf=None):
    """使用 unoconv 将 DOCX 转为 PDF"""
    if not output_pdf:
        output_pdf = os.path.splitext(docx_path)[0] + ".pdf"

    try:
        subprocess.run(["unoconv", "-f", "pdf", "-o", output_pdf, docx_path], check=True)
        return output_pdf
    except subprocess.CalledProcessError as e:
        print(f"转换失败: {e}")
        return None


def print_file(printer_name, file_path):
    """通过 CUPS 打印文件"""
    try:
        conn = cups.Connection()
        job_id = conn.printFile(printer_name, file_path, "Python打印任务", {})
        print(f"已提交打印任务 (ID: {job_id})")
        return True
    except cups.IPPError as e:
        print(f"打印错误: {e}")
        return False


def print_docx(printer_name, docx_path):
    """主函数:打印 DOCX 文件"""
    # 1. 转换为 PDF
    pdf_path = convert_docx_to_pdf(docx_path)
    if not pdf_path or not os.path.exists(pdf_path):
        return False

    # 2. 打印 PDF
    success = print_file(printer_name, pdf_path)

    # 清理临时文件(可选)
    # os.unlink(pdf_path)
    return success


if __name__ == "__main__":
    get_printer_status()
    # 打印文件
    printer = "DeskJet_2130"  # 替换为你的打印机名
    docx_file = "1.docx"  # DOCX 文件路径

    if print_docx(printer, docx_file):
        print("打印任务已发送!")
    else:
        print("打印失败,请检查错误信息")

5. 创建KWDB数据库

使用 KWDB 开发者中心连接 KWDB 数据库(详细操作可参考《香橙派Ai Pro安装部署KWDB数据库踩坑经验》

5.1 新建连接

5.2 选择连接类型

5.3 输入账密完成连接

主机输入你的部署服务器的IP,外网服务器的话直接用公网IP即可,输入账号密码点击测试连接,连接成功即可进入数据库管理。

6. 数据库类型

KWDB有两种数据库类型,分别是时序数据库和关系数据库,由于我们后面还有项目案例,根据项目的需求,确定方案用关系型数据库

7. 数据库操作

创建数据库doc_list

7.1 新建数据表

这个步骤我们可以用 SQL命令进行创建

代码语言:javascript
代码运行次数:0
运行
复制
-- 1. 先创建被引用的主表(必须先执行)
CREATE TABLE doc_list.public.print_files (
    id INT8 NOT NULL DEFAULT unique_rowid(),
    filename VARCHAR(255) NOT NULL,
    filepath STRING NOT NULL,
    file_size INT8 NOT NULL,
    file_type VARCHAR(20) NOT NULL,
    upload_time TIMESTAMP NOT NULL,
    print_status VARCHAR(20) NOT NULL DEFAULT 'pending',
    print_count INT4 DEFAULT 0,
    user_id INT8 NULL,
    PRIMARY KEY (id),  -- 明确声明主键
    FAMILY "primary" (id, filename, filepath, file_size, file_type, upload_time, print_status, print_count, user_id)
);

CREATE TABLE doc_list.public.printers (
    id INT8 NOT NULL DEFAULT unique_rowid(),
    name VARCHAR(100) NOT NULL,
    device_uri STRING NOT NULL,
    make_model VARCHAR(150) NOT NULL,
    location VARCHAR(100) NULL,
    status VARCHAR(20) NOT NULL DEFAULT 'idle',
    last_check TIMESTAMP NOT NULL,
    is_active BOOL DEFAULT true,
    PRIMARY KEY (id),  -- 明确声明主键
    FAMILY "primary" (id, name, device_uri, make_model, location, status, last_check, is_active)
);

-- 2. 再创建有关联关系的从表
CREATE TABLE doc_list.public.print_jobs (
    id INT8 NOT NULL DEFAULT unique_rowid(),
    file_id INT8 NOT NULL,
    printer_id INT8 NOT NULL,
    start_time TIMESTAMP NOT NULL,
    end_time TIMESTAMP NULL,
    status VARCHAR(20) NOT NULL,
    pages_printed INT4 DEFAULT 0,
    total_pages INT4 NOT NULL,
    user_id INT8 NULL,
    PRIMARY KEY (id),
    FAMILY "primary" (id, file_id, printer_id, start_time, end_time, status, pages_printed, total_pages, user_id),
    CONSTRAINT fk_file FOREIGN KEY (file_id) REFERENCES doc_list.public.print_files(id),
    CONSTRAINT fk_printer FOREIGN KEY (printer_id) REFERENCES doc_list.public.printers(id)
);

-- 3. 添加注释(可选)
COMMENT ON COLUMN doc_list.public.print_files.id IS '文件ID';
COMMENT ON COLUMN doc_list.public.printers.id IS '打印机ID';
COMMENT ON COLUMN doc_list.public.print_jobs.id IS '任务ID';

创建后可在数据库管理工具查看字段和数据

7.2 数据库连接demo

数据库创建好后,我们先写一个demo,去开发版运行看下能否连接上KWDB数据库

安装依赖

代码语言:javascript
代码运行次数:0
运行
复制
pip install psycopg2-binary -i https://pypi.tuna.tsinghua.edu.cn/simple

编写demo

代码语言:javascript
代码运行次数:0
运行
复制
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

import psycopg2


def main():
    try:
        con = psycopg2.connect(database="defaultdb", user="orange", password="123456", host="192.168.1.139",port="26257")
        print("Connected!")
        con.set_session(autocommit=True)
        cur = con.cursor()
    except psycopg2.Error as e:
        print(f"Failed to connect to Kaiwudb: {e}")

    


    cur.close()
    con.close()
    return


if __name__ == "__main__":
    main()

开发板内运行测试连接状态

代码语言:javascript
代码运行次数:0
运行
复制
cd /home/print/
python3 lianjie.py

可以看到成功连接上了KWDB数据库

8. 云盒升级

现在已经实现基本功能了,但是我们需要连起来才能成为一个远程云盒,要能正常使用,我们还需要一个客户端用来操作上传,服务端用来接收指令,所以需要更新下打印盒子的代码,通过flask上传文件 暴露接口获取打印机信息

代码语言:javascript
代码运行次数:0
运行
复制
# 安装依赖(测试中开启跨域)
pip install flask -i https://pypi.tuna.tsinghua.edu.cn/simple


pip install flask-cors -i https://pypi.tuna.tsinghua.edu.cn/simple

pip install psycopg2-binary -i https://pypi.tuna.tsinghua.edu.cn/simple

8.1 前端样式

代码语言:javascript
代码运行次数:0
运行
复制
<template>
  <view class="container">
    <!-- 顶部标题栏 -->
    <view class="header">
      <text class="title">云打印盒子</text>
      <view class="printer-status" :class="printerStateClass">
        {{ printerStatusText }}
      </view>
    </view>

    <!-- 打印机信息卡片 -->
   <!-- 打印机卡片部分修改为 -->
     <view class="printer-card" v-if="printerStatus.printers">
       <view class="printer-info">
         <uni-icons type="printer" size="24" color="#2979FF"></uni-icons>
         <view class="printer-details">
           <text class="printer-name">
             {{ Object.values(printerStatus.printers)[0]['printer-make-and-model'] }}
           </text>
           <text class="printer-uri">
             设备: {{ formatDeviceUri(Object.values(printerStatus.printers)[0]['device-uri']) }}
           </text>
         </view>
       </view>
       <view class="printer-state">
         <text>状态: {{ printerStatusText }}</text>
		  <text>队列: {{ status_print }}</text>
       </view>
     </view>

    <!-- 文件上传区域 -->
    <view class="upload-area">
      <button class="upload-btn" @tap="chooseFile">
        <uni-icons type="plus" size="20" color="#fff"></uni-icons>
        <text>上传文件</text>
      </button>
      <text class="upload-tips">支持格式: DOCX, PDF, TXT</text>
    </view>

    <!-- 文件列表 -->
    <view class="file-list">
      <view class="section-title">
        <text>待打印文件</text>
        <view class="action-group">
          <uni-icons 
            type="refresh" 
            size="18" 
            color="#666" 
            @tap="refreshFiles"
          ></uni-icons>
          <text class="refresh-text">刷新</text>
        </view>
      </view>

      <scroll-view scroll-y class="scroll-view">
        <view 
          v-for="(file, index) in files" 
          :key="index" 
          class="file-item"
        >
          <view class="file-info">
            <uni-icons :type="getFileIcon(file.filename)" size="24" color="#2979FF"></uni-icons>
            <view class="file-details">
              <text class="filename">{{ file.filename }}</text>
              <text class="upload-time">{{ formatTime(file.upload_time) }}</text>
            </view>
          </view>
          <view class="file-actions">
            <button 
              class="print-btn" 
              @tap="printFile(file.filename)"
              :disabled="file.status !== 'pending'"
            >
              {{ file.status === 'pending' ? '打印' : '打印中' }}
            </button>
			
            <button  style="color: red;width: 120rpx;height: 60rpx;font-size: 25rpx;" @tap="confirmDelete(file.id)">
             删除
            </button>
          </view>
        </view>
        
        <view v-if="files.length === 0" class="empty-tips">
          <uni-icons type="info" size="24" color="#999"></uni-icons>
          <text>暂无待打印文件</text>
        </view>
      </scroll-view>
    </view>

    <!-- 打印队列 -->
    <view class="queue-section" v-if="queue.length > 0">
      <view class="section-title">
        <text>当前打印队列 ({{ queue.length }})</text>
      </view>
      <view class="queue-list">
        <view 
          v-for="(job, index) in queue" 
          :key="index" 
          class="queue-item"
        >
          <view class="job-info">
            <text class="job-id">#{{ job.id }}</text>
            <text class="job-title">{{ job.title || '未命名任务' }}</text>
          </view>
          <text class="job-status">{{ formatJobStatus(job.state) }}</text>
        </view>
      </view>
    </view>
  </view>
</template>

<script>
export default {
  data() {
    return {
		status_print:'',
      printerStatus: {},
      files: [],
      queue: [],
      lastUpdate: null,
      API_BASE: 'http://192.168.1.139:5000/api'
    }
  },
  computed: {
     printerStatusText() {
          if(!this.printerStatus.printers) return '未连接'
          const printer = Object.values(this.printerStatus.printers)[0]
          return {
            3: '空闲',
            4: '打印中',
            5: '已停止'
          }[printer['printer-state']] || '未知'
        },
    printerStateClass() {
      const state = this.printerStatus['printer-state']
      const classes = {
        3: 'status-idle',
        4: 'status-printing',
        5: 'status-stopped'
      }
      return classes[state] || 'status-unknown'
    }
  },
  onLoad() {
    this.initData()
  },
  onPullDownRefresh() {
    this.refreshFiles()
    uni.stopPullDownRefresh()
  },
  methods: {
	    confirmDelete(fileId) {
	      uni.showModal({
	        title: '确认删除',
	        content: '确定要删除这个文件吗?',
	        success: (res) => {
	          if (res.confirm) {
	            this.deleteFile(fileId)
	          }
	        }
	      })
	    },
	    
	    async deleteFile(fileId) {
	      uni.showLoading({ title: '删除中...' })
	      try {
	        const [err, res] = await uni.request({
	          url: `${this.API_BASE}/files/${fileId}`,
	          method: 'DELETE'
	        })
	        
	        if (res.statusCode === 200) {
	          uni.showToast({ title: '删除成功' })
	          this.fetchFiles()
	        } else {
	          throw new Error(res.data.error || '删除失败')
	        }
	      } catch (e) {
	       
	      } finally {
	        uni.hideLoading()
	      }
	    },
	    
	    // 修改后的打印方法
	    async printFile(fileId) {
	      this.$set(this.printingStatus, fileId, 'pending')
	      
	      try {
	        const [err, res] = await uni.request({
	          url: `${this.API_BASE}/print`,
	          method: 'POST',
	          data: { file_id: fileId },
	          header: { 'Content-Type': 'application/json' }
	        })
	        
	        if (res.statusCode === 200) {
	          this.$set(this.printingStatus, fileId, 'printing')
	          this.monitorPrintJob(res.data.job_id, fileId)
	        } else {
	          throw new Error(res.data.error || '打印失败')
	        }
	      } catch (e) {
	        this.$set(this.printingStatus, fileId, 'error')
	        uni.showToast({ title: e.message, icon: 'none' })
	      }
	    },
    initData() {
      this.fetchPrinterStatus()
      this.fetchFiles()
      this.fetchPrintQueue()
      
      // 定时刷新(每30秒)
      this.refreshTimer = setInterval(() => {
        this.fetchPrinterStatus()
        this.fetchPrintQueue()
      }, 30000)
    },
    
    fetchPrinterStatus() {
      uni.request({
        url: this.API_BASE + '/printer/status',
        method: 'GET',
        success: (res) => {
          if(res.statusCode === 200) {
            this.printerStatus = res.data
            this.lastUpdate = new Date()
          }
        },
        fail: (err) => {
          console.error('获取打印机状态失败:', err)
        }
      })
    },
    
    fetchFiles() {
      uni.showLoading({ title: '加载文件列表' })
      uni.request({
        url: this.API_BASE + '/files',
        method: 'GET',
        success: (res) => {
          uni.hideLoading()
          if(res.statusCode === 200) {
            this.files = res.data.files || []
          }
        },
        fail: (err) => {
          uni.hideLoading()
          console.error('加载文件列表失败:', err)
        }
      })
    },
    
    fetchPrintQueue() {
      uni.request({
        url: this.API_BASE + '/printer/queue',
        method: 'GET',
        success: (res) => {
          if(res.statusCode === 200) {
			  uni.showToast({
			  	title:'打印机队列正常'
			  })
			this.status_print='正常'
            this.queue = Object.values(res.data)
          }
		  if(res.statusCode === 503) {
		  			  uni.showToast({
		  			  	title:'打印机队列异常',
						icon:'error'
		  			  })
					  
			this.status_print='卡纸异常'
		  }
        },
        fail: (err) => {
          console.error('获取打印队列失败:', err)
        }
      })
    },
    
    chooseFile() {
      uni.chooseFile({
        count: 1,
        extension: ['.docx', '.pdf', '.txt'],
        success: (res) => {
          this.uploadFile(res.tempFiles[0])
        }
      })
    },
    
    uploadFile(file) {
      uni.showLoading({ title: '上传文件中...' })
      uni.uploadFile({
        url: this.API_BASE + '/upload',
        filePath: file.path,
        name: 'file',
        formData: {
          filename: file.name
        },
        success: (res) => {
          if(res.statusCode === 200) {
            uni.showToast({ title: '上传成功' })
            this.fetchFiles()
          } else {
            uni.showToast({ title: '上传失败', icon: 'none' })
          }
        },
        fail: (err) => {
          uni.showToast({ title: '上传错误', icon: 'none' })
        },
        complete: () => {
          uni.hideLoading()
        }
      })
    },
    
	
    
    refreshFiles() {
      this.fetchPrinterStatus()
      this.fetchFiles()
      this.fetchPrintQueue()
    },
    
  
 
    
    formatTime(timestamp) {
      if (!timestamp) return ''
      const date = new Date(timestamp)
      const padZero = num => (num < 10 ? '0' + num : num)
      return `${date.getFullYear()}-${padZero(date.getMonth()+1)}-${padZero(date.getDate())} ${padZero(date.getHours())}:${padZero(date.getMinutes())}`
    },
    
    formatDeviceUri(uri) {
      if (!uri) return '未知设备'
      // 提取USB设备信息
      const usbMatch = uri.match(/usb\/([^?]+)/)
      if (usbMatch) return usbMatch[1].replace(/_/g, ' ')
      return uri
    },
    
    getFileIcon(filename) {
      const ext = filename.split('.').pop().toLowerCase()
      const icons = {
        pdf: 'pdf',
        docx: 'word',
        txt: 'text'
      }
      return icons[ext] || 'file'
    },
    
    formatJobStatus(state) {
      const states = {
        3: '等待中',
        4: '打印中',
        5: '已暂停',
        6: '已取消',
        7: '已完成'
      }
      return states[state] || '未知状态'
    }
  },
  
  beforeUnmount() {
    // 清除定时器
    if (this.refreshTimer) {
      clearInterval(this.refreshTimer)
    }
  }
}
</script>

<style>
/* 基础样式 */
page {
  background-color: #f5f5f5;
  font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, 
               Ubuntu, Cantarell, 'Open Sans', 'Helvetica Neue', sans-serif;
  color: #333;
}

.container {
  padding: 20rpx;
  max-width: 800px;
  margin: 0 auto;
}

/* 头部样式 */
.header {
  display: flex;
  justify-content: space-between;
  align-items: center;
  padding: 20rpx 0;
  margin-bottom: 20rpx;
  border-bottom: 1rpx solid #eee;
}

.title {
  font-size: 36rpx;
  font-weight: bold;
}

.printer-status {
  padding: 6rpx 20rpx;
  border-radius: 40rpx;
  font-size: 24rpx;
  color: #fff;
}

.status-idle {
  background-color: #52c41a;
}

.status-printing {
  background-color: #1890ff;
}

.status-stopped {
  background-color: #f5222d;
}

.status-unknown {
  background-color: #fa8c16;
}

/* 打印机卡片 */
.printer-card {
  background-color: #fff;
  border-radius: 12rpx;
  padding: 24rpx;
  margin-bottom: 30rpx;
  box-shadow: 0 2rpx 8rpx rgba(0, 0, 0, 0.05);
}

.printer-info {
  display: flex;
  align-items: flex-start;
  margin-bottom: 16rpx;
}

.printer-details {
  margin-left: 20rpx;
  flex: 1;
}

.printer-name {
  display: block;
  font-size: 30rpx;
  font-weight: 500;
  margin-bottom: 6rpx;
}

.printer-location,
.printer-uri {
  display: block;
  font-size: 24rpx;
  color: #666;
  margin-top: 6rpx;
}

.printer-state {
  display: flex;
  justify-content: space-between;
  font-size: 26rpx;
  color: #666;
  padding-top: 12rpx;
  border-top: 1rpx dashed #eee;
}

.update-time {
  font-size: 24rpx;
  color: #999;
}

/* 上传区域 */
.upload-area {
  margin-bottom: 30rpx;
  text-align: center;
}

.upload-btn {
  display: inline-flex;
  align-items: center;
  justify-content: center;
  height: 80rpx;
  padding: 0 40rpx;
  background-color: #2979ff;
  color: #fff;
  border-radius: 40rpx;
  font-size: 28rpx;
  box-shadow: 0 4rpx 12rpx rgba(41, 121, 255, 0.3);
}

.upload-btn text {
  margin-left: 10rpx;
}

.upload-tips {
  display: block;
  margin-top: 12rpx;
  font-size: 24rpx;
  color: #999;
}

/* 文件列表 */
.section-title {
  display: flex;
  justify-content: space-between;
  align-items: center;
  margin-bottom: 20rpx;
  font-size: 30rpx;
  font-weight: 500;
}

.action-group {
  display: flex;
  align-items: center;
}

.refresh-text {
  font-size: 26rpx;
  color: #666;
  margin-left: 8rpx;
}

.scroll-view {
  max-height: 60vh;
}

.file-item {
  display: flex;
  justify-content: space-between;
  align-items: center;
  padding: 24rpx;
  margin-bottom: 20rpx;
  background-color: #fff;
  border-radius: 12rpx;
  box-shadow: 0 2rpx 8rpx rgba(0, 0, 0, 0.05);
}

.file-info {
  display: flex;
  align-items: center;
  flex: 1;
  min-width: 0;
}

.file-details {
  margin-left: 20rpx;
  flex: 1;
  min-width: 0;
}

.filename {
  display: block;
  font-size: 28rpx;
  white-space: nowrap;
  overflow: hidden;
  text-overflow: ellipsis;
  margin-bottom: 6rpx;
}

.upload-time {
  font-size: 24rpx;
  color: #999;
}

.file-actions {
  display: flex;
  align-items: center;
  margin-left: 20rpx;
}

.print-btn {
  height: 56rpx;
  line-height: 56rpx;
  padding: 0 24rpx;
  font-size: 26rpx;
  color: #2979ff;
  background-color: #e6f7ff;
  border: 1rpx solid #91d5ff;
  border-radius: 6rpx;
  margin-right: 16rpx;
}

.print-btn[disabled] {
  color: #bfbfbf;
  background-color: #f5f5f5;
  border-color: #d9d9d9;
}

.delete-btn {
  width: 56rpx;
  height: 56rpx;
  display: flex;
  align-items: center;
  justify-content: center;
  background-color: #fff2f0;
  border: 1rpx solid #ffccc7;
  border-radius: 6rpx;
}

.empty-tips {
  display: flex;
  flex-direction: column;
  align-items: center;
  justify-content: center;
  padding: 60rpx 0;
  color: #999;
  font-size: 28rpx;
}

.empty-tips text {
  margin-top: 20rpx;
}

/* 打印队列 */
.queue-section {
  margin-top: 40rpx;
}

.queue-list {
  background-color: #fff;
  border-radius: 12rpx;
  padding: 0 20rpx;
  box-shadow: 0 2rpx 8rpx rgba(0, 0, 0, 0.05);
}

.queue-item {
  display: flex;
  justify-content: space-between;
  align-items: center;
  padding: 24rpx 0;
  border-bottom: 1rpx solid #f0f0f0;
}

.queue-item:last-child {
  border-bottom: none;
}

.job-info {
  display: flex;
  align-items: center;
  flex: 1;
  min-width: 0;
}

.job-id {
  font-size: 26rpx;
  color: #1890ff;
  margin-right: 20rpx;
  font-family: monospace;
}

.job-title {
  font-size: 26rpx;
  white-space: nowrap;
  overflow: hidden;
  text-overflow: ellipsis;
  flex: 1;
}

.job-status {
  font-size: 24rpx;
  color: #666;
  margin-left: 20rpx;
}
</style>

8.2 KWDB数据库连接

写一个方法用于后面处理连接

代码语言:javascript
代码运行次数:0
运行
复制
# KWDB连接器(带错误重试)
# 在get_kwdb_connection()中添加更详细的错误处理
def get_kwdb_connection():
    try:
        conn = psycopg2.connect(**app.config['KWDB_CONFIG'])
        conn.set_session(autocommit=True)

        # 测试连接有效性
        with conn.cursor() as cur:
            cur.execute("SELECT 1")
            if cur.fetchone()[0] != 1:
                raise psycopg2.OperationalError("Connection test failed")

        return conn

    except psycopg2.Error as e:
        logging.error(f"DB connection failed: {str(e)}")
        raise ConnectionError("Database service unavailable") from e

8.3 文件上传接口

用于客户端用户上传文件接收,将文件移动到开发版指定目录,并上传数据到数据库

代码语言:javascript
代码运行次数:0
运行
复制
# 文件上传接口(完全适配KWDB语法)
@app.route('/api/upload', methods=['POST'])
def upload_file():
    if 'file' not in request.files:
        return jsonify({'error': 'No file part'}), 400

    file = request.files['file']
    if not file or file.filename == '':
        return jsonify({'error': 'No selected file'}), 400

    if not allowed_file(file.filename):
        return jsonify({'error': 'File type not allowed'}), 400

    filename = secure_filename(file.filename)
    filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)

    try:
        # 先保存文件
        os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
        file.save(filepath)
        file_size = os.path.getsize(filepath)
        file_type = filename.split('.')[-1].lower()
        file_id = generate_file_id()

        with get_kwdb_connection() as conn, conn.cursor() as cur:
            # 使用更安全的ID生成方式(避免手动计算MAX+1)
            cur.execute("""
                INSERT INTO doc_list.public.print_files 
                (id,filename, filepath, file_size, file_type, upload_time)
                VALUES (%s,%s, %s, %s, %s, %s)
                RETURNING id::text
            """, (file_id,filename, filepath, file_size, file_type, datetime.utcnow()))

            file_id = cur.fetchone()[0]

            # 打印文件
            printer = "DeskJet_2130"  # 替换为你的打印机名
            docx_file = filename  # DOCX 文件路径

            if print_docx(printer, docx_file):
                print("打印任务已发送!")
            else:
                print("打印失败,请检查错误信息")



            return jsonify({
                'id': file_id,
                'filename': filename,
                'size': file_size,
                'message': 'File uploaded successfully'
            }), 200

    except psycopg2.Error as e:
        logging.error(f"Database error: {e.pgerror}")
        if os.path.exists(filepath):
            os.unlink(filepath)
        return jsonify({'error': 'Database operation failed'}), 500
    except Exception as e:
        logging.error(f"Upload failed: {str(e)}", exc_info=True)
        return jsonify({'error': 'Upload processing failed'}), 500

8.4 打印接口

用户点击打印按钮,后端接收到打印的ID去查找对应的文件,发送给打印机进行打印

代码语言:javascript
代码运行次数:0
运行
复制
# 打印接口(适配KWDB事务)

@app.route('/api/print', methods=['POST'])
def print_file():
    data = request.json
    if not data or 'file_id' not in data:
        return jsonify({'error': 'Missing file ID'}), 400

    try:
        with get_kwdb_connection() as conn, conn.cursor() as cur:
            # 获取文件信息
            cur.execute("""
                SELECT filepath, filename 
                FROM doc_list.public.print_files
                WHERE id = %s AND status = 'pending'
                FOR UPDATE NOWAIT
            """, (data['file_id'],))
            result = cur.fetchone()

            if not result:
                return jsonify({'error': 'File not available for printing'}), 404

            filepath, filename = result
            ext = filename.split('.')[-1].lower()

            # 文件转换逻辑
            if ext == 'docx':
                with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as tmp_pdf:
                    conv_result = subprocess.run(
                        ['unoconv', '-f', 'pdf', '-o', tmp_pdf.name, filepath],
                        capture_output=True
                    )
                    if conv_result.returncode != 0:
                        raise RuntimeError(f"Conversion failed: {conv_result.stderr.decode()}")
                    print_filepath = tmp_pdf.name
            else:
                print_filepath = filepath

            # 提交打印任务
            conn = cups.Connection()
            printer = conn.getDefault()
            if not printer:
                raise RuntimeError("No default printer set")

            job_id = conn.printFile(
                printer,
                print_filepath,
                f"PrintJob_{filename}",
                {
                    'media': 'A4',
                    'fit-to-page': 'True',
                    'print-quality': '3'
                }
            )

            # 更新数据库状态
            cur.execute("""
                UPDATE doc_list.public.print_files
                SET status = 'printing',
                    print_job_id = %s,
                    print_time = %s
                WHERE id = %s
            """, (job_id, datetime.now(), data['file_id']))

            return jsonify({
                'job_id': job_id,
                'message': 'Print job started'
            }), 200

    except Exception as e:
        logging.error(f"Print failed: {str(e)}", exc_info=True)
        return jsonify({'error': str(e)}), 500

8.5 查看已上传文件

客户端能够显示历史上传的文件,可进行对文件的操作

代码语言:javascript
代码运行次数:0
运行
复制
# 修改后的文件列表接口(修正SQL语法)

@app.route('/api/files', methods=['GET'])
def get_files():
    try:
        with get_kwdb_connection() as conn, conn.cursor() as cur:
            # 安全查询:先获取表结构
            cur.execute("""
                SELECT column_name 
                FROM information_schema.columns 
                WHERE table_name = 'print_files'
            """)
            columns = [row[0] for row in cur.fetchall()]

            if not columns:
                return jsonify({'files': []})

            # 动态构建查询
            cur.execute(f"""
                SELECT {', '.join(columns)}
                FROM doc_list.public.print_files
                ORDER BY upload_time DESC
            """)

            # 自动适配字段类型
            files = []
            for row in cur.fetchall():
                file_data = {}
                for i, col in enumerate(columns):
                    value = row[i]
                    if isinstance(value, datetime):
                        value = value.isoformat()
                    file_data[col] = value
                files.append(file_data)

            return jsonify({'files': files})

    except ConnectionError:
        return jsonify({'error': '数据库连接失败'}), 503
    except Exception as e:
        logging.error(f"查询失败: {str(e)}", exc_info=True)
        return jsonify({'error': '数据查询异常'}), 500

8.6 服务端完整代码

代码语言:javascript
代码运行次数:0
运行
复制
在这里插入代码片
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2025-05-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 前言
  • 2. 项目介绍
    • 2.1 目前支持
    • 2.2 图文演示
    • 2.3 视频演示
  • 3. 准备工作
    • 3.1 用到的工具
    • 3.2 使用 HPLIP 连接 HP 打印机
    • 3.3 安装 HPLIP
    • 3.4 连接打印机
  • 4. 打印文件
    • 4.1 使用 python-docx + unoconv(推荐)​
  • 5. 创建KWDB数据库
    • 5.1 新建连接
    • 5.2 选择连接类型
    • 5.3 输入账密完成连接
  • 6. 数据库类型
  • 7. 数据库操作
    • 7.1 新建数据表
    • 7.2 数据库连接demo
  • 8. 云盒升级
    • 8.1 前端样式
    • 8.2 KWDB数据库连接
    • 8.3 文件上传接口
    • 8.4 打印接口
    • 8.5 查看已上传文件
    • 8.6 服务端完整代码
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档