12月16, 2018

自定义TCP通信协议的压力测试

背景

对于通用的通信协议,主流的压测工具对其支持都比较完善;对于私有的或是流行度不是很高的通信协议,主流的压测工具是不支持的,要想解决这个问题有两种办法:

  • 直接发送Raw Data 这种方式比较灵活,只要知道协议格式,照着编写Raw Data就可以了,缺点也显而易见:压测脚本编写起来比较复杂,容易出错,代码复用率很低,不太方便修改和维护;
  • 编写专用的插件 这种方式下,压测脚本编写起来简单许多,而且方便阅读与维护,可以说是一劳永逸。

本文中使用的压力测试工具为Tsung,它本身支持许多种协议,配置简单灵活,我们只要给Tsung编写一个插件就可以实现我们的需求。

协议示例

首先假定我们的协议名称为qproto,定义如下:

  • 协议格式
packet_id 协议号 Body长度 Body
长度 2字节 2字节 4字节 视“Body长度”而定
  • 协议类型定义与示例
协议类型 packet_id 协议编号 Body长度 Body
ping 3 0
pong 4 1
auth 1 2 23 "u:test_user\np:test_pass"
auth_ack 1 3 8 "result:0"
send_data 5 4 12 "test_message"
send_data_ack 5 5
disconnect 6 6

准备

下载并解压Tsung源码

编写插件

要编写插件需要完成的任务有以下几点:

修改描述文件,添加新协议支持

进入Tsung源码目录,编辑tsung-1.0.dtd文件

  • 在Request Element添加qproto

添加后如下:

<!ELEMENT request ( match*, dyn_variable*, ( http | jabber | raw |
          pgsql | ldap | mysql |fs | shell | job | websocket | amqp | mqtt | qproto) )>
  • 创建qproto声明和定义,这一步主要给xml压测脚本配置使用
<!ELEMENT qproto (#PCDATA) >
<!ATTLIST qproto
        proto CDATA #REQUIRED
        keepalive CDATA "10"
        body CDATA ""
        username CDATA ""
        password CDATA "">
  • 添加Session Type与Element Type 添加后如下:

Session Type:

<!ELEMENT sessions (session+)>
<!ELEMENT session ( request | thinktime | transaction | setdynvars | for |
repeat | if | change_type | foreach | set_option | interaction | abort )*>
<!ATTLIST session
    name         CDATA #REQUIRED
    bidi         CDATA #IMPLIED
    persistent   (true | false) #IMPLIED
    probability   NMTOKEN #IMPLIED
    weight        NMTOKEN #IMPLIED
    type         (ts_jabber | ts_http | ts_raw | ts_pgsql | ts_ldap | ts_webdav | ts_mysql | ts_fs | ts_shell | ts_job | ts_websocket | ts_amqp | ts_mqtt | ts_qproto) #REQUIRED>

Element Type:

<!ELEMENT options (option*)>
<!ELEMENT option (user_agent*)>
<!ATTLIST option
    name     NMTOKEN #REQUIRED
    override (true | false) #IMPLIED
    random   (true | false) #IMPLIED
    id       NMTOKEN #IMPLIED
    min      NMTOKEN #IMPLIED
    max      NMTOKEN #IMPLIED
    type     (ts_http | ts_jabber | ts_pgsql | ts_amqp | ts_qproto) #IMPLIED
    value    CDATA #IMPLIED>

压测客户端会话管理

需要保存压测时模拟客户端的一些信息,如该会话的用户名等

创建src/tsung/ts_qproto.erl, 需要实现ts_plugin这个behaviour, 其中必须要定义的函数有:

get_message/1

该函数返回值必须为binary(),它用户构造发送往服务器端的请求。

parse/2

该函数用于处理服务器端返回的Response, 它解析服务器端返回的数据,更新统计数据,并维护Session信息,如Session用户名,连接状态等。

parse_bidi/2

该函数解析服务端发送来的数据,返回值为一个三元组,分别包含返回给服务端的数据,更新后的状态,以及客户端会话更新后的状态

parse_config/2

该函数用于解析压测脚本

add_dynparams/4

该函数用于处理动态变量,本例中,我们在AUTH认证以及发送数据时使用了该特性

本例中上面几个函数是需要我们自己实现的,以下的一些功能我们暂时还用不上,保持默认就可以了。

new_session/0

该函数用于初始化会话信息

session_defaults/0

会话的默认参数:持久会话、双向通信

decode_buffer/2

解析buffer,删除Packet元数据、解压缩等

dump/2

该函数用于记录Tsung发送与接收到的Packet

除此之外,由于我们实现了心跳,需要在客户端收到AUTH_ACK认证成功后循环向服务端发送ping

  1. 创建include/ts_qproto.hrl,声明客户端请求与会话状态
-record(qproto_request, {
  proto,
  keepalive = 10, % 10s
  body,
  username,
  password
}).


-record(qproto_session, {
  curr_id = 0,
  ack_buf = <<>>,
  ping_pid,
  keepalive,
  wait, % wait code
  status, % bind status
  username,
  product
}).
  1. 创建src/tsung/ts_qproto.erl
%%%-------------------------------------------------------------------
%%% @author sngyai
%%% @copyright (C) 2018, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 07. 十二月 2018 下午3:53
%%%-------------------------------------------------------------------
-module(ts_qproto).
-author("sngyai").

-behavior(ts_plugin).

-include("ts_profile.hrl").
-include("ts_config.hrl").
-include("ts_qproto.hrl").
-include("qproto.hrl").

%% API
-export([add_dynparams/4,
  get_message/2,
  session_defaults/0,
  parse/2,
  dump/2,
  parse_bidi/2,
  parse_config/2,
  decode_buffer/2,
  new_session/0]).
-export([ping_loop/3]).


add_dynparams(true, {DynVars, _S},
    Param = #qproto_request{proto = ?AUTH,
      username = UserName, password = Product}, _HostData) ->
  NewUserName = ts_search:subst(UserName, DynVars),
  NewProduct = ts_search:subst(Product, DynVars),
  Param#qproto_request{proto = ?AUTH,
    username = NewUserName,
    password = NewProduct};
add_dynparams(true, {DynVars, _S},
    Param = #qproto_request{proto = ?SEND, body = Payload, password = Product},
    _HostData) ->
  NewProduct = ts_search:subst(Product, DynVars),
  NewPayload = ts_search:subst(Payload, DynVars),
  Param#qproto_request{password = NewProduct, body = NewPayload};
add_dynparams(_Bool, _DynData, Param, _HostData) ->
  Param#qproto_request{}.


session_defaults() ->
  {ok, true, true}.


decode_buffer(Buffer, #qproto_session{}) ->
  Buffer.

new_session() ->
  #qproto_session{}.

dump(A, B) ->
  ts_plugin:dump(A,B).

parse_config(Element, Conf) ->
  ts_config_qproto:parse_config(Element, Conf).

get_message(Req0 = #qproto_request{proto = ?AUTH, username = undefined}, StateRcv) ->
  Username = iolist_to_binary(["tsung-", ts_utils:randombinstr(10)]),
  Req1 = Req0#qproto_request{username = Username},
  get_message(Req1, StateRcv);
get_message(#qproto_request{proto = ?AUTH,
  keepalive = KeepAlive, username = Username, password = Password},
    #state_rcv{session = QProtoSession = #qproto_session{curr_id = Id} }) ->
  NewQProtoSession = QProtoSession#qproto_session{curr_id = Id + 1},
  MsgId = NewQProtoSession#qproto_session.curr_id,
  Body = iolist_to_binary(["u:", Username, "\np:", Password]),
  Message = #qproto_packet{id = MsgId, proto = ?AUTH, body = Body},
  {qproto_frame:encode(Message),
    NewQProtoSession#qproto_session{wait = ?AUTH_ACK, keepalive = KeepAlive, username = Username}};
get_message(#qproto_request{proto = ?DISCONNECT},
    #state_rcv{session = QProtoSession = #qproto_session{curr_id = Id}}) ->
  NewQProtoSession = QProtoSession#qproto_session{curr_id = Id + 1},
  MsgId = NewQProtoSession#qproto_session.curr_id,
  Message = #qproto_packet{id = MsgId, proto = ?DISCONNECT},
  {qproto_frame:encode(Message),
    NewQProtoSession#qproto_session{status = disconnect, username = undefined}};
get_message(#qproto_request{proto = ?SEND, body = Body},
    #state_rcv{session = QProtoSession = #qproto_session{curr_id = Id}}) ->
  NewQProtoSession = QProtoSession#qproto_session{curr_id = Id + 1},
  MsgId = NewQProtoSession#qproto_session.curr_id,
  Message = #qproto_packet{
    id = MsgId,
    proto = ?SEND,
    body = Body},
  ?DebugF("send qproto_msg: ~p~n", [Message]),
  Wait = ?SEND_ACK,
  ts_mon_cache:add({count, qproto_send}),
  {qproto_frame:encode(Message), NewQProtoSession#qproto_session{wait = Wait}}.


parse(closed, State) ->
  {State#state_rcv{ack_done = true, datasize=0}, [], true};
%% new response, compute data size (for stats)
parse(Data, State=#state_rcv{acc = [], datasize= 0}) ->
  parse(Data, State#state_rcv{datasize= size(Data)});

%% normal QProto message
parse(Data, State=#state_rcv{acc = [], session = QProtoSession, socket = Socket}) ->
  Wait = QProtoSession#qproto_session.wait,
  AckBuf = QProtoSession#qproto_session.ack_buf,
  case qproto_frame:decode(Data) of
    {QProtoMessage = #qproto_packet{proto = Wait}, Left} ->
      ?DebugF("receive qproto_msg: ~p ~p~n",
        [qproto_frame:command_for_type(Wait), QProtoMessage]),
      case Wait of
        ?AUTH_ACK -> ts_mon_cache:add({count, qproto_auth});
        ?SEND_ACK -> ts_mon_cache:add({count, qproto_server_send_acked});
        _ -> ok
      end,
      NewQProtoSession = case Wait of
                         ?AUTH_ACK ->
                           Proto = State#state_rcv.protocol,
                           KeepAlive = QProtoSession#qproto_session.keepalive,
                           PingPid = create_ping_proc(Proto, Socket, KeepAlive),
                           QProtoSession#qproto_session{ping_pid = PingPid};
                         _ -> QProtoSession
                       end,
      {State#state_rcv{ack_done = true, acc = Left,
        session = NewQProtoSession}, [], false};
    {QProtoMessage = #qproto_packet{id = MessageId, proto = Type}, Left} ->
      ?DebugF("receive qproto_msg, expecting: ~p, actual: ~p ~p~n",
        [qproto_frame:command_for_type(Wait),
          qproto_frame:command_for_type(Type), QProtoMessage]),
      NewQProtoSession = case Type =:= ?SEND of
                         true ->
                           Message = #qproto_packet{id = MessageId, proto = ?SEND_ACK},
                           EncodedData = qproto_frame:encode(Message),
                           ts_mon_cache:add({count, qproto_server_delivered}),
                           NewAckBuf =  <<AckBuf/binary, EncodedData/binary>>,
                           QProtoSession#qproto_session{ack_buf = NewAckBuf};
                         false -> QProtoSession
                       end,
      {State#state_rcv{ack_done = false, acc = Left,
        session = NewQProtoSession}, [], false};
    more ->
      ?DebugF("incomplete qproto frame: ~p~n", [Data]),
      {State#state_rcv{acc = Data}, [], false}
  end;
%% more data, add this to accumulator and parse, update datasize
parse(Data, State=#state_rcv{acc = Acc, datasize = DataSize}) ->
  NewSize= DataSize + size(Data),
  parse(<< Acc/binary, Data/binary >>,
    State#state_rcv{acc = [], datasize = NewSize}).


parse_bidi(<<>>, State=#state_rcv{acc = [], session = QProtoSession}) ->
  AckBuf = QProtoSession#qproto_session.ack_buf,
  Ack = case AckBuf of
          <<>> -> nodata;
          _ -> AckBuf
        end,
  NewQProtoSession = QProtoSession#qproto_session{ack_buf = <<>>},
  ?DebugF("ack buf: ~p~n", [AckBuf]),
  {Ack, State#state_rcv{session = NewQProtoSession}, think};
parse_bidi(Data, State=#state_rcv{acc = [], session = QProtoSession}) ->
  AckBuf = QProtoSession#qproto_session.ack_buf,
  case qproto_frame:decode(Data) of
    {QProtoMessage = #qproto_packet{id = MessageId, proto = ?SEND}, Left} ->
      ?DebugF("receive bidi qproto_msg: ~p ~p~n",
        [qproto_frame:command_for_type(?SEND), QProtoMessage]),

      ts_mon_cache:add({count, qproto_server_delivered}),
      ts_mon_cache:add({count, qproto_send_acked}),
      Message = #qproto_packet{id = MessageId, proto = ?SEND_ACK},
      EncodedData = qproto_frame:encode(Message),
      NewAckBuf = <<AckBuf/binary, EncodedData/binary>>,
      NewQProtoSession = QProtoSession#qproto_session{ack_buf = NewAckBuf},
      parse_bidi(Left, State#state_rcv{session = NewQProtoSession});
    {} ->
      {nodata, State#state_rcv{ack_done = true, datasize=0}, [], true};
    {QProtoMessage = #qproto_packet{proto = _Type}, Left} ->
      ?DebugF("receive bidi qproto_msg: ~p ~p~n",
        [qproto_frame:command_for_type(_Type), QProtoMessage]),
      parse_bidi(Left, State);
    more ->
      {nodata, State#state_rcv{acc = Data},think}
  end;
parse_bidi(Data, State=#state_rcv{acc = Acc, datasize = DataSize}) ->
  NewSize = DataSize + size(Data),
  ?DebugF("parse qproto bidi data: ~p ~p~n", [Data, Acc]),
  parse_bidi(<<Acc/binary, Data/binary>>,
    State#state_rcv{acc = [], datasize = NewSize}).


%%%===================================================================
%%% Internal functions
%%%===================================================================
create_ping_proc(Proto, Socket, KeepAlive) ->
  PingPid = proc_lib:spawn_link(?MODULE, ping_loop, [Proto, Socket, KeepAlive]),
  erlang:send_after(KeepAlive * 1000, PingPid, ping),
  PingPid.

ping_loop(Proto, Socket, KeepAlive) ->
  receive
    ping ->
      try
        Message = #qproto_packet{proto = ?PING},
        PingFrame = qproto_frame:encode(Message),
        ?DebugF("sent qproto pingreq: ~p~n", [PingFrame]),
        Proto:send(Socket, PingFrame, [])
      catch
        Error ->
          ?LOGF("Error sending qproto pingreq: ~p~n",[Error], ?ERR)
      end,
      erlang:send_after(KeepAlive * 1000, self(), ping),
      ping_loop(Proto, Socket, KeepAlive);
    stop -> ok
  end.

协议编码、解码

要让Tsung能够理解接收和发送的协议,这一步是必不可少的

  1. 创建include/qproto.hrl,定义协议格式
-define(LOG(Msg), io:format("{~p:~p ~p}: ~p~n", [?MODULE, ?LINE, self(), Msg])).

-define(PING,     0). %% client ping to server
-define(PONG,     1). %% server send it back for client ping
-define(AUTH,     2). %% client bind to server
-define(AUTH_ACK,     3). %% ack for client bind to server
-define(SEND, 4).
-define(SEND_ACK, 5).
-define(DISCONNECT, 6).

-record(qproto_packet, {
  id,
  proto,
  prop = [],
  data = []
}).

创建src/lib/qproto_frame.erl,定义协议编解码方法,分别供src/tsung/ts_qproto.erl中的get_message/1与parse/2和parse_bidi/2调用

  1. 创建src/lib/qproto_frame.erl,用于协议编解码
%%%-------------------------------------------------------------------
%%% @author sngyai
%%% @copyright (C) 2018, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 07. 十二月 2018 下午4:11
%%%-------------------------------------------------------------------
-module(qproto_frame).
-author("sngyai").

%% API
-include_lib("qproto.hrl").

-export([encode/1, decode/1]).

-export([command_for_type/1]).

encode(#qproto_packet{
  id = Id,
  proto = Proto,
  data = Data
}) ->
  BinData = encode_data(Data),
  <<Id:16, Proto:16, BinData/binary>>.

encode_data([]) ->
  <<>>;
encode_data(Data) ->
  Bin = list_to_binary(Data),
  DataLen = byte_size(Bin),
  <<DataLen:32, Bin/binary>>.


decode(<<PacketId:16, Proto:16, Rest/binary>>) ->
  Packet = #qproto_packet{id = PacketId, proto = Proto},
  decode_frame(Proto, Packet, Rest).

decode_frame(?PONG, Packet, Rest) ->
  {Packet, Rest};
decode_frame(Proto, Packet, Rest) ->
  case Proto =:= ?SEND of
    false ->
      {Packet, Rest};
    true ->
      case decode_body(Packet, Rest) of
        more ->
          more;
        {Pkt, Left} ->
          {Pkt, Left}
      end
  end.

decode_body(<<>>, Packet) ->
  {Packet#qproto_packet{data = []}, <<>>};
decode_body(<<0:32, Rest/binary>>, Packet) ->
  {Packet#qproto_packet{data = []}, Rest};
decode_body(<<DataLen:32, BinData:DataLen/binary, Rest/binary>>, Packet) ->
  Data = binary_to_list(BinData),
  {Packet#qproto_packet{data = Data}, Rest};
decode_body(_, _Packet) ->
  more.

command_for_type(Type) ->
  case Type of
    ?PING -> ping;
    ?PONG -> pong;
    ?AUTH -> auth;
    ?AUTH_ACK -> auth_ack;
    ?DISCONNECT -> disconnect;
    ?SEND -> send;
    ?SEND_ACK -> send_ack;
    _ -> unknown
  end.

压测脚本配置解析 我们需要定义出压测脚本支持哪些配置,以及如何将配置转换为Tsung运行时的行为

  1. 创建src/tsung_controller/ts_config_qproto.erl,该模块用于将压测脚本转换为压测客户端会话,供ts_qproto:parse_config/2调用
%%%-------------------------------------------------------------------
%%% @author sngyai
%%% @copyright (C) 2018, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 10. 十二月 2018 上午10:56
%%%-------------------------------------------------------------------
-module(ts_config_qproto).
-author("sngyai").

-export([parse_config/2]).

-include("ts_profile.hrl").
-include("ts_config.hrl").
-include("ts_qproto.hrl").

-include("xmerl.hrl").



parse_config(Element = #xmlElement{name = dyn_variable}, Conf = #config{}) ->
  ts_config:parse(Element, Conf);
parse_config(Element = #xmlElement{name = qproto},
    Config = #config{curid = Id, session_tab = Tab,
      sessions = [CurS | _], dynvar = DynVar,
      subst = SubstFlag, match = MatchRegExp}) ->
  Proto = ts_config:getAttr(integer, Element#xmlElement.attributes, proto_num),
  KeepAlive = ts_config:getAttr(float_or_integer, Element#xmlElement.attributes,
    keepalive, 10),
  Body = ts_config:getText(Element#xmlElement.content),
  UserName = ts_config:getAttr(string, Element#xmlElement.attributes,
    username, undefined),
  Password = ts_config:getAttr(string, Element#xmlElement.attributes,
    password, undefined),


  Timeout = ts_config:getAttr(float_or_integer, Element#xmlElement.attributes,
    timeout, 1),


  Request =
    #qproto_request{
      proto = Proto,
      keepalive = KeepAlive,
      body = Body,
      username = UserName,
      password = Password
    },

  Msg =
    #ts_request{
      ack = parse,
      endpage = true,
      dynvar_specs  = DynVar,
      subst   = SubstFlag,
      match   = MatchRegExp,
      param   = Request
    },

  ts_config:mark_prev_req(Id-1, Tab, CurS),
  case Proto of
    waitForMessages ->
      ets:insert(Tab, {{CurS#session.id, Id},
        {thinktime, Timeout * 1000}});
    _ ->
      ets:insert(Tab, {{CurS#session.id, Id}, Msg })
  end,

  ?LOGF("request tab: ~p~n", [ets:match(Tab, '$1')], ?INFO),

  lists:foldl( fun(A, B)->ts_config:parse(A, B) end,
    Config#config{dynvar = []},
    Element#xmlElement.content);

%% Parsing other elements
parse_config(Element = #xmlElement{}, Conf = #config{}) ->
  ts_config:parse(Element,Conf);
%% Parsing non #xmlElement elements
parse_config(_, Conf = #config{}) ->
  Conf.

大功告成,最后我们来编写一个压测脚本:

创建examples/qproto.xml.in,内容如下:

<?xml version="1.0"?>
<!DOCTYPE tsung SYSTEM "@prefix@/share/@PACKAGE_NAME@/@DTD@">
<tsung loglevel="debug" dumptraffic="true" version="1.0">
    <clients>
        <client host="localhost" maxusers="100000" cpu="4" weight="8"/>
    </clients>

    <servers>
        <server host="localhost" port="6060" proto="tcp" />
    </servers>

    <load>
        <arrivalphase phase="1" duration="300" unit="second">
            <users maxnumber="100000" arrivalrate="1000" unit="second"/>
        </arrivalphase>
    </load>

    <sessions>
        <session name="qproto_client" probability="100" proto="ts_qproto">

            <setdynvars sourceproto="random_string" length="1024">
                <var name="randpayload" />
            </setdynvars>

            <!--AUTH-->
            <request>
                <qproto proto_num="2" keepalive="10"/>
            </request>

            <thinktime value="100"/>

            <!--SEND-->
            <for from="1" to="100" incr="1" var="counter">
                <request subst="true">
                    <qproto proto_num="4" body="%%_randpayload%%">%%_randpayload%%</qproto>
                </request>
                <thinktime value="1"/>
            </for>


            <thinktime value="100"/>

            <request>
                <qproto proto_num="6"/>
            </request>

        </session>
    </sessions>
</tsung>

编译运行

进入Tsung源码根目录,执行

$ ./configure
$ make
$ make install

configure步骤在examples目录下创建了一个文件:qproto.xml,与qproto.xml.in不同,其中指出了系统中tsung-1.0.dtd。安装完毕之后,我们可以直接用来测试了。

$ cd examples
$ tsung -f qproto.xml start

以上就是为Tsung编写自定义TCP通信协议插件的基本步骤。

本文链接:https://www.opsdev.cn/post/tsung-plugin.html

-- EOF --

Comments

评论加载中...

注:如果长时间无法加载,请针对 disq.us | disquscdn.com | disqus.com 启用代理。