前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >基于ray 多进程调度管理能力优化networks节点最短路径的并行计算

基于ray 多进程调度管理能力优化networks节点最短路径的并行计算

作者头像
857技术社区
发布2023-05-23 11:17:24
2910
发布2023-05-23 11:17:24
举报
文章被收录于专栏:857-Bigdata857-Bigdata
本教程设计到多进程框架ray与networkx的图计算框架。

原生的networkx实现的只能在节点介数度量性任务上达到单核心100的cpu利用率。通过对源码的几行改造我们可以实现多核心的100的利用率。接下来要我们来一起看看是如何实现的多核心100的利用率。

代码语言:javascript
复制
def f_one():


    degree_centrality_result = betweenness_centrality(G)
    out = []
    for i, j in degree_centrality_result.items():
        out.append({"user_id": i, "weight": j})

    pd.DataFrame(out).to_csv("betweenness_centrality_result.csv")

# 读取2020年企业年报因果关系数据集。
bracket_name = json.load(open("2020_extract_result_center.json", "r"))
graph = {}
for i in tqdm(list(bracket_name)):
    if (i[0], i[4]) not in graph:
        graph[(i[0], i[4])] = 1
    else:
        graph[(i[0], i[4])] += 1
print(len(graph))
graph_relation = []
for graph_id, graph_count in graph.items():
    if graph_count > 1:
        graph_relation.append(graph_id)
print(len(graph_relation))
graph_relation = graph_relation[:10000]
G = nx.Graph()
G.add_edges_from(graph_relation)  # 添加多条边
ray_G = ray.put(G)
f_one()

心细的小伙伴可能已经发现了,我们在代码中有一个ray.put的操作,对G这个无向图图谱数据集进行了多进程的准备。那么接下来,我们需要解释一下什么叫做介数中心性。在一张无向图图谱中存在着海量的节点。每一个节点到非相邻的节点都存在着一条最短路径。在介数中心性这个算法中,当前节点出现在无向图图谱所有的最短路径中出现的次数越多意味着节点的重要性越高。通过百度搜索我们可以知道,介数中心性指标在航海、飞行航路场景中有着重要的应用。在百度开源的hugegraph图数据库白皮书中介绍到介数重要性可以作为反洗钱的重要算法、社区发现可以进行团伙反欺诈。

那么源码如何实现的介数中心性这个指标的呢。我们慢慢往下看。

代码语言:javascript
复制
def betweenness_centrality(
        G, k=None, normalized=True, weight=None, endpoints=False, seed=None
):
    betweenness = dict.fromkeys(G, 0.0)  # b[v]=0 for v in G
    if k is None:
        nodes = G
    else:
        nodes = seed.sample(list(G.nodes()), k)
    ray_betweenness = [shortest_path_basic.remote(s, ray_betweenness) for s in nodes]
    betweenness = ray.get(ray_betweenness)
    betweenness_compute = {}
    for betweenness_one in betweenness:
        for i,j in betweenness_one.items():
            if i in betweenness_compute:
                betweenness_compute[i]+=j
            else:
                betweenness_compute[i]=j

    betweenness = _rescale(
        betweenness_compute,
        len(G),
        normalized=normalized,
        directed=G.is_directed(),
        k=k,
        endpoints=endpoints,
    )
    return betweenness

其中k代表着节点取样数,节点取样数量越少计算速度越快。(因为通过节点进行最短路径的遍历过程最少。)

首先我们需要定义一个betweeness的字典。用以储存每一个节点在其所经过的最短路径中的次数。

第二我们需要遍历所有的节点,用以在计算最短路径这个事情上获取到每一个节点所在的最短路径。

第三我们将每一个节点造成的最短路径的结果给重新合并到一个字典上。

第四,通过rescale重新对我们的所有节点结果进行汇总计算。

那么接下来让我们看看重头戏寻找当前节点的最短路径的代码我们是怎么进行修改的。

代码语言:javascript
复制
@ray.remote
def shortest_path_basic(s, ray_betweenness=None):
    G = ray.get(ray_G)
    betweenness = dict.fromkeys(G, 0.0)  # b[v]=0 for v in G

    # single source shortest paths
    S, P, sigma, _ = _single_source_shortest_path_basic(G, s)
    # accumulation
    betweenness, delta = _accumulate_basic(betweenness, S, P, sigma, s)
#     ray_betweenness = ray.put(betweenness)
    del G
    return betweenness

首先我们从ray的大对象共享代理池中把我们的图拉出来。

第二我们构建一个包含所有节点为key的字典。

第三输入图谱数据G和节点s。通过s来计算所覆盖到的最短路径。

第四我们对所产生的betweenness字典对象进行积累。

第五,我们为了节约内存,所以删掉了特别占用内存的图谱数据G。

第六,我们将累计好的结果返回。

接下来我们就可以通过对基于节点的最短路径查找出来的节点权重进行权重的计算了。

整体代码如下,感兴趣的小伙伴们快来试一试这样实现的单机多进程betweenness节点介数中心性的效果吧。

代码语言:javascript
复制
import json
import networkx as nx
import pandas as pd
from collections import deque

from networkx.algorithms.shortest_paths.weighted import _weight_function
from tqdm import tqdm


# helpers for betweenness centrality


def _single_source_shortest_path_basic(G, s):
    S = []
    P = {}
    for v in G:
        P[v] = []
    sigma = dict.fromkeys(G, 0.0)  # sigma[v]=0 for v in G
    D = {}
    sigma[s] = 1.0
    D[s] = 0
    Q = deque([s])
    while Q:  # use BFS to find shortest paths
        v = Q.popleft()
        S.append(v)
        Dv = D[v]
        sigmav = sigma[v]
        for w in G[v]:
            if w not in D:
                Q.append(w)
                D[w] = Dv + 1
            if D[w] == Dv + 1:  # this is a shortest path, count paths
                sigma[w] += sigmav
                P[w].append(v)  # predecessors
    return S, P, sigma, D


def _accumulate_basic(betweenness, S, P, sigma, s):
    delta = dict.fromkeys(S, 0)
    while S:
        w = S.pop()
        coeff = (1 + delta[w]) / sigma[w]
        for v in P[w]:
            delta[v] += sigma[v] * coeff
        if w != s:
            betweenness[w] += delta[w]
    return betweenness, delta


def _accumulate_endpoints(betweenness, S, P, sigma, s):
    betweenness[s] += len(S) - 1
    delta = dict.fromkeys(S, 0)
    while S:
        w = S.pop()
        coeff = (1 + delta[w]) / sigma[w]
        for v in P[w]:
            delta[v] += sigma[v] * coeff
        if w != s:
            betweenness[w] += delta[w] + 1
    return betweenness, delta


def _accumulate_edges(betweenness, S, P, sigma, s):
    delta = dict.fromkeys(S, 0)
    while S:
        w = S.pop()
        coeff = (1 + delta[w]) / sigma[w]
        for v in P[w]:
            c = sigma[v] * coeff
            if (v, w) not in betweenness:
                betweenness[(w, v)] += c
            else:
                betweenness[(v, w)] += c
            delta[v] += c
        if w != s:
            betweenness[w] += delta[w]
    return betweenness


def _rescale(betweenness, n, normalized, directed=False, k=None, endpoints=False):
    if normalized:
        if endpoints:
            if n < 2:
                scale = None  # no normalization
            else:
                # Scale factor should include endpoint nodes
                scale = 1 / (n * (n - 1))
        elif n <= 2:
            scale = None  # no normalization b=0 for all nodes
        else:
            scale = 1 / ((n - 1) * (n - 2))
    else:  # rescale by 2 for undirected graphs
        if not directed:
            scale = 0.5
        else:
            scale = None
    if scale is not None:
        if k is not None:
            scale = scale * n / k
        for v in betweenness:
            betweenness[v] *= scale
    return betweenness


def _rescale_e(betweenness, n, normalized, directed=False, k=None):
    if normalized:
        if n <= 1:
            scale = None  # no normalization b=0 for all nodes
        else:
            scale = 1 / (n * (n - 1))
    else:  # rescale by 2 for undirected graphs
        if not directed:
            scale = 0.5
        else:
            scale = None
    if scale is not None:
        if k is not None:
            scale = scale * n / k
        for v in betweenness:
            betweenness[v] *= scale
    return betweenness


import ray

ray.init()


@ray.remote
def shortest_path_basic(s, ray_betweenness=None):
    G = ray.get(ray_G)
    betweenness = dict.fromkeys(G, 0.0)  # b[v]=0 for v in G

    # single source shortest paths
    S, P, sigma, _ = _single_source_shortest_path_basic(G, s)
    # accumulation
    betweenness, delta = _accumulate_basic(betweenness, S, P, sigma, s)
#     ray_betweenness = ray.put(betweenness)
    del G
    return betweenness

def betweenness_centrality(
        G, k=None, normalized=True, weight=None, endpoints=False, seed=None
):
    betweenness = dict.fromkeys(G, 0.0)  # b[v]=0 for v in G
    if k is None:
        nodes = G
    else:
        nodes = seed.sample(list(G.nodes()), k)
    ray_betweenness = ray.put(betweenness)
    ray_betweenness = [shortest_path_basic.remote(s, ray_betweenness) for s in nodes]
    betweenness = ray.get(ray_betweenness)
#     print(betweenness[:3])
    # rescaling
#     json.dump(betweenness, open("betweenness.json", "w", encoding="utf-8"), ensure_ascii=False)
    betweenness_compute = {}
    for betweenness_one in betweenness:
        for i,j in betweenness_one.items():
            if i in betweenness_compute:
                betweenness_compute[i]+=j
            else:
                betweenness_compute[i]=j

    betweenness = _rescale(
        betweenness_compute,
        len(G),
        normalized=normalized,
        directed=G.is_directed(),
        k=k,
        endpoints=endpoints,
    )
    return betweenness


@ray.remote
def f(index):
    bracket_name = ray.get(bracket_name_ray)

    graph = {}

    print(index)
    for i in tqdm(list(bracket_name)):
        if (i[0], i[2]) not in graph:
            graph[(i[0], i[2])] = 1
        else:
            graph[(i[0], i[2])] += 1

    print(len(graph))
    graph_relation = []
    for graph_id, graph_count in graph.items():
        if graph_count > 1:
            graph_relation.append(graph_id)

    G = nx.Graph()

    G.add_edges_from(graph_relation)  # 添加多条边
    degree_centrality_result = betweenness_centrality(G)
    out = []
    for i, j in degree_centrality_result.items():
        out.append({"user_id": i, "weight": j})

    pd.DataFrame(out).to_csv("betweenness_centrality_result" + str(index) + ".csv")


def f_one():


    degree_centrality_result = betweenness_centrality(G)
    out = []
    for i, j in degree_centrality_result.items():
        out.append({"user_id": i, "weight": j})

    pd.DataFrame(out).to_csv("betweenness_centrality_result.csv")


bracket_name = json.load(open("2020_extract_result_center.json", "r"))
bracket_name_ray = ray.put(bracket_name)

# futures = [f.remote(i) for i in range(len(bracket_name) // 3000)]
# print(ray.get(futures))  # [0, 1, 4, 9]
bracket_name = ray.get(bracket_name_ray)
graph = {}
for i in tqdm(list(bracket_name)):
    if (i[0], i[4]) not in graph:
        graph[(i[0], i[4])] = 1
    else:
        graph[(i[0], i[4])] += 1

print(len(graph))
graph_relation = []
for graph_id, graph_count in graph.items():
    if graph_count > 1:
        graph_relation.append(graph_id)
print(len(graph_relation))
graph_relation = graph_relation[:10000]
G = nx.Graph()

G.add_edges_from(graph_relation)  # 添加多条边
ray_G = ray.put(G)

f_one()
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-03-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 857Hub 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
图数据库 KonisGraph
图数据库 KonisGraph(TencentDB for KonisGraph)是一种云端图数据库服务,基于腾讯在海量图数据上的实践经验,提供一站式海量图数据存储、管理、实时查询、计算、可视化分析能力;KonisGraph 支持属性图模型和 TinkerPop Gremlin 查询语言,能够帮助用户快速完成对图数据的建模、查询和可视化分析。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档