Elixir凭借其Actor模型与轻量级进程,天然支持高并发。本爬虫利用此优势,结合HTTPoison与Floki,高效实现多任务网页抓取、链接提取与数据解析,是处理大规模数据采集的理想方案。
下面是一个使用 Elixir 实现的高并发爬虫示例。Elixir 的 Actor 模型和轻量级进程使其非常适合构建高并发的网络应用。
defmodule ConcurrentCrawler do
@moduledoc """
高并发网页爬虫实现
使用 Elixir 的并发特性和 HTTPoison 库
"""
use Task
require Logger
@user_agent "Mozilla/5.0 (compatible; ConcurrentCrawler/1.0; +http://example.com/bot.html)"
@default_concurrency 50
@request_timeout 10_000
def start_link(urls, concurrency \\ @default_concurrency) when is_list(urls) do
Task.start_link(fn -> crawl(urls, concurrency) end)
end
@doc """
主爬取函数
"""
def crawl(urls, concurrency) do
# 创建任务监督器
{:ok, supervisor} = Task.Supervisor.start_link(max_children: concurrency)
# 创建流处理管道
urls
|> Stream.map(&String.trim/1)
|> Stream.filter(&valid_url?/1)
|> Stream.uniq()
|> Task.Supervisor.async_stream_nolink(
supervisor,
&process_url/1,
max_concurrency: concurrency,
timeout: @request_timeout * 2
)
|> Stream.run()
end
defp process_url(url) do
Logger.info("Processing: #{url}")
case fetch_url(url) do
{:ok, %{status_code: 200, body: body}} ->
# 提取数据并解析链接
data = extract_data(body, url)
links = extract_links(body, url)
# 存储数据(这里只是打印,实际应用中可存入数据库)
Logger.info("Extracted #{length(links)} links from #{url}")
Logger.debug("Data: #{inspect(data, limit: 3)}")
# 可以选择继续爬取新发现的链接
# crawl(links, concurrency) # 注意:这需要额外的循环控制逻辑
{:ok, url, data, links}
{:ok, response} ->
Logger.warning("Failed to fetch #{url}: Status #{response.status_code}")
{:error, url, response.status_code}
{:error, reason} ->
Logger.error("Error fetching #{url}: #{inspect(reason)}")
{:error, url, reason}
end
end
defp fetch_url(url) do
headers = [
{"User-Agent", @user_agent},
{"Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"}
]
HTTPoison.get(url, headers,
timeout: @request_timeout,
recv_timeout: @request_timeout,
follow_redirect: true
)
end
defp extract_links(html, base_url) do
case Floki.parse_document(html) do
{:ok, document} ->
document
|> Floki.find("a")
|> Floki.attribute("href")
|> Enum.map(&absolute_url(&1, base_url))
|> Enum.filter(&valid_url?/1)
|> Enum.uniq()
_ ->
[]
end
end
defp extract_data(html, url) do
# 这里可以根据需要提取特定数据
# 例如标题、元描述、正文内容等
case Floki.parse_document(html) do
{:ok, document} ->
title =
document
|> Floki.find("title")
|> Floki.text()
|> String.trim()
description =
document
|> Floki.find("meta[name=\"description\"]")
|> Floki.attribute("content")
|> List.first()
|> Kernel.||("")
%{
url: url,
title: title,
description: description,
crawled_at: DateTime.utc_now()
}
_ ->
%{url: url, title: "", description: "", crawled_at: DateTime.utc_now()}
end
end
defp absolute_url(url, base) when is_binary(url) do
case URI.parse(url) do
%URI{scheme: scheme} when scheme in ["http", "https"] ->
url
_ ->
base_uri = URI.parse(base)
absolute_path = Path.expand(url, base_uri.path || "/")
URI.merge(base, absolute_path) |> to_string()
end
end
defp valid_url?(url) do
case URI.parse(url) do
%URI{scheme: scheme, host: host} when scheme in ["http", "https"] and not is_nil(host) ->
true
_ ->
false
end
end
end
# 配置和启动应用
defmodule Crawler.Application do
use Application
def start(_type, _args) do
# 配置HTTPoison
:hackney_pool.start_pool(:crawler_pool, [
timeout: 15_000,
max_connections: 100
])
children = [
{Task.Supervisor, name: Crawler.TaskSupervisor},
{ConcurrentCrawler, [["https://example.com/page1", "https://example.com/page2"]]}
]
opts = [strategy: :one_for_one, name: Crawler.Supervisor]
Supervisor.start_link(children, opts)
end
end具体说明
1、添加依赖到 mix.exs:
defp deps do
[
{:httpoison, "~> 1.8"},
{:floki, "~> 0.32.0"},
{:hackney, "~> 1.17"}
]
end2、运行爬虫:
# 直接运行
urls = ["https://example.com", "https://example.org"]
ConcurrentCrawler.crawl(urls, 20) # 20是并发数
# 或者作为监督树的一部分启动
{:ok, pid} = ConcurrentCrawler.start_link(urls, 20)这个实现利用了 Elixir 的并发特性,能够高效地处理大量网页抓取任务,同时保持良好的可维护性和扩展性。
总之,该爬虫充分展现了Elixir在并发处理上的强大能力。代码结构清晰,易于扩展,您可根据实际需求添加代理、去重或分布式存储等功能,以构建健壮的爬虫系统。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。