首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >Elixir轻量级进程如何成就百万级并发爬虫?

Elixir轻量级进程如何成就百万级并发爬虫?

原创
作者头像
华科云商小徐
发布2025-09-04 11:38:16
发布2025-09-04 11:38:16
2050
举报
文章被收录于专栏:小徐学爬虫小徐学爬虫

Elixir凭借其Actor模型与轻量级进程,天然支持高并发。本爬虫利用此优势,结合HTTPoison与Floki,高效实现多任务网页抓取、链接提取与数据解析,是处理大规模数据采集的理想方案。

下面是一个使用 Elixir 实现的高并发爬虫示例。Elixir 的 Actor 模型和轻量级进程使其非常适合构建高并发的网络应用。

代码语言:javascript
复制
defmodule ConcurrentCrawler do
  @moduledoc """
  高并发网页爬虫实现
  使用 Elixir 的并发特性和 HTTPoison 库
  """
​
  use Task
  require Logger
​
  @user_agent "Mozilla/5.0 (compatible; ConcurrentCrawler/1.0; +http://example.com/bot.html)"
  @default_concurrency 50
  @request_timeout 10_000
​
  def start_link(urls, concurrency \\ @default_concurrency) when is_list(urls) do
    Task.start_link(fn -> crawl(urls, concurrency) end)
  end
​
  @doc """
  主爬取函数
  """
  def crawl(urls, concurrency) do
    # 创建任务监督器
    {:ok, supervisor} = Task.Supervisor.start_link(max_children: concurrency)
​
    # 创建流处理管道
    urls
    |> Stream.map(&String.trim/1)
    |> Stream.filter(&valid_url?/1)
    |> Stream.uniq()
    |> Task.Supervisor.async_stream_nolink(
      supervisor,
      &process_url/1,
      max_concurrency: concurrency,
      timeout: @request_timeout * 2
    )
    |> Stream.run()
  end
​
  defp process_url(url) do
    Logger.info("Processing: #{url}")
​
    case fetch_url(url) do
      {:ok, %{status_code: 200, body: body}} ->
        # 提取数据并解析链接
        data = extract_data(body, url)
        links = extract_links(body, url)
        
        # 存储数据(这里只是打印,实际应用中可存入数据库)
        Logger.info("Extracted #{length(links)} links from #{url}")
        Logger.debug("Data: #{inspect(data, limit: 3)}")
        
        # 可以选择继续爬取新发现的链接
        # crawl(links, concurrency) # 注意:这需要额外的循环控制逻辑
        
        {:ok, url, data, links}
​
      {:ok, response} ->
        Logger.warning("Failed to fetch #{url}: Status #{response.status_code}")
        {:error, url, response.status_code}
​
      {:error, reason} ->
        Logger.error("Error fetching #{url}: #{inspect(reason)}")
        {:error, url, reason}
    end
  end
​
  defp fetch_url(url) do
    headers = [
      {"User-Agent", @user_agent},
      {"Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"}
    ]
​
    HTTPoison.get(url, headers,
      timeout: @request_timeout,
      recv_timeout: @request_timeout,
      follow_redirect: true
    )
  end
​
  defp extract_links(html, base_url) do
    case Floki.parse_document(html) do
      {:ok, document} ->
        document
        |> Floki.find("a")
        |> Floki.attribute("href")
        |> Enum.map(&absolute_url(&1, base_url))
        |> Enum.filter(&valid_url?/1)
        |> Enum.uniq()
​
      _ ->
        []
    end
  end
​
  defp extract_data(html, url) do
    # 这里可以根据需要提取特定数据
    # 例如标题、元描述、正文内容等
    
    case Floki.parse_document(html) do
      {:ok, document} ->
        title =
          document
          |> Floki.find("title")
          |> Floki.text()
          |> String.trim()
​
        description =
          document
          |> Floki.find("meta[name=\"description\"]")
          |> Floki.attribute("content")
          |> List.first()
          |> Kernel.||("")
​
        %{
          url: url,
          title: title,
          description: description,
          crawled_at: DateTime.utc_now()
        }
​
      _ ->
        %{url: url, title: "", description: "", crawled_at: DateTime.utc_now()}
    end
  end
​
  defp absolute_url(url, base) when is_binary(url) do
    case URI.parse(url) do
      %URI{scheme: scheme} when scheme in ["http", "https"] ->
        url
​
      _ ->
        base_uri = URI.parse(base)
        absolute_path = Path.expand(url, base_uri.path || "/")
        URI.merge(base, absolute_path) |> to_string()
    end
  end
​
  defp valid_url?(url) do
    case URI.parse(url) do
      %URI{scheme: scheme, host: host} when scheme in ["http", "https"] and not is_nil(host) ->
        true
      _ ->
        false
    end
  end
end
​
# 配置和启动应用
defmodule Crawler.Application do
  use Application
​
  def start(_type, _args) do
    # 配置HTTPoison
    :hackney_pool.start_pool(:crawler_pool, [
      timeout: 15_000,
      max_connections: 100
    ])
​
    children = [
      {Task.Supervisor, name: Crawler.TaskSupervisor},
      {ConcurrentCrawler, [["https://example.com/page1", "https://example.com/page2"]]}
    ]
​
    opts = [strategy: :one_for_one, name: Crawler.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

具体说明

1、添加依赖到 mix.exs:

代码语言:javascript
复制
defp deps do
  [
    {:httpoison, "~> 1.8"},
    {:floki, "~> 0.32.0"},
    {:hackney, "~> 1.17"}
  ]
end

2、运行爬虫:

代码语言:javascript
复制
# 直接运行
urls = ["https://example.com", "https://example.org"]
ConcurrentCrawler.crawl(urls, 20) # 20是并发数
​
# 或者作为监督树的一部分启动
{:ok, pid} = ConcurrentCrawler.start_link(urls, 20)

这个实现利用了 Elixir 的并发特性,能够高效地处理大量网页抓取任务,同时保持良好的可维护性和扩展性。

总之,该爬虫充分展现了Elixir在并发处理上的强大能力。代码结构清晰,易于扩展,您可根据实际需求添加代理、去重或分布式存储等功能,以构建健壮的爬虫系统。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档