前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >python 实现一个自定义上下文管理器

python 实现一个自定义上下文管理器

作者头像
用户2458545
发布2022-09-07 11:41:41
3500
发布2022-09-07 11:41:41
举报
文章被收录于专栏:阿牛的牙

1、什么是上下文管理器?

上下文管理器(context manager)是Python2.5开始支持的一种语法,用于规定某个对象的使用范围。一旦进入或者离开该使用范围,会有特殊操作被调用 (比如为对象分配或者释放内存)。它的语法形式是,with...as...    使用关键字 withas; 上下文管理器是指在一段代码执行之前执行一段代码作预处理工作;执行之后再执行一段代码,用于一些清理工作。比如打开文件进行读写,读写完之后需要将文件关闭。又比如在数据库操作中,操作之前需要连接数据库,操作之后需要关闭数据库。在上下文管理协议中,有两个方法__enter____exit__,分别实现上述两个功能;

 links: 

https://www.geeksforgeeks.org/context-manager-in-python/

http://book.pythontips.com/en/latest/context_managers.html

https://www.python.org/dev/peps/pep-0343/ 官方文档

2、实现一个自定义上下文

任何定义了__enter__()和__exit__()方法的对象都可以用于上下文管理器。

话不多说,上代码 :  demo1

代码语言:javascript
复制
# -*- coding: utf-8 -*-
"""
@author: sato
@file: context.py
@date:  2019/9/12 18:25
"""
import os
import subprocess
import datetime


class CustomFile(object):
    """自定义一个上下文 文件类"""
    def __init__(self, file_name, file_model, *args, **kwargs):
        self.file_name = file_name
        self.file_model = file_model

    def __enter__(self, *args, **kwargs):
        self.my_print("before open file, do something!")
        self.f = open(self.file_name, self.file_model)
        return self.f

    def __exit__(self, *args, **kwargs):
        """
        __exit__方法有三个参数:exc_type, exc_val, exc_tb。如果代码块BLOCK发生异常并退出,那么分别对应异常的type、value 和 traceback。否则三个参数全为None。
        __exit__方法的返回值可以为True或者False。如果为True,那么表示异常被忽视,相当于进行了try-except操作;如果为False,则该异常会被重新raise。
        """
        self.my_print(args)
        self.my_print("before close , do something!")
        self.f.close()

    @classmethod
    def my_print(cls, *args):
        print('当前时间:{}'.format(datetime.datetime.now()), *args)


temp_file = './temp_file'
if not os.path.exists(temp_file):
    subprocess.getoutput('echo "hello world" >> {}'.format(temp_file))

CustomFile.my_print('start')
with CustomFile(temp_file, 'r') as file:
    data = file.read()
    CustomFile.my_print(data)
CustomFile.my_print('end')

输出:

代码语言:javascript
复制
/Users/sato/.virtualenvs/review/bin/python3.6 /Users/sato/Desktop/review/context.py
当前时间:2019-09-12 19:08:04.933938 start
当前时间:2019-09-12 19:08:04.933993 before open file, do something!
当前时间:2019-09-12 19:08:04.934072 hello world

当前时间:2019-09-12 19:08:04.934091 (None, None, None)
当前时间:2019-09-12 19:08:04.934103 before close , do something!
当前时间:2019-09-12 19:08:04.934123 end

Process finished with exit code 0

demo2:

代码语言:javascript
复制
from contextlib import contextmanager

temp_file = './temp_file'
if not os.path.exists(temp_file):
    subprocess.getoutput('echo "hello world" >> {}'.format(temp_file))


def demo2():
    from contextlib import contextmanager
    @contextmanager
    # 该装饰器将一个函数中yield语句之前的代码当做__enter__方法执行,yield语句之后的代码当做__exit__方法执行。
    # 同时yield返回值赋值给as后的变量。
    def custom_file(_temp_file, model):
        # __enter__   start
        f = open(_temp_file, model)
        try:
            yield f
        # __enter__  end
        # __exit__  start
        finally:
            f.close()
    return custom_file


with demo2()(temp_file,  'r') as file:   # 我也不知道为什么要这么写 
    data = file.read()
    CustomFile.my_print(data)

装饰器contextmanager 源码:

代码语言:javascript
复制
def contextmanager(func):
    """@contextmanager decorator.

    Typical usage:

        @contextmanager
        def some_generator(<arguments>):
            <setup>
            try:
                yield <value>
            finally:
                <cleanup>

    This makes this:

        with some_generator(<arguments>) as <variable>:
            <body>

    equivalent to this:

        <setup>
        try:
            <variable> = <value>
            <body>
        finally:
            <cleanup>

    """
    @wraps(func)
    def helper(*args, **kwds):
        return _GeneratorContextManager(func, args, kwds)
    return helper

_GeneratorContextManager 源码:

代码语言:javascript
复制
class _GeneratorContextManager(ContextDecorator, AbstractContextManager):
    """Helper for @contextmanager decorator."""

    def __init__(self, func, args, kwds): # 被装饰的函数 func
        self.gen = func(*args, **kwds)
        self.func, self.args, self.kwds = func, args, kwds
        # Issue 19330: ensure context manager instances have good docstrings
        doc = getattr(func, "__doc__", None)
        if doc is None:
            doc = type(self).__doc__
        self.__doc__ = doc
        # Unfortunately, this still doesn't provide good help output when
        # inspecting the created context manager instances, since pydoc
        # currently bypasses the instance docstring and shows the docstring
        # for the class instead.
        # See http://bugs.python.org/issue19404 for more details.

    def _recreate_cm(self):
        # _GCM instances are one-shot context managers, so the
        # CM must be recreated each time a decorated function is
        # called
        return self.__class__(self.func, self.args, self.kwds)

    def __enter__(self):  #  也是实现了__enter__方法的
        try:
            return next(self.gen)
        except StopIteration:
            raise RuntimeError("generator didn't yield") from None

    def __exit__(self, type, value, traceback):  # 也是实现了__exit__方法的
        if type is None:
            try:
                next(self.gen)
            except StopIteration:
                return False
            else:
                raise RuntimeError("generator didn't stop")
        else:
            if value is None:
                # Need to force instantiation so we can reliably
                # tell if we get the same exception back
                value = type()
            try:
                self.gen.throw(type, value, traceback)
            except StopIteration as exc:
                # Suppress StopIteration *unless* it's the same exception that
                # was passed to throw().  This prevents a StopIteration
                # raised inside the "with" statement from being suppressed.
                return exc is not value
            except RuntimeError as exc:
                # Don't re-raise the passed in exception. (issue27122)
                if exc is value:
                    return False
                # Likewise, avoid suppressing if a StopIteration exception
                # was passed to throw() and later wrapped into a RuntimeError
                # (see PEP 479).
                if type is StopIteration and exc.__cause__ is value:
                    return False
                raise
            except:
                # only re-raise if it's *not* the exception that was
                # passed to throw(), because __exit__() must not raise
                # an exception unless __exit__() itself failed.  But throw()
                # has to raise the exception to signal propagation, so this
                # fixes the impedance mismatch between the throw() protocol
                # and the __exit__() protocol.
                #
                if sys.exc_info()[1] is value:
                    return False
                raise
            raise RuntimeError("generator didn't stop after throw()")

如果我重写装饰器和content类代码如下:

代码语言:javascript
复制
# -*- coding: utf-8 -*-
"""
@author: sato
@file: context.py
@date:  2019/9/12 18:25
"""
import os
import subprocess
import datetime
import sys
from contextlib import ContextDecorator, AbstractContextManager
from functools import wraps

temp_file = './temp_file'
if not os.path.exists(temp_file):
    subprocess.getoutput('echo "hello world" >> {}'.format(temp_file))


class CustomFile(object):
    """自定义一个上下文 文件类"""
    def __init__(self, file_name, file_model, *args, **kwargs):
        self.file_name = file_name
        self.file_model = file_model

    def __enter__(self, *args, **kwargs):
        self.my_print("before open file, do something!")
        self.f = open(self.file_name, self.file_model)
        return self.f

    def __exit__(self, *args, **kwargs):
        """
        __exit__方法有三个参数:exc_type, exc_val, exc_tb。如果代码块BLOCK发生异常并退出,那么分别对应异常的type、value 和 traceback。否则三个参数全为None。
        __exit__方法的返回值可以为True或者False。如果为True,那么表示异常被忽视,相当于进行了try-except操作;如果为False,则该异常会被重新raise。
        """
        self.my_print(args)
        self.my_print("before close , do something!")
        self.f.close()

    @classmethod
    def my_print(cls, *args):
        print('当前时间:{}'.format(datetime.datetime.now()), *args)


def demo1(_temp_file):
    CustomFile.my_print('start')
    with CustomFile(_temp_file, 'r') as file:
        data = file.read()
        CustomFile.my_print(data)
    CustomFile.my_print('end')


def demo2():
    from contextlib import contextmanager
    @contextmanager
    # 该装饰器将一个函数中yield语句之前的代码当做__enter__方法执行,yield语句之后的代码当做__exit__方法执行。
    # 同时yield返回值赋值给as后的变量。
    def custom_file(_temp_file, model):
        # __enter__   start
        f = open(_temp_file, model)
        try:
            yield f
        # __enter__  end
        # __exit__  start
        finally:
            f.close()
    return custom_file


class GeneratorContextManager(ContextDecorator, AbstractContextManager):
    """Helper for @contextmanager decorator."""

    def __init__(self, func, args, kwds):
        self.gen = func(*args, **kwds)
        self.func, self.args, self.kwds = func, args, kwds
        # Issue 19330: ensure context manager instances have good docstrings
        doc = getattr(func, "__doc__", None)
        if doc is None:
            doc = type(self).__doc__
        self.__doc__ = doc
        # Unfortunately, this still doesn't provide good help output when
        # inspecting the created context manager instances, since pydoc
        # currently bypasses the instance docstring and shows the docstring
        # for the class instead.
        # See http://bugs.python.org/issue19404 for more details.

    def _recreate_cm(self):
        # _GCM instances are one-shot context managers, so the
        # CM must be recreated each time a decorated function is
        # called
        return self.__class__(self.func, self.args, self.kwds)

    def __enter__(self):
        CustomFile.my_print('before open do something')
        try:
            return next(self.gen)
        except StopIteration:
            raise RuntimeError("generator didn't yield") from None

    def __exit__(self, type, value, traceback):
        CustomFile.my_print('before close do something')
        if type is None:
            try:
                next(self.gen)
            except StopIteration:
                return False
            else:
                raise RuntimeError("generator didn't stop")
        else:
            if value is None:
                # Need to force instantiation so we can reliably
                # tell if we get the same exception back
                value = type()
            try:
                self.gen.throw(type, value, traceback)
            except StopIteration as exc:
                # Suppress StopIteration *unless* it's the same exception that
                # was passed to throw().  This prevents a StopIteration
                # raised inside the "with" statement from being suppressed.
                return exc is not value
            except RuntimeError as exc:
                # Don't re-raise the passed in exception. (issue27122)
                if exc is value:
                    return False
                # Likewise, avoid suppressing if a StopIteration exception
                # was passed to throw() and later wrapped into a RuntimeError
                # (see PEP 479).
                if type is StopIteration and exc.__cause__ is value:
                    return False
                raise
            except:
                # only re-raise if it's *not* the exception that was
                # passed to throw(), because __exit__() must not raise
                # an exception unless __exit__() itself failed.  But throw()
                # has to raise the exception to signal propagation, so this
                # fixes the impedance mismatch between the throw() protocol
                # and the __exit__() protocol.
                #
                if sys.exc_info()[1] is value:
                    return False
                raise
            raise RuntimeError("generator didn't stop after throw()")


def my_contextmanager(func):
    """@contextmanager decorator.

    Typical usage:

        @contextmanager
        def some_generator(<arguments>):
            <setup>
            try:
                yield <value>
            finally:
                <cleanup>

    This makes this:

        with some_generator(<arguments>) as <variable>:
            <body>

    equivalent to this:

        <setup>
        try:
            <variable> = <value>
            <body>
        finally:
            <cleanup>

    """
    @wraps(func)
    def helper(*args, **kwds):
        return GeneratorContextManager(func, args, kwds)
    return helper


@my_contextmanager
def custom_file(_temp_file, model):
    # __enter__   start
    f = open(_temp_file, model)
    try:
        yield f
    # __enter__  end
    # __exit__  start
    finally:
        f.close()
    return custom_file


CustomFile.my_print('start')
with custom_file(temp_file, 'r') as f:
    data = f.read()
    CustomFile.my_print(data)
CustomFile.my_print('end')

输出:

代码语言:javascript
复制
/Users/sato/.virtualenvs/review/bin/python3.6 /Users/sato/Desktop/review/context.py
当前时间:2019-09-12 19:37:17.099099 start
当前时间:2019-09-12 19:37:17.099160 before open do something
当前时间:2019-09-12 19:37:17.099236 hello world

当前时间:2019-09-12 19:37:17.099254 before close do something
当前时间:2019-09-12 19:37:17.099279 end

Process finished with exit code 0

可以看到执行顺序同样是: 先enter 后exit

如果继续看 AbstractContextManager 会发现 :

代码语言:javascript
复制
class AbstractContextManager(abc.ABC):

    """An abstract base class for context managers."""

    def __enter__(self):
        """Return `self` upon entering the runtime context."""
        return self

    @abc.abstractmethod
    def __exit__(self, exc_type, exc_value, traceback):
        """Raise any exception triggered within the runtime context."""
        return None

    @classmethod
    def __subclasshook__(cls, C):
        if cls is AbstractContextManager:
            return _collections_abc._check_methods(C, "__enter__", "__exit__")
        return NotImplemented
代码语言:javascript
复制
AbstractContextManager 同样有__enter__ __exit__ 方法

这就说明:

This PEP adds a new statement "with" to the Python language to make it possible to factor out standard uses of try/finally statements.

In this PEP, context managers provide __enter__() and __exit__() methods that are invoked on entry to and exit from the body of the with statement.

https://www.python.org/dev/peps/pep-0343/#specification-the-with-statement

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019年9月12日 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、什么是上下文管理器?
  • 2、实现一个自定义上下文
  • 输出:
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档