file = sc.textFile(inputFile)
#创建Accumulator[Int]并初始化为0
blankLines = sc.accumulator(0)
def extractCallSigns(Line):
globle blankLines #访问全局变量
if (line == ""):
blankLines += 1
return line.split("")
callSigns = file.flatMap(extractCallSigns)
callSigns.saveAsTextFile(outputDir + "/callsigns")
print "Blank lines:%d" % blankLines.value
Scala代码使用广播变量查询国家
//查询RDD contactCounts中的呼号的对应位置。将呼号前缀
//读取为国家代码进行查询
val signPrefixes = sc.broadcast(loadCallSignTable())
val countryContactCounts = contactCounts.map{case (sign,count) =>
val country = lookupInArray(sign,signPrefixes.value)
(country,count)
}.reduceByKey((x,y) => x+y)
countryContactCounts.saveAsTextFile(outputDir + "/countries.text")