首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >将大量CSV文件转换为拼花文件

将大量CSV文件转换为拼花文件
EN

Stack Overflow用户
提问于 2020-06-04 02:11:19
回答 2查看 2.6K关注 0票数 0

我有大量的CSV文件,需要转换为拼花文件,使用火种。一个CSV就是一个Parquet。

输入: csv文件:

代码语言:javascript
复制
000.csv
001.csv
002.csv
...

输出: qarquet文件:

代码语言:javascript
复制
000.parquet
001.parquet
002.parquet
...

我目前的解决办法是:

代码语言:javascript
复制
for each_csv in same_folder:
   df = spark.read.csv(each_csv, header = True)
   df.write.parquet(output_folder)

for循环很昂贵。有什么办法可以利用火花做批处理吗?例如:

spark.read.csv(same_folder/).write.parquet(output_folder/)

根据水银的答案,这是我的PySpark版本:

代码语言:javascript
复制
spark = SparkSession.builder.master("local[*]").appName("csv_to_parquet").getOrCreate()

# Read csv files into a single data frame and add a column of input file names: 
baseDf = spark.read.csv("input_folder/*.csv").withColumn("input_file_name", input_file_name())

# Convert file names into a list: 
filePathInfo = baseDf.select("input_file_name").distinct().collect() 
filePathInfo_array = list(map(lambda row: row.input_file_name, filePathInfo))

# Write to parquet:  
map(lambda csvFileName: baseDf.filter(col("input_file_name").endsWith(csvFileName)).write.mode('overwrite').parquet(f'output_folder/{csvFileName}'), filePathInfo_array)
EN

回答 2

Stack Overflow用户

发布于 2020-06-07 07:01:58

您可以按照以下步骤来避免星火中的多个文件加载,

  1. 使用源csv文件夹
  2. input_file_name加载数据,其中记录源文件名
  3. 将文件名收集到列表
  4. 中,迭代文件名列表

G 210

在文件名列表循环中,

  1. 通过文件名
  2. 将数据过滤到相应的文件

scala中的Sudo工作代码

代码语言:javascript
复制
import java.nio.file.Paths

import org.apache.spark.sql.{Encoders, SaveMode, SparkSession}
import org.apache.spark.sql.functions._

object ReadWriteToRespCsv {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder.master("local[*]").getOrCreate;

    val baseDf = spark.read.csv("src/main/resources/same_folder/*.csv")
      //Add a column `input_file_name` which records source file name
      .withColumn("input_file_name",input_file_name())

    //Collect the file names into a List
    val filePathInfo = baseDf.select("input_file_name").distinct()
      .map(row=>Paths.get(row.getString(0)).getFileName.toString)(Encoders.STRING).collect()

    //Iterate for file name list
    filePathInfo.foreach(csvFileName => {
      baseDf
        //Filter dataframe by file name
        .filter(col("input_file_name").endsWith(csvFileName) )
        .write
        .mode(SaveMode.Overwrite)
        //Write to respective file
        .parquet(s"src/main/resources/output_folder/${csvFileName}")
    })
  }

}
票数 1
EN

Stack Overflow用户

发布于 2020-06-04 02:52:54

您可以使用全局模式来选择文件,也可以提供文件列表。

如果我在文件夹/tmp/file1_csv/file1.csv/tmp/file2_csv/file2.csv中有两个文件,我可以使用以下方法

代码语言:javascript
复制
spark.read.option("header", "true").csv("/tmp/file*_csv/*.csv")

或者,如果您有奇怪的路径,也可以使用重载版本的csv方法。

代码语言:javascript
复制
val paths = "/dir1/,/dir2/,/dir3/"
val df = spark.read.option("header", "true").csv(paths.split(","): _*)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62185753

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档