我想使用PySpark将我的MongoDB集合集成到Elasticsearch中。我有MongoDB的连接字符串,但我不知道如何构造代码结构或指定一些参数。有人能给我创建这个任务的代码示例吗?虽然我在这个领域仍然是新手,但我已经尝试阅读了一些文档,但我发现自己被一些参数卡住了,仍然不能清楚地了解这项任务。谢谢你帮我。
发布于 2021-11-10 11:32:21
在Spark中,你可以使用RDDs或DataFrames。RDDs不是结构化的,是较旧的Spark API。要使用RDDs,您需要使用MapPartition
、ForEachPartition
等。DataFrames是使用类似于ANSI SQL的Spark SQL函数来构造和启用的。
MongoDB支持每个文档的模式。Spark DataFrames要求每个DataFrame都有一个统一的模式。
如果您的集合对每个集合的所有文档都有一个统一的模式(取决于插入的数据是什么),您可以手动(通过代码)创建Spark模式定义(PySpark中的pyspark.sql.types.StructType
)。例如,查看StructType、DataType、StructDef、FieldType、ArrayDef、DictDef、StringType等的Spark文档。
如果您的Elasticsearch集合没有统一的模式,则需要使用RDDs,然后通过MapPartition
将其转换为统一的模式,以便将此统一模式用于MongoDB索引。
您可以使用Spark SQL通过pyspark使用:spark_session.read.format('mongo').option('uri', 'mongodb://uri.to.mongo.goes.here').schema(schema=spark_schema_goes_here)
从Mongo读取DataFrames,并使用df.write.format('mongo').option('uri', uri).mode(write_mode_eg_append).save()
进行类似的写入
我不记得Elastic语法了,但是,它是相似的,并且定义是每个索引的模式。尝试在MongoDB站点和Elasticsearch站点中搜索Spark以查找更多详细信息。
可以从Spark模式定义自动创建Elastic索引定义,我曾经这样做过,但是没有代码。
https://stackoverflow.com/questions/69912273
复制相似问题