首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >将异步websocket.send()传递给stdout/stderr包装器类

将异步websocket.send()传递给stdout/stderr包装器类
EN

Stack Overflow用户
提问于 2017-02-18 17:58:33
回答 1查看 239关注 0票数 0

我有一个类函数,用于解缓冲stdout和stderr,如下所示:

代码语言:javascript
运行
复制
class Unbuffered:
    def __init__(self, stream):
        self.stream = stream
    def write(self, data):
        data = data.strip()
        if data.startswith("INFO: "):
            data = data[6:]
        if '[' in data:
            progress = re.compile(r"\[(\d+)/(\d+)\]")
            data = progress.match(data)
            total = data.group(2)
            current = data.group(1)
            data = '{0}/{1}'.format(current, total)
        if data.startswith("ERROR: "):
            data = data[7:]
        self.stream.write(data + '\n')
        self.stream.flush()
    def __getattr__(self, attr):
        return getattr(self.stream, attr)

当从websocket入站时,输出来自在ProcessPoolExecutor中运行的函数。

我希望输出输出在控制台中并发送到我的websocket客户端。我尝试过异步、取消缓冲并将websocket实例传递给它,但没有成功。

更新:run()和websocket handler()的基本要素如下所示:

代码语言:javascript
运行
复制
def run(url, path):
    logging.addLevelName(25, "INFO")
    fmt = logging.Formatter('%(levelname)s: %(message)s')
    #----
    output.progress_stream = Unbuffered(sys.stderr)
    stream = Unbuffered(sys.stdout)
    #----
    level = logging.INFO
    hdlr = logging.StreamHandler(stream)
    hdlr.setFormatter(fmt)
    log.addHandler(hdlr)
    log.setLevel(level)
    get_media(url, opt)

async def handler(websocket, path):
    while True:
        inbound = json.loads(await websocket.recv())
        if inbound is None:
            break
        url = inbound['url']
        if 'path' in inbound:
            path = inbound['path'].rstrip(os.path.sep) + os.path.sep
        else:
            path = os.path.expanduser("~") + os.path.sep
        # blah more code
        while inbound != None:
            await asyncio.sleep(.001)
            await loop.run_in_executor(None, run, url, path)

run()handler()Unbuffered是相互分离的。

EN

回答 1

Stack Overflow用户

发布于 2017-02-19 10:02:50

最好是重写get_media()以使用asyncio,而不是在不同的线程中运行它。否则,有一些选项可以在常规线程和协同线程之间进行通信,例如,使用套接字对:

代码语言:javascript
运行
复制
import asyncio
import socket
import threading
import time

import random


# threads stuff
def producer(n, writer):
    for i in range(10):
        # print("sending", i)
        writer.send("message #{}.{}\n".format(n, i).encode())
        time.sleep(random.uniform(0.1, 1))


def go(writer):
    threads = [threading.Thread(target=producer, args=(i + 1, writer,))
               for i in range(5)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    writer.send("bye\n".encode())


# asyncio coroutines
async def clock():
    for i in range(11):
        print("The time is", i)
        await asyncio.sleep(1)


async def main(reader):
    buffer = ""
    while True:
        buffer += (await loop.sock_recv(reader, 10000)).decode()
        # print(len(buffer))
        while "\n" in buffer:
            msg, _nl, buffer = buffer.partition("\n")
            print("Got", msg)
            if msg == "bye":
                return


reader, writer = socket.socketpair()
reader.setblocking(False)
threading.Thread(target=go, args=(writer,)).start()
# time.sleep(1.5) # socket is buffering
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([clock(), main(reader)]))
loop.close()

您还可以尝试这个第三方thread+asyncio兼容队列:贾纳斯

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42318714

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档