# 多进程,
import os
import time
from multiprocessing import Process
# 启动时必须在 if __name__ 判断下,windows 必须,其他 无限制
# =================================================
# def func(args):
# print("子进程:",os.getpid())
# print("子进程的父进程:",os.getppid())
# time.sleep(10)
# print("子进程结束")
# if __name__ =="__main__":
# p = Process(target=func,args=(1,)) # 注册 并传入元祖 元祖有一个参数要加逗号
# # p是进程对象
# p.start() # 开启子进程
# print("主进程:", os.getpid())
# print("主进程的父进程:",os.getppid()) # cmd 或者是 pycharm
# 生命周期
# 主进程长:自己执行完结束
# 子进程长:等待子进程结束
# =================================================
# 多进程中的方法
# join
# def fun(arg1,arg2):
# print('*'*arg1)
# # time.sleep(5)
# print('*'*arg2)
# if __name__ == "__main__":
# p = Process(target=fun,args=(10,20))
# p.start()
# # p.join() # 感知子进程结束
# # time.sleep(1)
# print("all is stop")
#
# print("最后的语句")
# os.walk(r"目录") # 返回 文件夹中文件名字
# =================================================
# def fun():
# print("xxx")
# if __name__ == "__main__":
# for i in range(10):
# p = Process(target=fun)
# p.start()
# p.join() # 停止for循环 进程结束后继续
# print("for")
# print("主进程")
# =================================================
# 第二种方法
# class MyProcess(Process):
# def __init__(self,args):
# super().__init__() # 若要传递参数,需要调用父类init
#
# def run(self):
# print("子进程",self.__dict__)
# print(self.pid)
# if __name__ == "__main__":
# print("主进程:",os.getpid())
# p1 = MyProcess()
# p1.start()
# =================================================
# 进程之间数据是隔离,命名空间不通
# def fun():
# global n
# n= 0
# print("pid:%s" %os.getpid(),n)
# if __name__ == "__main__":
# n=100
# p = Process(target=fun)
# p.start()
# p.join()
# print(n) # -->100
# =================================================
# 多进程tcp连接
# import socket
# # 客户端
# sk = socket.socket()
# sk.connect(("127.0.0.1",8080))
# sk.send('N好'.encode("utf8"))
# msg = sk.recv(1024).decode("utf8")
# print(msg)
# sk.close()
#
# # 服务端
# def server(conn):
# ret= "你好".encode("utf8")
# conn.send(ret)
# msg = conn.recv(1024).decode("utf8")
# print(msg)
# conn.close()
#
# sk = socket.socket()
# sk.bind(("127.0.0.1",8080))
# sk.listen()
# if __name__ == "__main__":
# while True:
# conn, addr = sk.accept()
# p = Process(target=server,args=(conn,))
# p.start()
# =================================================
# 守护进程
# 默认情况 父进程 等待子进程结束
# p.daemon = True 在start前,设置为守护进程,守护进程随父进程(代码执行完毕)结束
# 若父进程在等待 子进程(非守护进程时) ,若父进程代码完毕,守护进程应该结束
# p.is_alive() 判断进程是否存活
# p.terminate() 终止进程
# =================================================
# 锁
# 未加锁实例:
# 火车票
import json
import time
from multiprocessing import Process
# def show(i):
# with open('ticket') as f:
# dic = json.load(f)
# print('余票: %s'%dic['ticket'])
def buy_ticket(i):
with open('ticket') as f:
dic = json.load(f)
time.sleep(0.1)
if dic['ticket'] > 0 :
dic['ticket'] -= 1
print('\033[32m%s买到票了\033[0m'%i)
else:
print('\033[31m%s没买到票\033[0m'%i)
time.sleep(0.1)
with open('ticket','w') as f:
json.dump(dic,f)
if __name__ == '__main__':
# for i in range(10):
# p = Process(target=show,args=(i,))
# p.start()
for i in range(10):
p = Process(target=buy_ticket, args=(i))
p.start()
# =================================================
# 锁
# 加锁实例
# 火车票
import json
import time
from multiprocessing import Process
from multiprocessing import Lock
# def show(i):
# with open('ticket') as f:
# dic = json.load(f)
# print('余票: %s'%dic['ticket'])
def buy_ticket(i,lock):
lock.acquire() #拿钥匙进门
with open('ticket') as f:
dic = json.load(f)
time.sleep(0.1)
if dic['ticket'] > 0 :
dic['ticket'] -= 1
print('\033[32m%s买到票了\033[0m'%i)
else:
print('\033[31m%s没买到票\033[0m'%i)
time.sleep(0.1)
with open('ticket','w') as f:
json.dump(dic,f)
lock.release() # 还钥匙
if __name__ == '__main__':
# for i in range(10):
# p = Process(target=show,args=(i,))
# p.start()
lock = Lock()
for i in range(10):
p = Process(target=buy_ticket, args=(i,lock))
p.start()
# =================================================
# =================================================
# =================================================
# 多进程中的组件
# 一个资源 同一时间 被n个人访问
import time
import random
from multiprocessing import Process,Event
# ==============================
# 未用信号量
# def ktv(i):
# print('%s走进ktv'%i)
# time.sleep(random.randint(1,5))
# print('%s走出ktv'%i)
# if __name__ == '__main__' :
# for i in range(20):
# p = Process(target=ktv,args=(i))
# p.start()
# ==============================
from multiprocessing import Semaphore
# sem = Semaphore(4)
# sem.acquire()
# print('拿到第一把钥匙')
# sem.acquire()
# print('拿到第二把钥匙')
# sem.acquire()
# print('拿到第三把钥匙')
# sem.acquire()
# print('拿到第四把钥匙')
# sem.acquire()
# print('拿到第五把钥匙')
# def ktv(i,sem):
# sem.acquire() #获取钥匙
# print('%s走进ktv'%i)
# time.sleep(random.randint(1,5))
# print('%s走出ktv'%i)
# sem.release() # 释放钥匙
#
#
# if __name__ == '__main__' :
# sem = Semaphore(4)
# for i in range(20):
# p = Process(target=ktv,args=(i,sem))
# p.start()
# ==============================
# 事件
# 信号是控制进程阻塞与否
# 事件创建后,默认是阻塞状态
# e = Event() # 创建事件
# e.is_set() # False 默认阻塞
# print("xx") # 可打印 e.set() 设置为True e.clear() 设置为False
# e.wait()
# print("xx") # 阻塞
# 遇到wait()会判断is_set() 为False 阻塞
# ==============================
# 红绿灯事件
def cars(e,i):
if not e.is_set():
print("car%i在等待" % i)
e.wait()
print("car%i通过" % i)
def light(e):
while True:
if e.is_set():
e.clear()
print("\033[31m红灯\033[0m")
else:
e.set()
print("\033[32m绿灯\033[0m")
time.sleep(2)
if __name__ == "__main__":
e = Event()
p =Process(target=light,args=(e,))
p.start()
for i in range(1,21):
car = Process(target=cars,args=(e,i))
car.start()
time.sleep(random.random())
# ==============================
# ==============================
# ==============================
# ==============================
# def test(e):
# e.set()
# print("xxx")
# if __name__=="__main__":
# e = Event()
# print(e.is_set())
# p = Process(target=test,args=(e,))
# p.start()
# e.wait()
# print("注")
# IPC 内部进程通信,不能使用普通queue
from multiprocessing import Queue,Process
# ===============================
# q = Queue(5) # 队列大小
# q.put(1)
# q.put(1)
# q.full() # 若队列满了,阻塞等待
# q.get()
# q.empty() # 若为空,阻塞等待有数据 后取值
# q.get()
# q.get_nowait() # 用于跳过等待,需要用try
# ===============================
# def produce(q):
# q.put('hello')
# def consume(q):
# print(q.get())
# if __name__ =="__main__":
# q = Queue()
# p = Process(target=produce,args=(q,))
# p.start()
# p2 = Process(target=consume, args=(q,))
# p2.start()
# ===============================
# 生产者消费者模型
# 若生产者,生产有数量,消费者,不停消费,最后消费进程会处于等待状态
# 可在主进程后边join生产进程,消费进程判断为空,但不准确
# 需要在队列put(None) 子进程判断,由于数据之间不能共享,需要put 消费数量的None
# ===============================
from multiprocessing import JoinableQueue
# consume :
# ....
# q.task_done()
# produce :
# ...
# q.join()
# ===============================
# 循环通知,致使进程结束
# JoinableQueue
# 生产者生产,不停不停消费,若q为空 一直等待,
# 生产者完毕后,会join等待消费值消费完毕,因为是同一个q,一个生产者完毕后,其他还没有完毕q会处于,他会处于阻塞
# 等待 消费者 全部消费完毕,q.join()会感知,因此 生产进程会结束,主进程最后join生产进程,生产结束
# 主进程就结束,身为守护进程的子进程也结束
# import time
# import random
# from multiprocessing import Process,JoinableQueue
# def consumer(q,name):
# while True:
# food = q.get()
# print('\033[31m%s消费了%s\033[0m' % (name,food))
# time.sleep(random.randint(1,3))
# q.task_done() # count - 1
#
# def producer(name,food,q):
# for i in range(4):
# time.sleep(random.randint(1,3))
# f = '%s生产了%s%s'%(name,food,i)
# print(f)
# q.put(f)
# q.join() # 阻塞 直到一个队列中的所有数据 全部被处理完毕
#
# if __name__ == '__main__':
# q = JoinableQueue(20)
# p1 = Process(target=producer,args=('Egon','包子',q))
# p2 = Process(target=producer, args=('wusir','泔水', q))
# c1 = Process(target=consumer, args=(q,'alex'))
# c2 = Process(target=consumer, args=(q,'jinboss'))
# p1.start()
# p2.start()
# c1.daemon = True # 设置为守护进程 主进程中的代码执行完毕之后,子进程自动结束
# c2.daemon = True
# c1.start()
# c2.start()
# p1.join()
# p2.join() # 感知一个进程的结束
# 在消费者这一端:
# 每次获取一个数据
# 处理一个数据
# 发送一个记号 : 标志一个数据被处理成功
# 在生产者这一端:
# 每一次生产一个数据,
# 且每一次生产的数据都放在队列中
# 在队列中刻上一个记号
# 当生产者全部生产完毕之后,
# join信号 : 已经停止生产数据了
# 且要等待之前被刻上的记号都被消费完
# 当数据都被处理完时,join阻塞结束
# consumer 中把所有的任务消耗完
# producer 端 的 join感知到,停止阻塞
# 所有的producer进程结束
# 主进程中的p.join结束
# 主进程中代码结束
# 守护进程(消费者的进程)结束
# ===============================
# 管道 双向通信工具
from multiprocessing import Pipe
# conn,conn2 = Pipe()
# conn.send("123456") # 不用字节
# print(conn2.recv()) # 不用指定大小
def fun(conn):
conn.send("hello")
if __name__=="__main__":
conn1,conn2 = Pipe()
Process(target=fun,args=(conn1,)).start()
print(conn1.recv())
# 管道返回2个连接
# conn1, conn2
# P 发送 接受
# p2 接受 发送
# 若只发数据,可关闭一端,若取数据的时候,对面已关闭,则报错,看根据此 终止程序
from multiprocessing import Pipe,Process
# def func(conn1,conn2):
# conn2.close()
# while True:
# try :
# msg = conn1.recv()
# print(msg)
# except EOFError:
# conn1.close()
# break
#
# if __name__ == '__main__':
# conn1, conn2 = Pipe()
# Process(target=func,args = (conn1,conn2)).start()
# conn1.close()
# for i in range(20):
# conn2.send('吃了么')
# conn2.close()
# ===============================
# from multiprocessing import Lock,Pipe,Process
# def producer(con,pro,name,food):
# con.close()
# for i in range(100):
# f = '%s生产%s%s'%(name,food,i)
# print(f)
# pro.send(f)
# pro.send(None)
# pro.send(None)
# pro.send(None)
# pro.close()
#
# def consumer(con,pro,name,lock):
# pro.close()
# while True:
# lock.acquire() # 不安全主要是recv的时候,因此两端加锁即可
# food = con.recv()
# lock.release()
# if food is None:
# con.close()
# break
# print('%s吃了%s' % (name, food))
# if __name__ == '__main__':
# con,pro = Pipe()
# lock= Lock()
# p = Process(target=producer,args=(con,pro,'egon','泔水'))
# c1 = Process(target=consumer, args=(con, pro, 'alex',lock))
# c2 = Process(target=consumer, args=(con, pro, 'bossjin',lock))
# c3 = Process(target=consumer, args=(con, pro, 'wusir',lock))
# c1.start()
# c2.start()
# c3.start()
# p.start()
# con.close()
# pro.close()
# from multiprocessing import Process,Pipe,Lock
#
# def consumer(produce, consume,name,lock):
# produce.close()
# while True:
# lock.acquire()
# baozi=consume.recv()
# lock.release()
# if baozi:
# print('%s 收到包子:%s' %(name,baozi))
# else:
# consume.close()
# break
#
# def producer(produce, consume,n):
# consume.close()
# for i in range(n):
# produce.send(i)
# produce.send(None)
# produce.send(None)
# produce.close()
#
# if __name__ == '__main__':
# produce,consume=Pipe()
# lock = Lock()
# c1=Process(target=consumer,args=(produce,consume,'c1',lock))
# c2=Process(target=consumer,args=(produce,consume,'c2',lock))
# p1=Process(target=producer,args=(produce,consume,30))
# c1.start()
# c2.start()
# p1.start()
# produce.close()
# consume.close()
# pipe 数据不安全性
# IPC
# 加锁来控制操作管道的行为 来避免进程之间争抢数据造成的数据不安全现象
# 队列 进程之间数据安全的
# 管道 + 锁
from multiprocessing import Manager,Process,Lock
# 进程之间不能传递数据,通过方法的参数可以传过去,但修改后,无反应,不知道原因
def main(dict,lock):
lock.acquire()
dict['count']-=1
lock.release()
if __name__ == "__main__":
m = Manager()
l = Lock()
dict = m.dict({"count":100})
p_list=[]
for i in range(50):
p = Process(target=main,args=(dict,l))
p.start()
p_list.append(p)
for i in p_list:i.join()
print(dict['count'])
# 进程池
# 上一个例子中50个进程很慢
# 寄存器 堆栈 文件
# 操作系统调度,cup切换
# 高级线程池有数量限定 ,有最低,任务变多的时候 逐步加到最高限制
import os,time
from multiprocessing import Pool,Manager
# ==================================
# def func2(i):
# print(os.getpid(),os.getppid())
# i+1
# def func(list):
# list[1]['set'].add(os.getpid())
# print(len(list[1]['set']))
# # 一般超过5个使用pool
# if __name__ == "__main__":
# pid = Manager()
# dict1 = pid.dict({"set":set()})
# pool = Pool(5)
# # 执行不同的任务,map 自带join方法
# #pool.map(func2,range(100))
# pool.map(func, [[i,dict1]for i in range(100)])
# print(len(dict1['set']))
# """
# 等于
# for i in range(100):
# p = Process(target=func,args=(i,))
# p.start()
# """
# ==================================
# def fun(n):
# print("start fun%s" %n,os.getpid())
# time.sleep(1)
# print("end fun%s" % n, os.getpid())
# if __name__ == "__main__":
# p = Pool() # 默认cup核心数量
# for i in range(10):
# #p.apply(fun,args=(i,)) # 同步提交的
# p.apply_async(fun,args=(i,)) # 异步提交,真的异步,因此需要join
# p.close() # 不再接受新的任务
# p.join() # 感知进程池中任务结束 保持 主进程 与子进程同步
# ==================================
# import socket
# from multiprocessing import Pool
#
# def func(conn):
# conn.send(b'hello')
# print(conn.recv(1024).decode('utf-8'))
# conn.close()
#
# if __name__ == '__main__':
# p = Pool(5)
# sk = socket.socket()
# sk.bind(('127.0.0.1',8080))
# sk.listen()
# while True:
# conn, addr = sk.accept()
# p.apply_async(func,args=(conn,))
# sk.close()
# import socket
#
# sk = socket.socket()
# sk.connect(('127.0.0.1', 8080))
#
# ret = sk.recv(1024).decode('utf-8')
# print(ret)
# msg = input('>>>').encode('utf-8')
# sk.send(msg)
# sk.close()
# ==================================
# 进程池的返回值
# p = Pool()
# p.map(funcname,iterable) 默认异步的执行任务,且自带close和join
# p.apply 同步调用的
# p.apply_async 异步调用 和主进程完全异步 需要手动close 和 join
# from multiprocessing import Pool
# def func(i):
# return i*i
#
# if __name__ == '__main__':
# p = Pool(5)
# for i in range(10):
# res = p.apply(func,args=(i,)) # apply的结果就是func的返回值
# print(res) --> 直接就是返回值
# ==================================
# import time
# from multiprocessing import Pool
# def func(i):
# time.sleep(0.5)
# return i*i
#
# if __name__ == '__main__':
# p = Pool(5)
# res_l = []
# for i in range(10):
# res = p.apply_async(func,args=(i,)) # apply的结果就是func的返回值
# res_l.append(res)
# 若在for 中直接获取res.get()会在成阻塞,程序变同步执行
# for res in res_l:print(res.get())# 等着 func的计算结果
# 调用res.get时返回
# ==================================
# import time
# from multiprocessing import Pool
# def func(i):
# time.sleep(0.5)
# return i*i
#
# if __name__ == '__main__':
# p = Pool(5)
# ret = p.map(func,range(100))
# print(ret) # -> 直接返回全部,列表返回
# 自带join,close 最后一起返回
# ====================================
# 回调函数 , 回调的函数在主进程调用
# 对于子进程中再起子进程问题,还不知道
# 每个进程的回调函数 交给主进程顺序执行
import os
from multiprocessing import Pool,Process
def func2(nn):
print('in func2',os.getpid())
print(nn)
def func3(n):
print('in func3', os.getpid())
return n*n
def func1(n):
print('in func1',os.getpid())
p = Pool(5)
p.apply_async(func3, args=(10,), callback=func2)
p.close()
p.join()
return n*n
if __name__ == '__main__':
print('主进程 :',os.getpid())
p = Pool(5)
p.apply_async(func1,args=(10,),callback=func2)
p.close()
p.join()
# ===================================================
import requests
from urllib.request import urlopen
from multiprocessing import Pool
# 200 网页正常的返回
# 404 网页找不到
# 502 504
# 场景:callback 耗时段,远小于网络延时,此时使用,在主进程运行,
def get(url):
response = requests.get(url)
if response.status_code == 200:
return url, response.content.decode('utf-8')
def get_urllib(url):
ret = urlopen(url)
return ret.read().decode('utf-8')
def call_back(args):
url, content = args
print(url, len(content))
if __name__ == '__main__':
url_lst = [
'https://www.cnblogs.com/',
'http://www.baidu.com',
'https://www.sogou.com/',
'http://www.sohu.com/',
]
p = Pool(5)
for url in url_lst:
p.apply_async(get, args=(url,), callback=call_back) # callback 中的参数为 get函数的返回值
p.close()
p.join()
# 同一进程的线程间的数据共享的,共享的 共享的
# 可通过直接访问全局变量 global,还需要进程同步
# 创建,切换,撤销 相比进程 消耗小,轻量级
# 进程:资源分配单位,每个进程 至少一个线程
# 线程:cup调度单位
# thread 基本模块,避免使用,可能与threading 冲突
# threading thread的高级版本
# Queue 多线程之间共享数据的数据结构
# 与进程类似,好多方法相同
import time
from threading import Thread
import threading
# def func(n):
# time.sleep(1)
# print(n)
# t = Thread(target=func,args=(12,))
# t.daemon = True # 成为"守护线程"
# t.start()
# print("主线程") # 默认情况等待子线程结束
# ===================================
# class MyThread(Thread):
# def __init__(self,name):
# super().__init__()
# self.name = name
# def run(self):
# # time.sleep(1)
# print(self.name)
# MyThread("段志方").start()
# ================================
# GIL 锁的是线程,同一时间 只有一个线程 ,cpython解释器的问题,jpython 就不会
# 对于io密集型 没什么区别,只要io时会切换即可
# 但对于多核cup python 同时只能运行一个cup ,其他语言的会运行多个,因此...
# 即不能通过物理核心数增加速度,不能实现(并行)
# ============================================
# 多线程socket 可以input
# import socket
# from threading import Thread
# def chat(conn):
# conn.send(b'hello')
# msg = conn.recv(1024).decode('utf-8')
# print(msg)
# conn.close()
# sk = socket.socket()
# sk.bind(('127.0.0.1',8080))
# sk.listen()
# while True:
# conn,addr = sk.accept()
# Thread(target=chat,args = (conn,)).start()
# sk.close()
#
# import socket
# sk = socket.socket()
# sk.connect(('127.0.0.1',8080))
# msg = sk.recv(1024)
# print(msg)
# inp = input('>>> ').encode('utf-8')
# sk.send(inp)
# sk.close()
# =========================
# print(threading.current_thread()) # 当前线程
# print(threading.active_count()) # 全部线程,包括主线程
# print(threading.enumerate()) # 列表返回全部线程对象
# ==========================================
# 守护线程
# import time
# from threading import Thread
# def func1():
# while True:
# print('*'*10)
# time.sleep(1)
# def func2():
# print('in func2')
# time.sleep(5)
#
# t = Thread(target=func1,)
# t.daemon = True
# t.start()
# t2 = Thread(target=func2,)
# t2.start()
# t2.join()
# print('主线程')
# (守护进程)随着(主进程代码)的执行结束而结束
# 守护(线程)会在主线程结束之后等待(其他非守护子线程)的结束才结束
# 主进程在执行完自己的代码之后不会立即结束 而是等待子进程结束之后 回收子进程的资源
# import time
# from multiprocessing import Process
# def func():
# time.sleep(5)
#
# if __name__ == '__main__':
# Process(target=func).start()
# =========================================
# 线程锁 ,与gil无关
import time
from threading import Lock,Thread
# Lock 互斥锁
# def func(lock):
# global n
# lock.acquire()
# temp = n
# time.sleep(0.2)
# n = temp - 1
# lock.release()
#
# n = 10
# t_lst = []
# lock = Lock()
# for i in range(10):
# t = Thread(target=func,args=(lock,))
# t.start()
# t_lst.append(t)
# for t in t_lst: t.join()
# print(n)
# 科学家吃面 还会死锁
# noodle_lock = Lock()
# fork_lock = Lock()
# def eat1(name):
# noodle_lock.acquire()
# print('%s拿到面条啦'%name)
# fork_lock.acquire()
# print('%s拿到叉子了'%name)
# print('%s吃面'%name)
# fork_lock.release()
# noodle_lock.release()
#
# def eat2(name):
# fork_lock.acquire()
# print('%s拿到叉子了'%name)
# time.sleep(1)
# noodle_lock.acquire()
# print('%s拿到面条啦'%name)
# print('%s吃面'%name)
# noodle_lock.release()
# fork_lock.release()
#
# Thread(target=eat1,args=('alex',)).start()
# Thread(target=eat2,args=('Egon',)).start()
# Thread(target=eat1,args=('bossjin',)).start()
# Thread(target=eat2,args=('nezha',)).start()
# ===============================================
from threading import RLock # 递归锁
fork_lock = noodle_lock = RLock()
# 一个钥匙串上的两把钥匙,同一个lock 在一个线程中可又多次acquire
# 传给其他线程时 不能被acquire
# def eat1(name):
# print(name)
# noodle_lock.acquire() # 一把钥匙
# print('%s拿到面条啦'%name)
# fork_lock.acquire()
# print('%s拿到叉子了'%name)
# print('%s吃面'%name)
# fork_lock.release()
# noodle_lock.release()
#
# def eat2(name):
# print(name)
# fork_lock.acquire()
# print('%s拿到叉子了'%name)
# time.sleep(1)
# noodle_lock.acquire()
# print('%s拿到面条啦'%name)
# print('%s吃面'%name)
# noodle_lock.release()
# fork_lock.release()
# Thread(target=eat1,args=('alex',)).start()
# Thread(target=eat2,args=('Egon',)).start()
# Thread(target=eat1,args=('bossjin',)).start()
# Thread(target=eat2,args=('nezha',)).start()
# =================================================
import time
from threading import Semaphore,Thread
# ====================================
# def func(sem,a,b):
# sem.acquire()
# time.sleep(1)
# print(a+b)
# sem.release()
# sem = Semaphore(4)
# for i in range(10):
# t = Thread(target=func,args=(sem,i,i+5))
# t.start()
# ====================================
# 事件被创建的时候
# False状态
# wait() 阻塞
# True状态
# wait() 非阻塞
# clear 设置状态为False
# set 设置状态为True
# 数据库 - 文件夹
# 文件夹里有好多excel表格
# 1.能够更方便的对数据进行增删改查
# 2.安全访问的机制
# 起两个线程
# 第一个线程 : 连接数据库
# 等待一个信号 告诉我我们之间的网络是通的
# 连接数据库
# 第二个线程 : 检测与数据库之间的网络是否连通
# time.sleep(0,2) 2
# 将事件的状态设置为True
# import time
# import random
# from threading import Thread,Event
# def connect_db(e):
# count = 0
# while count < 3:
# e.wait(0.5) # 状态为False的时候,我只等待1s就结束
# if e.is_set() == True:
# print('连接数据库')
# break
# else:
# count += 1
# print('第%s次连接失败'%count)
# else:
# raise TimeoutError('数据库连接超时')
# def check_web(e):
# time.sleep(random.randint(0,3))
# e.set()
# e = Event()
# t1 = Thread(target=connect_db,args=(e,))
# t2 = Thread(target=check_web,args=(e,))
# t1.start()
# t2.start()
# ====================================
# 条件 复杂的锁
# 条件
from threading import Condition
# 条件
# 锁
# acquire release
# 一个条件被创建之初 默认有一个(False)状态
# False状态 会影响wait一直处于等待状态
# notify(int数据类型) 造钥匙
# from threading import Thread,Condition
# def func(con,i):
# con.acquire()
# con.wait() # 等钥匙
# print('在第%s个循环里'%i)
# con.release()
# con = Condition()
# for i in range(10):
# Thread(target=func,args = (con,i)).start()
# while True:
# num = int(input('>>>'))
# con.acquire()
# con.notify(num) # 造钥匙
# con.release()
# ====================================
#定时器
# import time
# from threading import Timer
# def func():
# print('时间同步') #1-3
# while True:
# t = Timer(5,func).start() # 非阻塞的 ,异步的 ,会把所有的5s在一起
# time.sleep(5) # 睡5s 每5s进行意思时间同步
# ====================================
# 加锁 麻烦 所以使用队列
#线程通信
# queue
# import queue #直接导入普通queue 是线程安全的
# q = queue.Queue() # 队列 先进先出
# q.put()
# q.get()
# q.put_nowait()
# q.get_nowait()
# q = queue.LifoQueue() # 栈 先进后出
# q.put(1)
# q.put(2)
# q.put(3)
# print(q.get())
# print(q.get())
# q = queue.PriorityQueue() # 优先级队列
# q.put((1,'a'))
# q.put((10,'b'))
# q.put((30,'c'))
# q.put((1,'d'))
# q.put((1,'f'))
# print(q.get())
# 元祖中的元素按顺序比较,数字越小优先级大,祖父按照ascii越小优先级越大
# ====================================
# 线程池
import time
# 以前没有线程池
from concurrent.futures import ThreadPoolExecutor
# ProcessPoolExecutor 该模块下还有一个进程池,与multi 功能相同
# submit(fn,*args,**kwargs) 异步提交任务
# map(fun,*iterables,timeout=None,chunksize - 1) 循环的submit
# shutdown(wait=True) # 等于原来的 close join 合并
# result(time=None) 取得结果
# add_done_callback(fn) 回调函数
def func(n):
time.sleep(2)
print(n)
return n*n
def call_back(m):
print('结果是 %s'%m.result())
# 若使用进程池 只换ThreadPoolExecutor->ProcessPoolExecutor
tpool = ThreadPoolExecutor(max_workers=5) # 默认 不要超过cpu个数*5
for i in range(20):
tpool.submit(func,i).add_done_callback(call_back)
tpool.shutdown()
# tpool.map(func,range(20)) # 拿不到返回值
# t_lst = []
# for i in range(20):
# t = tpool.submit(func,i)
# t_lst.append(t)
# tpool.shutdown() # close+join #
# print('主线程')
# for t in t_lst:print('***',t.result()) # 拿返回值
# 进程 多个进程,操作系统负责
# 线程 不能同一时间多个cup 其他语言可以,但不影响高io
# 开启线程 创建线程 寄存器 堆栈
# 关闭一个线程
# 协程
# 本质是一个线程
# 能够在多个任务间切换,不需要寄存器,堆栈切换
# 任务之间切换时间开销 远小于线程
# 计算任务之间切换消耗也很大,一般都是遇到io的时候切换
# 进程(cup数+1)+线程(cup数*5)+协程(500) = 50000
# 适合爬虫
# 实现并发的手段
# import time
# 实现在 con,pro之间来回切换
# def consumer():
# while True:
# x = yield
# time.sleep(1)
# print('处理了数据 :',x)
#
# def producer():
# c = consumer()
# next(c)
# for i in range(10):
# time.sleep(1)
# print('生产了数据 :',i)
# c.send(i)
#
# producer()
# =============================================
# 真正的协程模块就是使用greenlet完成的切换
from greenlet import greenlet
# def eat():
# print('eating start')
# g2.switch()
# print('eating end')
# g2.switch()
#
# def play():
# print('playing start')
# g1.switch()
# print('playing end')
# g1 = greenlet(eat) # 必须先有g1 ,g2 函数中才能使用g
# g2 = greenlet(play) # 不会自动切换
# g1.switch()
# ======================================
# 不能感知time.sleep(1)
# 可以感知gevent.sleep(1),在第一行引入 如下from...
# 后边的time 都会经过特殊处理,time.sleep() 就可以被识别
# from gevent import monkey;monkey.patch_all()
# import time
# import gevent
# import threading
# def eat():
# DummyThread-1 虚拟的线程
# print(threading.current_thread().getName())
# print(threading.current_thread())
# print('eating start')
# time.sleep(1)
# print('eating end')
#
# def play():
# DummyThread-2 虚拟的线程
# print(threading.current_thread().getName())
# print(threading.current_thread())
# print('playing start')
# time.sleep(1)
# print('playing end')
#
# g1 = gevent.spawn(eat) # 注册进入,会自动切换,不是操作系统调度
# g2 = gevent.spawn(play) # gevent 负责协程的调度 通过封装的greenlet switch
# g1.join() gevent 是完全异步的 join等待协程结束
# g2.join()
# 进程和线程的任务切换右操作系统完成
# 协程任务之间的切换由程序(代码)完成,只有遇到协程模块能识别的IO操作,(时间片等不识别)的时候,程序才会进行任务切换,实现并发的效果
# ========================================
# 同步 和 异步
# from gevent import monkey;monkey.patch_all() # 放最前面
# import time
# import gevent
# def task(n):
# time.sleep(1)
# print(n)
# def sync(): # 同步
# for i in range(10):
# task(i)
# def async(): # 异步
# g_lst = []
# for i in range(10):
# g = gevent.spawn(task,i)
# g_lst.append(g)
# gevent.joinall(g_lst) #两种方法都可
# for g in g_lst:g.join()
# ======================================
# 协程 : 能够在一个线程中实现并发效果的概念
# 能够规避一些任务中的IO操作
# 在任务的执行过程中,检测到IO就切换到其他任务
# 多线程 被弱化了
# 协程 在一个线程上 提高CPU 的利用率
# 协程相比于多线程的优势 切换的效率更快
# ==========================================
# 爬虫的例子
# 请求过程中的IO等待
# from gevent import monkey;monkey.patch_all()
# import gevent
# from urllib.request import urlopen # 内置的模块
# urlopen html时有个格式的 reguests 无格式
# def get_url(url):
# response = urlopen(url)
# content = response.read().decode('utf-8')
# return len(content)
#
# g1 = gevent.spawn(get_url,'http://www.baidu.com')
# g2 = gevent.spawn(get_url,'http://www.sogou.com')
# g3 = gevent.spawn(get_url,'http://www.taobao.com')
# g4 = gevent.spawn(get_url,'http://www.hao123.com')
# g5 = gevent.spawn(get_url,'http://www.cnblogs.com')
# gevent.joinall([g1,g2,g3,g4,g5])
# print(g1.value)
# print(g2.value)
# print(g3.value)
# print(g4.value)
# print(g5.value)
# ret = get_url('http://www.baidu.com')
# print(ret)
# ======================================
from gevent import monkey;monkey.patch_all()
import socket
import gevent
def talk(conn):
conn.send(b'hello')
print(conn.recv(1024).decode('utf-8'))
conn.close()
sk = socket.socket()
sk.bind(('127.0.0.1',8080))
sk.listen()
while True:
conn,addr = sk.accept()
gevent.spawn(talk,conn)
sk.close()
import socket
sk = socket.socket()
sk.connect(('127.0.0.1',8080))
print(sk.recv(1024))
msg = input('>>>').encode('utf-8')
sk.send(msg)
sk.close()
# 同步 提交一个任务之后要等待这个任务执行完毕
# 异步 只管提交任务,不等待这个任务执行完毕就可以做其他事情
# 阻塞 recv recvfrom accept
# 非阻塞
# 阻塞 线程 运行状态 --> 阻塞状态 --> 就绪
# 非阻塞
# IO多路复用
# select机制 Windows linux 都是操作系统轮询每一个被监听的项,看是否有读操作
# poll机制 linux 它可以监听的对象比select机制可以监听的数量多
# 随着监听项的增多,导致效率降低
# epoll机制 linux 更高级,绑定回调函数,
# =================================
# 以前的都是阻塞io
# =================================
# 非阻塞io实例
# import socket
# sk = socket.socket()
# sk.bind(('127.0.0.1',9000))
# sk.setblocking(False) # 设置不阻塞
# sk.listen()
# conn_l = []
# del_conn = []
# while True:
# try:
# conn,addr = sk.accept() #不阻塞,但是没人连我会报错
# print('建立连接了:',addr)
# conn_l.append(conn)
# except BlockingIOError:
# for con in conn_l:
# try:
# msg = con.recv(1024) # 非阻塞,如果没有数据就报错
# if msg == b'': # 若客户端关闭 会发送空消息
# del_conn.append(con)
# continue
# print(msg)
# con.send(b'byebye')
# except BlockingIOError:pass
# for con in del_conn:
# con.close()
# conn_l.remove(con)
# del_conn.clear()
# # while True : 10000 500 501
#
# import time
# import socket
# import threading
# def func():
# sk = socket.socket()
# sk.connect(('127.0.0.1',9000))
# sk.send(b'hello')
# time.sleep(1)
# print(sk.recv(1024))
# sk.close()
#
# for i in range(2):
# threading.Thread(target=func).start()
# =================================
# io 多路复用, 监听列表的循环 变为有操作系统执行
import select
import socket
sk = socket.socket()
sk.bind(('127.0.0.1',8000))
sk.setblocking(False)
sk.listen()
read_lst = [sk] # 监听列表
while True: # [sk,conn]
# 等待读列表,写列表,修改列表 都必传
# 返回元祖中3个列表,对应三个list,一般只用第一个
# r_lst里面就是sk对象
r_lst,w_lst,x_lst = select.select(read_lst,[],[])
for i in r_lst:
if i is sk:
conn,addr = i.accept()
read_lst.append(conn)
else:
ret = i.recv(1024)
if ret == b'':
i.close()
read_lst.remove(i)
continue
print(ret)
i.send(b'goodbye!')
import time
import socket
import threading
def func():
sk = socket.socket()
sk.connect(('127.0.0.1', 8000))
sk.send(b'hello')
time.sleep(3)
print(sk.recv(1024))
sk.close()
for i in range(20):
threading.Thread(target=func).start()
# =================================
import selectors # 选择合适的多路复用机制
from socket import *
def accept(sk,mask):
conn,addr=sk.accept()
sel.register(conn,selectors.EVENT_READ,read)
def read(conn,mask):
try:
data=conn.recv(1024)
if not data:
print('closing',conn)
sel.unregister(conn)
conn.close()
return
conn.send(data.upper()+b'_SB')
except Exception:
print('closing', conn)
sel.unregister(conn)
conn.close()
sk=socket()
sk.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
sk.bind(('127.0.0.1',8088))
sk.listen(5)
sk.setblocking(False) #设置socket的接口为非阻塞
sel = selectors.DefaultSelector() # 自动选择一个适合我的IO多路复用的机制
sel.register(sk,selectors.EVENT_READ,accept)
#相当于网select的读列表里append了一个sk对象,并且绑定了一个回调函数accept
# 说白了就是 如果有人请求连接sk,就调用accrpt方法
while True:
events=sel.select() #检测所有的sk,conn,是否有完成wait data阶段
for sel_obj,mask in events: # [conn]
callback=sel_obj.data #callback=read
callback(sel_obj.fileobj,mask) #read(conn,1)
import pymysql
# 连接
conn = pymysql.connect(
host="106.15.39.74",
port=3306,
database="test",
user="root",
password="dzf123,.",
charset="utf8" # 没有"-" 没有
)
cursor = conn.cursor()
sql = "select*from student"
name = "dzf"
password = "123456"
sql = "select * from student where name = %s and password = %s"
ret = cursor.execute(sql,[name,password])
# 自己拼接需要加引号,使用防注入sql不用加引号,参数不能少,多
#print(cursor.lastrowid) # 获取刚插入数据的id 应该就是主键 自增的那个,与名字无关
print(ret) # 返回受影响行数
ret = cursor.fetchall() # 元祖 大元组里边小元祖
print(ret,"a")
ret = cursor.fetchone() # 取一条数据
print(ret,"a")
ret = cursor.fetchone()
print(ret,"a")
# 直接返回一条元素,格式是 小元祖,或只有list中的一个小字典,外边没有元祖或list
# 若连续fetchone() 第一次第一条,第二次第二条,一次向下取
# 若取完后 再次 fetchone() 取不到
# -->(('dzf','1234'),('dzf','1234'))
# 在执行语句前 修改cursor格式
cursor.fetchmany(3) # 在cursor位置接下取3条,大元组中小元祖
# 移动光标
cursor.scroll(1,mode="absolute") # 绝对移动 移到1位置,从2开始 ,
cursor.scroll(1,mode="relative") # 相对移动 原来在3 位置,从4 开始读,现在 移动到4 从5开始读
# 向上移可以使用负的
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # 指定为字典格式
"""
[
{'id':1,'name':'dzf'},
{'id':1,'name':'dzf'},
]
"""
cursor.close()
conn.close()
# ====================================
# 插入数据还是用cursor.execute(),注意提交后conn.commit()
# 若多语句,可能错误,conn.rollback()
# sql2 = "insert into student (name,password) values(%s, %s)"
# ret = cursor.execute(sql2,['123','123'])
# conn.commit()
# 或insert into student (name,password) values(%(name)s, %(pwd)s)
# 下边传入字典excute(sql,{"name":xxx..})
# ====================================
# 批量执行
data = (['12','12'],['23','32'],['32','23']) # 格式必须固定
cursor.executemany(sql,data) # 内部的for循环
# try 防止异常,要回滚, 会取消以前正确的插入语句
# =================================
# 删除,同理,也要提交
# ================================
# 修改 记得提交