Merge lp:~ewanmellor/nova/xenapi-concurrency-model into lp:~hudson-openstack/nova/trunk

Proposed by Ewan Mellor
Status: Superseded
Proposed branch: lp:~ewanmellor/nova/xenapi-concurrency-model
Merge into: lp:~hudson-openstack/nova/trunk
Diff against target: 315 lines (+144/-31)
2 files modified
nova/utils.py (+8/-0)
nova/virt/xenapi.py (+136/-31)
To merge this branch: bzr merge lp:~ewanmellor/nova/xenapi-concurrency-model
Reviewer Review Type Date Requested Status
OZAWA Tsuyoshi (community) Approve
justinsb (community) Approve
termie (community) Needs Fixing
Jay Pipes (community) Approve
Rick Clark Pending
Review via email: mp+32939@code.launchpad.net

This proposal supersedes a proposal from 2010-08-15.

This proposal has been superseded by a proposal from 2010-08-19.

Description of the change

Rework virt.xenapi's concurrency model. There were many places where we were
inadvertently blocking the reactor thread. The reworking puts all calls to
XenAPI on background threads, so that they won't block the reactor thread.

Long-lived operations (VM start, reboot, etc) are invoked asynchronously
at the XenAPI level (Async.VM.start, etc). These return a XenAPI task. We
relinquish the background thread at this point, so as not to hold threads in
the pool for too long, and use reactor.callLater to poll the task.

This combination of techniques means that we don't block the reactor thread at
all, and at the same time we don't hold lots of threads waiting for
long-running operations.

There is a FIXME in here: get_info does not conform to these new rules.
Changes are required in compute.service before we can make get_info
non-blocking.

To post a comment you must log in.
Revision history for this message
Jay Pipes (jaypipes) wrote : Posted in a previous version of this proposal

Really nice work, Ewan! No criticism at all from me! Feel free to uncomment the logging.debug() output, though. :)

review: Approve
Revision history for this message
Ewan Mellor (ewanmellor) wrote : Posted in a previous version of this proposal

I thought that those two were a bit loud even for debug level -- that's
two messages every .5 seconds when polling a task (in the default
configuration).

Ewan.

On Mon, Aug 16, 2010 at 06:35:56PM +0100, Jay Pipes wrote:

> Review: Approve
> Really nice work, Ewan! No criticism at all from me! Feel free to uncomment the logging.debug() output, though. :)
> --
> https://code.launchpad.net/~ewanmellor/nova/xenapi-concurrency-model/+merge/32722
> You are the owner of lp:~ewanmellor/nova/xenapi-concurrency-model.

Revision history for this message
Jay Pipes (jaypipes) wrote : Posted in a previous version of this proposal

> I thought that those two were a bit loud even for debug level -- that's
> two messages every .5 seconds when polling a task (in the default
> configuration).

Hmm, I suppose that is a bit loud...but then again it's debugging information. Well, I approve regardless. I'd prefer to see the debug statements uncommented, but it's certainly no reason to hold up this excellent patch :)

-jay

Revision history for this message
OpenStack Infra (hudson-openstack) wrote : Posted in a previous version of this proposal

Attempt to merge lp:~ewanmellor/nova/xenapi-concurrency-model into lp:nova failed due to merge conflicts:

text conflict in nova/virt/xenapi.py

Revision history for this message
Ewan Mellor (ewanmellor) wrote : Posted in a previous version of this proposal

I've remerged this with trunk. The style cleanups that went in today caused
inevitable conflicts.

Ewan.

On Tue, Aug 17, 2010 at 10:33:45PM +0100, OpenStack Hudson wrote:

> Attempt to merge lp:~ewanmellor/nova/xenapi-concurrency-model into lp:nova failed due to merge conflicts:
>
> text conflict in nova/virt/xenapi.py
> --
> https://code.launchpad.net/~ewanmellor/nova/xenapi-concurrency-model/+merge/32722
> You are the owner of lp:~ewanmellor/nova/xenapi-concurrency-model.

Revision history for this message
Jay Pipes (jaypipes) :
review: Approve
Revision history for this message
termie (termie) wrote :

Code looks good to the degree that I understand what xenapi does.

Style fixes: only one line of whitespace between methods, please read HACKING

It also looks like you may have been a little overzealous in wrapping the description xenapi_poll_task_interval.

If you think deferredToThread is globally useful you may consider putting it in utils.

Other than those small things code looks awesome, a welcome upgrade.

review: Needs Fixing
Revision history for this message
justinsb (justin-fathomdb) wrote :

Nice.

Hopefully all these deferred twists and turns will soon be a distant and painful memory, in the compute layer at least. If we have to spawn threads (even pooled threads) for each of these method calls, I can't see Twisted having any advantages here.

review: Approve
Revision history for this message
OZAWA Tsuyoshi (ozawa-tsuyoshi) :
review: Approve
231. By Ewan Mellor

Remove whitespace to match style guide.

232. By Ewan Mellor

Move deferredToThread into utils, as suggested by termie.

Revision history for this message
Ewan Mellor (ewanmellor) wrote :

I've removed the additional whitespace between the methods.

I've left the description for xenapi_poll_task_interval alone. Assuming that
I should wrap at spaces (not at equals signs for example) and that I should
keep the description lined up on the LHS with the open parenthesis, in
both lines the next break would be at column 82, and I presume that we're
working in 80 columns. They are big words at the end there, which is why
they look a bit odd.

I've put deferredToThread into utils, as suggested.

Cheers,

Ewan.

On Wed, Aug 18, 2010 at 05:13:46PM +0100, termie wrote:

> Review: Needs Fixing
> Code looks good to the degree that I understand what xenapi does.
>
> Style fixes: only one line of whitespace between methods, please read HACKING
>
> It also looks like you may have been a little overzealous in wrapping the description xenapi_poll_task_interval.
>
> If you think deferredToThread is globally useful you may consider putting it in utils.
>
> Other than those small things code looks awesome, a welcome upgrade.
> --
> https://code.launchpad.net/~ewanmellor/nova/xenapi-concurrency-model/+merge/32939
> You are the owner of lp:~ewanmellor/nova/xenapi-concurrency-model.

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'nova/utils.py'
--- nova/utils.py 2010-08-16 12:16:21 +0000
+++ nova/utils.py 2010-08-19 14:16:14 +0000
@@ -29,6 +29,8 @@
29import socket29import socket
30import sys30import sys
3131
32from twisted.internet.threads import deferToThread
33
32from nova import exception34from nova import exception
33from nova import flags35from nova import flags
3436
@@ -142,3 +144,9 @@
142144
143def parse_isotime(timestr):145def parse_isotime(timestr):
144 return datetime.datetime.strptime(timestr, TIME_FORMAT)146 return datetime.datetime.strptime(timestr, TIME_FORMAT)
147
148
149def deferredToThread(f):
150 def g(*args, **kwargs):
151 return deferToThread(f, *args, **kwargs)
152 return g
145153
=== modified file 'nova/virt/xenapi.py'
--- nova/virt/xenapi.py 2010-08-17 11:53:30 +0000
+++ nova/virt/xenapi.py 2010-08-19 14:16:14 +0000
@@ -16,17 +16,35 @@
1616
17"""17"""
18A connection to XenServer or Xen Cloud Platform.18A connection to XenServer or Xen Cloud Platform.
19
20The concurrency model for this class is as follows:
21
22All XenAPI calls are on a thread (using t.i.t.deferToThread, via the decorator
23deferredToThread). They are remote calls, and so may hang for the usual
24reasons. They should not be allowed to block the reactor thread.
25
26All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async
27(using XenAPI.VM.async_start etc). These return a task, which can then be
28polled for completion. Polling is handled using reactor.callLater.
29
30This combination of techniques means that we don't block the reactor thread at
31all, and at the same time we don't hold lots of threads waiting for
32long-running operations.
33
34FIXME: get_info currently doesn't conform to these rules, and will block the
35reactor thread if the VM.get_by_name_label or VM.get_record calls block.
19"""36"""
2037
21import logging38import logging
22import xmlrpclib39import xmlrpclib
2340
24from twisted.internet import defer41from twisted.internet import defer
42from twisted.internet import reactor
25from twisted.internet import task43from twisted.internet import task
2644
27from nova import exception
28from nova import flags45from nova import flags
29from nova import process46from nova import process
47from nova import utils
30from nova.auth.manager import AuthManager48from nova.auth.manager import AuthManager
31from nova.compute import power_state49from nova.compute import power_state
32from nova.virt import images50from nova.virt import images
@@ -47,6 +65,11 @@
47 None,65 None,
48 'Password for connection to XenServer/Xen Cloud Platform.'66 'Password for connection to XenServer/Xen Cloud Platform.'
49 ' Used only if connection_type=xenapi.')67 ' Used only if connection_type=xenapi.')
68flags.DEFINE_float('xenapi_task_poll_interval',
69 0.5,
70 'The interval used for polling of remote tasks '
71 '(Async.VM.start, etc). Used only if '
72 'connection_type=xenapi.')
5073
5174
52XENAPI_POWER_STATE = {75XENAPI_POWER_STATE = {
@@ -84,9 +107,8 @@
84 for vm in self._conn.xenapi.VM.get_all()]107 for vm in self._conn.xenapi.VM.get_all()]
85108
86 @defer.inlineCallbacks109 @defer.inlineCallbacks
87 @exception.wrap_exception
88 def spawn(self, instance):110 def spawn(self, instance):
89 vm = yield self.lookup(instance.name)111 vm = yield self._lookup(instance.name)
90 if vm is not None:112 if vm is not None:
91 raise Exception('Attempted to create non-unique name %s' %113 raise Exception('Attempted to create non-unique name %s' %
92 instance.name)114 instance.name)
@@ -105,21 +127,27 @@
105127
106 user = AuthManager().get_user(instance.datamodel['user_id'])128 user = AuthManager().get_user(instance.datamodel['user_id'])
107 project = AuthManager().get_project(instance.datamodel['project_id'])129 project = AuthManager().get_project(instance.datamodel['project_id'])
108 vdi_uuid = yield self.fetch_image(130 vdi_uuid = yield self._fetch_image(
109 instance.datamodel['image_id'], user, project, True)131 instance.datamodel['image_id'], user, project, True)
110 kernel = yield self.fetch_image(132 kernel = yield self._fetch_image(
111 instance.datamodel['kernel_id'], user, project, False)133 instance.datamodel['kernel_id'], user, project, False)
112 ramdisk = yield self.fetch_image(134 ramdisk = yield self._fetch_image(
113 instance.datamodel['ramdisk_id'], user, project, False)135 instance.datamodel['ramdisk_id'], user, project, False)
114 vdi_ref = yield self._conn.xenapi.VDI.get_by_uuid(vdi_uuid)136 vdi_ref = yield self._call_xenapi('VDI.get_by_uuid', vdi_uuid)
115137
116 vm_ref = yield self.create_vm(instance, kernel, ramdisk)138 vm_ref = yield self._create_vm(instance, kernel, ramdisk)
117 yield self.create_vbd(vm_ref, vdi_ref, 0, True)139 yield self._create_vbd(vm_ref, vdi_ref, 0, True)
118 if network_ref:140 if network_ref:
119 yield self._create_vif(vm_ref, network_ref, mac_address)141 yield self._create_vif(vm_ref, network_ref, mac_address)
120 yield self._conn.xenapi.VM.start(vm_ref, False, False)142 logging.debug('Starting VM %s...', vm_ref)
143 yield self._call_xenapi('VM.start', vm_ref, False, False)
144 logging.info('Spawning VM %s created %s.', instance.name, vm_ref)
121145
122 def create_vm(self, instance, kernel, ramdisk):146 @defer.inlineCallbacks
147 def _create_vm(self, instance, kernel, ramdisk):
148 """Create a VM record. Returns a Deferred that gives the new
149 VM reference."""
150
123 mem = str(long(instance.datamodel['memory_kb']) * 1024)151 mem = str(long(instance.datamodel['memory_kb']) * 1024)
124 vcpus = str(instance.datamodel['vcpus'])152 vcpus = str(instance.datamodel['vcpus'])
125 rec = {153 rec = {
@@ -152,11 +180,15 @@
152 'other_config': {},180 'other_config': {},
153 }181 }
154 logging.debug('Created VM %s...', instance.name)182 logging.debug('Created VM %s...', instance.name)
155 vm_ref = self._conn.xenapi.VM.create(rec)183 vm_ref = yield self._call_xenapi('VM.create', rec)
156 logging.debug('Created VM %s as %s.', instance.name, vm_ref)184 logging.debug('Created VM %s as %s.', instance.name, vm_ref)
157 return vm_ref185 defer.returnValue(vm_ref)
158186
159 def create_vbd(self, vm_ref, vdi_ref, userdevice, bootable):187 @defer.inlineCallbacks
188 def _create_vbd(self, vm_ref, vdi_ref, userdevice, bootable):
189 """Create a VBD record. Returns a Deferred that gives the new
190 VBD reference."""
191
160 vbd_rec = {}192 vbd_rec = {}
161 vbd_rec['VM'] = vm_ref193 vbd_rec['VM'] = vm_ref
162 vbd_rec['VDI'] = vdi_ref194 vbd_rec['VDI'] = vdi_ref
@@ -171,12 +203,16 @@
171 vbd_rec['qos_algorithm_params'] = {}203 vbd_rec['qos_algorithm_params'] = {}
172 vbd_rec['qos_supported_algorithms'] = []204 vbd_rec['qos_supported_algorithms'] = []
173 logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref)205 logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref)
174 vbd_ref = self._conn.xenapi.VBD.create(vbd_rec)206 vbd_ref = yield self._call_xenapi('VBD.create', vbd_rec)
175 logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref,207 logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref,
176 vdi_ref)208 vdi_ref)
177 return vbd_ref209 defer.returnValue(vbd_ref)
178210
211 @defer.inlineCallbacks
179 def _create_vif(self, vm_ref, network_ref, mac_address):212 def _create_vif(self, vm_ref, network_ref, mac_address):
213 """Create a VIF record. Returns a Deferred that gives the new
214 VIF reference."""
215
180 vif_rec = {}216 vif_rec = {}
181 vif_rec['device'] = '0'217 vif_rec['device'] = '0'
182 vif_rec['network']= network_ref218 vif_rec['network']= network_ref
@@ -188,25 +224,29 @@
188 vif_rec['qos_algorithm_params'] = {}224 vif_rec['qos_algorithm_params'] = {}
189 logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref,225 logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref,
190 network_ref)226 network_ref)
191 vif_ref = self._conn.xenapi.VIF.create(vif_rec)227 vif_ref = yield self._call_xenapi('VIF.create', vif_rec)
192 logging.debug('Created VIF %s for VM %s, network %s.', vif_ref,228 logging.debug('Created VIF %s for VM %s, network %s.', vif_ref,
193 vm_ref, network_ref)229 vm_ref, network_ref)
194 return vif_ref230 defer.returnValue(vif_ref)
195231
232 @defer.inlineCallbacks
196 def _find_network_with_bridge(self, bridge):233 def _find_network_with_bridge(self, bridge):
197 expr = 'field "bridge" = "%s"' % bridge234 expr = 'field "bridge" = "%s"' % bridge
198 networks = self._conn.xenapi.network.get_all_records_where(expr)235 networks = yield self._call_xenapi('network.get_all_records_where',
236 expr)
199 if len(networks) == 1:237 if len(networks) == 1:
200 return networks.keys()[0]238 defer.returnValue(networks.keys()[0])
201 elif len(networks) > 1:239 elif len(networks) > 1:
202 raise Exception('Found non-unique network for bridge %s' % bridge)240 raise Exception('Found non-unique network for bridge %s' % bridge)
203 else:241 else:
204 raise Exception('Found no network for bridge %s' % bridge)242 raise Exception('Found no network for bridge %s' % bridge)
205243
206 def fetch_image(self, image, user, project, use_sr):244 @defer.inlineCallbacks
245 def _fetch_image(self, image, user, project, use_sr):
207 """use_sr: True to put the image as a VDI in an SR, False to place246 """use_sr: True to put the image as a VDI in an SR, False to place
208 it on dom0's filesystem. The former is for VM disks, the latter for247 it on dom0's filesystem. The former is for VM disks, the latter for
209 its kernel and ramdisk (if external kernels are being used)."""248 its kernel and ramdisk (if external kernels are being used).
249 Returns a Deferred that gives the new VDI UUID."""
210250
211 url = images.image_url(image)251 url = images.image_url(image)
212 access = AuthManager().get_access_key(user, project)252 access = AuthManager().get_access_key(user, project)
@@ -218,22 +258,28 @@
218 args['password'] = user.secret258 args['password'] = user.secret
219 if use_sr:259 if use_sr:
220 args['add_partition'] = 'true'260 args['add_partition'] = 'true'
221 return self._call_plugin('objectstore', fn, args)261 task = yield self._async_call_plugin('objectstore', fn, args)
262 uuid = yield self._wait_for_task(task)
263 defer.returnValue(uuid)
222264
265 @defer.inlineCallbacks
223 def reboot(self, instance):266 def reboot(self, instance):
224 vm = self.lookup(instance.name)267 vm = yield self._lookup(instance.name)
225 if vm is None:268 if vm is None:
226 raise Exception('instance not present %s' % instance.name)269 raise Exception('instance not present %s' % instance.name)
227 yield self._conn.xenapi.VM.clean_reboot(vm)270 task = yield self._call_xenapi('Async.VM.clean_reboot', vm)
271 yield self._wait_for_task(task)
228272
273 @defer.inlineCallbacks
229 def destroy(self, instance):274 def destroy(self, instance):
230 vm = self.lookup(instance.name)275 vm = yield self._lookup(instance.name)
231 if vm is None:276 if vm is None:
232 raise Exception('instance not present %s' % instance.name)277 raise Exception('instance not present %s' % instance.name)
233 yield self._conn.xenapi.VM.destroy(vm)278 task = yield self._call_xenapi('Async.VM.destroy', vm)
279 yield self._wait_for_task(task)
234280
235 def get_info(self, instance_id):281 def get_info(self, instance_id):
236 vm = self.lookup(instance_id)282 vm = self._lookup_blocking(instance_id)
237 if vm is None:283 if vm is None:
238 raise Exception('instance not present %s' % instance_id)284 raise Exception('instance not present %s' % instance_id)
239 rec = self._conn.xenapi.VM.get_record(vm)285 rec = self._conn.xenapi.VM.get_record(vm)
@@ -243,7 +289,11 @@
243 'num_cpu': rec['VCPUs_max'],289 'num_cpu': rec['VCPUs_max'],
244 'cpu_time': 0}290 'cpu_time': 0}
245291
246 def lookup(self, i):292 @utils.deferredToThread
293 def _lookup(self, i):
294 return self._lookup_blocking(i)
295
296 def _lookup_blocking(self, i):
247 vms = self._conn.xenapi.VM.get_by_name_label(i)297 vms = self._conn.xenapi.VM.get_by_name_label(i)
248 n = len(vms) 298 n = len(vms)
249 if n == 0:299 if n == 0:
@@ -253,9 +303,52 @@
253 else:303 else:
254 return vms[0]304 return vms[0]
255305
256 def _call_plugin(self, plugin, fn, args):306 def _wait_for_task(self, task):
307 """Return a Deferred that will give the result of the given task.
308 The task is polled until it completes."""
309 d = defer.Deferred()
310 reactor.callLater(0, self._poll_task, task, d)
311 return d
312
313 @utils.deferredToThread
314 def _poll_task(self, task, deferred):
315 """Poll the given XenAPI task, and fire the given Deferred if we
316 get a result."""
317 try:
318 #logging.debug('Polling task %s...', task)
319 status = self._conn.xenapi.task.get_status(task)
320 if status == 'pending':
321 reactor.callLater(FLAGS.xenapi_task_poll_interval,
322 self._poll_task, task, deferred)
323 elif status == 'success':
324 result = self._conn.xenapi.task.get_result(task)
325 logging.info('Task %s status: success. %s', task, result)
326 deferred.callback(_parse_xmlrpc_value(result))
327 else:
328 error_info = self._conn.xenapi.task.get_error_info(task)
329 logging.warn('Task %s status: %s. %s', task, status,
330 error_info)
331 deferred.errback(XenAPI.Failure(error_info))
332 #logging.debug('Polling task %s done.', task)
333 except Exception, exn:
334 logging.warn(exn)
335 deferred.errback(exn)
336
337 @utils.deferredToThread
338 def _call_xenapi(self, method, *args):
339 """Call the specified XenAPI method on a background thread. Returns
340 a Deferred for the result."""
341 f = self._conn.xenapi
342 for m in method.split('.'):
343 f = f.__getattr__(m)
344 return f(*args)
345
346 @utils.deferredToThread
347 def _async_call_plugin(self, plugin, fn, args):
348 """Call Async.host.call_plugin on a background thread. Returns a
349 Deferred with the task reference."""
257 return _unwrap_plugin_exceptions(350 return _unwrap_plugin_exceptions(
258 self._conn.xenapi.host.call_plugin,351 self._conn.xenapi.Async.host.call_plugin,
259 self._get_xenapi_host(), plugin, fn, args)352 self._get_xenapi_host(), plugin, fn, args)
260353
261 def _get_xenapi_host(self):354 def _get_xenapi_host(self):
@@ -281,3 +374,15 @@
281 except xmlrpclib.ProtocolError, exn:374 except xmlrpclib.ProtocolError, exn:
282 logging.debug("Got exception: %s", exn)375 logging.debug("Got exception: %s", exn)
283 raise376 raise
377
378
379def _parse_xmlrpc_value(val):
380 """Parse the given value as if it were an XML-RPC value. This is
381 sometimes used as the format for the task.result field."""
382 if not val:
383 return val
384 x = xmlrpclib.loads(
385 '<?xml version="1.0"?><methodResponse><params><param>' +
386 val +
387 '</param></params></methodResponse>')
388 return x[0][0]