Merge lp:~ewanmellor/nova/xenapi-concurrency-model into lp:~hudson-openstack/nova/trunk
- xenapi-concurrency-model
- Merge into trunk
Status: | Superseded |
---|---|
Proposed branch: | lp:~ewanmellor/nova/xenapi-concurrency-model |
Merge into: | lp:~hudson-openstack/nova/trunk |
Diff against target: |
315 lines (+144/-31) 2 files modified
nova/utils.py (+8/-0) nova/virt/xenapi.py (+136/-31) |
To merge this branch: | bzr merge lp:~ewanmellor/nova/xenapi-concurrency-model |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
OZAWA Tsuyoshi (community) | Approve | ||
justinsb (community) | Approve | ||
termie (community) | Needs Fixing | ||
Jay Pipes (community) | Approve | ||
Rick Clark | Pending | ||
Review via email: mp+32939@code.launchpad.net |
This proposal supersedes a proposal from 2010-08-15.
This proposal has been superseded by a proposal from 2010-08-19.
Commit message
Description of the change
Rework virt.xenapi's concurrency model. There were many places where we were
inadvertently blocking the reactor thread. The reworking puts all calls to
XenAPI on background threads, so that they won't block the reactor thread.
Long-lived operations (VM start, reboot, etc) are invoked asynchronously
at the XenAPI level (Async.VM.start, etc). These return a XenAPI task. We
relinquish the background thread at this point, so as not to hold threads in
the pool for too long, and use reactor.callLater to poll the task.
This combination of techniques means that we don't block the reactor thread at
all, and at the same time we don't hold lots of threads waiting for
long-running operations.
There is a FIXME in here: get_info does not conform to these new rules.
Changes are required in compute.service before we can make get_info
non-blocking.
Jay Pipes (jaypipes) wrote : Posted in a previous version of this proposal | # |
Ewan Mellor (ewanmellor) wrote : Posted in a previous version of this proposal | # |
I thought that those two were a bit loud even for debug level -- that's
two messages every .5 seconds when polling a task (in the default
configuration).
Ewan.
On Mon, Aug 16, 2010 at 06:35:56PM +0100, Jay Pipes wrote:
> Review: Approve
> Really nice work, Ewan! No criticism at all from me! Feel free to uncomment the logging.debug() output, though. :)
> --
> https:/
> You are the owner of lp:~ewanmellor/nova/xenapi-concurrency-model.
Jay Pipes (jaypipes) wrote : Posted in a previous version of this proposal | # |
> I thought that those two were a bit loud even for debug level -- that's
> two messages every .5 seconds when polling a task (in the default
> configuration).
Hmm, I suppose that is a bit loud...but then again it's debugging information. Well, I approve regardless. I'd prefer to see the debug statements uncommented, but it's certainly no reason to hold up this excellent patch :)
-jay
OpenStack Infra (hudson-openstack) wrote : Posted in a previous version of this proposal | # |
Attempt to merge lp:~ewanmellor/nova/xenapi-concurrency-model into lp:nova failed due to merge conflicts:
text conflict in nova/virt/xenapi.py
Ewan Mellor (ewanmellor) wrote : Posted in a previous version of this proposal | # |
I've remerged this with trunk. The style cleanups that went in today caused
inevitable conflicts.
Ewan.
On Tue, Aug 17, 2010 at 10:33:45PM +0100, OpenStack Hudson wrote:
> Attempt to merge lp:~ewanmellor/nova/xenapi-concurrency-model into lp:nova failed due to merge conflicts:
>
> text conflict in nova/virt/xenapi.py
> --
> https:/
> You are the owner of lp:~ewanmellor/nova/xenapi-concurrency-model.
Jay Pipes (jaypipes) : | # |
termie (termie) wrote : | # |
Code looks good to the degree that I understand what xenapi does.
Style fixes: only one line of whitespace between methods, please read HACKING
It also looks like you may have been a little overzealous in wrapping the description xenapi_
If you think deferredToThread is globally useful you may consider putting it in utils.
Other than those small things code looks awesome, a welcome upgrade.
justinsb (justin-fathomdb) wrote : | # |
Nice.
Hopefully all these deferred twists and turns will soon be a distant and painful memory, in the compute layer at least. If we have to spawn threads (even pooled threads) for each of these method calls, I can't see Twisted having any advantages here.
OZAWA Tsuyoshi (ozawa-tsuyoshi) : | # |
- 231. By Ewan Mellor
-
Remove whitespace to match style guide.
- 232. By Ewan Mellor
-
Move deferredToThread into utils, as suggested by termie.
Ewan Mellor (ewanmellor) wrote : | # |
I've removed the additional whitespace between the methods.
I've left the description for xenapi_
I should wrap at spaces (not at equals signs for example) and that I should
keep the description lined up on the LHS with the open parenthesis, in
both lines the next break would be at column 82, and I presume that we're
working in 80 columns. They are big words at the end there, which is why
they look a bit odd.
I've put deferredToThread into utils, as suggested.
Cheers,
Ewan.
On Wed, Aug 18, 2010 at 05:13:46PM +0100, termie wrote:
> Review: Needs Fixing
> Code looks good to the degree that I understand what xenapi does.
>
> Style fixes: only one line of whitespace between methods, please read HACKING
>
> It also looks like you may have been a little overzealous in wrapping the description xenapi_
>
> If you think deferredToThread is globally useful you may consider putting it in utils.
>
> Other than those small things code looks awesome, a welcome upgrade.
> --
> https:/
> You are the owner of lp:~ewanmellor/nova/xenapi-concurrency-model.
Unmerged revisions
Preview Diff
1 | === modified file 'nova/utils.py' | |||
2 | --- nova/utils.py 2010-08-16 12:16:21 +0000 | |||
3 | +++ nova/utils.py 2010-08-19 14:16:14 +0000 | |||
4 | @@ -29,6 +29,8 @@ | |||
5 | 29 | import socket | 29 | import socket |
6 | 30 | import sys | 30 | import sys |
7 | 31 | 31 | ||
8 | 32 | from twisted.internet.threads import deferToThread | ||
9 | 33 | |||
10 | 32 | from nova import exception | 34 | from nova import exception |
11 | 33 | from nova import flags | 35 | from nova import flags |
12 | 34 | 36 | ||
13 | @@ -142,3 +144,9 @@ | |||
14 | 142 | 144 | ||
15 | 143 | def parse_isotime(timestr): | 145 | def parse_isotime(timestr): |
16 | 144 | return datetime.datetime.strptime(timestr, TIME_FORMAT) | 146 | return datetime.datetime.strptime(timestr, TIME_FORMAT) |
17 | 147 | |||
18 | 148 | |||
19 | 149 | def deferredToThread(f): | ||
20 | 150 | def g(*args, **kwargs): | ||
21 | 151 | return deferToThread(f, *args, **kwargs) | ||
22 | 152 | return g | ||
23 | 145 | 153 | ||
24 | === modified file 'nova/virt/xenapi.py' | |||
25 | --- nova/virt/xenapi.py 2010-08-17 11:53:30 +0000 | |||
26 | +++ nova/virt/xenapi.py 2010-08-19 14:16:14 +0000 | |||
27 | @@ -16,17 +16,35 @@ | |||
28 | 16 | 16 | ||
29 | 17 | """ | 17 | """ |
30 | 18 | A connection to XenServer or Xen Cloud Platform. | 18 | A connection to XenServer or Xen Cloud Platform. |
31 | 19 | |||
32 | 20 | The concurrency model for this class is as follows: | ||
33 | 21 | |||
34 | 22 | All XenAPI calls are on a thread (using t.i.t.deferToThread, via the decorator | ||
35 | 23 | deferredToThread). They are remote calls, and so may hang for the usual | ||
36 | 24 | reasons. They should not be allowed to block the reactor thread. | ||
37 | 25 | |||
38 | 26 | All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async | ||
39 | 27 | (using XenAPI.VM.async_start etc). These return a task, which can then be | ||
40 | 28 | polled for completion. Polling is handled using reactor.callLater. | ||
41 | 29 | |||
42 | 30 | This combination of techniques means that we don't block the reactor thread at | ||
43 | 31 | all, and at the same time we don't hold lots of threads waiting for | ||
44 | 32 | long-running operations. | ||
45 | 33 | |||
46 | 34 | FIXME: get_info currently doesn't conform to these rules, and will block the | ||
47 | 35 | reactor thread if the VM.get_by_name_label or VM.get_record calls block. | ||
48 | 19 | """ | 36 | """ |
49 | 20 | 37 | ||
50 | 21 | import logging | 38 | import logging |
51 | 22 | import xmlrpclib | 39 | import xmlrpclib |
52 | 23 | 40 | ||
53 | 24 | from twisted.internet import defer | 41 | from twisted.internet import defer |
54 | 42 | from twisted.internet import reactor | ||
55 | 25 | from twisted.internet import task | 43 | from twisted.internet import task |
56 | 26 | 44 | ||
57 | 27 | from nova import exception | ||
58 | 28 | from nova import flags | 45 | from nova import flags |
59 | 29 | from nova import process | 46 | from nova import process |
60 | 47 | from nova import utils | ||
61 | 30 | from nova.auth.manager import AuthManager | 48 | from nova.auth.manager import AuthManager |
62 | 31 | from nova.compute import power_state | 49 | from nova.compute import power_state |
63 | 32 | from nova.virt import images | 50 | from nova.virt import images |
64 | @@ -47,6 +65,11 @@ | |||
65 | 47 | None, | 65 | None, |
66 | 48 | 'Password for connection to XenServer/Xen Cloud Platform.' | 66 | 'Password for connection to XenServer/Xen Cloud Platform.' |
67 | 49 | ' Used only if connection_type=xenapi.') | 67 | ' Used only if connection_type=xenapi.') |
68 | 68 | flags.DEFINE_float('xenapi_task_poll_interval', | ||
69 | 69 | 0.5, | ||
70 | 70 | 'The interval used for polling of remote tasks ' | ||
71 | 71 | '(Async.VM.start, etc). Used only if ' | ||
72 | 72 | 'connection_type=xenapi.') | ||
73 | 50 | 73 | ||
74 | 51 | 74 | ||
75 | 52 | XENAPI_POWER_STATE = { | 75 | XENAPI_POWER_STATE = { |
76 | @@ -84,9 +107,8 @@ | |||
77 | 84 | for vm in self._conn.xenapi.VM.get_all()] | 107 | for vm in self._conn.xenapi.VM.get_all()] |
78 | 85 | 108 | ||
79 | 86 | @defer.inlineCallbacks | 109 | @defer.inlineCallbacks |
80 | 87 | @exception.wrap_exception | ||
81 | 88 | def spawn(self, instance): | 110 | def spawn(self, instance): |
83 | 89 | vm = yield self.lookup(instance.name) | 111 | vm = yield self._lookup(instance.name) |
84 | 90 | if vm is not None: | 112 | if vm is not None: |
85 | 91 | raise Exception('Attempted to create non-unique name %s' % | 113 | raise Exception('Attempted to create non-unique name %s' % |
86 | 92 | instance.name) | 114 | instance.name) |
87 | @@ -105,21 +127,27 @@ | |||
88 | 105 | 127 | ||
89 | 106 | user = AuthManager().get_user(instance.datamodel['user_id']) | 128 | user = AuthManager().get_user(instance.datamodel['user_id']) |
90 | 107 | project = AuthManager().get_project(instance.datamodel['project_id']) | 129 | project = AuthManager().get_project(instance.datamodel['project_id']) |
92 | 108 | vdi_uuid = yield self.fetch_image( | 130 | vdi_uuid = yield self._fetch_image( |
93 | 109 | instance.datamodel['image_id'], user, project, True) | 131 | instance.datamodel['image_id'], user, project, True) |
95 | 110 | kernel = yield self.fetch_image( | 132 | kernel = yield self._fetch_image( |
96 | 111 | instance.datamodel['kernel_id'], user, project, False) | 133 | instance.datamodel['kernel_id'], user, project, False) |
98 | 112 | ramdisk = yield self.fetch_image( | 134 | ramdisk = yield self._fetch_image( |
99 | 113 | instance.datamodel['ramdisk_id'], user, project, False) | 135 | instance.datamodel['ramdisk_id'], user, project, False) |
101 | 114 | vdi_ref = yield self._conn.xenapi.VDI.get_by_uuid(vdi_uuid) | 136 | vdi_ref = yield self._call_xenapi('VDI.get_by_uuid', vdi_uuid) |
102 | 115 | 137 | ||
105 | 116 | vm_ref = yield self.create_vm(instance, kernel, ramdisk) | 138 | vm_ref = yield self._create_vm(instance, kernel, ramdisk) |
106 | 117 | yield self.create_vbd(vm_ref, vdi_ref, 0, True) | 139 | yield self._create_vbd(vm_ref, vdi_ref, 0, True) |
107 | 118 | if network_ref: | 140 | if network_ref: |
108 | 119 | yield self._create_vif(vm_ref, network_ref, mac_address) | 141 | yield self._create_vif(vm_ref, network_ref, mac_address) |
110 | 120 | yield self._conn.xenapi.VM.start(vm_ref, False, False) | 142 | logging.debug('Starting VM %s...', vm_ref) |
111 | 143 | yield self._call_xenapi('VM.start', vm_ref, False, False) | ||
112 | 144 | logging.info('Spawning VM %s created %s.', instance.name, vm_ref) | ||
113 | 121 | 145 | ||
115 | 122 | def create_vm(self, instance, kernel, ramdisk): | 146 | @defer.inlineCallbacks |
116 | 147 | def _create_vm(self, instance, kernel, ramdisk): | ||
117 | 148 | """Create a VM record. Returns a Deferred that gives the new | ||
118 | 149 | VM reference.""" | ||
119 | 150 | |||
120 | 123 | mem = str(long(instance.datamodel['memory_kb']) * 1024) | 151 | mem = str(long(instance.datamodel['memory_kb']) * 1024) |
121 | 124 | vcpus = str(instance.datamodel['vcpus']) | 152 | vcpus = str(instance.datamodel['vcpus']) |
122 | 125 | rec = { | 153 | rec = { |
123 | @@ -152,11 +180,15 @@ | |||
124 | 152 | 'other_config': {}, | 180 | 'other_config': {}, |
125 | 153 | } | 181 | } |
126 | 154 | logging.debug('Created VM %s...', instance.name) | 182 | logging.debug('Created VM %s...', instance.name) |
128 | 155 | vm_ref = self._conn.xenapi.VM.create(rec) | 183 | vm_ref = yield self._call_xenapi('VM.create', rec) |
129 | 156 | logging.debug('Created VM %s as %s.', instance.name, vm_ref) | 184 | logging.debug('Created VM %s as %s.', instance.name, vm_ref) |
131 | 157 | return vm_ref | 185 | defer.returnValue(vm_ref) |
132 | 158 | 186 | ||
134 | 159 | def create_vbd(self, vm_ref, vdi_ref, userdevice, bootable): | 187 | @defer.inlineCallbacks |
135 | 188 | def _create_vbd(self, vm_ref, vdi_ref, userdevice, bootable): | ||
136 | 189 | """Create a VBD record. Returns a Deferred that gives the new | ||
137 | 190 | VBD reference.""" | ||
138 | 191 | |||
139 | 160 | vbd_rec = {} | 192 | vbd_rec = {} |
140 | 161 | vbd_rec['VM'] = vm_ref | 193 | vbd_rec['VM'] = vm_ref |
141 | 162 | vbd_rec['VDI'] = vdi_ref | 194 | vbd_rec['VDI'] = vdi_ref |
142 | @@ -171,12 +203,16 @@ | |||
143 | 171 | vbd_rec['qos_algorithm_params'] = {} | 203 | vbd_rec['qos_algorithm_params'] = {} |
144 | 172 | vbd_rec['qos_supported_algorithms'] = [] | 204 | vbd_rec['qos_supported_algorithms'] = [] |
145 | 173 | logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref) | 205 | logging.debug('Creating VBD for VM %s, VDI %s ... ', vm_ref, vdi_ref) |
147 | 174 | vbd_ref = self._conn.xenapi.VBD.create(vbd_rec) | 206 | vbd_ref = yield self._call_xenapi('VBD.create', vbd_rec) |
148 | 175 | logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref, | 207 | logging.debug('Created VBD %s for VM %s, VDI %s.', vbd_ref, vm_ref, |
149 | 176 | vdi_ref) | 208 | vdi_ref) |
151 | 177 | return vbd_ref | 209 | defer.returnValue(vbd_ref) |
152 | 178 | 210 | ||
153 | 211 | @defer.inlineCallbacks | ||
154 | 179 | def _create_vif(self, vm_ref, network_ref, mac_address): | 212 | def _create_vif(self, vm_ref, network_ref, mac_address): |
155 | 213 | """Create a VIF record. Returns a Deferred that gives the new | ||
156 | 214 | VIF reference.""" | ||
157 | 215 | |||
158 | 180 | vif_rec = {} | 216 | vif_rec = {} |
159 | 181 | vif_rec['device'] = '0' | 217 | vif_rec['device'] = '0' |
160 | 182 | vif_rec['network']= network_ref | 218 | vif_rec['network']= network_ref |
161 | @@ -188,25 +224,29 @@ | |||
162 | 188 | vif_rec['qos_algorithm_params'] = {} | 224 | vif_rec['qos_algorithm_params'] = {} |
163 | 189 | logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref, | 225 | logging.debug('Creating VIF for VM %s, network %s ... ', vm_ref, |
164 | 190 | network_ref) | 226 | network_ref) |
166 | 191 | vif_ref = self._conn.xenapi.VIF.create(vif_rec) | 227 | vif_ref = yield self._call_xenapi('VIF.create', vif_rec) |
167 | 192 | logging.debug('Created VIF %s for VM %s, network %s.', vif_ref, | 228 | logging.debug('Created VIF %s for VM %s, network %s.', vif_ref, |
168 | 193 | vm_ref, network_ref) | 229 | vm_ref, network_ref) |
170 | 194 | return vif_ref | 230 | defer.returnValue(vif_ref) |
171 | 195 | 231 | ||
172 | 232 | @defer.inlineCallbacks | ||
173 | 196 | def _find_network_with_bridge(self, bridge): | 233 | def _find_network_with_bridge(self, bridge): |
174 | 197 | expr = 'field "bridge" = "%s"' % bridge | 234 | expr = 'field "bridge" = "%s"' % bridge |
176 | 198 | networks = self._conn.xenapi.network.get_all_records_where(expr) | 235 | networks = yield self._call_xenapi('network.get_all_records_where', |
177 | 236 | expr) | ||
178 | 199 | if len(networks) == 1: | 237 | if len(networks) == 1: |
180 | 200 | return networks.keys()[0] | 238 | defer.returnValue(networks.keys()[0]) |
181 | 201 | elif len(networks) > 1: | 239 | elif len(networks) > 1: |
182 | 202 | raise Exception('Found non-unique network for bridge %s' % bridge) | 240 | raise Exception('Found non-unique network for bridge %s' % bridge) |
183 | 203 | else: | 241 | else: |
184 | 204 | raise Exception('Found no network for bridge %s' % bridge) | 242 | raise Exception('Found no network for bridge %s' % bridge) |
185 | 205 | 243 | ||
187 | 206 | def fetch_image(self, image, user, project, use_sr): | 244 | @defer.inlineCallbacks |
188 | 245 | def _fetch_image(self, image, user, project, use_sr): | ||
189 | 207 | """use_sr: True to put the image as a VDI in an SR, False to place | 246 | """use_sr: True to put the image as a VDI in an SR, False to place |
190 | 208 | it on dom0's filesystem. The former is for VM disks, the latter for | 247 | it on dom0's filesystem. The former is for VM disks, the latter for |
192 | 209 | its kernel and ramdisk (if external kernels are being used).""" | 248 | its kernel and ramdisk (if external kernels are being used). |
193 | 249 | Returns a Deferred that gives the new VDI UUID.""" | ||
194 | 210 | 250 | ||
195 | 211 | url = images.image_url(image) | 251 | url = images.image_url(image) |
196 | 212 | access = AuthManager().get_access_key(user, project) | 252 | access = AuthManager().get_access_key(user, project) |
197 | @@ -218,22 +258,28 @@ | |||
198 | 218 | args['password'] = user.secret | 258 | args['password'] = user.secret |
199 | 219 | if use_sr: | 259 | if use_sr: |
200 | 220 | args['add_partition'] = 'true' | 260 | args['add_partition'] = 'true' |
202 | 221 | return self._call_plugin('objectstore', fn, args) | 261 | task = yield self._async_call_plugin('objectstore', fn, args) |
203 | 262 | uuid = yield self._wait_for_task(task) | ||
204 | 263 | defer.returnValue(uuid) | ||
205 | 222 | 264 | ||
206 | 265 | @defer.inlineCallbacks | ||
207 | 223 | def reboot(self, instance): | 266 | def reboot(self, instance): |
209 | 224 | vm = self.lookup(instance.name) | 267 | vm = yield self._lookup(instance.name) |
210 | 225 | if vm is None: | 268 | if vm is None: |
211 | 226 | raise Exception('instance not present %s' % instance.name) | 269 | raise Exception('instance not present %s' % instance.name) |
213 | 227 | yield self._conn.xenapi.VM.clean_reboot(vm) | 270 | task = yield self._call_xenapi('Async.VM.clean_reboot', vm) |
214 | 271 | yield self._wait_for_task(task) | ||
215 | 228 | 272 | ||
216 | 273 | @defer.inlineCallbacks | ||
217 | 229 | def destroy(self, instance): | 274 | def destroy(self, instance): |
219 | 230 | vm = self.lookup(instance.name) | 275 | vm = yield self._lookup(instance.name) |
220 | 231 | if vm is None: | 276 | if vm is None: |
221 | 232 | raise Exception('instance not present %s' % instance.name) | 277 | raise Exception('instance not present %s' % instance.name) |
223 | 233 | yield self._conn.xenapi.VM.destroy(vm) | 278 | task = yield self._call_xenapi('Async.VM.destroy', vm) |
224 | 279 | yield self._wait_for_task(task) | ||
225 | 234 | 280 | ||
226 | 235 | def get_info(self, instance_id): | 281 | def get_info(self, instance_id): |
228 | 236 | vm = self.lookup(instance_id) | 282 | vm = self._lookup_blocking(instance_id) |
229 | 237 | if vm is None: | 283 | if vm is None: |
230 | 238 | raise Exception('instance not present %s' % instance_id) | 284 | raise Exception('instance not present %s' % instance_id) |
231 | 239 | rec = self._conn.xenapi.VM.get_record(vm) | 285 | rec = self._conn.xenapi.VM.get_record(vm) |
232 | @@ -243,7 +289,11 @@ | |||
233 | 243 | 'num_cpu': rec['VCPUs_max'], | 289 | 'num_cpu': rec['VCPUs_max'], |
234 | 244 | 'cpu_time': 0} | 290 | 'cpu_time': 0} |
235 | 245 | 291 | ||
237 | 246 | def lookup(self, i): | 292 | @utils.deferredToThread |
238 | 293 | def _lookup(self, i): | ||
239 | 294 | return self._lookup_blocking(i) | ||
240 | 295 | |||
241 | 296 | def _lookup_blocking(self, i): | ||
242 | 247 | vms = self._conn.xenapi.VM.get_by_name_label(i) | 297 | vms = self._conn.xenapi.VM.get_by_name_label(i) |
243 | 248 | n = len(vms) | 298 | n = len(vms) |
244 | 249 | if n == 0: | 299 | if n == 0: |
245 | @@ -253,9 +303,52 @@ | |||
246 | 253 | else: | 303 | else: |
247 | 254 | return vms[0] | 304 | return vms[0] |
248 | 255 | 305 | ||
250 | 256 | def _call_plugin(self, plugin, fn, args): | 306 | def _wait_for_task(self, task): |
251 | 307 | """Return a Deferred that will give the result of the given task. | ||
252 | 308 | The task is polled until it completes.""" | ||
253 | 309 | d = defer.Deferred() | ||
254 | 310 | reactor.callLater(0, self._poll_task, task, d) | ||
255 | 311 | return d | ||
256 | 312 | |||
257 | 313 | @utils.deferredToThread | ||
258 | 314 | def _poll_task(self, task, deferred): | ||
259 | 315 | """Poll the given XenAPI task, and fire the given Deferred if we | ||
260 | 316 | get a result.""" | ||
261 | 317 | try: | ||
262 | 318 | #logging.debug('Polling task %s...', task) | ||
263 | 319 | status = self._conn.xenapi.task.get_status(task) | ||
264 | 320 | if status == 'pending': | ||
265 | 321 | reactor.callLater(FLAGS.xenapi_task_poll_interval, | ||
266 | 322 | self._poll_task, task, deferred) | ||
267 | 323 | elif status == 'success': | ||
268 | 324 | result = self._conn.xenapi.task.get_result(task) | ||
269 | 325 | logging.info('Task %s status: success. %s', task, result) | ||
270 | 326 | deferred.callback(_parse_xmlrpc_value(result)) | ||
271 | 327 | else: | ||
272 | 328 | error_info = self._conn.xenapi.task.get_error_info(task) | ||
273 | 329 | logging.warn('Task %s status: %s. %s', task, status, | ||
274 | 330 | error_info) | ||
275 | 331 | deferred.errback(XenAPI.Failure(error_info)) | ||
276 | 332 | #logging.debug('Polling task %s done.', task) | ||
277 | 333 | except Exception, exn: | ||
278 | 334 | logging.warn(exn) | ||
279 | 335 | deferred.errback(exn) | ||
280 | 336 | |||
281 | 337 | @utils.deferredToThread | ||
282 | 338 | def _call_xenapi(self, method, *args): | ||
283 | 339 | """Call the specified XenAPI method on a background thread. Returns | ||
284 | 340 | a Deferred for the result.""" | ||
285 | 341 | f = self._conn.xenapi | ||
286 | 342 | for m in method.split('.'): | ||
287 | 343 | f = f.__getattr__(m) | ||
288 | 344 | return f(*args) | ||
289 | 345 | |||
290 | 346 | @utils.deferredToThread | ||
291 | 347 | def _async_call_plugin(self, plugin, fn, args): | ||
292 | 348 | """Call Async.host.call_plugin on a background thread. Returns a | ||
293 | 349 | Deferred with the task reference.""" | ||
294 | 257 | return _unwrap_plugin_exceptions( | 350 | return _unwrap_plugin_exceptions( |
296 | 258 | self._conn.xenapi.host.call_plugin, | 351 | self._conn.xenapi.Async.host.call_plugin, |
297 | 259 | self._get_xenapi_host(), plugin, fn, args) | 352 | self._get_xenapi_host(), plugin, fn, args) |
298 | 260 | 353 | ||
299 | 261 | def _get_xenapi_host(self): | 354 | def _get_xenapi_host(self): |
300 | @@ -281,3 +374,15 @@ | |||
301 | 281 | except xmlrpclib.ProtocolError, exn: | 374 | except xmlrpclib.ProtocolError, exn: |
302 | 282 | logging.debug("Got exception: %s", exn) | 375 | logging.debug("Got exception: %s", exn) |
303 | 283 | raise | 376 | raise |
304 | 377 | |||
305 | 378 | |||
306 | 379 | def _parse_xmlrpc_value(val): | ||
307 | 380 | """Parse the given value as if it were an XML-RPC value. This is | ||
308 | 381 | sometimes used as the format for the task.result field.""" | ||
309 | 382 | if not val: | ||
310 | 383 | return val | ||
311 | 384 | x = xmlrpclib.loads( | ||
312 | 385 | '<?xml version="1.0"?><methodResponse><params><param>' + | ||
313 | 386 | val + | ||
314 | 387 | '</param></params></methodResponse>') | ||
315 | 388 | return x[0][0] |
Really nice work, Ewan! No criticism at all from me! Feel free to uncomment the logging.debug() output, though. :)