Merge lp:~ewanmellor/nova/xenapi-concurrency-model into lp:~hudson-openstack/nova/trunk

Proposed by Ewan Mellor
Status: Superseded
Proposed branch: lp:~ewanmellor/nova/xenapi-concurrency-model
Merge into: lp:~hudson-openstack/nova/trunk
Diff against target: 315 lines (+144/-31)
2 files modified
nova/utils.py (+8/-0)
nova/virt/xenapi.py (+136/-31)
To merge this branch: bzr merge lp:~ewanmellor/nova/xenapi-concurrency-model
Reviewer Review Type Date Requested Status
OZAWA Tsuyoshi (community) Approve
justinsb (community) Approve
termie (community) Needs Fixing
Jay Pipes (community) Approve
Rick Clark Pending
Review via email: mp+32939@code.launchpad.net

This proposal supersedes a proposal from 2010-08-15.

This proposal has been superseded by a proposal from 2010-08-19.

Description of the change

Rework virt.xenapi's concurrency model. There were many places where we were
inadvertently blocking the reactor thread. The reworking puts all calls to
XenAPI on background threads, so that they won't block the reactor thread.

Long-lived operations (VM start, reboot, etc) are invoked asynchronously
at the XenAPI level (Async.VM.start, etc). These return a XenAPI task. We
relinquish the background thread at this point, so as not to hold threads in
the pool for too long, and use reactor.callLater to poll the task.

This combination of techniques means that we don't block the reactor thread at
all, and at the same time we don't hold lots of threads waiting for
long-running operations.

There is a FIXME in here: get_info does not conform to these new rules.
Changes are required in compute.service before we can make get_info
non-blocking.

To post a comment you must log in.
Revision history for this message
Jay Pipes (jaypipes) wrote : Posted in a previous version of this proposal

Really nice work, Ewan! No criticism at all from me! Feel free to uncomment the logging.debug() output, though. :)

review: Approve
Revision history for this message
Ewan Mellor (ewanmellor) wrote : Posted in a previous version of this proposal

I thought that those two were a bit loud even for debug level -- that's
two messages every .5 seconds when polling a task (in the default
configuration).

Ewan.

On Mon, Aug 16, 2010 at 06:35:56PM +0100, Jay Pipes wrote:

> Review: Approve
> Really nice work, Ewan! No criticism at all from me! Feel free to uncomment the logging.debug() output, though. :)
> --
> https://code.launchpad.net/~ewanmellor/nova/xenapi-concurrency-model/+merge/32722
> You are the owner of lp:~ewanmellor/nova/xenapi-concurrency-model.

Revision history for this message
Jay Pipes (jaypipes) wrote : Posted in a previous version of this proposal

> I thought that those two were a bit loud even for debug level -- that's
> two messages every .5 seconds when polling a task (in the default
> configuration).

Hmm, I suppose that is a bit loud...but then again it's debugging information. Well, I approve regardless. I'd prefer to see the debug statements uncommented, but it's certainly no reason to hold up this excellent patch :)

-jay

Revision history for this message
OpenStack Infra (hudson-openstack) wrote : Posted in a previous version of this proposal

Attempt to merge lp:~ewanmellor/nova/xenapi-concurrency-model into lp:nova failed due to merge conflicts:

text conflict in nova/virt/xenapi.py

Revision history for this message
Ewan Mellor (ewanmellor) wrote : Posted in a previous version of this proposal

I've remerged this with trunk. The style cleanups that went in today caused
inevitable conflicts.

Ewan.

On Tue, Aug 17, 2010 at 10:33:45PM +0100, OpenStack Hudson wrote:

> Attempt to merge lp:~ewanmellor/nova/xenapi-concurrency-model into lp:nova failed due to merge conflicts:
>
> text conflict in nova/virt/xenapi.py
> --
> https://code.launchpad.net/~ewanmellor/nova/xenapi-concurrency-model/+merge/32722
> You are the owner of lp:~ewanmellor/nova/xenapi-concurrency-model.

Revision history for this message
Jay Pipes (jaypipes) :
review: Approve
Revision history for this message
termie (termie) wrote :

Code looks good to the degree that I understand what xenapi does.

Style fixes: only one line of whitespace between methods, please read HACKING

It also looks like you may have been a little overzealous in wrapping the description xenapi_poll_task_interval.

If you think deferredToThread is globally useful you may consider putting it in utils.

Other than those small things code looks awesome, a welcome upgrade.

review: Needs Fixing
Revision history for this message
justinsb (justin-fathomdb) wrote :

Nice.

Hopefully all these deferred twists and turns will soon be a distant and painful memory, in the compute layer at least. If we have to spawn threads (even pooled threads) for each of these method calls, I can't see Twisted having any advantages here.

review: Approve
Revision history for this message
OZAWA Tsuyoshi (ozawa-tsuyoshi) :
review: Approve
231. By Ewan Mellor

Remove whitespace to match style guide.

232. By Ewan Mellor

Move deferredToThread into utils, as suggested by termie.

Revision history for this message
Ewan Mellor (ewanmellor) wrote :

I've removed the additional whitespace between the methods.

I've left the description for xenapi_poll_task_interval alone. Assuming that
I should wrap at spaces (not at equals signs for example) and that I should
keep the description lined up on the LHS with the open parenthesis, in
both lines the next break would be at column 82, and I presume that we're
working in 80 columns. They are big words at the end there, which is why
they look a bit odd.

I've put deferredToThread into utils, as suggested.

Cheers,

Ewan.

On Wed, Aug 18, 2010 at 05:13:46PM +0100, termie wrote:

> Review: Needs Fixing
> Code looks good to the degree that I understand what xenapi does.
>
> Style fixes: only one line of whitespace between methods, please read HACKING
>
> It also looks like you may have been a little overzealous in wrapping the description xenapi_poll_task_interval.
>
> If you think deferredToThread is globally useful you may consider putting it in utils.
>
> Other than those small things code looks awesome, a welcome upgrade.
> --
> https://code.launchpad.net/~ewanmellor/nova/xenapi-concurrency-model/+merge/32939
> You are the owner of lp:~ewanmellor/nova/xenapi-concurrency-model.

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'nova/utils.py'
2--- nova/utils.py 2010-08-16 12:16:21 +0000
3+++ nova/utils.py 2010-08-19 14:16:14 +0000
4@@ -29,6 +29,8 @@
5 import socket
6 import sys
7
8+from twisted.internet.threads import deferToThread
9+
10 from nova import exception
11 from nova import flags
12
13@@ -142,3 +144,9 @@
14
15 def parse_isotime(timestr):
16 return datetime.datetime.strptime(timestr, TIME_FORMAT)
17+
18+
19+def deferredToThread(f):
20+ def g(*args, **kwargs):
21+ return deferToThread(f, *args, **kwargs)
22+ return g
23
24=== modified file 'nova/virt/xenapi.py'
25--- nova/virt/xenapi.py 2010-08-17 11:53:30 +0000
26+++ nova/virt/xenapi.py 2010-08-19 14:16:14 +0000
27@@ -16,17 +16,35 @@
28
29 """
30 A connection to XenServer or Xen Cloud Platform.
31+
32+The concurrency model for this class is as follows:
33+
34+All XenAPI calls are on a thread (using t.i.t.deferToThread, via the decorator
35+deferredToThread). They are remote calls, and so may hang for the usual
36+reasons. They should not be allowed to block the reactor thread.
37+
38+All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async
39+(using XenAPI.VM.async_start etc). These return a task, which can then be
40+polled for completion. Polling is handled using reactor.callLater.
41+
42+This combination of techniques means that we don't block the reactor thread at
43+all, and at the same time we don't hold lots of threads waiting for
44+long-running operations.
45+
46+FIXME: get_info currently doesn't conform to these rules, and will block the
47+reactor thread if the VM.get_by_name_label or VM.get_record calls block.
48 """
49
50 import logging
51 import xmlrpclib
52
53 from twisted.internet import defer
54+from twisted.internet import reactor
55 from twisted.internet import task
56
57-from nova import exception
58 from nova import flags
59 from nova import process
60+from nova import utils
61 from nova.auth.manager import AuthManager
62 from nova.compute import power_state
63 from nova.virt import images
64@@ -47,6 +65,11 @@
65 None,
66 'Password for connection to XenServer/Xen Cloud Platform.'
67 ' Used only if connection_type=xenapi.')
68+flags.DEFINE_float('xenapi_task_poll_interval',
69+ 0.5,
70+ 'The interval used for polling of remote tasks '
71+ '(Async.VM.start, etc). Used only if '
72+ 'connection_type=xenapi.')
73
74
75 XENAPI_POWER_STATE = {
76@@ -84,9 +107,8 @@
77 for vm in self._conn.xenapi.VM.get_all()]
78
79 @defer.inlineCallbacks
80- @exception.wrap_exception
81 def spawn(self, instance):
82- vm = yield self.lookup(instance.name)
83+ vm = yield self._lookup(instance.name)
84 if vm is not None:
85 raise Exception('Attempted to create non-unique name %s' %
86 instance.name)
87@@ -105,21 +127,27 @@
88
89 user = AuthManager().get_user(instance.datamodel['user_id'])
90 project = AuthManager().get_project(instance.datamodel['project_id'])
91- vdi_uuid = yield self.fetch_image(
92+ vdi_uuid = yield self._fetch_image(
93 instance.datamodel['image_id'], user, project, True)
94- kernel = yield self.fetch_image(
95+ kernel = yield self._fetch_image(
96 instance.datamodel['kernel_id'], user, project, False)
97- ramdisk = yield self.fetch_image(
98+ ramdisk = yield self._fetch_image(
99 instance.datamodel['ramdisk_id'], user, project, False)
100- vdi_ref = yield self._conn.xenapi.VDI.get_by_uuid(vdi_uuid)
101+ vdi_ref = yield self._call_xenapi('VDI.get_by_uuid', vdi_uuid)
102
103- vm_ref = yield self.create_vm(instance, kernel, ramdisk)
104- yield self.create_vbd(vm_ref, vdi_ref, 0, True)
105+ vm_ref = yield self._create_vm(instance, kernel, ramdisk)
106+ yield self._create_vbd(vm_ref, vdi_ref, 0, True)
107 if network_ref:
108 yield self._create_vif(vm_ref, network_ref, mac_address)
109- yield self._conn.xenapi.VM.start(vm_ref, False, False)
110+ logging.debug('Starting VM %s...', vm_ref)
111+ yield self._call_xenapi('VM.start', vm_ref, False, False)
112+ logging.info('Spawning VM %s created %s.', instance.name, vm_ref)
113
114- def create_vm(self, instance, kernel, ramdisk):
115+ @defer.inlineCallbacks
116+ def _create_vm(self, instance, kernel, ramdisk):
117+ """Create a VM record. Returns a Deferred that gives the new
118+ VM reference."""
119+
120 mem = str(long(instance.datamodel['memory_kb']) * 1024)
121 vcpus = str(instance.datamodel['vcpus'])
122 rec = {
123@@ -152,11 +180,15 @@
124 'other_config': {},
125 }
126 logging.debug('Created VM %s...', instance.name)
127- vm_ref = self._conn.xenapi.VM.create(rec)
128+ vm_ref = yield self._call_xenapi('VM.create', rec)
129 logging.debug('Created VM %s as %s.', instance.name, vm_ref)
130- return vm_ref
131+ defer.returnValue(vm_ref)
132
133- def create_vbd(self, vm_ref, vdi_ref, userdevice, bootable):
134+ @defer.inlineCallbacks
135+ def _create_vbd(self, vm_ref, vdi_ref, userdevice, bootable):
136+ """Create a VBD record. Returns a Deferred that gives the new
137+ VBD reference."""
138+
139 vbd_rec = {}
140 vbd_rec['VM'] = vm_ref
141 vbd_rec['VDI'] = vdi_ref
142@@ -171,12 +203,16 @@
143 vbd_rec['qos_algorithm_params'] = {}
144 vbd_rec['qos_supported_algorithms'] = []
145 logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref)
146- vbd_ref = self._conn.xenapi.VBD.create(vbd_rec)
147+ vbd_ref = yield self._call_xenapi('VBD.create', vbd_rec)
148 logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref,
149 vdi_ref)
150- return vbd_ref
151+ defer.returnValue(vbd_ref)
152
153+ @defer.inlineCallbacks
154 def _create_vif(self, vm_ref, network_ref, mac_address):
155+ """Create a VIF record. Returns a Deferred that gives the new
156+ VIF reference."""
157+
158 vif_rec = {}
159 vif_rec['device'] = '0'
160 vif_rec['network']= network_ref
161@@ -188,25 +224,29 @@
162 vif_rec['qos_algorithm_params'] = {}
163 logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref,
164 network_ref)
165- vif_ref = self._conn.xenapi.VIF.create(vif_rec)
166+ vif_ref = yield self._call_xenapi('VIF.create', vif_rec)
167 logging.debug('Created VIF %s for VM %s, network %s.', vif_ref,
168 vm_ref, network_ref)
169- return vif_ref
170+ defer.returnValue(vif_ref)
171
172+ @defer.inlineCallbacks
173 def _find_network_with_bridge(self, bridge):
174 expr = 'field "bridge" = "%s"' % bridge
175- networks = self._conn.xenapi.network.get_all_records_where(expr)
176+ networks = yield self._call_xenapi('network.get_all_records_where',
177+ expr)
178 if len(networks) == 1:
179- return networks.keys()[0]
180+ defer.returnValue(networks.keys()[0])
181 elif len(networks) > 1:
182 raise Exception('Found non-unique network for bridge %s' % bridge)
183 else:
184 raise Exception('Found no network for bridge %s' % bridge)
185
186- def fetch_image(self, image, user, project, use_sr):
187+ @defer.inlineCallbacks
188+ def _fetch_image(self, image, user, project, use_sr):
189 """use_sr: True to put the image as a VDI in an SR, False to place
190 it on dom0's filesystem. The former is for VM disks, the latter for
191- its kernel and ramdisk (if external kernels are being used)."""
192+ its kernel and ramdisk (if external kernels are being used).
193+ Returns a Deferred that gives the new VDI UUID."""
194
195 url = images.image_url(image)
196 access = AuthManager().get_access_key(user, project)
197@@ -218,22 +258,28 @@
198 args['password'] = user.secret
199 if use_sr:
200 args['add_partition'] = 'true'
201- return self._call_plugin('objectstore', fn, args)
202+ task = yield self._async_call_plugin('objectstore', fn, args)
203+ uuid = yield self._wait_for_task(task)
204+ defer.returnValue(uuid)
205
206+ @defer.inlineCallbacks
207 def reboot(self, instance):
208- vm = self.lookup(instance.name)
209+ vm = yield self._lookup(instance.name)
210 if vm is None:
211 raise Exception('instance not present %s' % instance.name)
212- yield self._conn.xenapi.VM.clean_reboot(vm)
213+ task = yield self._call_xenapi('Async.VM.clean_reboot', vm)
214+ yield self._wait_for_task(task)
215
216+ @defer.inlineCallbacks
217 def destroy(self, instance):
218- vm = self.lookup(instance.name)
219+ vm = yield self._lookup(instance.name)
220 if vm is None:
221 raise Exception('instance not present %s' % instance.name)
222- yield self._conn.xenapi.VM.destroy(vm)
223+ task = yield self._call_xenapi('Async.VM.destroy', vm)
224+ yield self._wait_for_task(task)
225
226 def get_info(self, instance_id):
227- vm = self.lookup(instance_id)
228+ vm = self._lookup_blocking(instance_id)
229 if vm is None:
230 raise Exception('instance not present %s' % instance_id)
231 rec = self._conn.xenapi.VM.get_record(vm)
232@@ -243,7 +289,11 @@
233 'num_cpu': rec['VCPUs_max'],
234 'cpu_time': 0}
235
236- def lookup(self, i):
237+ @utils.deferredToThread
238+ def _lookup(self, i):
239+ return self._lookup_blocking(i)
240+
241+ def _lookup_blocking(self, i):
242 vms = self._conn.xenapi.VM.get_by_name_label(i)
243 n = len(vms)
244 if n == 0:
245@@ -253,9 +303,52 @@
246 else:
247 return vms[0]
248
249- def _call_plugin(self, plugin, fn, args):
250+ def _wait_for_task(self, task):
251+ """Return a Deferred that will give the result of the given task.
252+ The task is polled until it completes."""
253+ d = defer.Deferred()
254+ reactor.callLater(0, self._poll_task, task, d)
255+ return d
256+
257+ @utils.deferredToThread
258+ def _poll_task(self, task, deferred):
259+ """Poll the given XenAPI task, and fire the given Deferred if we
260+ get a result."""
261+ try:
262+ #logging.debug('Polling task %s...', task)
263+ status = self._conn.xenapi.task.get_status(task)
264+ if status == 'pending':
265+ reactor.callLater(FLAGS.xenapi_task_poll_interval,
266+ self._poll_task, task, deferred)
267+ elif status == 'success':
268+ result = self._conn.xenapi.task.get_result(task)
269+ logging.info('Task %s status: success. %s', task, result)
270+ deferred.callback(_parse_xmlrpc_value(result))
271+ else:
272+ error_info = self._conn.xenapi.task.get_error_info(task)
273+ logging.warn('Task %s status: %s. %s', task, status,
274+ error_info)
275+ deferred.errback(XenAPI.Failure(error_info))
276+ #logging.debug('Polling task %s done.', task)
277+ except Exception, exn:
278+ logging.warn(exn)
279+ deferred.errback(exn)
280+
281+ @utils.deferredToThread
282+ def _call_xenapi(self, method, *args):
283+ """Call the specified XenAPI method on a background thread. Returns
284+ a Deferred for the result."""
285+ f = self._conn.xenapi
286+ for m in method.split('.'):
287+ f = f.__getattr__(m)
288+ return f(*args)
289+
290+ @utils.deferredToThread
291+ def _async_call_plugin(self, plugin, fn, args):
292+ """Call Async.host.call_plugin on a background thread. Returns a
293+ Deferred with the task reference."""
294 return _unwrap_plugin_exceptions(
295- self._conn.xenapi.host.call_plugin,
296+ self._conn.xenapi.Async.host.call_plugin,
297 self._get_xenapi_host(), plugin, fn, args)
298
299 def _get_xenapi_host(self):
300@@ -281,3 +374,15 @@
301 except xmlrpclib.ProtocolError, exn:
302 logging.debug("Got exception: %s", exn)
303 raise
304+
305+
306+def _parse_xmlrpc_value(val):
307+ """Parse the given value as if it were an XML-RPC value. This is
308+ sometimes used as the format for the task.result field."""
309+ if not val:
310+ return val
311+ x = xmlrpclib.loads(
312+ '<?xml version="1.0"?><methodResponse><params><param>' +
313+ val +
314+ '</param></params></methodResponse>')
315+ return x[0][0]