在条件语句中结束spark作业的推荐方式是什么?
我正在对我的数据进行验证,如果为false,我希望优雅地结束spark作业。
现在我有:
if (isValid(data)) {
sparkSession.sparkContext.stop()
}
但是,我得到以下错误:
Exception in thread "main" java.lang.IllegalStateException: SparkContext has been shutdown
然后它会显示一个堆栈跟踪。
sparkContext.stop()难道不是优雅地结束spark作业的正确方式吗?
我有一个接受spark DataFrame的函数,我想要获得DataFrames所在的Spark上下文。 原因是我想要获得SQLContext,这样我就可以运行一些SQL查询。 sql_Context = SQLContext(output_df.sparkContext())
sql_Context.registerDataFrameAsTable(output_df, "table1")
sql_Context.sql("select * from table1") # or some more complicated query 但是output_df.
我正在尝试运行简单的数据写入ElasticSearch示例。但是,我一直收到这样的错误:
EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only
我对星火和ElasticSearch的依赖:
scal
我想使用来测试我的作业,方法是使用一个名为localTest的新函数扩展FunSuite,该函数使用默认的SparkContext运行测试:
class SparkFunSuite extends FunSuite {
def localTest(name : String)(f : SparkContext => Unit) : Unit = {
val conf = new SparkConf().setAppName(name).setMaster("local")
val sc = new SparkContext(conf)
try
我有一个简单的火花作业,它从s3读取一个文件,取5个文件,然后用s3写回去。我看到的是,在s3中总是有额外的文件,在我的输出“目录”(称为output_$folder$ )旁边。
那是什么?我怎样才能阻止火花的产生?这是一些代码来说明我在做什么..。
x = spark.sparkContext.textFile("s3n://.../0000_part_00")
five = x.take(5)
five = spark.sparkContext.parallelize(five)
five.repartition(1).saveAsTextFile("s3n://p
当我运行pyspark.SparkContext('loc', 'pyspark_rec')时,出现了一个错误,说它无法解析master URL。作为spark编程的初学者,我不太确定这是什么意思。但就我的代码而言,我没有使用任何部署模块(YARN、Hadoop等),而是在独立模式下测试代码。因此,我认为将URL分配给'loc‘是可以的。但是谁能给我解释一下我该如何解决这个问题?谢谢。
以下是错误代码。
File "recommender.py", line 112, in spark_recommendations
sc = p
我想在spark中扩展SparkSession类。我复制了这里部分复制的原始SparkSession的构造函数:
class SparkSession private(
@transient val sparkContext: SparkContext,
@transient private val existingSharedState: Option[SharedState],
@transient private val parentSessionState: Option[SessionState],
@transient private[sql] va
我想测试机器学习库Mllib form spark,并在eclipse中设置一个带有依赖项的maven项目。当我尝试编译示例时,我得到了以下异常:`ERROR SparkContext:初始化SparkContext时出错。
ERROR SparkContext: Error initializing SparkContext.
org.apache.spark.SparkException: A master URL must be set in your configuration
at org.apache.spark.SparkContext.<init>(
我无法理解withScope方法的功能(实际上,我不知道RDDOperationScope类的含义)
特别是,(body:=> T)在withScope方法的参数列表中的含义是什么:
private[spark] def withScope[T](
sc: SparkContext,
name: String,
allowNesting: Boolean,
ignoreParent: Boolean)(body: => T): T = {
// Save the old scope to restore it later
val scopeKey = SparkCo