openstack之filter-scheduler

int32位 posted @ Oct 10, 2014 03:01:20 PM in openstack , 4455 阅读
转载请注明:http://krystism.is-programmer.com/若有错误,请多多指正,谢谢!

上篇文章讲了nova-scheduler:openstack之nova-scheduler 。为了简单,只叙述了随机调度算法,而没有细讲filter调度算法。filter调度算法原理并不难,先层层过滤掉一些不满足条件的宿主机,然后对剩余的宿主机进行weight评分排序,多个weight得到的分数进行累加,分数较高的(注意不一定是最高,原因下面讲)作为侯选宿主机。具体算法描述可以查看官方文档:http://docs.openstack.org/trunk/config-reference/content/section_compute-scheduler.html 下面从源码逐步分析算法的运行过程。

首先看看schedule_run_instance方法:

 def schedule_run_instance(self, context, request_spec,
                              admin_password, injected_files,
                              requested_networks, is_first_time,
                              filter_properties, legacy_bdm_in_spec):
        """This method is called from nova.compute.api to provision
        an instance.  We first create a build plan (a list of WeightedHosts)
        and then provision.

        Returns a list of the instances created.
        """
        payload = dict(request_spec=request_spec)
        self.notifier.info(context, 'scheduler.run_instance.start', payload)

        instance_uuids = request_spec.get('instance_uuids') # 获取uuids,可有多个
        LOG.info(_("Attempting to build %(num_instances)d instance(s) "
                    "uuids: %(instance_uuids)s"),
                  {'num_instances': len(instance_uuids),
                   'instance_uuids': instance_uuids})
        LOG.debug(_("Request Spec: %s") % request_spec)

	# 返回主机列表
        weighed_hosts = self._schedule(context, request_spec,
                                       filter_properties, instance_uuids)

        # NOTE: Pop instance_uuids as individual creates do not need the
        # set of uuids. Do not pop before here as the upper exception
        # handler fo NoValidHost needs the uuid to set error state
        instance_uuids = request_spec.pop('instance_uuids') # 弹出uuids,不再需要

        # NOTE(comstud): Make sure we do not pass this through.  It
        # contains an instance of RpcContext that cannot be serialized.
        filter_properties.pop('context', None)

        for num, instance_uuid in enumerate(instance_uuids):
            request_spec['instance_properties']['launch_index'] = num

            try:
                try:
                    weighed_host = weighed_hosts.pop(0) # 弹出第一个主机
                    LOG.info(_("Choosing host %(weighed_host)s "
                                "for instance %(instance_uuid)s"),
                              {'weighed_host': weighed_host,
                               'instance_uuid': instance_uuid})
                except IndexError:
                    raise exception.NoValidHost(reason="")

                self._provision_resource(context, weighed_host,
                                         request_spec,
                                         filter_properties,
                                         requested_networks,
                                         injected_files, admin_password,
                                         is_first_time,
                                         instance_uuid=instance_uuid,
                                         legacy_bdm_in_spec=legacy_bdm_in_spec)
            except Exception as ex:
                # NOTE(vish): we don't reraise the exception here to make sure
                #             that all instances in the request get set to
                #             error properly
                driver.handle_schedule_error(context, ex, instance_uuid,
                                             request_spec)
            # scrub retry host list in case we're scheduling multiple
            # instances:
            retry = filter_properties.get('retry', {})
            retry['hosts'] = []

        self.notifier.info(context, 'scheduler.run_instance.end', payload)

该方法在进行一些参数处理后,首先调用_schedule方法,该方法返回宿主机列表,然后对每个待启动云主机调用_provision_resource方法,并把对应的目标宿主机传入该方法。_provision_resource方法的任务是更新数据库和调用nova-compute的rpcapi指定目标宿主机启动云主机。核心方法是_schedule方法,

def _schedule(self, context, request_spec, filter_properties,
                  instance_uuids=None):
        """Returns a list of hosts that meet the required specs,
        ordered by their fitness.
        """
        elevated = context.elevated()
        instance_properties = request_spec['instance_properties']
        instance_type = request_spec.get("instance_type", None) # get flavor

        # Get the group
        update_group_hosts = False
        scheduler_hints = filter_properties.get('scheduler_hints') or {}
        group = scheduler_hints.get('group', None)
	# --hint group SERVER_GROUP, 如果有group,则更新到数据库中
        if group:
            group_hosts = self.group_hosts(elevated, group)
            update_group_hosts = True
            if 'group_hosts' not in filter_properties:
                filter_properties.update({'group_hosts': []})
            configured_hosts = filter_properties['group_hosts']
            filter_properties['group_hosts'] = configured_hosts + group_hosts

        config_options = self._get_configuration_options()

        # check retry policy.  Rather ugly use of instance_uuids[0]...
        # but if we've exceeded max retries... then we really only
        # have a single instance.
        properties = instance_properties.copy()
        if instance_uuids:
            properties['uuid'] = instance_uuids[0]
	self._populate_retry(filter_properties, properties) # 如果超出最多尝试次数,抛出NoValidHost异常

        filter_properties.update({'context': context,
                                  'request_spec': request_spec,
                                  'config_options': config_options,
                                  'instance_type': instance_type})

        self.populate_filter_properties(request_spec, # 把一些数据填入filter_properties中,比如project_id, os_type等
                                        filter_properties)

        # Find our local list of acceptable hosts by repeatedly
        # filtering and weighing our options. Each time we choose a
        # host, we virtually consume resources on it so subsequent
        # selections can adjust accordingly.

        # Note: remember, we are using an iterator here. So only
        # traverse this list once. This can bite you if the hosts
        # are being scanned in a filter or weighing function.
        hosts = self.host_manager.get_all_host_states(elevated) # 获取所有主机列表,host_manager从父类init方法获取,根据CONF获取,默认为nova.scheduler.host_manager.HostManager,直接读取数据库

        selected_hosts = []
        if instance_uuids:
            num_instances = len(instance_uuids)
        else:
            num_instances = request_spec.get('num_instances', 1)
	# 注意range和xrange区别,range返回一个list,而xrange返回一个生成器
        for num in xrange(num_instances):
            # Filter local hosts based on requirements ...
            hosts = self.host_manager.get_filtered_hosts(hosts,
                    filter_properties, index=num)
            if not hosts:
                # Can't get any more locally.
                break

            LOG.debug(_("Filtered %(hosts)s"), {'hosts': hosts})

            weighed_hosts = self.host_manager.get_weighed_hosts(hosts, # 获取weight值,并按大到小排序
                    filter_properties)

            LOG.debug(_("Weighed %(hosts)s"), {'hosts': weighed_hosts})

            scheduler_host_subset_size = CONF.scheduler_host_subset_size # 截取集合到指定大小。
            if scheduler_host_subset_size > len(weighed_hosts):
                scheduler_host_subset_size = len(weighed_hosts)
            if scheduler_host_subset_size < 1:
                scheduler_host_subset_size = 1

            chosen_host = random.choice(
                weighed_hosts[0:scheduler_host_subset_size]) # 从截取的集合中随机选择一个作为目标宿主机,而不是一定是最大的。
            selected_hosts.append(chosen_host)

            # Now consume the resources so the filter/weights
            # will change for the next instance.
            chosen_host.obj.consume_from_instance(instance_properties) # 更新值,为下一个主机调度做准备
            if update_group_hosts is True:
                filter_properties['group_hosts'].append(chosen_host.obj.host)
        return selected_hosts

该方法的两个核心方法是host_manager.get_filtered_hosts和host_manager.get_weighed_hosts方法,分别对应算法的过滤和计算权值两个过程。注意在计算权值后返回的是一个排好序的主机列表,但并不是选择其中一个最大值的作为目标宿主机,而是通过配置指定从topN中随机选择一个,比如设置scheduler_host_subset_size为5,过滤后返回的主机个数为10,则从top5中随机返回其中一个,这就是前面讲的为什么不是分值最高,而是较高。host_manager为可配置的,默nova.scheduler.host_manager.HostManager,

HostManagerd get_filtered_hosts主要调用两个方法:_choose_host_filters和filter_handler.get_filtered_objects,前者通过过滤器类名返回对应的类列表(相当于java中根据类名,比如"Apple",找到对应的类,比如a.b.Apple.class,或者getClass("Apple"),过滤器类名通过nova.conf的scheduler_default_filters配置,默认为RetryFilter','AvailabilityZoneFilter','RamFilter','ComputeFilter','ComputeCapabilitiesFilter','ImagePropertiesFilter'。然后把类列表传递给filter_handler.get_filtered_objects方法,filte_handle是filters.HostFilterHandler,而HostFilterHandler继承自nova.filters.BaseFilterHandler,其实现为:

class BaseFilterHandler(loadables.BaseLoader):
    """Base class to handle loading filter classes.

    This class should be subclassed where one needs to use filters.
    """

    def get_filtered_objects(self, filter_classes, objs,
            filter_properties, index=0):
        list_objs = list(objs)
        LOG.debug(_("Starting with %d host(s)"), len(list_objs))
        for filter_cls in filter_classes:
            cls_name = filter_cls.__name__
            filter = filter_cls()

            if filter.run_filter_for_index(index):
                objs = filter.filter_all(list_objs,
                                               filter_properties)
                if objs is None:
                    LOG.debug(_("Filter %(cls_name)s says to stop filtering"),
                          {'cls_name': cls_name})
                    return
                list_objs = list(objs)
                LOG.debug(_("Filter %(cls_name)s returned "
                            "%(obj_len)d host(s)"),
                          {'cls_name': cls_name, 'obj_len': len(list_objs)})
                if len(list_objs) == 0:
                    break
        return list_objs

可见他会遍历所有的过滤类,实例化,并且调用它的filter_all方法,最后返回所有未被拦截的对象。下面我们看看过滤类:

我们上编文章说了,过滤器是可插除的,如果我们要自定义自己的过滤器只需要继承BaseHostFilter(在nova.schedule.filters.__init__.py中定义)并实现host_passes方法即可,如下代码:

class BaseHostFilter(filters.BaseFilter):
    """Base class for host filters."""
    def _filter_one(self, obj, filter_properties):
        """Return True if the object passes the filter, otherwise False."""
        return self.host_passes(obj, filter_properties)

    def host_passes(self, host_state, filter_properties):
        """Return True if the HostState passes the filter, otherwise False.
        Override this in a subclass.
        """
        raise NotImplementedError()

可见BaseHostFilter继承filters.BaseFilter,代码:

class BaseFilter(object):
    """Base class for all filter classes."""
    def _filter_one(self, obj, filter_properties):
        """Return True if it passes the filter, False otherwise.
        Override this in a subclass.
        """
        return True

    def filter_all(self, filter_obj_list, filter_properties):
        """Yield objects that pass the filter.

        Can be overriden in a subclass, if you need to base filtering
        decisions on all objects.  Otherwise, one can just override
        _filter_one() to filter a single object.
        """
        for obj in filter_obj_list:
            if self._filter_one(obj, filter_properties):
                yield obj

    # Set to true in a subclass if a filter only needs to be run once
    # for each request rather than for each instance
    run_filter_once_per_request = False

    def run_filter_for_index(self, index):
        """Return True if the filter needs to be run for the "index-th"
        instance in a request.  Only need to override this if a filter
        needs anything other than "first only" or "all" behaviour.
        """
        if self.run_filter_once_per_request and index > 0:
            return False
        else:
            return True

我们只需要关注两个方法_filter_one和filter_all,_filter_one传入过滤对象和过滤参数,返回bool类型,通过返回True,拦截返回False,而filer_all是传入一个对象集合,通过调用_filter_one产生一个通过过滤器的元素生成器。因此我们只需要重写_filter_one即可,而BaseHostFilter的_filter_one调用host_passes,因此只需要重写host_passes方法。

filterHandle正是调用了filter类的filter_all方法。

filter过程到此结束,下面看看weight过程,回到_schedule方法,调用了host_manager.get_weighed_hosts,而host_manager调用了weight_handler.get_weighed_objects方法,weight_handle是HostWeightHandler实例,该类继承自nova.weights.BaseWeightHandler,其实现为:

class BaseWeightHandler(loadables.BaseLoader):
    object_class = WeighedObject

    def get_weighed_objects(self, weigher_classes, obj_list,
            weighing_properties):
        """Return a sorted (highest score first) list of WeighedObjects."""

        if not obj_list:
            return []

        weighed_objs = [self.object_class(obj, 0.0) for obj in obj_list]
        for weigher_cls in weigher_classes:
            weigher = weigher_cls()
            weigher.weigh_objects(weighed_objs, weighing_properties)

        return sorted(weighed_objs, key=lambda x: x.weight, reverse=True)

和过滤过程类似,也是遍历所有的weighed类,调用它的weigh_objects方法,得到一个weight值,再和之前的值累加。weight_objects方法会调用_weight_object和_weight_multiplier方法,前者对应分值,后者对应权值,二者的乘积就是最后的分值。因此weighed类必须实现_weigh_objects和_weight_multiplier方法,最后再通过weight值排序返回。如果要自定义weight类,只需继承BaseHostWeigher,重写 _weigh_object和_weight_multiplier方法,得到的值就是他们的乘积。

 

转载请注明:http://krystism.is-programmer.com/若有错误,请多多指正,谢谢!
  • 无匹配
  • 无匹配
lion 说:
2015年3月12日 13:55

楼主你好,我想直接调用 filter_scheduler.FilterScheduler._schedule() 来获取selected_hosts列表
应该怎么做?不清楚Class FilterScheduler 初始化需要传什么参数,能写个例子吗


登录 *


loading captcha image...
(输入验证码)
or Ctrl+Enter