使用c语言写一个函数实现两个数交换,很简单写下以下代码:
void swap(int *a, int *b)
{
*a ^= *b;
*b ^= *a;
*a ^= *b;
}
只有三行代码,且没有引入中间变量,使用位运算,效率高!
但首先必须保证a, b 不是空指针,否则就会出现段错误。
于是代码进一步改成:
void swap(int *a, int *b)
{
if (a == NULL || b == NULL)
return;
*a ^= *b;
*b ^= *a;
*a ^= *b;
}
似乎这样就完美了?
咋一看,真没有什么问题了,不信你可以测试,无论正数,负数,还是0,基本都不会出错了。。
那么请看以下代码:
static int count = 0;
void permutation(int *a, int from, int to)
{
if (from == to) {
cout << ++count << ":";
for (int i = 0; i <= to; ++i)
cout << a[i] << " ";
cout << endl;
return;
}
for (int i = from; i <= to; ++i) {
swap(&a[from], &a[i]);
permutation(a, from + 1, to);
swap(&a[from], &a[i]);
}
}
以上代码是求一个数组全排列的递归方法,算法应该没有错?
可是输出大量0!!为什么呢???
答案在下面:
原因在于当swap(int *a, int *b)传的是同一个地址时,就会等于0
即若 a == b
*a ^= *b,此时 *a == 0, *b == 0
最后必然是 *a == 0, *b == 0,
for (int i = from; i <= to; ++i) {
swap(&a[from], &a[i]);
permutation(a, from + 1, to);
swap(&a[from], &a[i]);
}
上面代码没有考虑当from == i的情况,即传递的是同一个地址。
所以正确的swap函数,应该是:
void swap(int* a, int* b)
{
if (a == NULL || b == NULL || a == b)
return;
*a ^= *b;
*b ^= *a;
*a ^= *b;
}
什么是随机数?通俗说法就是随机产生的一个数,这个数预先不能计算出来的,并且所有可能出现的数字,概率应该是均匀的。因此随机数应该满足至少以下两点:
如何产生随机数是一个具有挑战的问题,一般使用随机硬件产生,比如骰子、电子元件噪声、核裂变等。
在计算机编程中,我们经常调用随机数产生器函数,但我们必须清楚的一点是,一般直接调用软件的随机数产生器函数,产生的数字并不是严格的随机数,而是通过一定的算法计算出来的(不满足随机数的不可计算性),我们称它为伪随机数!由于它具有类似随机的统计特征,在不是很严格的情况,使用软件方式产生伪随机相比硬件实现方式,成本更低并且操作简单、效率也更高!
那一般伪随机数如何产生呢? 一般是通过一个随机种子(比如当前系统时间值),通某个算法(一般是位运算),不断迭代产生下一个数。比如c语言中的stdlib中rand_r函数(用的glibc):
/* This algorithm is mentioned in the ISO C standard, here extended
for 32 bits. */
int
rand_r (unsigned int *seed)
{
unsigned int next = *seed;
int result;
next *= 1103515245;
next += 12345;
result = (unsigned int) (next / 65536) % 2048;
next *= 1103515245;
next += 12345;
result <<= 10;
result ^= (unsigned int) (next / 65536) % 1024;
next *= 1103515245;
next += 12345;
result <<= 10;
result ^= (unsigned int) (next / 65536) % 1024;
*seed = next;
return result;
}
而java中的Random类产生方法next()为:
protected int next(int bits) {
long oldseed, nextseed;
AtomicLong seed = this.seed;
do {
oldseed = seed.get();
nextseed = (oldseed * multiplier + addend) & mask;
} while (!seed.compareAndSet(oldseed, nextseed));
return (int)(nextseed >>> (48 - bits));
}
java中还有一个更精确的伪随机产生器java.security.SecurityRandom, 它继承自Random类,可以指定算法名称,next方法为:
final protected int next(int numBits) {
int numBytes = (numBits+7)/8;
byte b[] = new byte[numBytes];
int next = 0;
nextBytes(b);
for (int i = 0; i < numBytes; i++) {
next = (next << 8) + (b[i] & 0xFF);
}
return next >>> (numBytes*8 - numBits);
}
当然这个类不仅仅是重写了next方法,在种子设置等都进行了重写。
最近有一道题:已知一个rand7函数,能够产生1~7的随机数,求一个函数,使其能够产生1~10的随机数。
显然调用一次不可能满足,必须多次调用!利用乘法原理,调用rand7() * rand7()可以产生1~49的随机数,我们可以把结果模10(即取个位数)得到0~9的数,再加1,即产生1~10的数。但我们还需要保证概率的机会均等性。显然1~49中,共有49个数,个位为0出现的次数要少1,不满足概率均等,如果直接这样计算,2~10出现的概率要比1出现的概率大!我们可以丢掉一些数字,比如不要大于40的数字,出现大于40,就重新产生。
int rand10() {
int ans;
do {
int i = rand7();
int j = rand7();
ans = i * j;
} while(ans > 40);
return ans % 10 + 1;
}
随机数的用途就不用多说了,比如取样,产生随机密码等。下面则着重说说其中一个应用--洗牌算法。
我们可能接触比较多的一种情况是需要把一个无序的列表排序成一个有序列表。洗牌算法(shuffle)则是一个相反的过程,即把一个有序的列表(当然无序也无所谓)变成一个无序的列表。
这个新列表必须是随机的,即原来的某个数在新列表的位置具有随机性!
我们假设有1~100共100个无重复数字。
很容易想到一种方案是:
从第一张牌开始,利用随机函数生成器产生1~100的随机数,比如产生88,则看第88个位置有没有占用,如果没有占用则把当前牌放到第88位置,如果已经占用,则重新产生随机数,直到找到有空位置!
首先必须承认这个方法是可以实现洗牌算法的。关键在于效率,首先空间复杂度是O(n),时间复杂度也是O(n),关键是越到后面越难找到空位置,大量时间浪费在求随机数和找空位置的。
第二中方案:
从第一张牌开始,设当前位置牌为第i张,利用随机函数生成器产生1~100的随机数,比如产生88,则交换第i张牌和第88张牌。
这样满足了空间是O(1)的原地操作,时间复杂度是O(n)。但是否能够保证每个牌的位置具有机会均等性呢?
首先一个常识是:n张牌,利用随机数产生N种情况,则必须满足N能够整除n,这样就能给予每个牌以N/n的机会(或者说权值),如果N不能整除n,必然机会不均等,即有些牌分配的机会多,有些少。
我们知道100的全排列有100的阶乘种情况,而调用100次随机函数,共可以产生100^100种情况,而n^n 必然不能整除n!,具体证明不在这里叙述。
那我们可以利用第二种方法改进,每次不是产生1~100的随机数,而是1~i的数字,则共有n!中情况,即N=n!,显然满足条件,且时间为O(n),空间为O(1).这也就是Fisher-Yates_shuffle算法,大多数库都使用的这种方法。
我们看看java中Collections实现:
public static void shuffle(List<?> list, Random rnd) {
int size = list.size();
if (size < SHUFFLE_THRESHOLD || list instanceof RandomAccess) {
for (int i=size; i>1; i--)
swap(list, i-1, rnd.nextInt(i));
} else {
Object arr[] = list.toArray();
// Shuffle array
for (int i=size; i>1; i--)
swap(arr, i-1, rnd.nextInt(i));
// Dump array back into list
// instead of using a raw type here, it's possible to capture
// the wildcard but it will require a call to a supplementary
// private method
ListIterator it = list.listIterator();
for (int i=0; i<arr.length; i++) {
it.next();
it.set(arr[i]);
}
}
}
除了首先判断能否随机访问,剩下的就是以上算法的实现了。
STL中实现:
// random_shuffle
template <class _RandomAccessIter>
inline void random_shuffle(_RandomAccessIter __first,
_RandomAccessIter __last) {
__STL_REQUIRES(_RandomAccessIter, _Mutable_RandomAccessIterator);
if (__first == __last) return;
for (_RandomAccessIter __i = __first + 1; __i != __last; ++__i)
iter_swap(__i, __first + __random_number((__i - __first) + 1));
}
template <class _RandomAccessIter, class _RandomNumberGenerator>
void random_shuffle(_RandomAccessIter __first, _RandomAccessIter __last,
_RandomNumberGenerator& __rand) {
__STL_REQUIRES(_RandomAccessIter, _Mutable_RandomAccessIterator);
if (__first == __last) return;
for (_RandomAccessIter __i = __first + 1; __i != __last; ++__i)
iter_swap(__i, __first + __rand((__i - __first) + 1));
}
如何测试洗牌算法具有随机性呢?其实很简单,调用洗牌算法N次,牌数为n,统计每个数字出现在某个位置的出现次数,构成一个矩阵n * n,如果这个矩阵的值都在N/n左右,则洗牌算法好。比如有100个数字,统计一万次,则每个数字在某个位置的出现次数应该在100左右。
洗牌算法的应用也很广,比如三国杀游戏、斗地主游戏等等。讲一个最常见的场景,就是播放器的随机播放。有些播放器的随机播放,是每次产生一个随机数来选择播放的歌曲,这样就有可能还没有听完所有的歌前,又听到已经听过的歌。另一种就是利用洗牌算法,把待播放的歌曲列表shuffle。如何判断使用的是哪一种方案呢? 很简单,如果点上一首还能回去,则利用的是洗牌算法,如果点上一首又是另外一首歌,则说明使用的是随机产生方法。比如上一首是3,现在是18,点上一首,如果是3说明采用的洗牌算法,如果不是3,则说明不是洗牌算法(存在误判,多试几次就可以了)。
顺便提一下网上的一些抽奖活动,尤其是转盘,是不是真正的随机?答案留给看客!
上篇文章讲了nova-scheduler:openstack之nova-scheduler 。为了简单,只叙述了随机调度算法,而没有细讲filter调度算法。filter调度算法原理并不难,先层层过滤掉一些不满足条件的宿主机,然后对剩余的宿主机进行weight评分排序,多个weight得到的分数进行累加,分数较高的(注意不一定是最高,原因下面讲)作为侯选宿主机。具体算法描述可以查看官方文档:http://docs.openstack.org/trunk/config-reference/content/section_compute-scheduler.html 下面从源码逐步分析算法的运行过程。
首先看看schedule_run_instance方法:
def schedule_run_instance(self, context, request_spec,
admin_password, injected_files,
requested_networks, is_first_time,
filter_properties, legacy_bdm_in_spec):
"""This method is called from nova.compute.api to provision
an instance. We first create a build plan (a list of WeightedHosts)
and then provision.
Returns a list of the instances created.
"""
payload = dict(request_spec=request_spec)
self.notifier.info(context, 'scheduler.run_instance.start', payload)
instance_uuids = request_spec.get('instance_uuids') # 获取uuids,可有多个
LOG.info(_("Attempting to build %(num_instances)d instance(s) "
"uuids: %(instance_uuids)s"),
{'num_instances': len(instance_uuids),
'instance_uuids': instance_uuids})
LOG.debug(_("Request Spec: %s") % request_spec)
# 返回主机列表
weighed_hosts = self._schedule(context, request_spec,
filter_properties, instance_uuids)
# NOTE: Pop instance_uuids as individual creates do not need the
# set of uuids. Do not pop before here as the upper exception
# handler fo NoValidHost needs the uuid to set error state
instance_uuids = request_spec.pop('instance_uuids') # 弹出uuids,不再需要
# NOTE(comstud): Make sure we do not pass this through. It
# contains an instance of RpcContext that cannot be serialized.
filter_properties.pop('context', None)
for num, instance_uuid in enumerate(instance_uuids):
request_spec['instance_properties']['launch_index'] = num
try:
try:
weighed_host = weighed_hosts.pop(0) # 弹出第一个主机
LOG.info(_("Choosing host %(weighed_host)s "
"for instance %(instance_uuid)s"),
{'weighed_host': weighed_host,
'instance_uuid': instance_uuid})
except IndexError:
raise exception.NoValidHost(reason="")
self._provision_resource(context, weighed_host,
request_spec,
filter_properties,
requested_networks,
injected_files, admin_password,
is_first_time,
instance_uuid=instance_uuid,
legacy_bdm_in_spec=legacy_bdm_in_spec)
except Exception as ex:
# NOTE(vish): we don't reraise the exception here to make sure
# that all instances in the request get set to
# error properly
driver.handle_schedule_error(context, ex, instance_uuid,
request_spec)
# scrub retry host list in case we're scheduling multiple
# instances:
retry = filter_properties.get('retry', {})
retry['hosts'] = []
self.notifier.info(context, 'scheduler.run_instance.end', payload)
该方法在进行一些参数处理后,首先调用_schedule方法,该方法返回宿主机列表,然后对每个待启动云主机调用_provision_resource方法,并把对应的目标宿主机传入该方法。_provision_resource方法的任务是更新数据库和调用nova-compute的rpcapi指定目标宿主机启动云主机。核心方法是_schedule方法,
def _schedule(self, context, request_spec, filter_properties,
instance_uuids=None):
"""Returns a list of hosts that meet the required specs,
ordered by their fitness.
"""
elevated = context.elevated()
instance_properties = request_spec['instance_properties']
instance_type = request_spec.get("instance_type", None) # get flavor
# Get the group
update_group_hosts = False
scheduler_hints = filter_properties.get('scheduler_hints') or {}
group = scheduler_hints.get('group', None)
# --hint group SERVER_GROUP, 如果有group,则更新到数据库中
if group:
group_hosts = self.group_hosts(elevated, group)
update_group_hosts = True
if 'group_hosts' not in filter_properties:
filter_properties.update({'group_hosts': []})
configured_hosts = filter_properties['group_hosts']
filter_properties['group_hosts'] = configured_hosts + group_hosts
config_options = self._get_configuration_options()
# check retry policy. Rather ugly use of instance_uuids[0]...
# but if we've exceeded max retries... then we really only
# have a single instance.
properties = instance_properties.copy()
if instance_uuids:
properties['uuid'] = instance_uuids[0]
self._populate_retry(filter_properties, properties) # 如果超出最多尝试次数,抛出NoValidHost异常
filter_properties.update({'context': context,
'request_spec': request_spec,
'config_options': config_options,
'instance_type': instance_type})
self.populate_filter_properties(request_spec, # 把一些数据填入filter_properties中,比如project_id, os_type等
filter_properties)
# Find our local list of acceptable hosts by repeatedly
# filtering and weighing our options. Each time we choose a
# host, we virtually consume resources on it so subsequent
# selections can adjust accordingly.
# Note: remember, we are using an iterator here. So only
# traverse this list once. This can bite you if the hosts
# are being scanned in a filter or weighing function.
hosts = self.host_manager.get_all_host_states(elevated) # 获取所有主机列表,host_manager从父类init方法获取,根据CONF获取,默认为nova.scheduler.host_manager.HostManager,直接读取数据库
selected_hosts = []
if instance_uuids:
num_instances = len(instance_uuids)
else:
num_instances = request_spec.get('num_instances', 1)
# 注意range和xrange区别,range返回一个list,而xrange返回一个生成器
for num in xrange(num_instances):
# Filter local hosts based on requirements ...
hosts = self.host_manager.get_filtered_hosts(hosts,
filter_properties, index=num)
if not hosts:
# Can't get any more locally.
break
LOG.debug(_("Filtered %(hosts)s"), {'hosts': hosts})
weighed_hosts = self.host_manager.get_weighed_hosts(hosts, # 获取weight值,并按大到小排序
filter_properties)
LOG.debug(_("Weighed %(hosts)s"), {'hosts': weighed_hosts})
scheduler_host_subset_size = CONF.scheduler_host_subset_size # 截取集合到指定大小。
if scheduler_host_subset_size > len(weighed_hosts):
scheduler_host_subset_size = len(weighed_hosts)
if scheduler_host_subset_size < 1:
scheduler_host_subset_size = 1
chosen_host = random.choice(
weighed_hosts[0:scheduler_host_subset_size]) # 从截取的集合中随机选择一个作为目标宿主机,而不是一定是最大的。
selected_hosts.append(chosen_host)
# Now consume the resources so the filter/weights
# will change for the next instance.
chosen_host.obj.consume_from_instance(instance_properties) # 更新值,为下一个主机调度做准备
if update_group_hosts is True:
filter_properties['group_hosts'].append(chosen_host.obj.host)
return selected_hosts
该方法的两个核心方法是host_manager.get_filtered_hosts和host_manager.get_weighed_hosts方法,分别对应算法的过滤和计算权值两个过程。注意在计算权值后返回的是一个排好序的主机列表,但并不是选择其中一个最大值的作为目标宿主机,而是通过配置指定从topN中随机选择一个,比如设置scheduler_host_subset_size为5,过滤后返回的主机个数为10,则从top5中随机返回其中一个,这就是前面讲的为什么不是分值最高,而是较高。host_manager为可配置的,默nova.scheduler.host_manager.HostManager,
HostManagerd get_filtered_hosts主要调用两个方法:_choose_host_filters和filter_handler.get_filtered_objects,前者通过过滤器类名返回对应的类列表(相当于java中根据类名,比如"Apple",找到对应的类,比如a.b.Apple.class,或者getClass("Apple"),过滤器类名通过nova.conf的scheduler_default_filters配置,默认为RetryFilter','AvailabilityZoneFilter','RamFilter','ComputeFilter','ComputeCapabilitiesFilter','ImagePropertiesFilter'。然后把类列表传递给filter_handler.get_filtered_objects方法,filte_handle是filters.HostFilterHandler,而HostFilterHandler继承自nova.filters.BaseFilterHandler,其实现为:
class BaseFilterHandler(loadables.BaseLoader):
"""Base class to handle loading filter classes.
This class should be subclassed where one needs to use filters.
"""
def get_filtered_objects(self, filter_classes, objs,
filter_properties, index=0):
list_objs = list(objs)
LOG.debug(_("Starting with %d host(s)"), len(list_objs))
for filter_cls in filter_classes:
cls_name = filter_cls.__name__
filter = filter_cls()
if filter.run_filter_for_index(index):
objs = filter.filter_all(list_objs,
filter_properties)
if objs is None:
LOG.debug(_("Filter %(cls_name)s says to stop filtering"),
{'cls_name': cls_name})
return
list_objs = list(objs)
LOG.debug(_("Filter %(cls_name)s returned "
"%(obj_len)d host(s)"),
{'cls_name': cls_name, 'obj_len': len(list_objs)})
if len(list_objs) == 0:
break
return list_objs
可见他会遍历所有的过滤类,实例化,并且调用它的filter_all方法,最后返回所有未被拦截的对象。下面我们看看过滤类:
我们上编文章说了,过滤器是可插除的,如果我们要自定义自己的过滤器只需要继承BaseHostFilter(在nova.schedule.filters.__init__.py中定义)并实现host_passes方法即可,如下代码:
class BaseHostFilter(filters.BaseFilter):
"""Base class for host filters."""
def _filter_one(self, obj, filter_properties):
"""Return True if the object passes the filter, otherwise False."""
return self.host_passes(obj, filter_properties)
def host_passes(self, host_state, filter_properties):
"""Return True if the HostState passes the filter, otherwise False.
Override this in a subclass.
"""
raise NotImplementedError()
可见BaseHostFilter继承filters.BaseFilter,代码:
class BaseFilter(object):
"""Base class for all filter classes."""
def _filter_one(self, obj, filter_properties):
"""Return True if it passes the filter, False otherwise.
Override this in a subclass.
"""
return True
def filter_all(self, filter_obj_list, filter_properties):
"""Yield objects that pass the filter.
Can be overriden in a subclass, if you need to base filtering
decisions on all objects. Otherwise, one can just override
_filter_one() to filter a single object.
"""
for obj in filter_obj_list:
if self._filter_one(obj, filter_properties):
yield obj
# Set to true in a subclass if a filter only needs to be run once
# for each request rather than for each instance
run_filter_once_per_request = False
def run_filter_for_index(self, index):
"""Return True if the filter needs to be run for the "index-th"
instance in a request. Only need to override this if a filter
needs anything other than "first only" or "all" behaviour.
"""
if self.run_filter_once_per_request and index > 0:
return False
else:
return True
我们只需要关注两个方法_filter_one和filter_all,_filter_one传入过滤对象和过滤参数,返回bool类型,通过返回True,拦截返回False,而filer_all是传入一个对象集合,通过调用_filter_one产生一个通过过滤器的元素生成器。因此我们只需要重写_filter_one即可,而BaseHostFilter的_filter_one调用host_passes,因此只需要重写host_passes方法。
filterHandle正是调用了filter类的filter_all方法。
filter过程到此结束,下面看看weight过程,回到_schedule方法,调用了host_manager.get_weighed_hosts,而host_manager调用了weight_handler.get_weighed_objects方法,weight_handle是HostWeightHandler实例,该类继承自nova.weights.BaseWeightHandler,其实现为:
class BaseWeightHandler(loadables.BaseLoader):
object_class = WeighedObject
def get_weighed_objects(self, weigher_classes, obj_list,
weighing_properties):
"""Return a sorted (highest score first) list of WeighedObjects."""
if not obj_list:
return []
weighed_objs = [self.object_class(obj, 0.0) for obj in obj_list]
for weigher_cls in weigher_classes:
weigher = weigher_cls()
weigher.weigh_objects(weighed_objs, weighing_properties)
return sorted(weighed_objs, key=lambda x: x.weight, reverse=True)
和过滤过程类似,也是遍历所有的weighed类,调用它的weigh_objects方法,得到一个weight值,再和之前的值累加。weight_objects方法会调用_weight_object和_weight_multiplier方法,前者对应分值,后者对应权值,二者的乘积就是最后的分值。因此weighed类必须实现_weigh_objects和_weight_multiplier方法,最后再通过weight值排序返回。如果要自定义weight类,只需继承BaseHostWeigher,重写 _weigh_object和_weight_multiplier方法,得到的值就是他们的乘积。
nova-scheduler的功能是负责从多宿主机中调度最适合的宿主机生成云主机。即传入需要启动的云主机列表,nova-scheduler根据云主机的数量、参数等进行调度,选择合适的物理机(hypervisor,宿主机,即运行nova-compute的节点)启动这些云主机。在H版本中实现的调度算法有两个,即过滤(filter)调度算法和随机调度算法(chance)。目前的默认调度算法是 filter-scheduler,即过滤调度器,其思想是先进行一些条件过滤一些宿主机,比如要求可用内存大于2GB,小于2GB的直接过滤,过滤器可以串联多个,即层层过滤。然后对过滤后的宿主机进行权值计算,权值计算就是根据宿主机的状态进行评分(weight),最后根据评分(weight)排序,评分最高的为最佳候选宿主机,评分也是可以串联的,即层层评分。注意openstack的设计原则是可扩展,意味着调度算法、过滤函数、评分函数都是可插除的,用户可以自定义自己的调度器,过滤器,评分方法,而只需在配置文件中配置即可,无需修改核心代码。实现的过滤器很多,而评分函数目前只有内存评分,即根据内存使用量进行评分。
注意:启动云主机时宿主机的位置并不是完全由scheduler控制,用户可以指定availability-zone,aggregate以及通过设置--hint来控制宿主机在某个集合中(本质还是过滤,即通过用户设定的条件进行过滤)。
下面从入口manager开始看看nova-scheduler如何工作的,部分代码:
def run_instance(self, context, request_spec, admin_password,
injected_files, requested_networks, is_first_time,
filter_properties, legacy_bdm_in_spec=True):
"""Tries to call schedule_run_instance on the driver.
Sets instance vm_state to ERROR on exceptions
"""
instance_uuids = request_spec['instance_uuids']
with compute_utils.EventReporter(context, conductor_api.LocalAPI(),
'schedule', *instance_uuids):
try:
return self.driver.schedule_run_instance(context,
request_spec, admin_password, injected_files,
requested_networks, is_first_time, filter_properties,
legacy_bdm_in_spec)
except exception.NoValidHost as ex:
# don't re-raise
self._set_vm_state_and_notify('run_instance',
{'vm_state': vm_states.ERROR,
'task_state': None},
context, ex, request_spec)
except Exception as ex:
with excutils.save_and_reraise_exception():
self._set_vm_state_and_notify('run_instance',
{'vm_state': vm_states.ERROR,
'task_state': None},
context, ex, request_spec)
方法先获取需要创建云主机的uuid,然后直接调用driver的schedule_run_instance,这个driver即调度器,所有的调度器必须继承自driver.Schduler, 并且实现三个抽象方法:schedule_run_instance,select_destinations,select_hosts。driver是由配置文件配置,默认为nova.scheduler.filter_scheduler.FilterScheduler,如下:
scheduler_driver_opt = cfg.StrOpt('scheduler_driver',
default='nova.scheduler.filter_scheduler.FilterScheduler',
help='Default driver to use for the scheduler')
CONF = cfg.CONF
CONF.register_opt(scheduler_driver_opt)
class SchedulerManager(manager.Manager):
"""Chooses a host to run instances on."""
def __init__(self, scheduler_driver=None, *args, **kwargs):
if not scheduler_driver:
scheduler_driver = CONF.scheduler_driver
self.driver = importutils.import_object(scheduler_driver)
self.compute_rpcapi = compute_rpcapi.ComputeAPI()
super(SchedulerManager, self).__init__(service_name='scheduler',
*args, **kwargs)
### 省略其他代码
由此可见入口函数直接调用driver(调度器)的scheduler_run_instance方法,为了简单起见,下面以chance调度器为例,看看它如何工作。首先查看chance调度器的scheduler_run_instance代码:
def schedule_run_instance(self, context, request_spec,
admin_password, injected_files,
requested_networks, is_first_time,
filter_properties, legacy_bdm_in_spec):
"""Create and run an instance or instances."""
instance_uuids = request_spec.get('instance_uuids')
for num, instance_uuid in enumerate(instance_uuids):
request_spec['instance_properties']['launch_index'] = num
try:
host = self._schedule(context, CONF.compute_topic,
request_spec, filter_properties)
updated_instance = driver.instance_update_db(context,
instance_uuid)
self.compute_rpcapi.run_instance(context,
instance=updated_instance, host=host,
requested_networks=requested_networks,
injected_files=injected_files,
admin_password=admin_password,
is_first_time=is_first_time,
request_spec=request_spec,
filter_properties=filter_properties,
legacy_bdm_in_spec=legacy_bdm_in_spec)
except Exception as ex:
# NOTE(vish): we don't reraise the exception here to make sure
# that all instances in the request get set to
# error properly
driver.handle_schedule_error(context, ex, instance_uuid,
request_spec)
该方法首先获取所有需要启动的云主机列表uuid,对每个待启动云主机调用_schedule方法,该方法返回一个宿主机,更新数据库,然后调用nova-compute的远程方法api(rpcapi)调用run_instance方法,在选择的宿主机中启动该云主机,nova-scheduler任务完成。下面看看_schedule实现:
def _schedule(self, context, topic, request_spec, filter_properties):
"""Picks a host that is up at random."""
elevated = context.elevated()
hosts = self.hosts_up(elevated, topic) # 父类Schduler方法,返回所有nova-compute状态为up的主机列表。
if not hosts:
msg = _("Is the appropriate service running?")
raise exception.NoValidHost(reason=msg)
hosts = self._filter_hosts(request_spec, hosts, filter_properties) # 过滤一些主机黑名单列表。
if not hosts:
msg = _("Could not find another compute")
raise exception.NoValidHost(reason=msg)
return random.choice(hosts) # 随机返回其中一个主机
该方法非常简单,首先获取所有服务状态为up的宿主机列表,然后过滤一些黑名单,最后调用random.choice方法随机从中返回一个宿主机。
文中只对简单chance算法进行了简单叙述,而filter算法由于比较复杂,后面以另一篇文章进行叙述。
python中调用a.xx,内部就是a.__getattr__(xx)或者getattr(a, xx),而a.xx(),其中xx实现了__call__()方法,即调用了getattr(a, xx)()。
但python的灵活之处在于可以重写__getattr__方法,通过这个方式可以包装一个类,使其中一个类看起来具有另一个类的方法,非常像继承获取的方法(其实应该是通过组合方法获取,但调用时更方便)。看一下代码:
class Test:
def __init__(self):
self.id = 5
def get(self):
print("getting ...")
def update(self):
print("updating ...")
def delete(self):
print("deleting ...")
class Wapper:
def __init__(self, backend = None):
self.backend = backend
def __getattr__(self, key):
return getattr(self.backend, key)
if __name__ == "__main__":
test = Test()
wapper = Wapper(backend = test)
wapper.get()
print(wapper.id)
Wapper类并没有id属性,也没有get、update、delete方法,但Wapper的实例可以直接获取id属性和调用get、delete、update方法,就像是自己的属性一样。这在openstack中db中应用,nova.db.api部分代码:
from nova.openstack.common.db import api as db_api
_BACKEND_MAPPING = {'sqlalchemy': 'nova.db.sqlalchemy.api'}
IMPL = db_api.DBAPI(backend_mapping=_BACKEND_MAPPING)
def compute_node_get_all(context, no_date_fields=False):
"""Get all computeNodes.
:param context: The security context
:param no_date_fields: If set to True, excludes 'created_at', 'updated_at',
'deteled_at' and 'deleted' fields from the output,
thus significantly reducing its size.
Set to False by default
:returns: List of dictionaries each containing compute node properties,
including corresponding service and stats
"""
return IMPL.compute_node_get_all(context, no_date_fields)
可以看出这里的实现都是通过调用IMPL中的方法直接返回的,而IMPL是nova.openstack.common.db.DBAPI实例,我们查看其代码:
class DBAPI(object):
def __init__(self, backend_mapping=None):
if backend_mapping is None:
backend_mapping = {}
self.__backend = None
self.__backend_mapping = backend_mapping
@lockutils.synchronized('dbapi_backend', 'nova-')
def __get_backend(self):
"""Get the actual backend. May be a module or an instance of
a class. Doesn't matter to us. We do this synchronized as it's
possible multiple greenthreads started very quickly trying to do
DB calls and eventlet can switch threads before self.__backend gets
assigned.
"""
if self.__backend:
# Another thread assigned it
return self.__backend
backend_name = CONF.database.backend
self.__use_tpool = CONF.database.use_tpool
if self.__use_tpool:
from eventlet import tpool
self.__tpool = tpool
# Import the untranslated name if we don't have a
# mapping.
backend_path = self.__backend_mapping.get(backend_name,
backend_name)
backend_mod = importutils.import_module(backend_path)
self.__backend = backend_mod.get_backend()
return self.__backend
def __getattr__(self, key):
backend = self.__backend or self.__get_backend()
attr = getattr(backend, key)
if not self.__use_tpool or not hasattr(attr, '__call__'):
return attr
def tpool_wrapper(*args, **kwargs):
return self.__tpool.execute(attr, *args, **kwargs)
functools.update_wrapper(tpool_wrapper, attr)
return tpool_wrapper
我们期望的是DBAPI一堆实现方法,可我们咋一看只有一个核心方法__getattr__,而且它直接继承object,那它怎么会有那么多方法呢?其中的奥秘就在于__getattr_方法,返回的实质是backend中的方法,回到nova.db.api代码,我们发现backend就是nova.db.sqlalchemy.api,因此调用DBAPI方法,实质就是调用的nova.db.sqlchemy.api方法。下面是nova.db.sqlalchemy.api部分代码:
@require_admin_context
def compute_node_get_all(context, no_date_fields):
# NOTE(msdubov): Using lower-level 'select' queries and joining the tables
# manually here allows to gain 3x speed-up and to have 5x
# less network load / memory usage compared to the sqla ORM.
engine = get_engine()
# Retrieve ComputeNode, Service, Stat.
compute_node = models.ComputeNode.__table__
service = models.Service.__table__
stat = models.ComputeNodeStat.__table__
with engine.begin() as conn:
redundant_columns = set(['deleted_at', 'created_at', 'updated_at',
'deleted']) if no_date_fields else set([])
def filter_columns(table):
return [c for c in table.c if c.name not in redundant_columns]
compute_node_query = select(filter_columns(compute_node)).\
where(compute_node.c.deleted == 0).\
order_by(compute_node.c.service_id)
compute_node_rows = conn.execute(compute_node_query).fetchall()
service_query = select(filter_columns(service)).\
where((service.c.deleted == 0) &
(service.c.binary == 'nova-compute')).\
order_by(service.c.id)
service_rows = conn.execute(service_query).fetchall()
stat_query = select(filter_columns(stat)).\
where(stat.c.deleted == 0).\
order_by(stat.c.compute_node_id)
stat_rows = conn.execute(stat_query).fetchall()
# NOTE(msdubov): Transferring sqla.RowProxy objects to dicts.
stats = [dict(proxy.items()) for proxy in stat_rows]
# Join ComputeNode & Service manually.
services = {}
for proxy in service_rows:
services[proxy['id']] = dict(proxy.items())
compute_nodes = []
for proxy in compute_node_rows:
node = dict(proxy.items())
node['service'] = services.get(proxy['service_id'])
compute_nodes.append(node)
# Join ComputeNode & ComputeNodeStat manually.
# NOTE(msdubov): ComputeNode and ComputeNodeStat map 1-to-Many.
# Running time is (asymptotically) optimal due to the use
# of iterators (itertools.groupby() for ComputeNodeStat and
# iter() for ComputeNode) - we handle each record only once.
compute_nodes.sort(key=lambda node: node['id'])
compute_nodes_iter = iter(compute_nodes)
for nid, nsts in itertools.groupby(stats, lambda s: s['compute_node_id']):
for node in compute_nodes_iter:
if node['id'] == nid:
node['stats'] = list(nsts)
break
else:
node['stats'] = []
return compute_nodes
真正的实现在nova.db.sqlchemy.api中。
要想知道nova的工作过程,首先就要掌握它的入口,即novaclient!命令nova和horizon都调用了novaclient。
github地址:https://github.com/openstack/python-novaclient
novaclient的功能很简单,即解析参数,构造url并发送请求,处理结果。比如运行nova --debug list,首先需要解析出选项参数--debug,另外还要获取环境变量参数和默认参数,然后解析子命令list,通过子命令获取相对应的回调函数,list对应为novaclient.v1_1.shell.do_list。
下面详细看看它的工作原理,首先看看命令nova到底是什么?
which nova | xargs -I{} file {}
# 返回/usr/bin/nova: a /usr/bin/python script, ASCII text executable
可见命令nova只是一个python程序,让我们打开它
#!/usr/bin/python
# PBR Generated from 'console_scripts'
import sys
from novaclient.shell import main
if __name__ == "__main__":
sys.exit(main())
命令nova调用了novaclient.shell的main函数,从这里开始进入了novaclient,现在让我们开始novaclient吧!
首先看看novaclient.shell的main函数:
def main():
"""入口函数"""
try:
OpenStackComputeShell().main(map(strutils.safe_decode, sys.argv[1:]))
except Exception as e:
logger.debug(e, exc_info=1)
print("ERROR: %s" % strutils.safe_encode(six.text_type(e)),
file=sys.stderr)
sys.exit(1)
发现它又调用了OpenstackComputeShell()的main函数。这个main函数才是真正的入口函数,以下是前半部分代码:
def main(self, argv):
# Parse args once to find version and debug settings
parser = self.get_base_parser() # 添加选项,比如--user, --password等
(options, args) = parser.parse_known_args(argv)
self.setup_debugging(options.debug) # 如果options中有--debug,则设置logger的level为DEBUG,并输出到标准输出流
# Discover available auth plugins
novaclient.auth_plugin.discover_auth_systems()
# build available subcommands based on version
self.extensions = self._discover_extensions(
options.os_compute_api_version)
self._run_extension_hooks('__pre_parse_args__')
# NOTE(dtroyer): Hackery to handle --endpoint_type due to argparse
# thinking usage-list --end is ambiguous; but it
# works fine with only --endpoint-type present
# Go figure.
if '--endpoint_type' in argv:
spot = argv.index('--endpoint_type')
argv[spot] = '--endpoint-type'
# 根据版本解析子命令
subcommand_parser = self.get_subcommand_parser(
options.os_compute_api_version)
self.parser = subcommand_parser
# 如果--help,则打印help信息,并退出
if options.help or not argv:
subcommand_parser.print_help()
return 0
args = subcommand_parser.parse_args(argv) #解析命令行参数 argv=['list']
#print("args = %s" % args)
self._run_extension_hooks('__post_parse_args__', args)
# Short-circuit and deal with help right away.
# nova help xxxx 命令
if args.func == self.do_help:
self.do_help(args)
return 0
# nova bash-completion
elif args.func == self.do_bash_completion:
self.do_bash_completion(args)
return 0
parser是 NovaClientArgumentParser类型,该类型继承自argparse.ArgumentParser,argparse是python中的参数解析库。
get_base_parser方法即添加选项参数,诸如--debug, --timing,--os-username 等等,并会读取环境变量和设置默认值,下面是部分代码:
# Global arguments
parser.add_argument('-h', '--help',
action='store_true',
help=argparse.SUPPRESS,
)
parser.add_argument('--version',
action='version',
version=novaclient.__version__)
parser.add_argument('--debug',
default=False,
action='store_true',
help="Print debugging output")
parser.add_argument('--no-cache',
default=not utils.bool_from_str(
utils.env('OS_NO_CACHE', default='true')),
action='store_false',
dest='os_cache',
help=argparse.SUPPRESS)
parser.add_argument('--no_cache',
action='store_false',
dest='os_cache',
help=argparse.SUPPRESS)
parser.add_argument('--os-cache',
default=utils.env('OS_CACHE', default=False),
action='store_true',
help="Use the auth token cache.")
parser.add_argument('--timings',
default=False,
action='store_true',
help="Print call timing info")
parser.add_argument('--timeout',
default=600,
metavar='<seconds>',
type=positive_non_zero_float,
help="Set HTTP call timeout (in seconds)")
用过argparse库的一定不会陌生了。
回到main函数,接下来会设置debug,即如果有--debug选项,则设置logger的level为DEBUG并传入到标准输出流。
(options, args) = parser.parse_known_args(argv)返回解析结果,即options保存所有的选项参数,args保存位置参数,比如nova --debug list, options.debug等于True,args为['list']。
下一个函数get_subcommand_parser是一个核心方法,用于处理子命令比如list, flavor-list, boot等,以下是代码:
def get_subcommand_parser(self, version):
parser = self.get_base_parser()
self.subcommands = {}
subparsers = parser.add_subparsers(metavar='<subcommand>')
try:
actions_module = {
'1.1': shell_v1_1,
'2': shell_v1_1,
'3': shell_v3,
}[version]
except KeyError:
actions_module = shell_v1_1 #默认是1.1版本
self._find_actions(subparsers, actions_module)
self._find_actions(subparsers, self)
for extension in self.extensions:
self._find_actions(subparsers, extension.module)
self._add_bash_completion_subparser(subparsers)
return parser
这个方法是根据版本(默认是1.1)寻找可用的方法,我们假设使用shell_v1_1模块,它导入自from novaclient.v1_1 import shell as shell_v1_1,然后调用_find_actions方法。注意:这个方法传入的是一个模块,python中所有东西都是对象,模块也不例外,不过这里我们姑且认为它传入了一个类,类似与java的XXXClass.class类型,以下是代码:
def _find_actions(self, subparsers, actions_module):
# actions_module = shell_v1.1
for attr in (a for a in dir(actions_module) if a.startswith('do_')): # attr = do_flavor_list
# I prefer to be hypen-separated instead of underscores.
command = attr[3:].replace('_', '-') # do_flavor_list -> flavor-list
callback = getattr(actions_module, attr)
desc = callback.__doc__ or ''
action_help = desc.strip()
arguments = getattr(callback, 'arguments', [])
subparser = subparsers.add_parser(command,
help=action_help,
description=desc,
add_help=False,
formatter_class=OpenStackHelpFormatter
)
subparser.add_argument('-h', '--help',
action='help',
help=argparse.SUPPRESS,
)
self.subcommands[command] = subparser
for (args, kwargs) in arguments:
subparser.add_argument(*args, **kwargs)
subparser.set_defaults(func=callback)
可见这个方法是利用反射机制获取所有以do_开头的方法,这个do_XXX_XXX,XXX-XXX就是命令名,而do_XXX_XXX就是回调函数,把函数作为变量赋值给callback,是函数式编程的经典用法。最后把callback传入set_defaults方法。
至此我们知道nova list其实调用了novaclient.v1_1.shell.do_list()方法,而nova flavor-list调用了novaclient.v1_1.shell.do_flavor_list()方法,下面以nova --debug flavor-list为例继续深入。
我们看novaclient.v1_1.shell源码,发现好多do_XXX方法,但它本身并不做什么工作,而是调用cs去做,cs是什么现在不管。下面是do_flavor_list方法:
def do_flavor_list(cs, args):
"""Print a list of available 'flavors' (sizes of servers)."""
if args.all:
flavors = cs.flavors.list(is_public=None)
else:
flavors = cs.flavors.list()
_print_flavor_list(flavors, args.extra_specs)
现在我们不知道cs是什么东西,那我们继续回到main函数,main函数中间其余代码均是在各种参数检查,我们忽略不管,直接跳到main函数结尾
def main(self, argv):
# Parse args once to find version and debug settings
parser = self.get_base_parser() # 添加选项,比如--user, --password等
(options, args) = parser.parse_known_args(argv)
self.setup_debugging(options.debug) # 如果options中有--debug,则设置logger的level为DEBUG,并输出到标准输出流
# Discover available auth plugins
novaclient.auth_plugin.discover_auth_systems()
# build available subcommands based on version
self.extensions = self._discover_extensions(
options.os_compute_api_version)
self._run_extension_hooks('__pre_parse_args__')
# NOTE(dtroyer): Hackery to handle --endpoint_type due to argparse
# thinking usage-list --end is ambiguous; but it
# works fine with only --endpoint-type present
# Go figure.
if '--endpoint_type' in argv:
spot = argv.index('--endpoint_type')
argv[spot] = '--endpoint-type'
# 根据版本解析子命令
subcommand_parser = self.get_subcommand_parser(
options.os_compute_api_version)
self.parser = subcommand_parser
# 如果--help,则打印help信息,并退出
if options.help or not argv:
subcommand_parser.print_help()
return 0
args = subcommand_parser.parse_args(argv) #解析命令行参数 argv=['list']
#print("args = %s" % args)
self._run_extension_hooks('__post_parse_args__', args)
# Short-circuit and deal with help right away.
# nova help xxxx 命令
if args.func == self.do_help:
self.do_help(args)
return 0
# nova bash-completion
elif args.func == self.do_bash_completion:
self.do_bash_completion(args)
return 0
# 这里省略大量代码
self.cs = client.Client(options.os_compute_api_version, os_username,
os_password, os_tenant_name, tenant_id=os_tenant_id,
auth_url=os_auth_url, insecure=insecure,
region_name=os_region_name, endpoint_type=endpoint_type,
extensions=self.extensions, service_type=service_type,
service_name=service_name, auth_system=os_auth_system,
auth_plugin=auth_plugin,
volume_service_name=volume_service_name,
timings=args.timings, bypass_url=bypass_url,
os_cache=os_cache, http_log_debug=options.debug,
cacert=cacert, timeout=timeout)
# 这里省略大量代码
args.func(self.cs, args) # 此时func等于do_flavor_list
if args.timings: #如果有--timing选项,则打印请求时间
self._dump_timings(self.cs.get_timings())
可见cs是调用client.Client方法返回的,我们查看其代码client.py:
def get_client_class(version):
version_map = {
'1.1': 'novaclient.v1_1.client.Client',
'2': 'novaclient.v1_1.client.Client',
'3': 'novaclient.v3.client.Client',
}
try:
client_path = version_map[str(version)]
except (KeyError, ValueError):
msg = "Invalid client version '%s'. must be one of: %s" % (
(version, ', '.join(version_map.keys())))
raise exceptions.UnsupportedVersion(msg)
return utils.import_class(client_path)
def Client(version, *args, **kwargs):
client_class = get_client_class(version)
return client_class(*args, **kwargs)
不难看出cs即根据版本选择的Client类型,这里我们用的是novaclient.v1_1.client.Client。这个模块可以认为是功能模块的注册类,比如flavors操作模块为flavors.py,为了让他生效,必须注册,即在Client中设置self.flavors=flavors.FlavorManager(self):
self.projectid = project_id
self.tenant_id = tenant_id
self.flavors = flavors.FlavorManager(self)
self.flavor_access = flavor_access.FlavorAccessManager(self)
self.images = images.ImageManager(self)
self.limits = limits.LimitsManager(self)
self.servers = servers.ServerManager(self)
从do_flavor_list方法中cs.flavor.list()即调用了flavors.FlavorManager().list方法。从这里我们可以看出openstack的设计原则,即支持自由灵活的可扩展性,如果需要添加新功能,几乎不需要修改太多代码,只要修改Client注册即可。
我们查看flavors.py中的list方法:
def list(self, detailed=True, is_public=True):
"""
Get a list of all flavors.
:rtype: list of :class:`Flavor`.
"""
qparams = {}
# is_public is ternary - None means give all flavors.
# By default Nova assumes True and gives admins public flavors
# and flavors from their own projects only.
if not is_public:
qparams['is_public'] = is_public
query_string = "?%s" % urlutils.urlencode(qparams) if qparams else ""
detail = ""
if detailed:
detail = "/detail"
return self._list("/flavors%s%s" % (detail, query_string), "flavors")
很明显这里是把list命令组装url请求,然后调用_list方法,由于FlavorManager继承自base.ManagerWithFind,而base.ManagerWithFind继承自Manager,_list方法在Manager中定义。
def _list(self, url, response_key, obj_class=None, body=None):
if body:
_resp, body = self.api.client.post(url, body=body)
else:
_resp, body = self.api.client.get(url)
if obj_class is None:
obj_class = self.resource_class
data = body[response_key]
# NOTE(ja): keystone returns values as list as {'values': [ ... ]}
# unlike other services which just return the list...
if isinstance(data, dict):
try:
data = data['values']
except KeyError:
pass
with self.completion_cache('human_id', obj_class, mode="w"):
with self.completion_cache('uuid', obj_class, mode="w"):
return [obj_class(self, res, loaded=True)
for res in data if res]
有源码中看出主要发送url请求即self.api.client.post(url, body=dody)或者self.api.client.get(url),具体根据是否有body,即是否数据选择GET或者POST请求。然后处理返回的数据。self.api在这里其实就是novaclient.v1_1.client.Client,只是前面用cs,这里用api。
我们回到novaclient.v1_1.client.Client的方法中,我们发现除了注册一系列功能外,还有一个比较特殊的,
self.client = client.HTTPClient(username,
password,
projectid=project_id,
tenant_id=tenant_id,
auth_url=auth_url,
insecure=insecure,
timeout=timeout,
auth_system=auth_system,
auth_plugin=auth_plugin,
proxy_token=proxy_token,
proxy_tenant_id=proxy_tenant_id,
region_name=region_name,
endpoint_type=endpoint_type,
service_type=service_type,
service_name=service_name,
volume_service_name=volume_service_name,
timings=timings,
bypass_url=bypass_url,
os_cache=self.os_cache,
http_log_debug=http_log_debug,
cacert=cacert)
这个self.client是client.HTTPClient类型,真正负责发送url请求的类,部分代码为:
def _cs_request(self, url, method, **kwargs):
if not self.management_url:
self.authenticate()
# Perform the request once. If we get a 401 back then it
# might be because the auth token expired, so try to
# re-authenticate and try again. If it still fails, bail.
try:
kwargs.setdefault('headers', {})['X-Auth-Token'] = self.auth_token
if self.projectid:
kwargs['headers']['X-Auth-Project-Id'] = self.projectid
resp, body = self._time_request(self.management_url + url, method,
**kwargs)
return resp, body
except exceptions.Unauthorized as e:
try:
# frist discard auth token, to avoid the possibly expired
# token being re-used in the re-authentication attempt
self.unauthenticate()
self.authenticate()
kwargs['headers']['X-Auth-Token'] = self.auth_token
resp, body = self._time_request(self.management_url + url,
method, **kwargs)
return resp, body
except exceptions.Unauthorized:
raise e
def get(self, url, **kwargs):
return self._cs_request(url, 'GET', **kwargs)
最后会调用http.request方法发送请求,这里使用了python库Requests: HTTP for Humans,这个库比httplib2更好,查看地址:http://docs.python-requests.org/en/latest/。接收请求的工作就由nova-api负责了,这里不再深入。
接下来我们简单增加一个没用的功能test,首先在novaclient/v1_1下touch test.py,使用vim增加以下代码:
"""
Test interface.
"""
from novaclient import base
class Test(base.Resource):
def test(self):
print("This is a test")
class TestManager(base.Manager):
def test(self):
print("This is a test")
然后我们需要在client中注册,编辑novaclient/v1_1/client.py文件,增加self.test = test.TestManager(self)
然后在shell下增加入口函数,注册新功能,
def do_test(cs, _args): """ do test. """ cs.test.test()
运行nova test, nova help test查看效果。