
在数字阅读产业高速发展的背景下,海量小说数据成为内容分析、用户画像构建、版权监测等业务的核心资产。传统单机爬虫面对百万级甚至亿级小说资源时,存在采集效率低、任务调度难、数据处理能力弱等问题。Apache Spark 作为分布式计算框架,凭借其内存计算、弹性分布式数据集(RDD)和分布式任务调度能力,成为构建海量小说数据采集系统的理想选择。本文将从系统架构、核心模块设计、技术实现等维度,详解基于 Spark 的小说数据爬虫系统构建过程。
针对小说数据采集的特殊性(多源站点、内容分散、反爬机制多样、数据量大),Spark 爬虫系统需实现以下目标:
Spark 爬虫系统采用分层设计,从上至下分为:
Spark 的核心优势在于将爬取任务拆分为多个 Partition,分发至不同 Executor 节点并行执行。系统首先构建小说站点的 URL 种子池,通过 Spark 的parallelize方法将 URL 列表转化为 RDD,每个 Partition 对应一批待爬取的 URL,由不同节点处理。
retry算子和 Checkpoint 机制,对失败任务标记并重新调度。小说站点常见反爬手段包括 IP 封禁、User-Agent 检测、Cookie 验证、动态页面渲染,系统针对性设计解决方案:
爬取的小说数据包含 HTML 源码、冗余广告、乱码等问题,需通过 Spark 分布式计算完成清洗:
scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
object NovelCrawler {
def main(args: Array[String]): Unit = {
// Spark配置:设置应用名、集群模式
val conf = new SparkConf()
.setAppName("NovelCrawler_Spark")
.setMaster("yarn") // 集群模式使用yarn,本地测试用local[*]
.set("spark.executor.memory", "4g") // 每个Executor内存
.set("spark.cores.max", "100") // 最大使用核数
.set("spark.task.maxFailures", "3") // 任务最大失败次数
val sc = new SparkContext(conf)
sc.setCheckpointDir("hdfs://cluster/checkpoint/novel_crawler") // 容错检查点
// 加载URL种子池(从HDFS读取待爬取的小说URL列表)
val urlRDD: RDD[String] = sc.textFile("hdfs://cluster/novel/seed_urls.txt")
// 分布式爬取
val novelDataRDD: RDD[NovelInfo] = urlRDD.mapPartitions(urls => {
// 每个Partition初始化爬虫客户端(避免重复创建连接)
val crawler = new NovelCrawlerClient()
urls.map(url => crawler.crawl(url))
}).filter(_ != null) // 过滤爬取失败的数据
// 数据清洗
val cleanedNovelRDD: RDD[NovelInfo] = novelDataRDD.map(novel => {
// 清洗章节内容:剔除广告、乱码
val cleanContent = NovelCleaner.cleanContent(novel.content)
// 统一字符编码
val normalizedTitle = new String(novel.title.getBytes("ISO-8859-1"), "UTF-8")
NovelInfo(novel.id, normalizedTitle, novel.author, cleanContent, novel.updateTime)
})
// 数据落地:写入HBase
cleanedNovelRDD.foreachPartition(novels => {
val hbaseClient = new HBaseClient("novel_table") // 初始化HBase客户端
novels.foreach(novel => {
hbaseClient.put(
rowKey = novel.id,
family = "content",
columns = Map(
"title" -> novel.title,
"author" -> novel.author,
"content" -> novel.content,
"update_time" -> novel.updateTime.toString
)
)
})
hbaseClient.close()
})
sc.stop()
}
}步骤 2:爬虫客户端实现(NovelCrawlerClient)scala
import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.CloseableHttpClient
import org.apache.http.impl.client.HttpClients
import org.jsoup.Jsoup
import org.jsoup.nodes.Document
import org.openqa.selenium.WebDriver
import org.openqa.selenium.chrome.ChromeDriver
import java.util.concurrent.TimeUnit
// 小说信息样例类
case class NovelInfo(
id: String, // 小说ID(URL哈希生成)
title: String, // 标题
author: String, // 作者
content: String, // 章节内容
updateTime: Long // 更新时间戳
)
class NovelCrawlerClient {
// 初始化HTTP客户端
private val httpClient: CloseableHttpClient = HttpClients.createDefault()
// 初始化Selenium WebDriver(处理动态页面)
private val driver: WebDriver = {
System.setProperty("webdriver.chrome.driver", "/opt/chromedriver") // 集群节点ChromeDriver路径
val d = new ChromeDriver()
d.manage().timeouts().pageLoadTimeout(30, TimeUnit.SECONDS)
d
}
// 核心爬取方法
def crawl(url: String): NovelInfo = {
try {
// 1. 发送请求(带代理、请求头)
val request = new HttpGet(url)
request.addHeader("User-Agent", getRandomUserAgent())
request.addHeader("Referer", "https://www.novel-site.com")
// 设置代理IP(从代理池获取)
val proxy = ProxyPool.getRandomProxy()
if (proxy != null) {
val httpHost = new HttpHost(proxy.ip, proxy.port)
request.setConfig(RequestConfig.custom().setProxy(httpHost).build())
}
// 2. 响应处理
val response = httpClient.execute(request)
val statusCode = response.getStatusLine().getStatusCode()
if (statusCode == 200) {
// 3. 解析页面(区分静态/动态页面)
val document: Document = if (isDynamicPage(url)) {
// 动态页面使用Selenium渲染
driver.get(url)
Jsoup.parse(driver.getPageSource())
} else {
// 静态页面直接解析
val entity = response.getEntity()
Jsoup.parse(EntityUtils.toString(entity), "UTF-8")
}
// 4. 提取核心信息
val title = document.select("h1.novel-title").text()
val author = document.select("span.author").text()
val content = document.select("div.chapter-content").text()
val updateTime = document.select("span.update-time").attr("data-time").toLong
val id = MD5Util.md5(url) // 生成唯一ID
NovelInfo(id, title, author, content, updateTime)
} else {
// 非200状态码标记失败
null
}
} catch {
case e: Exception =>
e.printStackTrace()
null
}
}
// 随机生成User-Agent
private def getRandomUserAgent(): String = {
val userAgents = Array(
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Firefox/114.0"
)
userAgents(scala.util.Random.nextInt(userAgents.length))
}
// 判断是否为动态页面(根据URL特征)
private def isDynamicPage(url: String): Boolean = {
url.contains("dynamic_chapter") || url.contains("js_render")
}
// 关闭资源
def close(): Unit = {
httpClient.close()
driver.quit()
}
}步骤 3:数据清洗工具类(NovelCleaner)scala
import java.util.regex.Pattern
object NovelCleaner {
// 清洗章节内容:剔除广告、无关链接、特殊字符
def cleanContent(content: String): String = {
if (content == null || content.isEmpty) return ""
// 正则剔除广告文本
val adPattern = Pattern.compile("【.*?广告.*?】|点击阅读.*?|VIP章节.*?|\\s{4,}")
val adMatcher = adPattern.matcher(content)
var cleanContent = adMatcher.replaceAll("")
// 剔除特殊字符和乱码
cleanContent = cleanContent.replaceAll("[^\\u4e00-\\u9fa5a-zA-Z0-9,。!?;:、\\s]", "")
// 去除多余空格和换行
cleanContent = cleanContent.replaceAll("\\n+", "\n").replaceAll("\\s+", " ")
cleanContent
}
}scala
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Connection
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Table
import org.apache.hadoop.hbase.util.Bytes
class HBaseClient(tableName: String) {
private val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "node1,node2,node3") // Zookeeper地址
private val connection: Connection = ConnectionFactory.createConnection(conf)
private val table: Table = connection.getTable(TableName.valueOf(tableName))
// 写入数据到HBase
def put(rowKey: String, family: String, columns: Map[String, String]): Unit = {
val put = new Put(Bytes.toBytes(rowKey))
columns.foreach { case (col, value) =>
put.addColumn(
Bytes.toBytes(family),
Bytes.toBytes(col),
Bytes.toBytes(value)
)
}
table.put(put)
}
// 关闭连接
def close(): Unit = {
table.close()
connection.close()
}
}spark.sql.shuffle.partitions参数(默认 200),根据数据量设置合理的分区数,避免小文件过多;spark.task.maxFailures设置任务最大失败次数,失败任务自动重新调度;基于 Spark 的海量小说数据爬虫系统,通过分布式计算解决了传统单机爬虫的效率瓶颈,同时结合反爬突破、数据清洗、分布式存储等能力,实现了海量小说数据的高效、稳定采集。该系统可适配不同小说站点的特征,通过灵活的参数调优和模块扩展,满足数字阅读行业对大规模数据采集的需求。在实际应用中,需结合站点反爬策略的变化持续优化爬取逻辑,并通过监控体系保障系统的稳定性,最终为小说内容分析、版权保护等业务提供可靠的数据支撑。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。