首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

ETS

本章是Mix和OTP指南的一部分,它取决于本指南的前几章。有关更多信息,请阅读简介指南或查看边栏中的章节索引。

每当我们需要查找存储桶时,我们都需要向注册表发送消息。如果我们的注册表被多个进程同时访问,注册表可能会成为瓶颈!

在本章中,我们将学习ETS(Erlang Term Storage)以及如何将其用作缓存机制。

警告!不要过早使用ETS作为缓存!记录并分析您的应用程序性能并确定哪些部分是瓶颈,以便知道您是否应该缓存以及应该缓存什么。一旦你确定了需求,本章仅仅是ETS如何使用的一个例子。

ETS作为缓存

ETS允许我们将任何Elixir术语存储在内存表中。使用ETS表格通过Erlang的:ets模块完成

代码语言:javascript
复制
iex> table = :ets.new(:buckets_registry, [:set, :protected])
8207
iex> :ets.insert(table, {"foo", self()})
true
iex> :ets.lookup(table, "foo")
[{"foo", #PID<0.41.0>}]

创建ETS表时,需要两个参数:表名和一组选项。从可用选项中,我们传递了表类型及其访问规则。我们选择了:set类型,这意味着密钥不能被复制。我们还设置了表的访问权限:protected,这意味着只有创建该表的进程才能写入该表,但所有进程都可以读取该进程。这些实际上是默认值,所以我们将从现在开始跳过它们。

ETS表也可以命名,允许我们通过给定名称访问它们:

代码语言:javascript
复制
iex> :ets.new(:buckets_registry, [:named_table])
:buckets_registry
iex> :ets.insert(:buckets_registry, {"foo", self()})
true
iex> :ets.lookup(:buckets_registry, "foo")
[{"foo", #PID<0.41.0>}]

让我们改变KV.Registry使用ETS表格。第一个变化是修改我们的注册表以要求名称参数,我们将用它来命名ETS表和注册表进程本身。ETS名称和进程名称存储在不同的位置,所以不存在冲突的可能性。

打开lib/kv/registry.ex,让我们改变它的实现。我们在源代码中添加了评论以突出显示我们所做的更改:

代码语言:javascript
复制
defmodule KV.Registry do
  use GenServer

  ## Client API

  @doc """
  Starts the registry with the given options.

  `:name` is always required.
  """
  def start_link(opts) do
    # 1. Pass the name to GenServer's init
    server = Keyword.fetch!(opts, :name)
    GenServer.start_link(__MODULE__, server, opts)
  end

  @doc """
  Looks up the bucket pid for `name` stored in `server`.

  Returns `{:ok, pid}` if the bucket exists, `:error` otherwise.
  """
  def lookup(server, name) do
    # 2. Lookup is now done directly in ETS, without accessing the server
    case :ets.lookup(server, name) do
      [{^name, pid}] -> {:ok, pid}
      [] -> :error
    end
  end

  @doc """
  Ensures there is a bucket associated with the given `name` in `server`.
  """
  def create(server, name) do
    GenServer.cast(server, {:create, name})
  end

  ## Server callbacks

  def init(table) do
    # 3. We have replaced the names map by the ETS table
    names = :ets.new(table, [:named_table, read_concurrency: true])
    refs  = %{}
    {:ok, {names, refs}}
  end

  # 4. The previous handle_call callback for lookup was removed

  def handle_cast({:create, name}, {names, refs}) do
    # 5. Read and write to the ETS table instead of the map
    case lookup(names, name) do
      {:ok, _pid} ->
        {:noreply, {names, refs}}
      :error ->
        {:ok, pid} = KV.BucketSupervisor.start_bucket()
        ref = Process.monitor(pid)
        refs = Map.put(refs, ref, name)
        :ets.insert(names, {name, pid})
        {:noreply, {names, refs}}
    end
  end

  def handle_info({:DOWN, ref, :process, _pid, _reason}, {names, refs}) do
    # 6. Delete from the ETS table instead of the map
    {name, refs} = Map.pop(refs, ref)
    :ets.delete(names, name)
    {:noreply, {names, refs}}
  end

  def handle_info(_msg, state) do
    {:noreply, state}
  end
end

请注意,在我们的更改KV.Registry.lookup/2向服务器发送请求之前,现在它直接从ETS表中读取,该表在所有进程之间共享。这是我们正在实施的缓存机制背后的主要思想。

为了使缓存机制起作用,创建的ETS表需要具有访问权限:protected(缺省值),因此所有客户端都可以从中读取数据,而只有KV.Registry进程写入该数据。我们还设置了read_concurrency: true何时启动表格,针对并发读取操作的常见场景优化表格。

我们上面执行的更改已打破我们的测试,因为注册表:name在启动时需要选项。此外,某些注册表操作(例如lookup/2要求将名称作为参数提供)而不是PID,因此我们可以执行ETS表查找。让我们改变设置功能test/kv/registry_test.exs来解决这两个问题:

代码语言:javascript
复制
  setup context do
    {:ok, _} = start_supervised({KV.Registry, name: context.test})
    %{registry: context.test}
  end

一旦我们改变了setup,一些测试将会继续失败。您甚至可能会注意到测试在运行之间通过和失败不一致。例如,“spawns buckets”测试:

代码语言:javascript
复制
test "spawns buckets", %{registry: registry} do
  assert KV.Registry.lookup(registry, "shopping") == :error

  KV.Registry.create(registry, "shopping")
  assert {:ok, bucket} = KV.Registry.lookup(registry, "shopping")

  KV.Bucket.put(bucket, "milk", 1)
  assert KV.Bucket.get(bucket, "milk") == 1
end

可能会失败:

代码语言:javascript
复制
{:ok, bucket} = KV.Registry.lookup(registry, "shopping")

如果我们刚刚在上一行创建了存储桶,那么这条线如何失败?

这些失败发生的原因是,为了教学目的,我们犯了两个错误:

  1. 我们正在过早地优化(通过添加这个缓存层)

2. 我们正在使用cast/2(虽然我们应该使用call/2

比赛条件?

在Elixir中开发并不会让你的代码免受竞争条件的影响。然而,Elixir的抽象概念在默认情况下没有共享,因此更容易发现竞争条件的根本原因。

我们测试中发生的事情是,在操作和我们可以在ETS表中观察到这种变化的时间之间存在延迟。这是我们期待的事情:

  1. 我们调用 KV.Registry.create(registry, "shopping")

2. 注册表创建存储桶并更新缓存表

3. 我们从表格中获取信息 KV.Registry.lookup(registry, "shopping")

4. 上面的命令返回 {:ok,bucket}

但是,由于KV.Registry.create/2是一个强制操作,所以在我们真正写入表之前,命令将会返回!换句话说,这发生了:

  1. 我们调用 KV.Registry.create(registry, "shopping")

2. 我们从表格中获取信息 KV.Registry.lookup(registry, "shopping")

3. 上面的命令返回 :error

4. 注册表创建存储桶并更新缓存表

为了解决这个问题,我们需要KV.Registry.create/2使用call/2而不是使用同步cast/2。这将保证客户只有在对表格进行更改后才能继续。让我们改变函数和它的回调,如下所示:

代码语言:javascript
复制
def create(server, name) do
  GenServer.call(server, {:create, name})
end

def handle_call({:create, name}, _from, {names, refs}) do
  case lookup(names, name) do
    {:ok, pid} ->
      {:reply, pid, {names, refs}}
    :error ->
      {:ok, pid} = KV.BucketSupervisor.start_bucket()
      ref = Process.monitor(pid)
      refs = Map.put(refs, ref, name)
      :ets.insert(names, {name, pid})
      {:reply, pid, {names, refs}}
  end
end

我们将回调从更改handle_cast/2handle_call/3并更改为使用创建的桶的pid进行回复。一般来说,Elixir开发人员更喜欢使用,call/2而不是cast/2因为它也提供了背压 - 你会阻止,直到你得到答复。cast/2在不必要时使用也可以被认为是不成熟的优化。

让我们再次运行测试。这一次,我们会通过--trace选项:

代码语言:javascript
复制
$ mix test --trace

--trace当您的测试死锁或存在竞争条件时,该选项非常有用,因为它同步运行所有测试(async: true无效)并显示有关每个测试的详细信息。这次我们应该下降到一两次间歇性的失败:

代码语言:javascript
复制
  1) test removes buckets on exit (KV.RegistryTest)
     test/kv/registry_test.exs:19
     Assertion with == failed
     code: KV.Registry.lookup(registry, "shopping") == :error
     lhs:  {:ok, #PID<0.109.0>}
     rhs:  :error
     stacktrace:
       test/kv/registry_test.exs:23

根据失败消息,我们期望该表不再存在于表格中,但它仍然存在!这个问题与我们刚刚解决的问题相反:以前在创建存储桶和更新表的命令之间存在延迟,现在存储桶过程死亡和从表中删除表项之间存在延迟。

不幸的是,这次我们不能简单地将handle_info/2负责清洗ETS表的操作改为同步操作。相反,我们需要找到一种方法来保证注册管理机构处理:DOWN在存储桶崩溃时发送的通知。

一个简单的方法是向注册中心发送一个同步请求:因为消息是按顺序处理的,如果注册中心回复Agent.stop调用后发送的请求,这意味着:DOWN消息已被处理。Agent.stop在两次测试中,通过创建一个“伪造”存储桶来实现这一点,这是一个同步请求:

代码语言:javascript
复制
  test "removes buckets on exit", %{registry: registry} do
    KV.Registry.create(registry, "shopping")
    {:ok, bucket} = KV.Registry.lookup(registry, "shopping")
    Agent.stop(bucket)

    # Do a call to ensure the registry processed the DOWN message
    _ = KV.Registry.create(registry, "bogus")
    assert KV.Registry.lookup(registry, "shopping") == :error
  end

  test "removes bucket on crash", %{registry: registry} do
    KV.Registry.create(registry, "shopping")
    {:ok, bucket} = KV.Registry.lookup(registry, "shopping")

    # Stop the bucket with non-normal reason
    Agent.stop(bucket, :shutdown)

    # Do a call to ensure the registry processed the DOWN message
    _ = KV.Registry.create(registry, "bogus")
    assert KV.Registry.lookup(registry, "shopping") == :error
  end

我们的测试现在应该(总是)通过!

这就结束了我们的优化章节。我们使用ETS作为缓存机制,可以从任何进程读取数据,但写入操作仍然通过单个进程序列化。更重要的是,我们还了解到,一旦数据可以异步读取,我们需要知道它可能引入的竞争条件。

在实践中,如果您发现自己处于需要动态流程的流程注册表的位置,则应该使用作为Elixir一部分提供Registry模块。它提供的功能类似于我们使用GenServer +构建的功能,:ets同时还能够同时执行写入和读取操作。即使在有40个内核的机器上,它也可以扩展到所有内核

接下来,让我们讨论外部和内部依赖关系以及Mix如何帮助我们管理大型代码库。

扫码关注腾讯云开发者

领取腾讯云代金券