Merge lp:~ewanmellor/nova/xenapi-concurrency-model into lp:~hudson-openstack/nova/trunk

Proposed by Ewan Mellor
Status: Superseded
Proposed branch: lp:~ewanmellor/nova/xenapi-concurrency-model
Merge into: lp:~hudson-openstack/nova/trunk
Diff against target: 326 lines (+163/-37)
1 file modified
nova/virt/xenapi.py (+163/-37)
To merge this branch: bzr merge lp:~ewanmellor/nova/xenapi-concurrency-model
Reviewer Review Type Date Requested Status
Jay Pipes (community) Approve
Review via email: mp+32722@code.launchpad.net

This proposal has been superseded by a proposal from 2010-08-17.

Description of the change

Rework virt.xenapi's concurrency model. There were many places where we were
inadvertently blocking the reactor thread. The reworking puts all calls to
XenAPI on background threads, so that they won't block the reactor thread.

Long-lived operations (VM start, reboot, etc) are invoked asynchronously
at the XenAPI level (Async.VM.start, etc). These return a XenAPI task. We
relinquish the background thread at this point, so as not to hold threads in
the pool for too long, and use reactor.callLater to poll the task.

This combination of techniques means that we don't block the reactor thread at
all, and at the same time we don't hold lots of threads waiting for
long-running operations.

There is a FIXME in here: get_info does not conform to these new rules.
Changes are required in compute.service before we can make get_info
non-blocking.

To post a comment you must log in.
Revision history for this message
Jay Pipes (jaypipes) wrote :

Really nice work, Ewan! No criticism at all from me! Feel free to uncomment the logging.debug() output, though. :)

review: Approve
Revision history for this message
Ewan Mellor (ewanmellor) wrote :

I thought that those two were a bit loud even for debug level -- that's
two messages every .5 seconds when polling a task (in the default
configuration).

Ewan.

On Mon, Aug 16, 2010 at 06:35:56PM +0100, Jay Pipes wrote:

> Review: Approve
> Really nice work, Ewan! No criticism at all from me! Feel free to uncomment the logging.debug() output, though. :)
> --
> https://code.launchpad.net/~ewanmellor/nova/xenapi-concurrency-model/+merge/32722
> You are the owner of lp:~ewanmellor/nova/xenapi-concurrency-model.

Revision history for this message
Jay Pipes (jaypipes) wrote :

> I thought that those two were a bit loud even for debug level -- that's
> two messages every .5 seconds when polling a task (in the default
> configuration).

Hmm, I suppose that is a bit loud...but then again it's debugging information. Well, I approve regardless. I'd prefer to see the debug statements uncommented, but it's certainly no reason to hold up this excellent patch :)

-jay

Revision history for this message
OpenStack Infra (hudson-openstack) wrote :

Attempt to merge lp:~ewanmellor/nova/xenapi-concurrency-model into lp:nova failed due to merge conflicts:

text conflict in nova/virt/xenapi.py

230. By Ewan Mellor

Merge with trunk, in particular merging with the style cleanup that caused
conflicts with this branch.

Revision history for this message
Ewan Mellor (ewanmellor) wrote :

I've remerged this with trunk. The style cleanups that went in today caused
inevitable conflicts.

Ewan.

On Tue, Aug 17, 2010 at 10:33:45PM +0100, OpenStack Hudson wrote:

> Attempt to merge lp:~ewanmellor/nova/xenapi-concurrency-model into lp:nova failed due to merge conflicts:
>
> text conflict in nova/virt/xenapi.py
> --
> https://code.launchpad.net/~ewanmellor/nova/xenapi-concurrency-model/+merge/32722
> You are the owner of lp:~ewanmellor/nova/xenapi-concurrency-model.

231. By Ewan Mellor

Remove whitespace to match style guide.

232. By Ewan Mellor

Move deferredToThread into utils, as suggested by termie.

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'nova/virt/xenapi.py'
--- nova/virt/xenapi.py 2010-08-17 11:53:30 +0000
+++ nova/virt/xenapi.py 2010-08-17 21:57:41 +0000
@@ -16,15 +16,33 @@
1616
17"""17"""
18A connection to XenServer or Xen Cloud Platform.18A connection to XenServer or Xen Cloud Platform.
19
20The concurrency model for this class is as follows:
21
22All XenAPI calls are on a thread (using t.i.t.deferToThread, or the decorator
23deferredToThread). They are remote calls, and so may hang for the usual
24reasons. They should not be allowed to block the reactor thread.
25
26All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async
27(using XenAPI.VM.async_start etc). These return a task, which can then be
28polled for completion. Polling is handled using reactor.callLater.
29
30This combination of techniques means that we don't block the reactor thread at
31all, and at the same time we don't hold lots of threads waiting for
32long-running operations.
33
34FIXME: get_info currently doesn't conform to these rules, and will block the
35reactor thread if the VM.get_by_name_label or VM.get_record calls block.
19"""36"""
2037
21import logging38import logging
22import xmlrpclib39import xmlrpclib
2340
24from twisted.internet import defer41from twisted.internet import defer
42from twisted.internet import reactor
25from twisted.internet import task43from twisted.internet import task
44from twisted.internet.threads import deferToThread
2645
27from nova import exception
28from nova import flags46from nova import flags
29from nova import process47from nova import process
30from nova.auth.manager import AuthManager48from nova.auth.manager import AuthManager
@@ -47,6 +65,11 @@
47 None,65 None,
48 'Password for connection to XenServer/Xen Cloud Platform.'66 'Password for connection to XenServer/Xen Cloud Platform.'
49 ' Used only if connection_type=xenapi.')67 ' Used only if connection_type=xenapi.')
68flags.DEFINE_float('xenapi_task_poll_interval',
69 0.5,
70 'The interval used for polling of remote tasks '
71 '(Async.VM.start, etc). Used only if '
72 'connection_type=xenapi.')
5073
5174
52XENAPI_POWER_STATE = {75XENAPI_POWER_STATE = {
@@ -74,6 +97,12 @@
74 return XenAPIConnection(url, username, password)97 return XenAPIConnection(url, username, password)
7598
7699
100def deferredToThread(f):
101 def g(*args, **kwargs):
102 return deferToThread(f, *args, **kwargs)
103 return g
104
105
77class XenAPIConnection(object):106class XenAPIConnection(object):
78 def __init__(self, url, user, pw):107 def __init__(self, url, user, pw):
79 self._conn = XenAPI.Session(url)108 self._conn = XenAPI.Session(url)
@@ -84,9 +113,8 @@
84 for vm in self._conn.xenapi.VM.get_all()]113 for vm in self._conn.xenapi.VM.get_all()]
85114
86 @defer.inlineCallbacks115 @defer.inlineCallbacks
87 @exception.wrap_exception
88 def spawn(self, instance):116 def spawn(self, instance):
89 vm = yield self.lookup(instance.name)117 vm = yield self._lookup(instance.name)
90 if vm is not None:118 if vm is not None:
91 raise Exception('Attempted to create non-unique name %s' %119 raise Exception('Attempted to create non-unique name %s' %
92 instance.name)120 instance.name)
@@ -105,21 +133,28 @@
105133
106 user = AuthManager().get_user(instance.datamodel['user_id'])134 user = AuthManager().get_user(instance.datamodel['user_id'])
107 project = AuthManager().get_project(instance.datamodel['project_id'])135 project = AuthManager().get_project(instance.datamodel['project_id'])
108 vdi_uuid = yield self.fetch_image(136 vdi_uuid = yield self._fetch_image(
109 instance.datamodel['image_id'], user, project, True)137 instance.datamodel['image_id'], user, project, True)
110 kernel = yield self.fetch_image(138 kernel = yield self._fetch_image(
111 instance.datamodel['kernel_id'], user, project, False)139 instance.datamodel['kernel_id'], user, project, False)
112 ramdisk = yield self.fetch_image(140 ramdisk = yield self._fetch_image(
113 instance.datamodel['ramdisk_id'], user, project, False)141 instance.datamodel['ramdisk_id'], user, project, False)
114 vdi_ref = yield self._conn.xenapi.VDI.get_by_uuid(vdi_uuid)142 vdi_ref = yield self._call_xenapi('VDI.get_by_uuid', vdi_uuid)
115143
116 vm_ref = yield self.create_vm(instance, kernel, ramdisk)144 vm_ref = yield self._create_vm(instance, kernel, ramdisk)
117 yield self.create_vbd(vm_ref, vdi_ref, 0, True)145 yield self._create_vbd(vm_ref, vdi_ref, 0, True)
118 if network_ref:146 if network_ref:
119 yield self._create_vif(vm_ref, network_ref, mac_address)147 yield self._create_vif(vm_ref, network_ref, mac_address)
120 yield self._conn.xenapi.VM.start(vm_ref, False, False)148 logging.debug('Starting VM %s...', vm_ref)
121149 yield self._call_xenapi('VM.start', vm_ref, False, False)
122 def create_vm(self, instance, kernel, ramdisk):150 logging.info('Spawning VM %s created %s.', instance.name, vm_ref)
151
152
153 @defer.inlineCallbacks
154 def _create_vm(self, instance, kernel, ramdisk):
155 """Create a VM record. Returns a Deferred that gives the new
156 VM reference."""
157
123 mem = str(long(instance.datamodel['memory_kb']) * 1024)158 mem = str(long(instance.datamodel['memory_kb']) * 1024)
124 vcpus = str(instance.datamodel['vcpus'])159 vcpus = str(instance.datamodel['vcpus'])
125 rec = {160 rec = {
@@ -152,11 +187,16 @@
152 'other_config': {},187 'other_config': {},
153 }188 }
154 logging.debug('Created VM %s...', instance.name)189 logging.debug('Created VM %s...', instance.name)
155 vm_ref = self._conn.xenapi.VM.create(rec)190 vm_ref = yield self._call_xenapi('VM.create', rec)
156 logging.debug('Created VM %s as %s.', instance.name, vm_ref)191 logging.debug('Created VM %s as %s.', instance.name, vm_ref)
157 return vm_ref192 defer.returnValue(vm_ref)
158193
159 def create_vbd(self, vm_ref, vdi_ref, userdevice, bootable):194
195 @defer.inlineCallbacks
196 def _create_vbd(self, vm_ref, vdi_ref, userdevice, bootable):
197 """Create a VBD record. Returns a Deferred that gives the new
198 VBD reference."""
199
160 vbd_rec = {}200 vbd_rec = {}
161 vbd_rec['VM'] = vm_ref201 vbd_rec['VM'] = vm_ref
162 vbd_rec['VDI'] = vdi_ref202 vbd_rec['VDI'] = vdi_ref
@@ -171,12 +211,17 @@
171 vbd_rec['qos_algorithm_params'] = {}211 vbd_rec['qos_algorithm_params'] = {}
172 vbd_rec['qos_supported_algorithms'] = []212 vbd_rec['qos_supported_algorithms'] = []
173 logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref)213 logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref)
174 vbd_ref = self._conn.xenapi.VBD.create(vbd_rec)214 vbd_ref = yield self._call_xenapi('VBD.create', vbd_rec)
175 logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref,215 logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref,
176 vdi_ref)216 vdi_ref)
177 return vbd_ref217 defer.returnValue(vbd_ref)
178218
219
220 @defer.inlineCallbacks
179 def _create_vif(self, vm_ref, network_ref, mac_address):221 def _create_vif(self, vm_ref, network_ref, mac_address):
222 """Create a VIF record. Returns a Deferred that gives the new
223 VIF reference."""
224
180 vif_rec = {}225 vif_rec = {}
181 vif_rec['device'] = '0'226 vif_rec['device'] = '0'
182 vif_rec['network']= network_ref227 vif_rec['network']= network_ref
@@ -188,25 +233,31 @@
188 vif_rec['qos_algorithm_params'] = {}233 vif_rec['qos_algorithm_params'] = {}
189 logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref,234 logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref,
190 network_ref)235 network_ref)
191 vif_ref = self._conn.xenapi.VIF.create(vif_rec)236 vif_ref = yield self._call_xenapi('VIF.create', vif_rec)
192 logging.debug('Created VIF %s for VM %s, network %s.', vif_ref,237 logging.debug('Created VIF %s for VM %s, network %s.', vif_ref,
193 vm_ref, network_ref)238 vm_ref, network_ref)
194 return vif_ref239 defer.returnValue(vif_ref)
195240
241
242 @defer.inlineCallbacks
196 def _find_network_with_bridge(self, bridge):243 def _find_network_with_bridge(self, bridge):
197 expr = 'field "bridge" = "%s"' % bridge244 expr = 'field "bridge" = "%s"' % bridge
198 networks = self._conn.xenapi.network.get_all_records_where(expr)245 networks = yield self._call_xenapi('network.get_all_records_where',
246 expr)
199 if len(networks) == 1:247 if len(networks) == 1:
200 return networks.keys()[0]248 defer.returnValue(networks.keys()[0])
201 elif len(networks) > 1:249 elif len(networks) > 1:
202 raise Exception('Found non-unique network for bridge %s' % bridge)250 raise Exception('Found non-unique network for bridge %s' % bridge)
203 else:251 else:
204 raise Exception('Found no network for bridge %s' % bridge)252 raise Exception('Found no network for bridge %s' % bridge)
205253
206 def fetch_image(self, image, user, project, use_sr):254
255 @defer.inlineCallbacks
256 def _fetch_image(self, image, user, project, use_sr):
207 """use_sr: True to put the image as a VDI in an SR, False to place257 """use_sr: True to put the image as a VDI in an SR, False to place
208 it on dom0's filesystem. The former is for VM disks, the latter for258 it on dom0's filesystem. The former is for VM disks, the latter for
209 its kernel and ramdisk (if external kernels are being used)."""259 its kernel and ramdisk (if external kernels are being used).
260 Returns a Deferred that gives the new VDI UUID."""
210261
211 url = images.image_url(image)262 url = images.image_url(image)
212 access = AuthManager().get_access_key(user, project)263 access = AuthManager().get_access_key(user, project)
@@ -218,22 +269,31 @@
218 args['password'] = user.secret269 args['password'] = user.secret
219 if use_sr:270 if use_sr:
220 args['add_partition'] = 'true'271 args['add_partition'] = 'true'
221 return self._call_plugin('objectstore', fn, args)272 task = yield self._async_call_plugin('objectstore', fn, args)
222273 uuid = yield self._wait_for_task(task)
274 defer.returnValue(uuid)
275
276
277 @defer.inlineCallbacks
223 def reboot(self, instance):278 def reboot(self, instance):
224 vm = self.lookup(instance.name)279 vm = yield self._lookup(instance.name)
225 if vm is None:280 if vm is None:
226 raise Exception('instance not present %s' % instance.name)281 raise Exception('instance not present %s' % instance.name)
227 yield self._conn.xenapi.VM.clean_reboot(vm)282 task = yield self._call_xenapi('Async.VM.clean_reboot', vm)
228283 yield self._wait_for_task(task)
284
285
286 @defer.inlineCallbacks
229 def destroy(self, instance):287 def destroy(self, instance):
230 vm = self.lookup(instance.name)288 vm = yield self._lookup(instance.name)
231 if vm is None:289 if vm is None:
232 raise Exception('instance not present %s' % instance.name)290 raise Exception('instance not present %s' % instance.name)
233 yield self._conn.xenapi.VM.destroy(vm)291 task = yield self._call_xenapi('Async.VM.destroy', vm)
292 yield self._wait_for_task(task)
293
234294
235 def get_info(self, instance_id):295 def get_info(self, instance_id):
236 vm = self.lookup(instance_id)296 vm = self._lookup_blocking(instance_id)
237 if vm is None:297 if vm is None:
238 raise Exception('instance not present %s' % instance_id)298 raise Exception('instance not present %s' % instance_id)
239 rec = self._conn.xenapi.VM.get_record(vm)299 rec = self._conn.xenapi.VM.get_record(vm)
@@ -243,7 +303,13 @@
243 'num_cpu': rec['VCPUs_max'],303 'num_cpu': rec['VCPUs_max'],
244 'cpu_time': 0}304 'cpu_time': 0}
245305
246 def lookup(self, i):306
307 @deferredToThread
308 def _lookup(self, i):
309 return self._lookup_blocking(i)
310
311
312 def _lookup_blocking(self, i):
247 vms = self._conn.xenapi.VM.get_by_name_label(i)313 vms = self._conn.xenapi.VM.get_by_name_label(i)
248 n = len(vms) 314 n = len(vms)
249 if n == 0:315 if n == 0:
@@ -253,11 +319,59 @@
253 else:319 else:
254 return vms[0]320 return vms[0]
255321
256 def _call_plugin(self, plugin, fn, args):322
323 def _wait_for_task(self, task):
324 """Return a Deferred that will give the result of the given task.
325 The task is polled until it completes."""
326 d = defer.Deferred()
327 reactor.callLater(0, self._poll_task, task, d)
328 return d
329
330
331 @deferredToThread
332 def _poll_task(self, task, deferred):
333 """Poll the given XenAPI task, and fire the given Deferred if we
334 get a result."""
335 try:
336 #logging.debug('Polling task %s...', task)
337 status = self._conn.xenapi.task.get_status(task)
338 if status == 'pending':
339 reactor.callLater(FLAGS.xenapi_task_poll_interval,
340 self._poll_task, task, deferred)
341 elif status == 'success':
342 result = self._conn.xenapi.task.get_result(task)
343 logging.info('Task %s status: success. %s', task, result)
344 deferred.callback(_parse_xmlrpc_value(result))
345 else:
346 error_info = self._conn.xenapi.task.get_error_info(task)
347 logging.warn('Task %s status: %s. %s', task, status,
348 error_info)
349 deferred.errback(XenAPI.Failure(error_info))
350 #logging.debug('Polling task %s done.', task)
351 except Exception, exn:
352 logging.warn(exn)
353 deferred.errback(exn)
354
355
356 @deferredToThread
357 def _call_xenapi(self, method, *args):
358 """Call the specified XenAPI method on a background thread. Returns
359 a Deferred for the result."""
360 f = self._conn.xenapi
361 for m in method.split('.'):
362 f = f.__getattr__(m)
363 return f(*args)
364
365
366 @deferredToThread
367 def _async_call_plugin(self, plugin, fn, args):
368 """Call Async.host.call_plugin on a background thread. Returns a
369 Deferred with the task reference."""
257 return _unwrap_plugin_exceptions(370 return _unwrap_plugin_exceptions(
258 self._conn.xenapi.host.call_plugin,371 self._conn.xenapi.Async.host.call_plugin,
259 self._get_xenapi_host(), plugin, fn, args)372 self._get_xenapi_host(), plugin, fn, args)
260373
374
261 def _get_xenapi_host(self):375 def _get_xenapi_host(self):
262 return self._conn.xenapi.session.get_this_host(self._conn.handle)376 return self._conn.xenapi.session.get_this_host(self._conn.handle)
263377
@@ -281,3 +395,15 @@
281 except xmlrpclib.ProtocolError, exn:395 except xmlrpclib.ProtocolError, exn:
282 logging.debug("Got exception: %s", exn)396 logging.debug("Got exception: %s", exn)
283 raise397 raise
398
399
400def _parse_xmlrpc_value(val):
401 """Parse the given value as if it were an XML-RPC value. This is
402 sometimes used as the format for the task.result field."""
403 if not val:
404 return val
405 x = xmlrpclib.loads(
406 '<?xml version="1.0"?><methodResponse><params><param>' +
407 val +
408 '</param></params></methodResponse>')
409 return x[0][0]