02月22, 2017

neutron中metadata相关代码分析(下)

上篇我们分析了metadata获取port的流程,主要集中在走认证这条路上,这篇补充一下metadata通过rpc来获取post的代码逻辑。

分析过程

首先先把系统默认配置的memorycache关闭。

这里还是直接改配置文件,cache_url = 空,然后看log确认 cache=false。

class MetadataProxyHandler(object):
    def __init__(self, conf):
        self.conf = conf
        self.auth_info = {}
        if self.conf.cache_url:
            self._cache = cache.get_cache(self.conf.cache_url)
        else:
            self._cache = False
        LOG.debug("wbp cache:%s", str(self._cache))
        self.plugin_rpc = MetadataPluginAPI(topics.PLUGIN)
        self.context = context.get_admin_context_without_session()
        # Use RPC by default
        self.use_rpc = True

这里cache获取不到就会走rpc,然后我们看核心代码_get_ports_from_server函数。

def _get_ports_from_server(self, router_id=None, ip_address=None,
                           networks=None):
    LOG.debug("wbp3")
    """Either get ports from server by RPC or fallback to neutron client"""
    filters = self._get_port_filters(router_id, ip_address, networks)
    LOG.debug("wbp self.context: %s, filters:%s", str(self.context), str(filters))
    if self.use_rpc:
        try:
            return self.plugin_rpc.get_ports(self.context, filters)
        except (oslo_messaging.MessagingException, AttributeError):
            # TODO(obondarev): remove fallback once RPC is proven
            # to work fine with metadata agent (K or L release at most)
            LOG.warning(_LW('Server does not support metadata RPC, '
                            'fallback to using neutron client'))
            self.use_rpc = False
    #LOG.debug("wbp ip_address: %s, networks:%s, use_rpc:%s, filters:%s", ip_address, str(networks), self.use_rpc, str(filters))
    #LOG.debug("wbp4")
    LOG.debug("wbp5 ip_address: %s, networks:%s, use_rpc:%s, filters:%s", ip_address, str(networks), self.use_rpc, str(filters))
    return self._get_ports_using_client(filters)

因为默认代码是开启rpc的,self.use_rpc=true。

这里就会进去我们的rpc核心代码

try:
    return self.plugin_rpc.get_ports(self.context, filters)
except (oslo_messaging.MessagingException, AttributeError):
    # TODO(obondarev): remove fallback once RPC is proven
    # to work fine with metadata agent (K or L release at most)
    LOG.warning(_LW('Server does not support metadata RPC, '
                    'fallback to using neutron client'))
    self.use_rpc = False

然后我们细看一下self.plugin_rpc.get_ports(self.context, filters)发生了什么

class MetadataProxyHandler(object):
    def __init__(self, conf):
        self.conf = conf
        self.auth_info = {}
        if self.conf.cache_url:
            self._cache = cache.get_cache(self.conf.cache_url)
        else:
            self._cache = False
        LOG.debug("wbp cache:%s", str(self._cache))
        self.plugin_rpc = MetadataPluginAPI(topics.PLUGIN)

所以相当于在调用 MetadataPluginAPI类的get_ports方法 , topics.PLUGIN = 'q-plugin'

self.plugin_rpc.get_ports(self.context, filters) =  MetadataPluginAPI(q-plugin).get_ports(self.context, filters)

我们看看传参,context 就是neutron.centext的ContextBase类的一个实例,filters就是根据ip和网络id组成的一个dict。 alt

现在我们看看MetadataPluginAPI类:

class MetadataPluginAPI(object):
    def __init__(self, topic):
        target = oslo_messaging.Target(
            topic=topic,
            namespace=n_const.RPC_NAMESPACE_METADATA,
            version='1.0')
        self.client = n_rpc.get_client(target)
    def get_ports(self, context, filters):
        cctxt = self.client.prepare()
        return cctxt.call(context, 'get_ports', filters=filters)

因为rpc 调用分 client端和server端。

这个类是metdata rpc的client端,提供回调返回的server端是在neutron.api.rpc.handlers.metadata_rpc.MetadataRpcCallback 现在我们的 MetadataPluginAPI(q-plugin)实例化的时候:

topic=q-plugin,
n_const.RPC_NAMESPACE_METADATA = None
version='1.0'
oslo_messaging.Target(
            topic=topic,
            namespace=n_const.RPC_NAMESPACE_METADATA,
            version='1.0')

这里是在注册消息队列,topic是q-plugin,namespace = none,version=‘1.0’,说明我们这里的rpc请求都会打到q-plugin这个queue中,这里queue中分两个角色,client和server,我们注册的是client,所以只需要提供topic,其他参数都是可选的。

如果注册server的话,就需要提供至少topic和server两个必须参数了。

在neutron/common/rpc.py中

def get_client(target, version_cap=None, serializer=None):
    assert TRANSPORT is not None
    serializer = RequestContextSerializer(serializer)
    return oslo_messaging.RPCClient(TRANSPORT,
                                    target,
                                    version_cap=version_cap,
                                    serializer=serializer
                                    )

oslo_messaging.RPCClient返回一个rpcclient,传参target ={}:

class TestClient(object):
    def __init__(self, transport):
       target = messaging.Target(topic='testtopic', version='2.0')
       self._client = messaging.RPCClient(transport, target)

    def test(self, ctxt, arg):
       cctxt = self._client.prepare(version='2.5')
       return cctxt.call(ctxt, 'test', arg=arg)

这是一个最基本的testclient写法,MetadataPluginAPI中只是把rpc需要触发的函数换成get_ports,传参就是filters,最后使用call同步去等返回值。 所以我们这里需要看一下server端怎么去响应我们的get_ports。

neutron.api.rpc.handlers.metadata_rpc.MetadataRpcCallback是我们处理metadata rpc请求的server端,回调这些请求到响应的neutron plugin。 agent和plugin通过rpc 走mq 通信,这里需要在controller上看看``metadataRpcCallback```产生的log。

在/usr/lib/python2.7/site-packages/neutron/api/rpc/handlers/metadata_rpc.py中

from oslo_log import log as logging
LOG = logging.getLogger(__name__)
class MetadataRpcCallback(object):
    target = oslo_messaging.Target(version='1.0',
   namespace=constants.RPC_NAMESPACE_METADATA)

    @property
   def plugin(self):
        LOG.debug("wbp22test not have _plugin")
        if not hasattr(self, '_plugin'):
            self._plugin = manager.NeutronManager.get_plugin()
            LOG.debug("wbp22test not have _plugin")
            LOG.debug("wbp22test get_plugin is %s"%self._plugin)
        return self._plugin

   def get_ports(self, context, filters):
        LOG.debug("wbp22 MetadataRpcCallback context: %s, filters:%s", str(context), str(filters))
        return self.plugin.get_ports(context, filters=filters)

这里我们rpc调用MetadataRpcCallback类的get_ports,因为def plugin(self)用了property装饰,所以self.plugin 直接可以get _plugin。

这里MetadataRpcCallback类在初始化的时候没有申明_plugin,所以会使用 manager.NeutronManager.get_plugin()来获取plugin, 这里获取的plugin是neutron.plugins.ml2.plugin.Ml2Plugin,最后return self.plugin.get_ports(context, filters=filters)

相当于执行了neutron.plugins.ml2.plugin.Ml2Plugin.get_ports(),所以我们继续深入ml2plugin的get_ports()方法

从此处可以定位到/usr/lib/python2.7/site-packages/neutron/plugins/ml2/plugin.py中的ml2plugin类。

ml2plugin类继承自db_base_plugin_v2.NeutronDbPluginV2和一些组合,该类的get_ports方法也是继承自db_base_plugin_v2.NeutronDbPluginV2。

class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
                dvr_mac_db.DVRDbMixin,
                external_net_db.External_net_db_mixin,
                sg_db_rpc.SecurityGroupServerRpcMixin,
                agentschedulers_db.DhcpAgentSchedulerDbMixin,
                addr_pair_db.AllowedAddressPairsMixin,
                vlantransparent_db.Vlantransparent_db_mixin,
                extradhcpopt_db.ExtraDhcpOptMixin,
                netmtu_db.Netmtu_db_mixin):

所以我们来看看NeutronDbPluginV2类中的get_ports方法,前面rpc给的传参context和filters

def get_ports(self, context, filters=None, fields=None,
              sorts=None, limit=None, marker=None,
              page_reverse=False):
    LOG.debug("wbpgetports context:%s, filters:%s", context, filters)
    marker_obj = self._get_marker_obj(context, 'port', limit, marker)
    query = self._get_ports_query(context, filters=filters,
                                  sorts=sorts, limit=limit,
                                  marker_obj=marker_obj,
                                  page_reverse=page_reverse)
    items = [self._make_port_dict(c, fields) for c in query]
    if limit and page_reverse:
        items.reverse()
    LOG.debug("wbpgetports items:%s ", str(items))
    return items

该方法的核心就是去db中查找port,过滤条件就是我们的传参filters,最后过滤出来符合的port。返回给rpc client。

整体rpc 获取port过程就ok了。

alt

具体是在那张表中读取,怎么读取的,我们可以再细看一下实现方式

marker_obj = self._get_marker_obj(context, 'port', limit, marker)

该方法继承自CommonDbMixin类:/usr/lib/python2.7/site-packages/neutron//db/CommonDbMixin.py

def _get_marker_obj(self, context, resource, limit, marker):
        if limit and marker:
            return getattr(self, '_get_%s' % resource)(context, marker)
        return None

我们的get_ports中默认传参limit=None, marker=None,所以_get_marker_obj返回一个none

然后传参context 和filter,其余参数均为none或false ,调用_get_ports_query函数去db中过滤。

query = self._get_ports_query(context, filters=filters,
           sorts=sorts, limit=limit,
           marker_obj=marker_obj,
           page_reverse=page_reverse)

该函数封装了非常多的操作,其实执行起来的效果就是通过db中的ports 外联 ml2_port_bindings 和ipallocations表来通过ip地址和网络di来查出该port对应的instance id和租户id,具体生成的select *** from ** where **如下图: alt

class Port(model_base.BASEV2, HasId, HasTenant):
"""Represents a port on a Neutron v2 network."""
    name = sa.Column(sa.String(attr.NAME_MAX_LEN))
    network_id = sa.Column(sa.String(36), sa.ForeignKey("networks.id"),
    nullable=False)
    fixed_ips = orm.relationship(IPAllocation, backref='ports', lazy='joined')
    mac_address = sa.Column(sa.String(32), nullable=False)
    admin_state_up = sa.Column(sa.Boolean(), nullable=False)
    status = sa.Column(sa.String(16), nullable=False)
    device_id = sa.Column(sa.String(attr.DEVICE_ID_MAX_LEN), nullable=False)
    device_owner = sa.Column(sa.String(attr.DEVICE_OWNER_MAX_LEN),
    nullable=False)

这是port 在models_v2中对应的 类,其中字段device id,如果该port是bound到vm上,device id就是对应的 instanceid。

整体的表查询就是先从IPAllocation中通过ip地址和network id查出来该ip对应的port id,然后查询ports表,通过id查出该port id对应的tenant_id 和 device_id,这里的device_id前面说了就是instance id,最后将instance id和tanant id一起作为返回值返回给rpc client。

总结

metadata 通过rpc获取port,核心就是在network节点的metadata rpc client端发起rpc call请求,走mq,然后controller端的metadata rpc server端的get_ports函数,去db中查找相应的port信息,作为返回值。

本文链接:https://www.opsdev.cn/post/metadata-code-analyze-two.html

-- EOF --

Comments

评论加载中...

注:如果长时间无法加载,请针对 disq.us | disquscdn.com | disqus.com 启用代理。