Merge lp:~ewanmellor/nova/xenapi-concurrency-model into lp:~hudson-openstack/nova/trunk

Proposed by Ewan Mellor
Status: Superseded
Proposed branch: lp:~ewanmellor/nova/xenapi-concurrency-model
Merge into: lp:~hudson-openstack/nova/trunk
Diff against target: 326 lines (+163/-37)
1 file modified
nova/virt/xenapi.py (+163/-37)
To merge this branch: bzr merge lp:~ewanmellor/nova/xenapi-concurrency-model
Reviewer Review Type Date Requested Status
Jay Pipes (community) Approve
Review via email: mp+32722@code.launchpad.net

This proposal has been superseded by a proposal from 2010-08-17.

Description of the change

Rework virt.xenapi's concurrency model. There were many places where we were
inadvertently blocking the reactor thread. The reworking puts all calls to
XenAPI on background threads, so that they won't block the reactor thread.

Long-lived operations (VM start, reboot, etc) are invoked asynchronously
at the XenAPI level (Async.VM.start, etc). These return a XenAPI task. We
relinquish the background thread at this point, so as not to hold threads in
the pool for too long, and use reactor.callLater to poll the task.

This combination of techniques means that we don't block the reactor thread at
all, and at the same time we don't hold lots of threads waiting for
long-running operations.

There is a FIXME in here: get_info does not conform to these new rules.
Changes are required in compute.service before we can make get_info
non-blocking.

To post a comment you must log in.
Revision history for this message
Jay Pipes (jaypipes) wrote :

Really nice work, Ewan! No criticism at all from me! Feel free to uncomment the logging.debug() output, though. :)

review: Approve
Revision history for this message
Ewan Mellor (ewanmellor) wrote :

I thought that those two were a bit loud even for debug level -- that's
two messages every .5 seconds when polling a task (in the default
configuration).

Ewan.

On Mon, Aug 16, 2010 at 06:35:56PM +0100, Jay Pipes wrote:

> Review: Approve
> Really nice work, Ewan! No criticism at all from me! Feel free to uncomment the logging.debug() output, though. :)
> --
> https://code.launchpad.net/~ewanmellor/nova/xenapi-concurrency-model/+merge/32722
> You are the owner of lp:~ewanmellor/nova/xenapi-concurrency-model.

Revision history for this message
Jay Pipes (jaypipes) wrote :

> I thought that those two were a bit loud even for debug level -- that's
> two messages every .5 seconds when polling a task (in the default
> configuration).

Hmm, I suppose that is a bit loud...but then again it's debugging information. Well, I approve regardless. I'd prefer to see the debug statements uncommented, but it's certainly no reason to hold up this excellent patch :)

-jay

Revision history for this message
OpenStack Infra (hudson-openstack) wrote :

Attempt to merge lp:~ewanmellor/nova/xenapi-concurrency-model into lp:nova failed due to merge conflicts:

text conflict in nova/virt/xenapi.py

230. By Ewan Mellor

Merge with trunk, in particular merging with the style cleanup that caused
conflicts with this branch.

Revision history for this message
Ewan Mellor (ewanmellor) wrote :

I've remerged this with trunk. The style cleanups that went in today caused
inevitable conflicts.

Ewan.

On Tue, Aug 17, 2010 at 10:33:45PM +0100, OpenStack Hudson wrote:

> Attempt to merge lp:~ewanmellor/nova/xenapi-concurrency-model into lp:nova failed due to merge conflicts:
>
> text conflict in nova/virt/xenapi.py
> --
> https://code.launchpad.net/~ewanmellor/nova/xenapi-concurrency-model/+merge/32722
> You are the owner of lp:~ewanmellor/nova/xenapi-concurrency-model.

231. By Ewan Mellor

Remove whitespace to match style guide.

232. By Ewan Mellor

Move deferredToThread into utils, as suggested by termie.

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'nova/virt/xenapi.py'
2--- nova/virt/xenapi.py 2010-08-17 11:53:30 +0000
3+++ nova/virt/xenapi.py 2010-08-17 21:57:41 +0000
4@@ -16,15 +16,33 @@
5
6 """
7 A connection to XenServer or Xen Cloud Platform.
8+
9+The concurrency model for this class is as follows:
10+
11+All XenAPI calls are on a thread (using t.i.t.deferToThread, or the decorator
12+deferredToThread). They are remote calls, and so may hang for the usual
13+reasons. They should not be allowed to block the reactor thread.
14+
15+All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async
16+(using XenAPI.VM.async_start etc). These return a task, which can then be
17+polled for completion. Polling is handled using reactor.callLater.
18+
19+This combination of techniques means that we don't block the reactor thread at
20+all, and at the same time we don't hold lots of threads waiting for
21+long-running operations.
22+
23+FIXME: get_info currently doesn't conform to these rules, and will block the
24+reactor thread if the VM.get_by_name_label or VM.get_record calls block.
25 """
26
27 import logging
28 import xmlrpclib
29
30 from twisted.internet import defer
31+from twisted.internet import reactor
32 from twisted.internet import task
33+from twisted.internet.threads import deferToThread
34
35-from nova import exception
36 from nova import flags
37 from nova import process
38 from nova.auth.manager import AuthManager
39@@ -47,6 +65,11 @@
40 None,
41 'Password for connection to XenServer/Xen Cloud Platform.'
42 ' Used only if connection_type=xenapi.')
43+flags.DEFINE_float('xenapi_task_poll_interval',
44+ 0.5,
45+ 'The interval used for polling of remote tasks '
46+ '(Async.VM.start, etc). Used only if '
47+ 'connection_type=xenapi.')
48
49
50 XENAPI_POWER_STATE = {
51@@ -74,6 +97,12 @@
52 return XenAPIConnection(url, username, password)
53
54
55+def deferredToThread(f):
56+ def g(*args, **kwargs):
57+ return deferToThread(f, *args, **kwargs)
58+ return g
59+
60+
61 class XenAPIConnection(object):
62 def __init__(self, url, user, pw):
63 self._conn = XenAPI.Session(url)
64@@ -84,9 +113,8 @@
65 for vm in self._conn.xenapi.VM.get_all()]
66
67 @defer.inlineCallbacks
68- @exception.wrap_exception
69 def spawn(self, instance):
70- vm = yield self.lookup(instance.name)
71+ vm = yield self._lookup(instance.name)
72 if vm is not None:
73 raise Exception('Attempted to create non-unique name %s' %
74 instance.name)
75@@ -105,21 +133,28 @@
76
77 user = AuthManager().get_user(instance.datamodel['user_id'])
78 project = AuthManager().get_project(instance.datamodel['project_id'])
79- vdi_uuid = yield self.fetch_image(
80+ vdi_uuid = yield self._fetch_image(
81 instance.datamodel['image_id'], user, project, True)
82- kernel = yield self.fetch_image(
83+ kernel = yield self._fetch_image(
84 instance.datamodel['kernel_id'], user, project, False)
85- ramdisk = yield self.fetch_image(
86+ ramdisk = yield self._fetch_image(
87 instance.datamodel['ramdisk_id'], user, project, False)
88- vdi_ref = yield self._conn.xenapi.VDI.get_by_uuid(vdi_uuid)
89+ vdi_ref = yield self._call_xenapi('VDI.get_by_uuid', vdi_uuid)
90
91- vm_ref = yield self.create_vm(instance, kernel, ramdisk)
92- yield self.create_vbd(vm_ref, vdi_ref, 0, True)
93+ vm_ref = yield self._create_vm(instance, kernel, ramdisk)
94+ yield self._create_vbd(vm_ref, vdi_ref, 0, True)
95 if network_ref:
96 yield self._create_vif(vm_ref, network_ref, mac_address)
97- yield self._conn.xenapi.VM.start(vm_ref, False, False)
98-
99- def create_vm(self, instance, kernel, ramdisk):
100+ logging.debug('Starting VM %s...', vm_ref)
101+ yield self._call_xenapi('VM.start', vm_ref, False, False)
102+ logging.info('Spawning VM %s created %s.', instance.name, vm_ref)
103+
104+
105+ @defer.inlineCallbacks
106+ def _create_vm(self, instance, kernel, ramdisk):
107+ """Create a VM record. Returns a Deferred that gives the new
108+ VM reference."""
109+
110 mem = str(long(instance.datamodel['memory_kb']) * 1024)
111 vcpus = str(instance.datamodel['vcpus'])
112 rec = {
113@@ -152,11 +187,16 @@
114 'other_config': {},
115 }
116 logging.debug('Created VM %s...', instance.name)
117- vm_ref = self._conn.xenapi.VM.create(rec)
118+ vm_ref = yield self._call_xenapi('VM.create', rec)
119 logging.debug('Created VM %s as %s.', instance.name, vm_ref)
120- return vm_ref
121-
122- def create_vbd(self, vm_ref, vdi_ref, userdevice, bootable):
123+ defer.returnValue(vm_ref)
124+
125+
126+ @defer.inlineCallbacks
127+ def _create_vbd(self, vm_ref, vdi_ref, userdevice, bootable):
128+ """Create a VBD record. Returns a Deferred that gives the new
129+ VBD reference."""
130+
131 vbd_rec = {}
132 vbd_rec['VM'] = vm_ref
133 vbd_rec['VDI'] = vdi_ref
134@@ -171,12 +211,17 @@
135 vbd_rec['qos_algorithm_params'] = {}
136 vbd_rec['qos_supported_algorithms'] = []
137 logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref)
138- vbd_ref = self._conn.xenapi.VBD.create(vbd_rec)
139+ vbd_ref = yield self._call_xenapi('VBD.create', vbd_rec)
140 logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref,
141 vdi_ref)
142- return vbd_ref
143-
144+ defer.returnValue(vbd_ref)
145+
146+
147+ @defer.inlineCallbacks
148 def _create_vif(self, vm_ref, network_ref, mac_address):
149+ """Create a VIF record. Returns a Deferred that gives the new
150+ VIF reference."""
151+
152 vif_rec = {}
153 vif_rec['device'] = '0'
154 vif_rec['network']= network_ref
155@@ -188,25 +233,31 @@
156 vif_rec['qos_algorithm_params'] = {}
157 logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref,
158 network_ref)
159- vif_ref = self._conn.xenapi.VIF.create(vif_rec)
160+ vif_ref = yield self._call_xenapi('VIF.create', vif_rec)
161 logging.debug('Created VIF %s for VM %s, network %s.', vif_ref,
162 vm_ref, network_ref)
163- return vif_ref
164-
165+ defer.returnValue(vif_ref)
166+
167+
168+ @defer.inlineCallbacks
169 def _find_network_with_bridge(self, bridge):
170 expr = 'field "bridge" = "%s"' % bridge
171- networks = self._conn.xenapi.network.get_all_records_where(expr)
172+ networks = yield self._call_xenapi('network.get_all_records_where',
173+ expr)
174 if len(networks) == 1:
175- return networks.keys()[0]
176+ defer.returnValue(networks.keys()[0])
177 elif len(networks) > 1:
178 raise Exception('Found non-unique network for bridge %s' % bridge)
179 else:
180 raise Exception('Found no network for bridge %s' % bridge)
181
182- def fetch_image(self, image, user, project, use_sr):
183+
184+ @defer.inlineCallbacks
185+ def _fetch_image(self, image, user, project, use_sr):
186 """use_sr: True to put the image as a VDI in an SR, False to place
187 it on dom0's filesystem. The former is for VM disks, the latter for
188- its kernel and ramdisk (if external kernels are being used)."""
189+ its kernel and ramdisk (if external kernels are being used).
190+ Returns a Deferred that gives the new VDI UUID."""
191
192 url = images.image_url(image)
193 access = AuthManager().get_access_key(user, project)
194@@ -218,22 +269,31 @@
195 args['password'] = user.secret
196 if use_sr:
197 args['add_partition'] = 'true'
198- return self._call_plugin('objectstore', fn, args)
199-
200+ task = yield self._async_call_plugin('objectstore', fn, args)
201+ uuid = yield self._wait_for_task(task)
202+ defer.returnValue(uuid)
203+
204+
205+ @defer.inlineCallbacks
206 def reboot(self, instance):
207- vm = self.lookup(instance.name)
208+ vm = yield self._lookup(instance.name)
209 if vm is None:
210 raise Exception('instance not present %s' % instance.name)
211- yield self._conn.xenapi.VM.clean_reboot(vm)
212-
213+ task = yield self._call_xenapi('Async.VM.clean_reboot', vm)
214+ yield self._wait_for_task(task)
215+
216+
217+ @defer.inlineCallbacks
218 def destroy(self, instance):
219- vm = self.lookup(instance.name)
220+ vm = yield self._lookup(instance.name)
221 if vm is None:
222 raise Exception('instance not present %s' % instance.name)
223- yield self._conn.xenapi.VM.destroy(vm)
224+ task = yield self._call_xenapi('Async.VM.destroy', vm)
225+ yield self._wait_for_task(task)
226+
227
228 def get_info(self, instance_id):
229- vm = self.lookup(instance_id)
230+ vm = self._lookup_blocking(instance_id)
231 if vm is None:
232 raise Exception('instance not present %s' % instance_id)
233 rec = self._conn.xenapi.VM.get_record(vm)
234@@ -243,7 +303,13 @@
235 'num_cpu': rec['VCPUs_max'],
236 'cpu_time': 0}
237
238- def lookup(self, i):
239+
240+ @deferredToThread
241+ def _lookup(self, i):
242+ return self._lookup_blocking(i)
243+
244+
245+ def _lookup_blocking(self, i):
246 vms = self._conn.xenapi.VM.get_by_name_label(i)
247 n = len(vms)
248 if n == 0:
249@@ -253,11 +319,59 @@
250 else:
251 return vms[0]
252
253- def _call_plugin(self, plugin, fn, args):
254+
255+ def _wait_for_task(self, task):
256+ """Return a Deferred that will give the result of the given task.
257+ The task is polled until it completes."""
258+ d = defer.Deferred()
259+ reactor.callLater(0, self._poll_task, task, d)
260+ return d
261+
262+
263+ @deferredToThread
264+ def _poll_task(self, task, deferred):
265+ """Poll the given XenAPI task, and fire the given Deferred if we
266+ get a result."""
267+ try:
268+ #logging.debug('Polling task %s...', task)
269+ status = self._conn.xenapi.task.get_status(task)
270+ if status == 'pending':
271+ reactor.callLater(FLAGS.xenapi_task_poll_interval,
272+ self._poll_task, task, deferred)
273+ elif status == 'success':
274+ result = self._conn.xenapi.task.get_result(task)
275+ logging.info('Task %s status: success. %s', task, result)
276+ deferred.callback(_parse_xmlrpc_value(result))
277+ else:
278+ error_info = self._conn.xenapi.task.get_error_info(task)
279+ logging.warn('Task %s status: %s. %s', task, status,
280+ error_info)
281+ deferred.errback(XenAPI.Failure(error_info))
282+ #logging.debug('Polling task %s done.', task)
283+ except Exception, exn:
284+ logging.warn(exn)
285+ deferred.errback(exn)
286+
287+
288+ @deferredToThread
289+ def _call_xenapi(self, method, *args):
290+ """Call the specified XenAPI method on a background thread. Returns
291+ a Deferred for the result."""
292+ f = self._conn.xenapi
293+ for m in method.split('.'):
294+ f = f.__getattr__(m)
295+ return f(*args)
296+
297+
298+ @deferredToThread
299+ def _async_call_plugin(self, plugin, fn, args):
300+ """Call Async.host.call_plugin on a background thread. Returns a
301+ Deferred with the task reference."""
302 return _unwrap_plugin_exceptions(
303- self._conn.xenapi.host.call_plugin,
304+ self._conn.xenapi.Async.host.call_plugin,
305 self._get_xenapi_host(), plugin, fn, args)
306
307+
308 def _get_xenapi_host(self):
309 return self._conn.xenapi.session.get_this_host(self._conn.handle)
310
311@@ -281,3 +395,15 @@
312 except xmlrpclib.ProtocolError, exn:
313 logging.debug("Got exception: %s", exn)
314 raise
315+
316+
317+def _parse_xmlrpc_value(val):
318+ """Parse the given value as if it were an XML-RPC value. This is
319+ sometimes used as the format for the task.result field."""
320+ if not val:
321+ return val
322+ x = xmlrpclib.loads(
323+ '<?xml version="1.0"?><methodResponse><params><param>' +
324+ val +
325+ '</param></params></methodResponse>')
326+ return x[0][0]