首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在磁盘上将spark DataFrame保存为csv?

如何在磁盘上将spark DataFrame保存为csv?
EN

Stack Overflow用户
提问于 2015-10-16 23:39:36
回答 4查看 158.2K关注 0票数 30

例如,结果如下:

代码语言:javascript
复制
df.filter("project = 'en'").select("title","count").groupBy("title").sum()

将返回一个数组。

如何在磁盘上将spark DataFrame保存为csv文件?

EN

回答 4

Stack Overflow用户

发布于 2015-10-16 23:47:13

Apache Spark不支持磁盘上的本机CSV输出。

不过,您有四种可用的解决方案:

  1. 您可以将数据帧转换为RDD:

定义行(r:convertToReadableString)=?df.rdd.map{ convertToReadableString }.saveAsTextFile(文件路径)

这将创建一个文件夹文件路径。在文件路径下,你会找到分区文件(例如part-000*)

如果我想要将所有分区附加到一个大的CSV中,我通常会做的是

cat文件路径/部分*> mycsvfile.csv

有些人会使用coalesce(1,false)从RDD创建一个分区。这通常是一种糟糕的实践,因为它可能会将您正在收集的所有数据拉到它上面,从而使驱动程序不堪重负。

请注意,df.rdd将返回RDD[Row].

  • With spark <2,您可以使用databricks spark-csv library

代码语言:javascript
复制
- Spark 1.4+:

df.write.format("com.databricks.spark.csv").save(filepath)

- Spark 1.3:

Df.save(文件路径,"com.databricks.spark.csv")

带有Spark2.x

  1. 不需要spark-csv包,因为它包含在Spark中。

df.write.format("csv").save(filepath)

  • You可以转换为本地熊猫数据帧并使用to_csv方法(仅限PySpark)。

注意: Spark解决方案1、2和3将生成由底层Hadoop API生成的格式文件(part-*),当您调用save时,该API将调用该API。每个分区将有一个part-文件。

票数 42
EN

Stack Overflow用户

发布于 2019-08-18 01:25:47

将数据帧作为csv写入磁盘与从csv读取数据帧类似。如果你想要你的结果作为一个文件,你可以使用coalesce。

代码语言:javascript
复制
df.coalesce(1)
      .write
      .option("header","true")
      .option("sep",",")
      .mode("overwrite")
      .csv("output/path")

如果您结果是一个数组,您应该使用特定于语言的解决方案,而不是spark dataframe api。因为所有这些类型的结果都返回驱动程序。

票数 20
EN

Stack Overflow用户

发布于 2016-08-12 16:28:01

我也遇到过类似的问题。当我在客户机模式下连接到集群时,我需要在驱动程序上写下csv文件。

我想重用与Apache Spark相同的CSV解析代码,以避免潜在的错误。

我检查了spark-csv代码,发现在com.databricks.spark.csv.CsvSchemaRDD中负责将数据帧转换为原始csv RDD[String]的代码。

遗憾的是,它是用sc.textFile和相关方法的末尾硬编码的。

我复制粘贴了该代码,并使用sc.textFile删除了最后一行,并直接返回了RDD。

我的代码:

代码语言:javascript
复制
/*
  This is copypasta from com.databricks.spark.csv.CsvSchemaRDD
  Spark's code has perfect method converting Dataframe -> raw csv RDD[String]
  But in last lines of that method it's hardcoded against writing as text file -
  for our case we need RDD.
 */
object DataframeToRawCsvRDD {

  val defaultCsvFormat = com.databricks.spark.csv.defaultCsvFormat

  def apply(dataFrame: DataFrame, parameters: Map[String, String] = Map())
           (implicit ctx: ExecutionContext): RDD[String] = {
    val delimiter = parameters.getOrElse("delimiter", ",")
    val delimiterChar = if (delimiter.length == 1) {
      delimiter.charAt(0)
    } else {
      throw new Exception("Delimiter cannot be more than one character.")
    }

    val escape = parameters.getOrElse("escape", null)
    val escapeChar: Character = if (escape == null) {
      null
    } else if (escape.length == 1) {
      escape.charAt(0)
    } else {
      throw new Exception("Escape character cannot be more than one character.")
    }

    val quote = parameters.getOrElse("quote", "\"")
    val quoteChar: Character = if (quote == null) {
      null
    } else if (quote.length == 1) {
      quote.charAt(0)
    } else {
      throw new Exception("Quotation cannot be more than one character.")
    }

    val quoteModeString = parameters.getOrElse("quoteMode", "MINIMAL")
    val quoteMode: QuoteMode = if (quoteModeString == null) {
      null
    } else {
      QuoteMode.valueOf(quoteModeString.toUpperCase)
    }

    val nullValue = parameters.getOrElse("nullValue", "null")

    val csvFormat = defaultCsvFormat
      .withDelimiter(delimiterChar)
      .withQuote(quoteChar)
      .withEscape(escapeChar)
      .withQuoteMode(quoteMode)
      .withSkipHeaderRecord(false)
      .withNullString(nullValue)

    val generateHeader = parameters.getOrElse("header", "false").toBoolean
    val headerRdd = if (generateHeader) {
      ctx.sparkContext.parallelize(Seq(
        csvFormat.format(dataFrame.columns.map(_.asInstanceOf[AnyRef]): _*)
      ))
    } else {
      ctx.sparkContext.emptyRDD[String]
    }

    val rowsRdd = dataFrame.rdd.map(row => {
      csvFormat.format(row.toSeq.map(_.asInstanceOf[AnyRef]): _*)
    })

    headerRdd union rowsRdd
  }

}
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/33174443

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档