之前 GeoTrellis 为方便用户将数据(GeoTiff 等遥感影像)导入到 backend (包含 Accumulo、File、Hadoop 等格式)中,编写了一个 ETL 类,该类的输入为用户配置好的 json 文件,其中包含数据源、数据类型、投影、瓦片类型、处理方式等等处理过程中需要用到的信息。
从 2.0 版开始,GeoTrellis 加入了流水线(pipeline)功能,用户可以使用 json 或者 AST 将数据的处理过程配置成处理的流水线过程,这样只需要执行此流水线,系统便会自动的将输入数据处理成最终结果。
本文简单分析 GeoTrellis 中的流水线实现方式,并探讨此技术在其他方面的应用。
其实在功能和性能上并没有任何的改进,只是将原来的 ETL 类代码变成了流水线中的一个个节点,这些节点的信息仍是原来 json 配置文件中的信息。那么做此改进到底有什么用呢?我们先来看一下前后两种方式同一种数据处理方式的代码。
object Etl {
val defaultModules = Array(s3.S3Module, hadoop.HadoopModule, file.FileModule, accumulo.AccumuloModule, cassandra.CassandraModule, hbase.HBaseModule)
type SaveAction[K, V, M] = (AttributeStore, Writer[LayerId, RDD[(K, V)] with Metadata[M]], LayerId, RDD[(K, V)] with Metadata[M]) => Unit
object SaveAction {
def DEFAULT[K, V, M] = {
(_: AttributeStore, writer: Writer[LayerId, RDD[(K, V)] with Metadata[M]], layerId: LayerId, rdd: RDD[(K, V)] with Metadata[M]) =>
writer.write(layerId, rdd)
}
}
def ingest[
I: Component[?, ProjectedExtent]: TypeTag: ? => TilerKeyMethods[I, K],
K: SpatialComponent: Boundable: TypeTag,
V <: CellGrid: TypeTag: RasterRegionReproject: Stitcher: (? => TileReprojectMethods[V]): (? => CropMethods[V]): (? => TileMergeMethods[V]): (? => TilePrototypeMethods[V])
](
args: Seq[String], modules: Seq[TypedModule] = Etl.defaultModules
)(implicit sc: SparkContext) = {
implicit def classTagK = ClassTag(typeTag[K].mirror.runtimeClass(typeTag[K].tpe)).asInstanceOf[ClassTag[K]]
implicit def classTagV = ClassTag(typeTag[V].mirror.runtimeClass(typeTag[V].tpe)).asInstanceOf[ClassTag[V]]
EtlConf(args) foreach { conf =>
/* parse command line arguments */
val etl = Etl(conf, modules)
/* load source tiles using input module specified */
val sourceTiles = etl.load[I, V]
/* perform the reprojection and mosaicing step to fit tiles to LayoutScheme specified */
val (zoom, tiled) = etl.tile(sourceTiles)
/* save and optionally pyramid the mosaiced layer */
etl.save[K, V](LayerId(etl.input.name, zoom), tiled)
}
}
}
case class Etl(conf: EtlConf, @transient modules: Seq[TypedModule] = Etl.defaultModules) extends LazyLogging {
import Etl.SaveAction
val input = conf.input
val output = conf.output
def scheme: Either[LayoutScheme, LayoutDefinition] = {
if (output.layoutScheme.nonEmpty) {
val scheme = output.getLayoutScheme
logger.info(scheme.toString)
Left(scheme)
} else if (output.layoutExtent.nonEmpty) {
val layout = output.getLayoutDefinition
logger.info(layout.toString)
Right(layout)
} else
sys.error("Either layoutScheme or layoutExtent with cellSize/tileLayout must be provided")
}
@transient val combinedModule = modules reduce (_ union _)
/**
* Loads RDD of rasters using the input module specified in the arguments.
* This RDD will contain rasters as they are stored, possibly overlapping and not conforming to any tile layout.
*
* @tparam I Input key type
* @tparam V Input raster value type
*/
def load[I: Component[?, ProjectedExtent]: TypeTag, V <: CellGrid: TypeTag]()(implicit sc: SparkContext): RDD[(I, V)] = {
val plugin = {
val plugins = combinedModule.findSubclassOf[InputPlugin[I, V]]
if(plugins.isEmpty) sys.error(s"Unable to find input module for input key type '${typeTag[I].tpe.toString}' and tile type '${typeTag[V].tpe.toString}'")
plugins.find(_.suitableFor(input.backend.`type`.name, input.format)).getOrElse(sys.error(s"Unable to find input module of type '${input.backend.`type`.name}' for format `${input.format} " +
s"for input key type '${typeTag[I].tpe.toString}' and tile type '${typeTag[V].tpe.toString}'"))
}
// clip in dest crs
input.clip.fold(plugin(conf))(extent => plugin(conf).filter { case (k, _) =>
val pe = k.getComponent[ProjectedExtent]
output.getCrs.fold(extent.contains(pe.extent))(crs => extent.contains(pe.extent.reproject(pe.crs, crs)))
})
}
/**
* Tiles RDD of arbitrary rasters to conform to a layout scheme or definition provided in the arguments.
* First metadata will be collected over input rasters to determine the overall extent, common crs, and resolution.
* This information will be used to select a LayoutDefinition if LayoutScheme is provided in the arguments.
*
* The tiling step will use this LayoutDefinition to cut input rasters into chunks that conform to the layout.
* If multiple rasters contribute to single target tile their values will be merged cell by cell.
*
* The timing of the reproject steps depends on the method chosen.
* BufferedReproject must be performed after the tiling step because it leans on SpatialComponent to identify neighboring
* tiles and sample their edge pixels. This method is the default and produces the best results.
*
* PerTileReproject method will be performed before the tiling step, on source tiles. When using this method the
* reproject logic does not have access to pixels past the tile boundary and will see them as NODATA.
* However, this approach does not require all source tiles to share a projection.
*
* @param rdd RDD of source rasters
* @param method Resampling method to be used when merging raster chunks in tiling step
*/
def tile[
I: Component[?, ProjectedExtent]: (? => TilerKeyMethods[I, K]),
V <: CellGrid: RasterRegionReproject: Stitcher: ClassTag: (? => TileMergeMethods[V]): (? => TilePrototypeMethods[V]):
(? => TileReprojectMethods[V]): (? => CropMethods[V]),
K: SpatialComponent: Boundable: ClassTag
](rdd: RDD[(I, V)], method: ResampleMethod = output.resampleMethod): (Int, RDD[(K, V)] with Metadata[TileLayerMetadata[K]]) = {
val targetCellType = output.cellType
val destCrs = output.getCrs.get
/** Tile layers form some resolution and adjust partition count based on resolution difference */
def resizingTileRDD(
rdd: RDD[(I, V)],
floatMD: TileLayerMetadata[K],
targetLayout: LayoutDefinition
): RDD[(K, V)] with Metadata[TileLayerMetadata[K]] = {
// rekey metadata to targetLayout
val newSpatialBounds = KeyBounds(targetLayout.mapTransform(floatMD.extent))
val tiledMD = floatMD.copy(
bounds = floatMD.bounds.setSpatialBounds(newSpatialBounds),
layout = targetLayout
)
// > 1 means we're upsampling during tiling process
val resolutionRatio = floatMD.layout.cellSize.resolution / targetLayout.cellSize.resolution
val tilerOptions = Tiler.Options(
resampleMethod = method,
partitioner = new HashPartitioner(
partitions = (math.pow(2, (resolutionRatio - 1) * 2) * rdd.partitions.length).toInt))
rdd.tileToLayout[K](tiledMD, tilerOptions)
}
output.reprojectMethod match {
case PerTileReproject =>
def reprojected(targetCellSize: Option[CellSize] = None) = {
val reprojectedRdd = rdd.reproject(destCrs, RasterReprojectOptions(method = method, targetCellSize = targetCellSize))
val floatMD = { // collecting floating metadata allows detecting upsampling
val (_, md) = reprojectedRdd.collectMetadata(FloatingLayoutScheme(output.tileSize))
md.copy(cellType = targetCellType.getOrElse(md.cellType))
}
reprojectedRdd -> floatMD
}
scheme match {
case Left(scheme: ZoomedLayoutScheme) if output.maxZoom.isDefined =>
val LayoutLevel(zoom, layoutDefinition) = scheme.levelForZoom(output.maxZoom.get)
val (reprojectedRdd, floatMD) = reprojected(Some(layoutDefinition.cellSize))
zoom -> resizingTileRDD(reprojectedRdd, floatMD, layoutDefinition)
case Left(scheme) => // True for both FloatinglayoutScheme and ZoomedlayoutScheme
val (reprojectedRdd, floatMD) = reprojected()
val LayoutLevel(zoom, layoutDefinition) = scheme.levelFor(floatMD.extent, floatMD.cellSize)
zoom -> resizingTileRDD(reprojectedRdd, floatMD, layoutDefinition)
case Right(layoutDefinition) =>
val (reprojectedRdd, floatMD) = reprojected()
0 -> resizingTileRDD(reprojectedRdd, floatMD, layoutDefinition)
}
case BufferedReproject =>
// Buffered reproject requires that tiles are already in some layout so we can find the neighbors
val md = { // collecting floating metadata allows detecting upsampling
val (_, md) = rdd.collectMetadata(FloatingLayoutScheme(output.tileSize))
md.copy(cellType = targetCellType.getOrElse(md.cellType))
}
val tiled = ContextRDD(rdd.tileToLayout[K](md, method), md)
scheme match {
case Left(layoutScheme: ZoomedLayoutScheme) if output.maxZoom.isDefined =>
val LayoutLevel(zoom, layoutDefinition) = layoutScheme.levelForZoom(output.maxZoom.get)
(zoom, output.bufferSize match {
case Some(bs) => tiled.reproject(destCrs, layoutDefinition, bs, RasterReprojectOptions(method = method, targetCellSize = Some(layoutDefinition.cellSize)))._2
case _ => tiled.reproject(destCrs, layoutDefinition, RasterReprojectOptions(method = method, targetCellSize = Some(layoutDefinition.cellSize)))._2
})
case Left(layoutScheme) =>
output.bufferSize match {
case Some(bs) => tiled.reproject(destCrs, layoutScheme, bs, method)
case _ => tiled.reproject(destCrs, layoutScheme, method)
}
case Right(layoutDefinition) =>
output.bufferSize match {
case Some(bs) => tiled.reproject(destCrs, layoutDefinition, bs, method)
case _ => tiled.reproject(destCrs, layoutDefinition, method)
}
}
}
}
/**
* Saves provided RDD to an output module specified by the ETL arguments.
* This step may perform two to one pyramiding until zoom level 1 is reached.
*
* @param id Layout ID to b
* @param rdd Tiled raster RDD with TileLayerMetadata
* @param saveAction Function to be called for saving. Defaults to writing the layer.
* This gives the caller an oppurtunity to modify the layer before writing,
* or to save additional attributes in the attributes store.
*
* @tparam K Key type with SpatialComponent corresponding LayoutDefinition
* @tparam V Tile raster with cells from single tile in LayoutDefinition
*/
def save[
K: SpatialComponent: TypeTag,
V <: CellGrid: TypeTag: ? => TileMergeMethods[V]: ? => TilePrototypeMethods[V]
](
id: LayerId,
rdd: RDD[(K, V)] with Metadata[TileLayerMetadata[K]],
saveAction: SaveAction[K, V, TileLayerMetadata[K]] = SaveAction.DEFAULT[K, V, TileLayerMetadata[K]]
): Unit = {
implicit def classTagK = ClassTag(typeTag[K].mirror.runtimeClass(typeTag[K].tpe)).asInstanceOf[ClassTag[K]]
implicit def classTagV = ClassTag(typeTag[V].mirror.runtimeClass(typeTag[V].tpe)).asInstanceOf[ClassTag[V]]
val outputPlugin =
combinedModule
.findSubclassOf[OutputPlugin[K, V, TileLayerMetadata[K]]]
.find { _.suitableFor(output.backend.`type`.name) }
.getOrElse(sys.error(s"Unable to find output module of type '${output.backend.`type`.name}'"))
def savePyramid(zoom: Int, rdd: RDD[(K, V)] with Metadata[TileLayerMetadata[K]]): Unit = {
val currentId = id.copy(zoom = zoom)
outputPlugin(currentId, rdd, conf, saveAction)
scheme match {
case Left(s) =>
if (output.pyramid && zoom >= 1) {
val (nextLevel, nextRdd) = Pyramid.up(rdd, s, zoom, output.getPyramidOptions)
savePyramid(nextLevel, nextRdd)
}
case Right(_) =>
if (output.pyramid)
logger.error("Pyramiding only supported with layoutScheme, skipping pyramid step")
}
}
savePyramid(id.zoom, rdd)
logger.info("Done")
}
}
implicit val sc: SparkContext = ???
val scheme = Left[LayoutScheme, LayoutDefinition](FloatingLayoutScheme(512))
val jsonRead = JsonRead("s3://geotrellis-test/", `type` = ReadTypes.SpatialS3Type)
val jsonTileToLayout = TileToLayout(`type` = TransformTypes.SpatialTileToLayoutType)
val jsonReproject = Reproject("EPSG:3857", scheme, `type` = TransformTypes.SpatialBufferedReprojectType)
val jsonPyramid = Pyramid(`type` = TransformTypes.SpatialPyramidType)
val jsonWrite = JsonWrite("mask", "s3://geotrellis-test/pipeline/", PipelineKeyIndexMethod("zorder"), scheme, `type` = WriteTypes.SpatialType)
val list: List[PipelineExpr] = jsonRead ~ jsonTileToLayout ~ jsonReproject ~ jsonPyramid ~ jsonWrite
// typed way, as in the JSON example above
val typedAst: Node[Stream[(Int, TileLayerRDD[SpatialKey])]] =
list
.node[Stream[(Int, TileLayerRDD[SpatialKey])]]
val result: Stream[(Int, TileLayerRDD[SpatialKey])] = typedAst.eval
// in some cases you may want just to evaluate the pipeline
// to add some flexibility we can do parsing and avaluation steps manually
// erasedNode function would parse JSON into an ErasedNode type that can be evaluated
val untypedAst: ErasedNode = list.erasedNode
// it would be an untyped result, just some evaluation
// but you still have a chance to catch and handle some types of exceptions
val untypedResult: Any =
Try {
untypedAst.unsafeEval
} match {
case Success(_) =>
case Failure(e) =>
}
// typed result
val typedResult: Option[Stream[(Int, TileLayerRDD[SpatialKey])]] =
Try {
untypedAst.eval
} match {
case Success(stream) => Some(stream)
case Failure(e) => None
}
从代码量我们就能看出来新的流水线方式明显减少了很多,其实正如前面所说,流水线就是将之前的操作封装成了一个个的操作节点,每种节点的代码已经写好,用户只需要将自己需要操作的节点串联起来,最终执行整个流水线即可。
认真的或者是熟悉 GeoTreliis 数据 ETL 的用户都知道,其实 ETL 无非是以单波段、多波段两种波段形式的栅格数据以及无时间数据和时间序列数据的两种时间格式的组合类型为输入及数据的存储位置(S3、Hadoop、File等),取出此数据并做投影转换、合并、生成金字塔等变换,最终将数据写入指定的 backend。
所以其 Pipeline 实现就是定义了对应的 ReadType、TreansformType、WriteType。我们看上面的例子
val jsonRead = JsonRead("s3://geotrellis-test/", `type` = ReadTypes.SpatialS3Type)
指定了 Read 部分,包含存放路径、存放位置(S3)、数据类型(Singleband)、时间格式(Spatial 无时间标记)。
val jsonTileToLayout = TileToLayout(`type` = TransformTypes.SpatialTileToLayoutType)
val jsonReproject = Reproject("EPSG:3857", scheme, `type` = TransformTypes.SpatialBufferedReprojectType)
val jsonPyramid = Pyramid(`type` = TransformTypes.SpatialPyramidType)
TileToLayout 将数据变成具有数据类型、空间布局等信息的 RDD,方便后续的瓦片切割等操作。
Reproject 对数据做投影变换。
Pyramid 将数据切割成金字塔。
val jsonWrite = JsonWrite("mask", "s3://geotrellis-test/pipeline/", PipelineKeyIndexMethod("zorder"), scheme, `type` = WriteTypes.SpatialType)
JsonWrite 指定数据的输出方式,包含索引方式、输出类型,并且系统自动根据给定的 uri 判断输出存储位置。
到此,已经指定好了上述的三种节点。
val list: List[PipelineExpr] = jsonRead ~ jsonTileToLayout ~ jsonReproject ~ jsonPyramid ~ jsonWrite
此句将上述的操作节点串联起来生成了一个 List。
val typedAst: Node[Stream[(Int, TileLayerRDD[SpatialKey])]] =
list
.node[Stream[(Int, TileLayerRDD[SpatialKey])]]
val result: Stream[(Int, TileLayerRDD[SpatialKey])] = typedAst.eval
上述两句生成对应的节点序列,最终执行 eval
函数,执行整个流水线得到最终结果。
就是这么简单的几句,完成了整个数据的处理流程,需要注意的是在串联最终流水线的时候,前一个数据的输出一定是后一个数据的输入类型,否则流水线便无法继续执行。
整个原理很类似最近很火的 TensorFlow、Keras 等神经网络框架,首先定义一个神经网络节点处理模型,其实就是数据处理模型,二者是一致的,只不过神经网络更关注数据的状态,比如维度、尺寸(节点数量)等等,而 GeoTrellis 关注的是数据处理 的方式。
关于 GeoTrellis 的流水线实现原理就介绍到这里,感兴趣的朋友可以查阅源码进行进一步分析。
认真学习了 GeoTrellis 的 Pipeline 技术 后,我发现很多东西都可以用这种方式来实现,比如刚刚讲到的神经网络。再比如我们可以将遥感数据的其他处理也封装成流水线,如不同的模型计算、匀光匀色、正射纠正等等。
凡是这种涉及到前后因果关联或是需要一步步进行操作的过程都可以封装成流水线,使得在后续处理的时候更加的规范化并且代码更精简,更方便使用。这也正是福特汽车为整个汽车工业带来的革命性巨变。
最近读计算机原理的相关书籍,也着重介绍了 CPU 指令工作的流水线技术,这些技术也可以用到数据处理中来,将数据处理流程按照指令来运行,这样比如对于涉及到大量内存操作或涉及到大量 CPU 操作的就可以错开,可以保持服务器的全负荷运行,必然能够加快处理速度。
本文介绍了 GeoTrellis 中的 Pipeline 实现原理,并简单分析了此技术对于我们处理其他技术的一些启发。
Geotrellis系列文章链接地址http://www.cnblogs.com/shoufengwei/p/5619419.html