[elixir! #0061] 高负载高并发问题的万能钥匙 ---- 队列(queue)

高负载高并发问题,不只仅出如今面试中,在平常生活中也很常见,好比周末去热闹的商场吃饭,餐厅们口常常会须要排队取号。能够概括为“需求”和“资源”的不匹配,多出来的“需求”的不到知足,就须要有合适的机制让这些”需求“进行 等待 或者 撤销。面试

让咱们用 elixir 模拟这样一个场景:一个服务器里一共有 3 个座位,每进来1个客人就要占用一个座位,座位占满以后服务器就没法提供服务。缓存

defmodule M5 do
  use GenServer

  @seats 3
  @wait_timeout 1000

  def start() do
    GenServer.start(__MODULE__, :ok)
  end

  def enter(pid) do
    GenServer.call(pid, :enter, @wait_timeout)
  end

  def leave(pid) do
    GenServer.cast(pid, :leave)
  end

  def init(_) do
    {:ok, @seats}
  end

  def handle_call(:enter, {_pid, _ref}, seats) do
    IO.puts("got enter request")

    if seats > 0 do
      {:reply, :ok, print(seats - 1)}
    else
      {:noreply, print(seats)}
    end
  end

  def handle_cast(:leave, seats) do
    IO.puts("free seats: #{seats}")
    {:noreply, print(seats + 1)}
  end

  defp print(seats) do
    IO.puts("free seats: #{seats}")
    seats
  end
end

再定义这样一个函数,模拟客人们同时要求进入服务器,若是得不到响应,就会 BOOM!服务器

def concurrent_enter(pid, n, t) do
    for _ <- 1..n do
      spawn(fn ->
        try do
          enter(pid)
          :timer.sleep(t)
          leave(pid)
        catch
          _, _ ->
            IO.puts("BOOM!")
        end
      end)
    end
  end

在同时进来的客人小于3人时,一切都很好,然而咱们知道实际状况确定不会是这样,同时出现的客人必定会大于3人。
咱们知道这里一共就3个座位,因此不管如何不能够同时处理超过3位客人。可是好消息是,每一个客人有1秒钟的等待耐心,因此只要在客人失去耐心以前有座位空出来,咱们就不至于丢掉这位客人。
因此按理说只要在 1秒钟以前,有客人离开,新的客人就能够进来,咱们来试试看是否是这样。设置同时进入的客人数量为4,每位客人用餐时间为 500 毫秒:并发

iex(8)> concurrent_enter s, 4, 500
got enter request
free seats: 2
got enter request
free seats: 1
got enter request
free seats: 0
got enter request
free seats: 0
free seats: 0
free seats: 1
free seats: 1
free seats: 2
free seats: 2
free seats: 3
BOOM!

BOOM!为何会这样?咱们注意到第4为客人请求进入时,是没有空座的,然而座位空出来以后,他也没有获得任何通知,也就是他并不知道有空座了。
一种简单的解决方案就是使用队列。让等待中的客人进入队列排队,每次服务器里有客人离开,就检查一下等待队列。只须要对咱们的代码作以下修改:函数

def init(_) do
    {:ok, %{seats: @seats, queue: :queue.new()}}
  end

  def handle_call(:enter, {_pid, _ref} = from, %{seats: seats} = state) do
    IO.puts("got enter request")

    if seats > 0 do
      {:reply, :ok, do_enter(state)}
    else
      handle_overload(from, state)
    end
  end

  defp do_enter(%{seats: seats} = state) do
    %{state | seats: print(seats - 1)}
  end

  def handle_overload(from, %{queue: queue} = state) do
    {:noreply, %{state | queue: :queue.in(from, queue)}}
  end

  def handle_cast(:leave, %{seats: seats} = state) do
    IO.puts("free seats: #{seats}")

    {:noreply,
     state
     |> do_leave()
     |> check_queue()}
  end

  defp do_leave(state) do
    %{state | seats: print(state.seats + 1)}
  end

  defp check_queue(%{queue: queue} = state) do
    case :queue.out(queue) do
      {:empty, _queue} ->
        state

      {{:value, from}, queue} ->
        GenServer.reply(from, :ok)

        %{state | queue: queue}
        |> do_enter()
    end
  end

如今咱们能够挑战一些刺激的:6人同时请求进入服务器,这是咱们理论上能够达到的最高负载:高并发

iex(21)> concurrent_enter s, 6, 500
got enter request
free seats: 2
got enter request
free seats: 1
got enter request
free seats: 0
got enter request
got enter request
got enter request
free seats: 0
free seats: 1
free seats: 0
free seats: 0
free seats: 1
free seats: 0
free seats: 0
free seats: 1
free seats: 0
free seats: 0
free seats: 1
free seats: 1
free seats: 2
free seats: 2
free seats: 3

Perfect! 注意到每当有座位空出来,立刻就会被等待队列里的客人使用。spa


觉得事情就这样愉快的结束了么?不,让咱们模拟一下同时有 6 位客人进入,每位用餐时间是 1100 毫秒:code

iex(25)> concurrent_enter s, 6, 1100
got enter request
free seats: 2
got enter request
free seats: 1
got enter request
free seats: 0
got enter request
got enter request
got enter request
BOOM!  
BOOM!    
BOOM!     
free seats: 0
free seats: 1
free seats: 0
free seats: 0
free seats: 1
free seats: 0
free seats: 0
free seats: 1
free seats: 0

在咱们意料之中的是,后3位客人没能在 timeout 以前进入服务器。然而,服务器并不知道他们已经失去耐心了,依旧在有空位出现后通知他们进入服务器。这些客人变成了可怕的僵尸客人,他们永远不会离开服务器,致使服务器里的空位始终为0.server

咱们能够限制客人的最长用餐时间,然而这样僵尸客人依旧会占用咱们大量的时间。更好的方法是要求客人们在发送 enter 请求的时候就附带上他们的最大耐心(wait_timeout),而后计算出客人失去耐心的时间辍(deadline)。若是有空位出现时,等待队列里面的客人已经失去耐心,那么服务器就能够直接跳过他,队列

作了修改以后的代码变成了这样:

def enter(pid, wait_timeout) do
    GenServer.call(pid, {:enter, wait_timeout}, wait_timeout)
  end

  ...

  def handle_call({:enter, timeout}, {_pid, _ref} = from, %{seats: seats} = state) do
    IO.puts("got enter request")

    if seats > 0 do
      {:reply, :ok, do_enter(state, from)}
    else
      handle_overload({from, timeout}, state)
    end
  end

  defp do_enter(%{requests: requests} = state, from) do
    case requests do
      %{^from => %{deadline: deadline}} ->
        state = %{state | requests: Map.delete(requests, from)}

        if past_deadline?(deadline) do
          state
          |> check_queue()
        else
          handle_enter(state, from)
        end

      _ ->
        handle_enter(state, from)
    end
  end

  defp past_deadline?(deadline) do
    :os.system_time(:millisecond) > deadline
  end

  defp handle_enter(%{seats: seats} = state, from) do
    GenServer.reply(from, :ok)
    %{state | seats: print(seats - 1)}
  end

  def handle_overload({from, timeout}, %{queue: queue, requests: requests} = state) do
    request_info = %{deadline: :os.system_time(:millisecond) + timeout}

    {:noreply,
     %{state | queue: :queue.in(from, queue), requests: Map.put(requests, from, request_info)}}
  end

  ...

  def concurrent_enter(pid, n, wait_timeout) do
    for _ <- 1..n do
      spawn(fn ->
        try do
          enter(pid, wait_timeout)
          :timer.sleep(1000)
          leave(pid)
        catch
          err, msg ->
            IO.puts("BOOM!" <> inspect({err, msg}))
        end
      end)
    end
  end

为了简化问题,咱们把每位客人的用餐时间固定为 1000 毫秒,而后把concurrent_enter的第三个参数修改成客人的耐心时间(wait_timeout). 咱们就能够构造这种情形:

# 来了 6 位耐心为 500 毫秒的客人
    concurrent_enter(pid, 6, 500)
    # 100 毫秒以后
    :timer.sleep(100)
    # 来了 2 位耐心为 2000 毫秒的客人
    concurrent_enter(pid, 2, 2000)

模拟的结果代表僵尸客人能够被马上辨识出来而且跳过,彻底不影响服务正常客人:

got enter request
free seats: 2
got enter request
free seats: 1
got enter request
free seats: 0
got enter request
got enter request
got enter request
got enter request
got enter request
BOOM!{:exit, {:timeout, {GenServer, :call, [#PID<0.515.0>, {:enter, 500}, 500]}}}
BOOM!{:exit, {:timeout, {GenServer, :call, [#PID<0.515.0>, {:enter, 500}, 500]}}}
BOOM!{:exit, {:timeout, {GenServer, :call, [#PID<0.515.0>, {:enter, 500}, 500]}}}
free seats: 0
free seats: 1
free seats: 0
free seats: 0
free seats: 1
free seats: 0
free seats: 0
free seats: 1
free seats: 1
free seats: 2
free seats: 2
free seats: 3

至此,咱们拥有了一个相对智能的资源服务器了,他能够在有空余资源时马上回复等待队列中的请求,而且在请求超时时将其跳过。

技术总结

  • 对于有限的资源,使用队列(queue)的方式将请求先缓存下来
  • 使用 checkout(至关于本文里的 enter) 和 checkin(本文里的 leave)的方式去占用和归还资源
  • 能够经过简单的 noreply call 的方式来实现不阻塞的 server
  • 在有空闲资源可用时,及时通知 client
  • 经过记录请求的超时时间,来在处理时跳过那些已经 dead 了的请求