前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用Python在Neo4j中创建图数据库

使用Python在Neo4j中创建图数据库

作者头像
磐创AI
发布2021-05-07 16:49:16
5.4K0
发布2021-05-07 16:49:16
举报
文章被收录于专栏:磐创AI技术团队的专栏


磐创AI分享

作者 | CJ Sullivan

编译 | VK 来源 | Towards Data Science

图数据库的一个最常见的问题是如何将数据存入数据库。在上一篇文章中,我展示了如何使用通过Docker设置的Neo4j浏览器UI以几种不同的方式之一实现这一点。

在这篇文章中,我将展示如何使用Python生成的数据来填充数据库。我还将向你展示如何使用Neo4j沙箱,这样就可以使用不同的Neo4j数据库设置。

你可以在这里找到一个谷歌Colab笔记本:https://colab.research.google.com/drive/1J9__HotNoINHpucoipLH-4qWc48GALAk?usp=sharing

里面有这篇文章的代码。(那本笔记本上有如何连接Colab和Kaggle的说明,可以让你更快地下载数据。)

必要的工具
  1. Neo4j Python驱动程序(撰写本文时为4.2版)
  2. jupiter notebook/Lab或谷歌Colab(可选)
  3. pandas

使用Python清理数据

现在我们可以开始用Python做一些数据处理了。

为了写这篇文章,我们将使用在Kaggle上找到的arXiv数据集,其中包含超过170万篇STEM学术论文。(在写这篇文章的时候,已经是第18版了。)你可以将数据下载到本地机器

https://www.kaggle.com/Cornell-University/arxiv

现在进入你的笔记本,我们可以开始查看我们的数据。我通过以下方式加载数据:

代码语言:javascript
复制
file = "./arxiv-metadata-oai-snapshot.json"

metadata  = []

lines = 100000    # 100k 测试

with open(file, 'r') as f:

    for line in tqdm(f):
        metadata.append(json.loads(line))
        lines -= 1
        if lines == 0: break

df = pd.DataFrame(metadata)

(你不必使用tqdm,但我发现在知道文件大小超过179万个条目时检查进度很有帮助。)

你可以通过df看到。我们的数据结构为:

代码语言:javascript
复制
id                object
submitter         object
authors           object
title             object
comments          object
journal-ref       object
doi               object
report-no         object
categories        object
license           object
abstract          object
versions          object
update_date       object
authors_parsed    object

假设我们想用这个数据框构建一个图,我们想知道哪些作者发表了哪些论文,以及这些论文与哪些类别相关联。

然后,我们希望有三种不同的节点类型与之对应:作者、论文和类别。

每个节点类型都有一两个属性。对于作家来说,有作者的名字。论文可以有ID和标题。最后,类别有自己的名称。我们也有一些关系:作者作者,论文和论文。

因此,我们的目标是拥有以下数据模型(用arrows.app绘制):

有一些列对我们很有用。例如,我打算保留id,这样我们就可以使用它作为每个论文的唯一索引。之后,我想要得到每个作者的个人列表。此外,authors_parsed列为我们提供了一个更清晰的所有作者列表。当然,我们将保留标题栏作为论文的主要属性。最后,我想保留categories列。

下一步是稍微清理一下我们的数据,这样数据帧的每行有一个作者,每行有一个类别。例如,我们看到authors_parsed列给出了一个列表,其中每个条目在名称后面都有一个多余的逗号。

如果我们简单地将其导入到数据库中,我们将得到author节点,如(显示一个小示例):

代码语言:javascript
复制
╒════════════════════════════════════╕
│"n"                                 │
╞════════════════════════════════════╡
│{"name":["Balázs","C.",""]}         │
├────────────────────────────────────┤
│{"name":["Berger","E. L.",""]}      │
├────────────────────────────────────┤
│{"name":["Nadolsky","P. M.",""]}    │
├────────────────────────────────────┤
│{"name":["Yuan","C. -P.",""]}       │
├────────────────────────────────────┤
│{"name":["Streinu","Ileana",""]}    │
└────────────────────────────────────┘

由于这不是一件令人愉快的事情(并且会导致查询不是最优雅的),我们需要稍微清理一下。我们还看到categories列可以有一个单独的类别,也可以有几个不采用传统列表格式的类别(如本示例的最后一行所示):

代码语言:javascript
复制
╒═══════════════════════════════════╕
│"c"                                │
╞═══════════════════════════════════╡
│{"category":"hep-ph"}              │
├───────────────────────────────────┤
│{"category":"math.CO cs.CG"}       │
├───────────────────────────────────┤
│{"category":"physics.gen-ph"}      │
├───────────────────────────────────┤
│{"category":"math.CO"}             │
├───────────────────────────────────┤
│{"category":"math.CA math.FA"}     │
└───────────────────────────────────┘

我们可以在Cypher中这样做,但为了这篇文章的目的,我们将在Python中做清理,以便说明

让我们创建两个帮助函数来清理这两列:

代码语言:javascript
复制
def get_author_list(line):
    # 清除author dataframe列,在行中创建作者列表。
    return [e[1] + ' ' + e[0] for e in line]


def get_category_list(line):
    # 清除“category”列,在该行中创建类别列表。
    return list(line.split(" "))


df['cleaned_authors_list'] = df['authors_parsed'].map(get_author_list)
df['category_list'] = df['categories'].map(get_category_list)
df = df.drop(['submitter', 'authors', 
             'comments', 'journal-ref', 
             'doi', 'report-no', 'license', 
             'versions', 'update_date', 
             'abstract', 'authors_parsed', 
             'categories'], axis=1)

我们得到的数据帧现在看起来像:

现在我们有东西可以用了!

创建一个Neo4j沙箱

Neo4j沙箱可以对Neo4j免费鼓捣。你可以启动一个实例,该实例将持续3天并开始工作!

出于本文的目的,当你进入沙箱时,你将创建一个基本的、空白的沙箱,像这样:

正如你在创建窗口中看到的那样,还有许多其他有用的沙箱,但是我们将选择这个选项,因为我们将用我们自己的数据填充数据库。休息几分钟,等待运行完成。一旦完成,你将得到你的连接信息,如下所示:

这个窗口有一些你需要的东西。首先,你将注意到Bolt URL,并完成其端口号。

要通过Python建立连接,你将需要这个。接下来,你还需要密码(在本例中为“difficulties-pushup-gap”)。这将需要验证到此实例中。我要指出的是,3天后当这个实例被删除时,这些信息就不再有效了。

连接到Neo4j并填充数据库

现在,我们需要在本地机器(或任何有Python代码的地方)和沙箱数据库之间建立连接。这就需要用到BOLT URL和密码。

我已经创建了一个helper类来做这一点:

代码语言:javascript
复制
class Neo4jConnection:

    def __init__(self, uri, user, pwd):
        self.__uri = uri
        self.__user = user
        self.__pwd = pwd
        self.__driver = None
        try:
            self.__driver = GraphDatabase.driver(self.__uri, auth=(self.__user, self.__pwd))
        except Exception as e:
            print("Failed to create the driver:", e)

    def close(self):
        if self.__driver is not None:
            self.__driver.close()

    def query(self, query, parameters=None, db=None):
        assert self.__driver is not None, "Driver not initialized!"
        session = None
        response = None
        try: 
            session = self.__driver.session(database=db) if db is not None else self.__driver.session() 
            response = list(session.run(query, parameters))
        except Exception as e:
            print("Query failed:", e)
        finally: 
            if session is not None:
                session.close()
        return response


conn = Neo4jConnection(uri="bolt://52.87.205.91:7687", 
                       user="neo4j",              
                       pwd="difficulties-pushup-gaps")

现在我们可以开始填充数据库了。我们首先在数据库中创建一些约束,以确保节点不重复,同时建立一些索引:

代码语言:javascript
复制
conn.query('CREATE CONSTRAINT papers IF NOT EXISTS ON (p:Paper)     ASSERT p.id IS UNIQUE')
conn.query('CREATE CONSTRAINT authors IF NOT EXISTS ON (a:Author) ASSERT a.name IS UNIQUE')
conn.query('CREATE CONSTRAINT categories IF NOT EXISTS ON (c:Category) ASSERT c.category IS UNIQUE')

现在我们创建三个函数来为category和author节点创建数据框,我们将使用它们分别填充到数据库中:

代码语言:javascript
复制
def add_categories(categories):
    # 向Neo4j图中添加类别节点。
    query = '''
            UNWIND $rows AS row
            MERGE (c:Category {category: row.category})
            RETURN count(*) as total
            '''
    return conn.query(query, parameters = {'rows':categories.to_dict('records')})


def add_authors(rows, batch_size=10000):
    # #以批处理作业的形式将作者节点添加到Neo4j图中。
    query = '''
            UNWIND $rows AS row
            MERGE (:Author {name: row.author})
            RETURN count(*) as total
            '''
    return insert_data(query, rows, batch_size)


def insert_data(query, rows, batch_size = 10000):
    # 以批处理方式更新Neo4j数据库。

    total = 0
    batch = 0
    start = time.time()
    result = None

    while batch * batch_size < len(rows):

        res = conn.query(query, 
                         parameters= {
                         'rows': rows[batch*batch_sizebatch+1)*batch_size].to_dict('records')})
        total += res[0]['total']
        batch += 1
        result = {"total":total, 
                  "batches":batch, 
                  "time":time.time()-start}
        print(result)

    return result

这些函数将每一列放入变量$rows中,这些列是列表格式的。

UNWIND命令获取列表中的每个实体并将其添加到数据库中。在此之后,我们使用一个辅助函数以批处理模式更新数据库,当你处理超过50k的上传时,它会很有帮助。

加载这些节点后,我们将添加论文节点以及与所有关系:

代码语言:javascript
复制
def add_papers(rows, batch_size=5000):
   # 添加论文节点 , (:Author)--(:Paper) , 
   # (:Paper)--(:Category) 关系

   query = '''
   UNWIND $rows as row
   MERGE (p:Paper {id:row.id}) ON CREATE SET p.title = row.title

   // connect categories
   WITH row, p
   UNWIND row.category_list AS category_name
   MATCH (c:Category {category: category_name})
   MERGE (p)-[:IN_CATEGORY]->(c)

   // connect authors
   WITH distinct row, p // reduce cardinality
   UNWIND row.cleaned_authors_list AS author
   MATCH (a:Author {name: author})
   MERGE (a)-[:AUTHORED]->(p)
   RETURN count(distinct p) as total
   '''

   return insert_data(query, rows, batch_size)

因此,与category和author节点类似,我们创建了每一篇论文,然后通过数据帧中每一行的:authorated或:IN_CATEGORY关系将其连接起来。

请注意,在这个函数中有更多的数据在管道中移动,因此它可能有助于减少批处理大小,以防止超时错误。

同样,在这个步骤中,我们可能会在完整的数据帧上使用类似于explosion的方法,为每个列表的每个元素获取一行,并以这种方式将整个数据帧载入到数据库中。这是可行的,这正是我们将在下面对少量数据所做的。

然而,对于更大的数据集,将数据加载到Neo4j并不是一种非常有效的方法。因为Neo4j是一个事务性数据库,我们创建一个数据库,数据帧的每一行就执行一条语句,这会非常缓慢。它也可能超出可用内存。沙箱实例有大约500 MB的堆内存和500 MB的页面缓存。因此,这进一步推动了以批处理方式更新数据库。

执行所有这些函数来填充图,我们有:

代码语言:javascript
复制
categories = pd.DataFrame(df[['category_list']])
categories.rename(columns={'category_list':'category'},
                  inplace=True)
categories = categories.explode('category') \
                       .drop_duplicates(subset=['category'])

authors = pd.DataFrame(df[['cleaned_authors_list']])
authors.rename(columns={'cleaned_authors_list':'author'},
               inplace=True)
authors=authors.explode('author').drop_duplicates(subset=['author'])

add_categories(categories)
add_authors(authors)
add_papers(df)

太棒了!我们现在有一个填充数据库!下面是该图的子样本,通过该命令运行得到:MATCH (a:Author)-[:AUTHORED]->(p:Paper)-[:IN_CATEGORY]->(c:Category) RETURN a, p, c LIMIT 300

让我们确保它有我们想要的东西……

查询数据库以获得一些答案

这里有一个提示:当你有了一个已填充的数据库时,你应该让Neo4j处理尽可能多的计算,然后再将答案带回Python(如果你甚至需要的话)。

在本例中,假设我们想计算每个类别的相关度,并返回前20个类别的类别。显然,我们可以在Python中完成这个简单的工作,但让我们在Neo4j中完成它。

在某些时候,你可能需要进行更复杂的计算(例如节点中心性、路径查找或社区检测),这些都可以并且应该在将结果下载回Python之前在Neo4j中完成。

为了在Cypher中做到这一点,我们可以使用许多方法,但这里有一个快速有效的方法:

代码语言:javascript
复制
query_string = '''
MATCH (c:Category) 
RETURN c.category_name, SIZE(()-[:IN_CATEGORY]->(c)) AS inDegree 
ORDER BY inDegree DESC LIMIT 20
'''
top_cat_df = pd.DataFrame([dict(_) for _ in conn.query(query_string)])
top_cat_df.head(20)

这应该返回:

上述数据子集的入度分布如下:

因此,这表明数据库已经填充,以及我们如何获得结果。无论如何,另一种方法可以得到相同的结果返回的列表形式是:

代码语言:javascript
复制
result = conn.query(query_string)
for record in result:
    print(record['c.category'], record['inDegree'])

总结

我们已经展示了如何从Python连接到Neo4j沙箱,并在满足要求的情况下上传数据。

就像编码中的其他事情一样,有很多不同的方法可以实现这一点,我们鼓励感兴趣的用户主要使用Cypher而不是Python来探索上面的演示。

通过使用Neo4j Python连接器,可以很容易地在Python和Neo4j数据库之间来回切换,就像其他数据库一样。这将为数据科学和机器学习带来各种令人兴奋的可能性,比如自动节点分类、链接预测和节点聚类。

感谢阅读!

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-05-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 磐创AI 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 必要的工具
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档