在线上安全测试的过程中,会使用 Nmap 进行端口扫描,为了提升端口扫描的效率,扫描策略通常是检测常用端口是否处于开放状态,并在父类中使用名为 all_open_ports
的属性来记录这些开放的端口。
在后续的测试过程中,需要检查所涉及的端口是否包含在 all_open_ports
中。如果不存在,就需要进一步对这些端口进行开放检测。如果端口的检测结果是开放的,测试将继续进行并将这些端口记录到 all_open_ports
中,以便在下次遇到相同端口时无需重复检测。
然而,由于安全测试是多线程进行的,某些情况下可以将 all_open_ports
理解为共享变量,这导致当两个不同的测试环境同时进行安全测试时,数据相互污染,从而影响最终测试结果的准确性。
为了解决这个问题,需要重新设计变量 all_open_ports
的存储和访问方式,以确保在多线程环境下数据的独立性和一致性,接下来由博主为各位读者进行仔细讲解。
本文代码点击此处跳转,博文中的所有代码全部收集在博主的 GitHub 仓库中。
先创建一个父类 Parent
,定义一个类属性 all_open_ports
用来记录已经开放的端口,并创建一个方法 check_port()
来模拟端口检测,代码如下所示:
class Parent:
all_open_ports = set()
def __init__(self, args):
self.all_open_ports.update(args.get("open_ports", []))
def check_port(self, port):
# 忽略端口扫描...
if port not in self.all_open_ports:
self.all_open_ports.add(port)
print(f"{port} in all_open_ports, {self.all_open_ports}")
pass
再创建一个子类 Child
继承父类,构造 scan()
方法来模拟安全测试过程,代码如下所示:
import threading
from parent import Parent
class Child(Parent):
def __init__(self, args):
super().__init__(args)
self.port = args.get("port")
def scan(self):
print(threading.current_thread().name, self.all_open_ports)
self.check_port(self.port)
pass
最后创建一个测试用例,实例化两个 Child
对象,并以多线程的方式运行对象方法 scan()
来进行场景复现,代码如下所示:
def test_thread():
c1 = Child({"port": 3001, "open_ports": [22, 3000, 3306]})
t1 = threading.Thread(target=c1.scan, name="Child_1")
t1.start()
t1.join()
c2 = Child({"port": 5001, "open_ports": [80, 3306, 5000]})
t2 = threading.Thread(target=c2.scan, name="Child_2")
t2.start()
t2.join()
print("All tasks have finished!")
运行结果:
造成上述问题的根本原因就是在多线程中 all_open_ports
可被当成共享变量使用,致使数据相互污染,从而影响最终测试结果的准确性。
因为 all_open_ports
是在父类中定义的一个类属性,这意味着它是类 Parent
的一部分,它被所有派生类(子类)所共享。通过这种方式,父类的所有子类都可以访问和更新 all_open_ports
属性。
每当子类的实例创建时,如果传递了 open_ports
参数,那么这些端口将被添加到 all_open_ports
集合中,并且在父类中的 check_port
方法中,判断给定端口 port
是否存在于 all_open_ports
集合中,如果不存在,则将端口添加到集合中。这样,所有子类实例都可以共享和更新这个属性。
现在我们修改部分代码,在打印时输出 all_open_ports
的地址来判断是否使用了同一变量,代码如下所示:
def scan(self):
print(threading.current_thread().name, self.all_open_ports, "id:", id(self.all_open_ports))
self.check_port(self.port)
pass
运行结果:
那么有什么方法能解决当前的问题呢?
all_open_ports
;contextvar
;thread.local
;重新初始化 all_open_ports
的方法是最快捷的,但是会有一个问题,重新初始化 all_open_ports
会使得每个 Child
对象都有自己独立的 all_open_ports
集合,而不会共享相同的集合,这会发生重复检测端口的情况,也就违背了一开始的设计初衷。
创建一个测试用例来观察一下当前的 all_open_ports
集合使用情况,代码如下所示:
def test_init_set():
c1 = Child({"port": 3001, "open_ports": [22, 3000, 3306]})
c2 = Child({"port": 3002, "open_ports": [80, 443, 3306]})
print("c1:", c1.all_open_ports, "c2:", c2.all_open_ports)
c1.scan()
print("c1:", c1.all_open_ports, "c2:", c2.all_open_ports)
c2.scan()
print("c1:", c1.all_open_ports, "c2:", c2.all_open_ports)
运行结果:
根据运行结果可以发现, all_open_ports
集合在当前情况下可以被看做是共享变量,哪怕在不同的线程中,个 Child
对象都能共享 all_open_ports
集合。
这时候,修改父类 Parent
中的 __init__
代码,使得 all_open_ports
集合在 __init__
时重新初始化,代码如下所示:
def __init__(self, args):
self.all_open_ports = set()
self.all_open_ports.update(args.get("open_ports", []))
运行结果:
根据运行结果可以发现,c1
和 c2
中的 all_open_ports
是完全独立的集合,c1
向 all_open_ports
集合中的增加操作不会影响到 c2
,这虽然避免了数据污染,但是会导致在 c1
检测过的端口还需要在 c2
重新进行检测,这与我们一开始设计 all_open_ports
集合来提升效率的想法背道而驰了。
contextvars
是 Python 3.7 引入的一个模块,用于提供上下文变量的功能。它是线程安全的,允许在异步编程和多线程环境中共享上下文相关的数据,而不会出现数据污染的问题,但是在较旧的 Python 版本中无法使用。
contextvars
模块提供了 ContextVar
类,它是一个上下文变量的容器。每个 ContextVar
对象都可以存储一个值,并且在不同的上下文中可以访问和修改这个值。上下文可以是线程、协程或其他异步任务。
不过需要注意的是,由于上下文变量的值可以在不同的上下文中共享,可能会导致代码中的隐式依赖。这可能增加代码的复杂性和维护成本。
二更:有被自己蠢到,实际是可行的。
先分析一下上一次为什么不行。
因为上一次一直在操作一个对象,所以本质上是在操作 set
而不是 ContextVar
,可以在 check_port()
方法里添加打印 all_open_ports
的代码,代码如下所示:
def check_port(self, port):
all_open_ports_ = self.all_open_ports.get()
print(print_prefix(), id(all_open_ports_))
...
运行结果:
结果打印出来的 id
也证实了我们的说法,因此,接下来我们要更改代码,使其操作的是 ContextVar
对象,代码如下所示:
class ParentContext:
all_open_ports = contextvars.ContextVar("all_open_ports", default=set())
def __init__(self, args):
open_ports = set(args.get("open_ports", []))
self.all_open_ports.set(open_ports | self.all_open_ports.get())
def check_port(self, port):
all_open_ports_ = self.all_open_ports.get()
print(print_prefix(), id(all_open_ports_))
if port not in all_open_ports_:
all_open_ports_.add(port)
self.all_open_ports.set(all_open_ports_)
print(f"{print_prefix()} Port {port} is added to all_open_ports, {self.all_open_ports.get()}")
运行结果:
一更:先说结论,好像不行,不知道是不是思路有问题,希望各位大神指点一下!
运行结果:
代码如下所示:
class ParentContext:
all_open_ports = contextvars.ContextVar("all_open_ports", default=set())
def __init__(self, args):
open_ports = self.all_open_ports.get()
open_ports.update(args.get("open_ports", []))
self.all_open_ports.set(open_ports)
def check_port(self, port):
all_open_ports_ = self.all_open_ports.get()
if port not in all_open_ports_:
all_open_ports_.add(port)
self.all_open_ports.set(all_open_ports_)
print(f"{print_prefix()} Port {port} is added to all_open_ports, {self.all_open_ports.get()}")
class ChildContext(ParentContext):
def __init__(self, args):
super().__init__(args)
self.port = args.get("port")
def scan(self, port=None):
self.check_port(port or self.port)
pass
def test_contextvars(open_ports, port):
c1 = ChildContext({"port": port, "open_ports": open_ports})
c1.scan()
if __name__ == '__main__':
t1 = threading.Thread(target=test_contextvars, name="Child_1", args=([80, 3306, 5000], 5001,))
t2 = threading.Thread(target=test_contextvars, name="Child_2", args=([22, 3306, 6000], 6001,))
t1.start()
t2.start()
t1.join()
t2.join()
threading.local()
是 Python 标准库中的一个类,它提供了一种在多线程环境下创建线程本地存储的机制。它允许每个线程都有自己独立的变量副本,这些变量在不同线程之间是相互隔离的,不会相互干扰。
当多个线程同时执行时,它们可以访问和修改各自的线程本地变量,而不会影响其他线程的变量。这对于需要在线程之间共享数据,但又需要保持数据独立性的情况非常有用。
接下来,我们创建父类 ParentLocal
,并使用 threading.local()
来存储集合 all_open_ports
,代码如下所示:
class ParentLocal:
local = threading.local()
def __init__(self, args):
self.local.all_open_ports = getattr(self.local, "all_open_ports", set())
self.local.all_open_ports.update(args.get("open_ports", []))
def check_port(self, port):
if port not in self.local.all_open_ports:
self.local.all_open_ports.add(port)
print(f"{self.print_prefix()} Port {port} is added to all_open_ports, {self.local.all_open_ports}")
def print_prefix(self):
return f"[{time.strftime('%H:%M:%S', time.localtime())} {threading.current_thread().name}]"
在上述代码中,ParentLocal
类定义了初始化方法 __init__
,通过 getattr()
函数来获取 self.local.all_open_ports
的值。如果 self.local.all_open_ports
不存在,则使用 set()
创建一个空的集合,并将其赋值给 self.local.all_open_ports
。然后,我们使用 update()
方法将 args.get("open_ports", [])
中的端口添加到 self.local.all_open_ports
中。
通过使用 ParentLocal
类,我们可以在多线程环境中创建多个实例,并且每个实例都有自己独立的 all_open_ports
变量。这样,不同线程的实例之间的数据不会相互干扰。
而 Child
类与之前基本保持不变,代码如下所示:
class ChildLocal(ParentLocal):
def __init__(self, args):
super().__init__(args)
self.port = args.get("port")
def scan(self, port=None):
self.check_port(port or self.port)
pass
在上述代码中,ChildLocal
类是继承自 ParentLocal
类的子类,通过继承关系它可以访问父类的 self.local.all_open_ports
集合。这使得 ChildLocal
实例可以在同一线程下共享数据,同时不会受到其他线程中的 ChildLocal
实例的影响。
编写测试代码如下所示:
def tset_local(open_ports, port):
c1 = ChildLocal({"port": port, "open_ports": open_ports})
c1.scan()
args = {"port": generate_random_numbers(1)[0], "open_ports": generate_random_numbers(3)}
print(threading.current_thread().name, args)
c2 = ChildLocal(args)
c2.scan()
time.sleep(3)
c1.scan(random.randint(8000, 9999))
if __name__ == '__main__':
t1 = threading.Thread(target=tset_local, name="Child_1", args=([80, 3306, 5000], 5001,))
t2 = threading.Thread(target=tset_local, name="Child_2", args=([22, 3306, 6000], 6001,))
t1.start()
t2.start()
t1.join()
t2.join()
运行结果:
如我们所料,Child1
和 Child2
线程中的 ChildLocal
实例相互之间共享 all_open_ports
集合的数据,但是不同线程之间的 ChildLocal
实例不能相互共享数据。
需要注意的是,threading.local()
对象在不同的线程中具有相同的 id
值,这是因为它们实际上是同一个对象的不同实例。每个线程都有自己独立的 threading.local()
对象,但它们共享相同的类定义。
当在不同的线程中创建 threading.local()
对象时,每个线程都会创建一个新的实例,但这些实例的类定义是相同的。因此,它们的 id
值是相同的。
幸好我们及时发现了这个问题,并没有造成安全事故。现在我们将这次经历分享出来,希望能给其他开发团队带来启发,共同提高系统的安全性和稳定性。
以上就是 记一次线上安全测试中误用父类属性导致数据污染的解决方案 的所有内容了,希望本篇博文对大家有所帮助!欢迎大家持续关注我的博客,一起分享学习和成长的乐趣!✨
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。