09月20, 2018

erlang源码分析之gen_server

一、简介

众所周知,Erlang/OTP是一个具有高并发、低延时、高容错等等特性的平台,其三大Behaviour:

  • gen_server(客户端服务器)
  • gen_fsm(有限状态机)
  • gen_event(事件通知)

是其坚如磐石的系统的基石。 本章节将透过源码分析其中gen_server的实现原理并管中一窥Erlang的设计思想。

二、背景知识

在开始介绍gen_server实现原理之前,需要先介绍一下Erlang异步编程的思想与基础。 Erlang异步编程的基础就是Erlang进程和异步消息投递机制。

1、异步编程基础

  • Erlang进程:

    这里的Erlang进程并不是系统进程,而是通过ErlangVM实现创建的,一个Erlang进程大概占用300个字的内存空间,创建时间只有几微秒。ErlangVM会为每个核启动一个线程充当调度器,调度器分配名下进程队列中的所有进程抢占运行时间片来达到并发执行的目的。在ErlangVM中,所有的Erlang代码都是运行在Erlang进程里面的。派生一个进程很简单,通过spawn函数,指定一个新进程运行的函数即可。

    API如下:

    spawn(Fun) -> pid()

    spawn(Node, Fun) -> pid()

    spawn(Module, Function, Args) -> pid()

    以匿名函数为例:在Erlang shell中运行

      spawn(fun() -> 
          io:format(erlang:group_leader(),"hello world~n",[]),
      end).
    

    屏幕中打印出

    hello world
    <0.66.0>
    
  • 异步消息投递机制:

    光有并行运行的Erlang进程是不够的,Erlang通过异步消息来实现跨进程的同步与异步操作的。 Erlang为每一个进程都配备了一个信箱,并且提供了发送消息和接受消息的命令。

    发送消息的命令:

    Pid ! Msg.
    

    Pid 是目标进程,!是发送消息的命令符,Msg是消息体。

    发送消息是异步执行的,发送消息后,本进程立即执行后续代码,消息投递到目标进程时会插入目标进程的信箱队列中,直到目标进程读取消息。

    receive
      Msg ->
          do_something()
    after 
      Timeout ->
          do_finish()
    end
    

    以上代码就是目标进程处理消息的代码示例。receive 命令就是读取并接收消息的命令。这个receive命令有两层含义,按照信箱队列先进先出的原则读取一条消息;如果信箱是空的则等待信箱接收到消息为止,这是一个阻塞操作,会使进程主动交出运行时间片,直到信箱收到消息为止才会重新接受调度器分配的新时间片来继续运行,这个阻塞等待可以设置超时时间,超时后继续运行后续代码,不设置则一直等待收到消息为止。

2、异步编程模型

Erlang异步编程最常见的模型就是异步应答式模型,这个模型就是建立上前面介绍的基础上的。通过派生的进程,接收消息并作出应答,并且通过这个模型既可以实现异步的逻辑,也可以实现同步的逻辑。

见代码

%%demo模块
-module(demo_server).
-define(SERVER, demo_server).
-export([start_server/0, loop/1]).
-export([get/1, set/2, stop/0]).

start_server() ->
    Pid = spawn_link(?MODULE, loop, [[]]),
    erlang:register(?SERVER, Pid),
    ok.

loop(Data) ->
    receive
        {call, From, {get, Key}} ->
            Value = proplists:get_value(Key,Data),
            From ! {reply, ?SERVER, Value},
            loop(Data);
        {cast, {set, Key ,Value}} ->
            NewData = [{Key, Value} | proplists:delete(Key,Data)],
            loop(NewData);
        stop ->
            ok
    end.

get(Key) ->
    ?SERVER ! {call, self(), {get, Key}},
    receive
        {reply, ?SERVER, Value} ->
            Value
    after 
        5000 ->
            undefined
    end.

set(Key,Value) ->
    ?SERVER ! {cast, {set, Key, Value}},
    ok.

stop() ->
    ?SERVER ! stop.

测试执行

demo_server:start_server().
打印
ok
demo_server:get(a).
打印
undefined
demo_server:set(a,b).
打印
ok
demo_server:get(a).
打印
b
demo_server:stop().
打印
stop

在上述代码中,存在两个进程,一个是调用方进程,这里是Erlang shell的进程,一个是start_server函数派生出的进程,我们把调用方进程叫做客户端进程,响应方进程叫做服务端进程。

服务端进程执行的函数是loop函数,服务端进程启动就通过receive阻塞住,等待接收匹配的消息,除了stop消息外,其他消息都会执行尾递归调用loop函数,这就是一个极简单服务端模型。

其他的两个消息,我们加了两个标签,一个call,一个cast,这两个不同标签的消息,我们实现了该模型下的同步调用,和异步调用。异步调用比较简单,调用方只需要投递消息就不用管了,目标进程收到消息,与调用方进程并发运行,执行处理消息的流程;同步调用需要调用方进行配合,投递消息后需要receive阻塞住,等待目标进程收到消息,处理消息,并向调用方投递结果,调用方收到结果再继续执行后续逻辑。

有人问,为什么要用异步消息投递的方式来执行同步逻辑呢?我觉得这样做可以使服务端进程执行高效并且有序(因为信箱是有序的)。

三、阅读gen_server源码

通过前面的背景知识,我们了解了gen_server的设计模型,以及Erlang的异步编程思想,我们就能够把握gen_server的设计走向。

1、如何使用

在我们阅读分析代码之前,首先我们得需要知道如何使用gen_server,创建一个通用服务器程序

上代码

-module(mock_server).
-behaviour(gen_server).
-define(SERVER, mock_server).

-export([start_link/0]).
-export([get/1,set/2,stop/0]).
-export([init/1,handle_call/3,handle_cast/2,handle_info/2]).

start_link() ->
    gen_server:start_link({local,?SERVER}, ?MODULE, [], []).

init(Data) ->
    {ok, Data}.

get(Key) ->
    gen_server:call(?SERVER,{get, Key}).

set(Key,Value) ->
    gen_server:cast(?SERVER, {set, Key, Value}).

stop() ->
    gen_server:stop(?SERVER).

handle_call({get,Key}, _From, State) ->
    Value = proplists:get_value(Key,State),
    {reply, Value, State};
handle_call(_Request, _From, State) ->
    {reply,ok, State}.

handle_cast({set,Key, Value}, State) ->
    NewState = [{Key, Value} | proplists:delete(Key,State)],
    {noreply, NewState};
handle_cast(_Request, State) ->
    {noreply, State}.

handle_info(Info,State) ->
    io:format("other message:~p~n",[Info]),
    {noreply, State}.

这份示例我使用了最少的API,用了最简单的方式实现了上述demo_server同样的功能,但是他说通过gen_server 实现的,通用,标准、可靠。

首先我们看一下gen_server的使用方法


gen_server module            Callback module
-----------------            ---------------
gen_server:start
gen_server:start_link -----> Module:init/1

gen_server:stop       -----> Module:terminate/2

gen_server:call
gen_server:multi_call -----> Module:handle_call/3

gen_server:cast
gen_server:abcast     -----> Module:handle_cast/2

-                     -----> Module:handle_info/2

-                     -----> Module:terminate/2

-                     -----> Module:code_change/3 

实现一个gen_server的Behavior需要加上 -behaviour(gen_server). 标记,并实现gen_server的回调。

它的全部回调函数列表如下:

[{init,1},
 {handle_call,3},
 {handle_cast,2},
 {handle_info,2},
 {terminate,2},
 {code_change,3},
 {format_status,2}]

其中可选回调列表如下:

[{handle_info,2},
 {terminate,2},
 {code_change,3},
 {format_status,2}]

可选回调可以省略,根据需要添加。

本次用的API 有start_link/4,call/2,cast/2,stop/0。

按照使用顺序进行讲解:

start_link/0 函数调用gen_server:start_link(Name,Module,Args,Options)创建带注册名字的gen_server 进程实例,在启动时,会执行回调函数init/1,并返回{ok, State}.

get/1 函数调用同步调用函数gen_server:call(ServerRef,Request),触发回调函数handle_call(Request, From,State) ,处理消息,并返回 {reply,Reply, NewState},gen_server 实例进程会把Reply作为gen_server:call/2的结果返回给调用者进程。

set/2 函数调用异步调用函数gen_server:cast(ServerRef,Request),触发回调函数handle_cast(Reqeust, State),处理消息,并返回{noreply, NewState}。不会给调用者进程返回任何消息,而且调用者进程调用gen_server:cast/2的时候会直接返回ok给调用者进程。

stop/0 函数调用gen_server:stop(ServerRef) 函数停止gen_server进程,如果有定义回调terminate/2 则先执行它进行收尾工作。

以上便是一个简单的gen_server 用法说明。

2、源码分析

实际上,通过原始模型、使用方法的研究,我们已经分析出了源码阅读的思路了,基本上我们可以通过启动、loop循环、异步调用、同步调用等几个环节来阅读代码。

经过阅读代码,涉及的模块主要有gen_server、gen、proc_lib三个模块,额外辅助模块有sys模块。

1、启动流程

首先是启动流程图例:

alt

我们以gen_server:start_link(Name,Module,Args,Options)为例查看代码。

我们以Mod模块为起始(这里就是我们自己的mock_server模块),调用gen_sever:start_link(Name,Module,Args,Options)函数,该函数最后返回{ok,Pid}为结束。 主要工作的模块为gen_server、 gen、 proc_lib。

其中图例中黄色部分是调用者进程,从spawn开始的绿色部分都是新派生出来的gen_server进程实例。

和spawn并行的流程sync_wait(Pid,Timeout)是调用者进程在等待派生进程返回启动成功标志的流程,绿线部分即返回成功标志的消息。

图中派生进程初始化后会回调Mod模块的init函数。

派生进程最后的状态会停留在loop函数的receive命令处,等待调用者进程发来消息,处理完消息后继续尾递归loop循环。

2、loop循环

以派生的gen_server实例进程的视角我们来看一下它的loop循环过程见图示

alt

从图示中我们看到,循环起始位置是gen_server 实例进程阻塞在receive接收消息状态,它接收消息大体分为几类:系统消息(system)、cast消息('$gen_cast')、call消息('gen_call')、一般消息(无tag消息)以及终止进程消息(stop是正常终止,其他为异常终止)。

系统消息由sys模块处理,当接收到suspend消息时,循环进入suspend_loop,此时只能处理系统消息,接收到resume消息时会恢复到正常loop循环。(此处逻辑未在图示中展开)

cast、call、info 分别对应的消息分别会回调Mod:handle_cast、Mod:handle_call、Mod:handle_info,并根据回调返回的结果 {noreply,NewState}| {reply,Reply,NewState} |{stop,Reason,Reply,NewState} | {stop,Reason,NewState}进行处理

noreply 直接重新进入loop循环;

reply 直接向调用者进程返回relpy消息,然后重新进入循环;

stop 则执行terminate,终止进程

3、cast、call流程

下面我们从代码执行流程看一下gen_server:cast/2的流程

alt

从代码流程看一下gen_server:call/2的流程

alt

上述两个图例绿色为调用者进程,黄色为派生的gen_server进程。

如果光从上面的loop图例以及代码图例中还不清晰,那我们就再从调试角度分析具体消息。

4、代码调试
mock_server:start_link().
打印
{ok,<0.243.0>}
sys:statistics(mock_server,true).
打印
ok
sys:trace(mock_server,true).
打印
ok
mock_server:get(a).
打印
*DBG* mock_server got call {get,a} from <0.201.0>
*DBG* mock_server sent undefined to <0.201.0>, new state []
undefined
mock_server:set(a,b).
打印
*DBG* mock_server got cast {set,a,b}
*DBG* mock_server new state [{a,b}]
ok

现在我们不用封装好的函数,而是自己发送消息来做尝试

mock_server ! {'$gen_cast',{set,a,c}}.
打印
*DBG* mock_server got cast {set,a,c}
{'$gen_cast',{set,a,c}}
*DBG* mock_server new state [{a,c}]
mock_server ! {'$gen_call', {self(), erlang:monitor(process, mock_server)}, {get, a}}.
打印
*DBG* mock_server got call {get,a} from <0.201.0>
*DBG* mock_server sent c to <0.201.0>, new state [{a,c}]
{'$gen_call',{<0.201.0>,
              #Ref<0.4032721631.3431727106.215954>},
             {get,a}}

flush().
Shell got {#Ref<0.4032721631.3431727106.215954>,c}
ok

通过模拟发cast消息,我们将a的value设置成c了,通过模拟发call消息,当我们刷新shell信箱时,收到了返回的value c。

模拟系统消息

sys:get_status(mock_server).
打印
{status,<0.243.0>,
        {module,gen_server},
        [[{'$initial_call',{mock_server,init,1}},
          {'$ancestors',[<0.201.0>]}],
         running,<0.201.0>,
         [{trace,true},
          {statistics,{{{2018,9,21},{14,36,25}},{reductions,39},6,0}}],
         [{header,"Status for generic server mock_server"},
          {data,[{"Status",running},
                 {"Parent",<0.201.0>},
                 {"Logged events",[]}]},
          {data,[{"State",[{a,c}]}]}]]}

mock_server ! {system, {self(), erlang:monitor(process, mock_server)}, suspend}.
打印
{system,{<0.201.0>,#Ref<0.4032721631.3431727106.216072>},
        suspend}
flush().
打印
Shell got {#Ref<0.4032721631.3431727106.216072>,ok}
ok

上面用原始消息模拟suspend原始系统消息,此时的系统状态会变为suspended状态,并且仅处理系统消息(suspend_loop循环)

sys:get_status(mock_server).
打印
{status,<0.243.0>,
        {module,gen_server},
        [[{'$initial_call',{mock_server,init,1}},
          {'$ancestors',[<0.201.0>]}],
         suspended,<0.201.0>,
         [{trace,true},
          {statistics,{{{2018,9,21},{14,36,25}},{reductions,39},6,0}}],
         [{header,"Status for generic server mock_server"},
          {data,[{"Status",suspended},
                 {"Parent",<0.201.0>},
                 {"Logged events",[]}]},
          {data,[{"State",[{a,c}]}]}]]}

此时调用cast和call消息

mock_server:set(a,d).
打印
ok
mock_server:get(a).
等待5秒钟,因为超时而崩溃
** exception exit: {timeout,{gen_server,call,[mock_server,{get,a}]}}
     in function  gen_server:call/2 (gen_server.erl, line 206)

suspend命令是otp系统命令一般是给监督树和代码热更使用的,所以要谨慎使用。

四、总结

无论gen_server实现的多么复杂,都脱不开前面提到的原始模型,通过阅读源码,我们发现它对消息加了不同标签进行区分,其中系统消息增加了很多通用的辅助功能。 初始化与同步、异步、销毁都变得标准并且可定制化。

最后,希望这篇文档对于对erlang感兴趣的同学有些用处,感谢!

本文链接:https://www.opsdev.cn/post/erlang源码分析之gen_server.html

-- EOF --

Comments

评论加载中...

注:如果长时间无法加载,请针对 disq.us | disquscdn.com | disqus.com 启用代理。