Merge lp:~xianghui/ubuntu/trusty/neutron/lp1318721 into lp:ubuntu/trusty-proposed/neutron
- Trusty (14.04)
- lp1318721
- Merge into trusty-proposed
Proposed by
Xiang Hui
Status: | Approved | ||||
---|---|---|---|---|---|
Approved by: | Serge Hallyn | ||||
Approved revision: | 37 | ||||
Proposed branch: | lp:~xianghui/ubuntu/trusty/neutron/lp1318721 | ||||
Merge into: | lp:ubuntu/trusty-proposed/neutron | ||||
Diff against target: |
1047 lines (+992/-1) 6 files modified
.pc/applied-patches (+1/-0) .pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron/openstack/common/rpc/impl_kombu.py (+858/-0) debian/changelog (+8/-0) debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch (+77/-0) debian/patches/series (+1/-0) neutron/openstack/common/rpc/impl_kombu.py (+47/-1) |
||||
To merge this branch: | bzr merge lp:~xianghui/ubuntu/trusty/neutron/lp1318721 | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Ubuntu Development Team | Pending | ||
Review via email: mp+280835@code.launchpad.net |
Commit message
Description of the change
* Backport of upstream release. (LP: #1318721):-
- d/p/fix-
Redeclare if exception is catched after self.queue.
To post a comment you must log in.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file '.pc/applied-patches' |
2 | --- .pc/applied-patches 2014-08-08 08:30:52 +0000 |
3 | +++ .pc/applied-patches 2016-01-08 08:41:27 +0000 |
4 | @@ -2,3 +2,4 @@ |
5 | disable-udev-tests.patch |
6 | skip-ipv6-tests.patch |
7 | use-concurrency.patch |
8 | +fix-reconnect-race-condition-with-rabbitmq-cluster.patch |
9 | |
10 | === added file '.pc/disable-udev-tests.patch/.timestamp' |
11 | === added file '.pc/fix-quantum-configuration.patch/.timestamp' |
12 | === added directory '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch' |
13 | === added file '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/.timestamp' |
14 | === added directory '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron' |
15 | === added directory '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron/openstack' |
16 | === added directory '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron/openstack/common' |
17 | === added directory '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron/openstack/common/rpc' |
18 | === added file '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron/openstack/common/rpc/impl_kombu.py' |
19 | --- .pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron/openstack/common/rpc/impl_kombu.py 1970-01-01 00:00:00 +0000 |
20 | +++ .pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron/openstack/common/rpc/impl_kombu.py 2016-01-08 08:41:27 +0000 |
21 | @@ -0,0 +1,858 @@ |
22 | +# Copyright 2011 OpenStack Foundation |
23 | +# |
24 | +# Licensed under the Apache License, Version 2.0 (the "License"); you may |
25 | +# not use this file except in compliance with the License. You may obtain |
26 | +# a copy of the License at |
27 | +# |
28 | +# http://www.apache.org/licenses/LICENSE-2.0 |
29 | +# |
30 | +# Unless required by applicable law or agreed to in writing, software |
31 | +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
32 | +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
33 | +# License for the specific language governing permissions and limitations |
34 | +# under the License. |
35 | + |
36 | +import functools |
37 | +import itertools |
38 | +import socket |
39 | +import ssl |
40 | +import time |
41 | +import uuid |
42 | + |
43 | +import eventlet |
44 | +import greenlet |
45 | +import kombu |
46 | +import kombu.connection |
47 | +import kombu.entity |
48 | +import kombu.messaging |
49 | +from oslo.config import cfg |
50 | +import six |
51 | + |
52 | +from neutron.openstack.common import excutils |
53 | +from neutron.openstack.common.gettextutils import _, _LE, _LI |
54 | +from neutron.openstack.common import network_utils |
55 | +from neutron.openstack.common.rpc import amqp as rpc_amqp |
56 | +from neutron.openstack.common.rpc import common as rpc_common |
57 | +from neutron.openstack.common import sslutils |
58 | + |
59 | +kombu_opts = [ |
60 | + cfg.StrOpt('kombu_ssl_version', |
61 | + default='', |
62 | + help='If SSL is enabled, the SSL version to use. Valid ' |
63 | + 'values are TLSv1, SSLv23 and SSLv3. SSLv2 might ' |
64 | + 'be available on some distributions.' |
65 | + ), |
66 | + cfg.StrOpt('kombu_ssl_keyfile', |
67 | + default='', |
68 | + help='SSL key file (valid only if SSL enabled)'), |
69 | + cfg.StrOpt('kombu_ssl_certfile', |
70 | + default='', |
71 | + help='SSL cert file (valid only if SSL enabled)'), |
72 | + cfg.StrOpt('kombu_ssl_ca_certs', |
73 | + default='', |
74 | + help=('SSL certification authority file ' |
75 | + '(valid only if SSL enabled)')), |
76 | + cfg.StrOpt('rabbit_host', |
77 | + default='localhost', |
78 | + help='The RabbitMQ broker address where a single node is used'), |
79 | + cfg.IntOpt('rabbit_port', |
80 | + default=5672, |
81 | + help='The RabbitMQ broker port where a single node is used'), |
82 | + cfg.ListOpt('rabbit_hosts', |
83 | + default=['$rabbit_host:$rabbit_port'], |
84 | + help='RabbitMQ HA cluster host:port pairs'), |
85 | + cfg.BoolOpt('rabbit_use_ssl', |
86 | + default=False, |
87 | + help='Connect over SSL for RabbitMQ'), |
88 | + cfg.StrOpt('rabbit_userid', |
89 | + default='guest', |
90 | + help='The RabbitMQ userid'), |
91 | + cfg.StrOpt('rabbit_password', |
92 | + default='guest', |
93 | + help='The RabbitMQ password', |
94 | + secret=True), |
95 | + cfg.StrOpt('rabbit_virtual_host', |
96 | + default='/', |
97 | + help='The RabbitMQ virtual host'), |
98 | + cfg.IntOpt('rabbit_retry_interval', |
99 | + default=1, |
100 | + help='How frequently to retry connecting with RabbitMQ'), |
101 | + cfg.IntOpt('rabbit_retry_backoff', |
102 | + default=2, |
103 | + help='How long to backoff for between retries when connecting ' |
104 | + 'to RabbitMQ'), |
105 | + cfg.IntOpt('rabbit_max_retries', |
106 | + default=0, |
107 | + help='Maximum number of RabbitMQ connection retries. ' |
108 | + 'Default is 0 (infinite retry count)'), |
109 | + cfg.BoolOpt('rabbit_ha_queues', |
110 | + default=False, |
111 | + help='Use HA queues in RabbitMQ (x-ha-policy: all). ' |
112 | + 'If you change this option, you must wipe the ' |
113 | + 'RabbitMQ database.'), |
114 | + |
115 | +] |
116 | + |
117 | +cfg.CONF.register_opts(kombu_opts) |
118 | + |
119 | +LOG = rpc_common.LOG |
120 | + |
121 | + |
122 | +def _get_queue_arguments(conf): |
123 | + """Construct the arguments for declaring a queue. |
124 | + |
125 | + If the rabbit_ha_queues option is set, we declare a mirrored queue |
126 | + as described here: |
127 | + |
128 | + http://www.rabbitmq.com/ha.html |
129 | + |
130 | + Setting x-ha-policy to all means that the queue will be mirrored |
131 | + to all nodes in the cluster. |
132 | + """ |
133 | + return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {} |
134 | + |
135 | + |
136 | +class ConsumerBase(object): |
137 | + """Consumer base class.""" |
138 | + |
139 | + def __init__(self, channel, callback, tag, **kwargs): |
140 | + """Declare a queue on an amqp channel. |
141 | + |
142 | + 'channel' is the amqp channel to use |
143 | + 'callback' is the callback to call when messages are received |
144 | + 'tag' is a unique ID for the consumer on the channel |
145 | + |
146 | + queue name, exchange name, and other kombu options are |
147 | + passed in here as a dictionary. |
148 | + """ |
149 | + self.callback = callback |
150 | + self.tag = str(tag) |
151 | + self.kwargs = kwargs |
152 | + self.queue = None |
153 | + self.ack_on_error = kwargs.get('ack_on_error', True) |
154 | + self.reconnect(channel) |
155 | + |
156 | + def reconnect(self, channel): |
157 | + """Re-declare the queue after a rabbit reconnect.""" |
158 | + self.channel = channel |
159 | + self.kwargs['channel'] = channel |
160 | + self.queue = kombu.entity.Queue(**self.kwargs) |
161 | + self.queue.declare() |
162 | + |
163 | + def _callback_handler(self, message, callback): |
164 | + """Call callback with deserialized message. |
165 | + |
166 | + Messages that are processed without exception are ack'ed. |
167 | + |
168 | + If the message processing generates an exception, it will be |
169 | + ack'ed if ack_on_error=True. Otherwise it will be .requeue()'ed. |
170 | + """ |
171 | + |
172 | + try: |
173 | + msg = rpc_common.deserialize_msg(message.payload) |
174 | + callback(msg) |
175 | + except Exception: |
176 | + if self.ack_on_error: |
177 | + LOG.exception(_LE("Failed to process message" |
178 | + " ... skipping it.")) |
179 | + message.ack() |
180 | + else: |
181 | + LOG.exception(_LE("Failed to process message" |
182 | + " ... will requeue.")) |
183 | + message.requeue() |
184 | + else: |
185 | + message.ack() |
186 | + |
187 | + def consume(self, *args, **kwargs): |
188 | + """Actually declare the consumer on the amqp channel. This will |
189 | + start the flow of messages from the queue. Using the |
190 | + Connection.iterconsume() iterator will process the messages, |
191 | + calling the appropriate callback. |
192 | + |
193 | + If a callback is specified in kwargs, use that. Otherwise, |
194 | + use the callback passed during __init__() |
195 | + |
196 | + If kwargs['nowait'] is True, then this call will block until |
197 | + a message is read. |
198 | + |
199 | + """ |
200 | + |
201 | + options = {'consumer_tag': self.tag} |
202 | + options['nowait'] = kwargs.get('nowait', False) |
203 | + callback = kwargs.get('callback', self.callback) |
204 | + if not callback: |
205 | + raise ValueError("No callback defined") |
206 | + |
207 | + def _callback(raw_message): |
208 | + message = self.channel.message_to_python(raw_message) |
209 | + self._callback_handler(message, callback) |
210 | + |
211 | + self.queue.consume(*args, callback=_callback, **options) |
212 | + |
213 | + def cancel(self): |
214 | + """Cancel the consuming from the queue, if it has started.""" |
215 | + try: |
216 | + self.queue.cancel(self.tag) |
217 | + except KeyError as e: |
218 | + # NOTE(comstud): Kludge to get around a amqplib bug |
219 | + if str(e) != "u'%s'" % self.tag: |
220 | + raise |
221 | + self.queue = None |
222 | + |
223 | + |
224 | +class DirectConsumer(ConsumerBase): |
225 | + """Queue/consumer class for 'direct'.""" |
226 | + |
227 | + def __init__(self, conf, channel, msg_id, callback, tag, **kwargs): |
228 | + """Init a 'direct' queue. |
229 | + |
230 | + 'channel' is the amqp channel to use |
231 | + 'msg_id' is the msg_id to listen on |
232 | + 'callback' is the callback to call when messages are received |
233 | + 'tag' is a unique ID for the consumer on the channel |
234 | + |
235 | + Other kombu options may be passed |
236 | + """ |
237 | + # Default options |
238 | + options = {'durable': False, |
239 | + 'queue_arguments': _get_queue_arguments(conf), |
240 | + 'auto_delete': True, |
241 | + 'exclusive': False} |
242 | + options.update(kwargs) |
243 | + exchange = kombu.entity.Exchange(name=msg_id, |
244 | + type='direct', |
245 | + durable=options['durable'], |
246 | + auto_delete=options['auto_delete']) |
247 | + super(DirectConsumer, self).__init__(channel, |
248 | + callback, |
249 | + tag, |
250 | + name=msg_id, |
251 | + exchange=exchange, |
252 | + routing_key=msg_id, |
253 | + **options) |
254 | + |
255 | + |
256 | +class TopicConsumer(ConsumerBase): |
257 | + """Consumer class for 'topic'.""" |
258 | + |
259 | + def __init__(self, conf, channel, topic, callback, tag, name=None, |
260 | + exchange_name=None, **kwargs): |
261 | + """Init a 'topic' queue. |
262 | + |
263 | + :param channel: the amqp channel to use |
264 | + :param topic: the topic to listen on |
265 | + :paramtype topic: str |
266 | + :param callback: the callback to call when messages are received |
267 | + :param tag: a unique ID for the consumer on the channel |
268 | + :param name: optional queue name, defaults to topic |
269 | + :paramtype name: str |
270 | + |
271 | + Other kombu options may be passed as keyword arguments |
272 | + """ |
273 | + # Default options |
274 | + options = {'durable': conf.amqp_durable_queues, |
275 | + 'queue_arguments': _get_queue_arguments(conf), |
276 | + 'auto_delete': conf.amqp_auto_delete, |
277 | + 'exclusive': False} |
278 | + options.update(kwargs) |
279 | + exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) |
280 | + exchange = kombu.entity.Exchange(name=exchange_name, |
281 | + type='topic', |
282 | + durable=options['durable'], |
283 | + auto_delete=options['auto_delete']) |
284 | + super(TopicConsumer, self).__init__(channel, |
285 | + callback, |
286 | + tag, |
287 | + name=name or topic, |
288 | + exchange=exchange, |
289 | + routing_key=topic, |
290 | + **options) |
291 | + |
292 | + |
293 | +class FanoutConsumer(ConsumerBase): |
294 | + """Consumer class for 'fanout'.""" |
295 | + |
296 | + def __init__(self, conf, channel, topic, callback, tag, **kwargs): |
297 | + """Init a 'fanout' queue. |
298 | + |
299 | + 'channel' is the amqp channel to use |
300 | + 'topic' is the topic to listen on |
301 | + 'callback' is the callback to call when messages are received |
302 | + 'tag' is a unique ID for the consumer on the channel |
303 | + |
304 | + Other kombu options may be passed |
305 | + """ |
306 | + unique = uuid.uuid4().hex |
307 | + exchange_name = '%s_fanout' % topic |
308 | + queue_name = '%s_fanout_%s' % (topic, unique) |
309 | + |
310 | + # Default options |
311 | + options = {'durable': False, |
312 | + 'queue_arguments': _get_queue_arguments(conf), |
313 | + 'auto_delete': True, |
314 | + 'exclusive': False} |
315 | + options.update(kwargs) |
316 | + exchange = kombu.entity.Exchange(name=exchange_name, type='fanout', |
317 | + durable=options['durable'], |
318 | + auto_delete=options['auto_delete']) |
319 | + super(FanoutConsumer, self).__init__(channel, callback, tag, |
320 | + name=queue_name, |
321 | + exchange=exchange, |
322 | + routing_key=topic, |
323 | + **options) |
324 | + |
325 | + |
326 | +class Publisher(object): |
327 | + """Base Publisher class.""" |
328 | + |
329 | + def __init__(self, channel, exchange_name, routing_key, **kwargs): |
330 | + """Init the Publisher class with the exchange_name, routing_key, |
331 | + and other options |
332 | + """ |
333 | + self.exchange_name = exchange_name |
334 | + self.routing_key = routing_key |
335 | + self.kwargs = kwargs |
336 | + self.reconnect(channel) |
337 | + |
338 | + def reconnect(self, channel): |
339 | + """Re-establish the Producer after a rabbit reconnection.""" |
340 | + self.exchange = kombu.entity.Exchange(name=self.exchange_name, |
341 | + **self.kwargs) |
342 | + self.producer = kombu.messaging.Producer(exchange=self.exchange, |
343 | + channel=channel, |
344 | + routing_key=self.routing_key) |
345 | + |
346 | + def send(self, msg, timeout=None): |
347 | + """Send a message.""" |
348 | + if timeout: |
349 | + # |
350 | + # AMQP TTL is in milliseconds when set in the header. |
351 | + # |
352 | + self.producer.publish(msg, headers={'ttl': (timeout * 1000)}) |
353 | + else: |
354 | + self.producer.publish(msg) |
355 | + |
356 | + |
357 | +class DirectPublisher(Publisher): |
358 | + """Publisher class for 'direct'.""" |
359 | + def __init__(self, conf, channel, msg_id, **kwargs): |
360 | + """init a 'direct' publisher. |
361 | + |
362 | + Kombu options may be passed as keyword args to override defaults |
363 | + """ |
364 | + |
365 | + options = {'durable': False, |
366 | + 'auto_delete': True, |
367 | + 'exclusive': False} |
368 | + options.update(kwargs) |
369 | + super(DirectPublisher, self).__init__(channel, msg_id, msg_id, |
370 | + type='direct', **options) |
371 | + |
372 | + |
373 | +class TopicPublisher(Publisher): |
374 | + """Publisher class for 'topic'.""" |
375 | + def __init__(self, conf, channel, topic, **kwargs): |
376 | + """init a 'topic' publisher. |
377 | + |
378 | + Kombu options may be passed as keyword args to override defaults |
379 | + """ |
380 | + options = {'durable': conf.amqp_durable_queues, |
381 | + 'auto_delete': conf.amqp_auto_delete, |
382 | + 'exclusive': False} |
383 | + options.update(kwargs) |
384 | + exchange_name = rpc_amqp.get_control_exchange(conf) |
385 | + super(TopicPublisher, self).__init__(channel, |
386 | + exchange_name, |
387 | + topic, |
388 | + type='topic', |
389 | + **options) |
390 | + |
391 | + |
392 | +class FanoutPublisher(Publisher): |
393 | + """Publisher class for 'fanout'.""" |
394 | + def __init__(self, conf, channel, topic, **kwargs): |
395 | + """init a 'fanout' publisher. |
396 | + |
397 | + Kombu options may be passed as keyword args to override defaults |
398 | + """ |
399 | + options = {'durable': False, |
400 | + 'auto_delete': True, |
401 | + 'exclusive': False} |
402 | + options.update(kwargs) |
403 | + super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic, |
404 | + None, type='fanout', **options) |
405 | + |
406 | + |
407 | +class NotifyPublisher(TopicPublisher): |
408 | + """Publisher class for 'notify'.""" |
409 | + |
410 | + def __init__(self, conf, channel, topic, **kwargs): |
411 | + self.durable = kwargs.pop('durable', conf.amqp_durable_queues) |
412 | + self.queue_arguments = _get_queue_arguments(conf) |
413 | + super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs) |
414 | + |
415 | + def reconnect(self, channel): |
416 | + super(NotifyPublisher, self).reconnect(channel) |
417 | + |
418 | + # NOTE(jerdfelt): Normally the consumer would create the queue, but |
419 | + # we do this to ensure that messages don't get dropped if the |
420 | + # consumer is started after we do |
421 | + queue = kombu.entity.Queue(channel=channel, |
422 | + exchange=self.exchange, |
423 | + durable=self.durable, |
424 | + name=self.routing_key, |
425 | + routing_key=self.routing_key, |
426 | + queue_arguments=self.queue_arguments) |
427 | + queue.declare() |
428 | + |
429 | + |
430 | +class Connection(object): |
431 | + """Connection object.""" |
432 | + |
433 | + pool = None |
434 | + |
435 | + def __init__(self, conf, server_params=None): |
436 | + self.consumers = [] |
437 | + self.consumer_thread = None |
438 | + self.proxy_callbacks = [] |
439 | + self.conf = conf |
440 | + self.max_retries = self.conf.rabbit_max_retries |
441 | + # Try forever? |
442 | + if self.max_retries <= 0: |
443 | + self.max_retries = None |
444 | + self.interval_start = self.conf.rabbit_retry_interval |
445 | + self.interval_stepping = self.conf.rabbit_retry_backoff |
446 | + # max retry-interval = 30 seconds |
447 | + self.interval_max = 30 |
448 | + self.memory_transport = False |
449 | + |
450 | + if server_params is None: |
451 | + server_params = {} |
452 | + # Keys to translate from server_params to kombu params |
453 | + server_params_to_kombu_params = {'username': 'userid'} |
454 | + |
455 | + ssl_params = self._fetch_ssl_params() |
456 | + params_list = [] |
457 | + for adr in self.conf.rabbit_hosts: |
458 | + hostname, port = network_utils.parse_host_port( |
459 | + adr, default_port=self.conf.rabbit_port) |
460 | + |
461 | + params = { |
462 | + 'hostname': hostname, |
463 | + 'port': port, |
464 | + 'userid': self.conf.rabbit_userid, |
465 | + 'password': self.conf.rabbit_password, |
466 | + 'virtual_host': self.conf.rabbit_virtual_host, |
467 | + } |
468 | + |
469 | + for sp_key, value in six.iteritems(server_params): |
470 | + p_key = server_params_to_kombu_params.get(sp_key, sp_key) |
471 | + params[p_key] = value |
472 | + |
473 | + if self.conf.fake_rabbit: |
474 | + params['transport'] = 'memory' |
475 | + if self.conf.rabbit_use_ssl: |
476 | + params['ssl'] = ssl_params |
477 | + |
478 | + params_list.append(params) |
479 | + |
480 | + self.params_list = params_list |
481 | + |
482 | + brokers_count = len(self.params_list) |
483 | + self.next_broker_indices = itertools.cycle(range(brokers_count)) |
484 | + |
485 | + self.memory_transport = self.conf.fake_rabbit |
486 | + |
487 | + self.connection = None |
488 | + self.reconnect() |
489 | + |
490 | + def _fetch_ssl_params(self): |
491 | + """Handles fetching what ssl params should be used for the connection |
492 | + (if any). |
493 | + """ |
494 | + ssl_params = dict() |
495 | + |
496 | + # http://docs.python.org/library/ssl.html - ssl.wrap_socket |
497 | + if self.conf.kombu_ssl_version: |
498 | + ssl_params['ssl_version'] = sslutils.validate_ssl_version( |
499 | + self.conf.kombu_ssl_version) |
500 | + if self.conf.kombu_ssl_keyfile: |
501 | + ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile |
502 | + if self.conf.kombu_ssl_certfile: |
503 | + ssl_params['certfile'] = self.conf.kombu_ssl_certfile |
504 | + if self.conf.kombu_ssl_ca_certs: |
505 | + ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs |
506 | + # We might want to allow variations in the |
507 | + # future with this? |
508 | + ssl_params['cert_reqs'] = ssl.CERT_REQUIRED |
509 | + |
510 | + # Return the extended behavior or just have the default behavior |
511 | + return ssl_params or True |
512 | + |
513 | + def _connect(self, params): |
514 | + """Connect to rabbit. Re-establish any queues that may have |
515 | + been declared before if we are reconnecting. Exceptions should |
516 | + be handled by the caller. |
517 | + """ |
518 | + if self.connection: |
519 | + LOG.info(_LI("Reconnecting to AMQP server on " |
520 | + "%(hostname)s:%(port)d") % params) |
521 | + try: |
522 | + self.connection.release() |
523 | + except self.connection_errors: |
524 | + pass |
525 | + # Setting this in case the next statement fails, though |
526 | + # it shouldn't be doing any network operations, yet. |
527 | + self.connection = None |
528 | + self.connection = kombu.connection.BrokerConnection(**params) |
529 | + self.connection_errors = self.connection.connection_errors |
530 | + if self.memory_transport: |
531 | + # Kludge to speed up tests. |
532 | + self.connection.transport.polling_interval = 0.0 |
533 | + self.consumer_num = itertools.count(1) |
534 | + self.connection.connect() |
535 | + self.channel = self.connection.channel() |
536 | + # work around 'memory' transport bug in 1.1.3 |
537 | + if self.memory_transport: |
538 | + self.channel._new_queue('ae.undeliver') |
539 | + for consumer in self.consumers: |
540 | + consumer.reconnect(self.channel) |
541 | + LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d') % |
542 | + params) |
543 | + |
544 | + def reconnect(self): |
545 | + """Handles reconnecting and re-establishing queues. |
546 | + Will retry up to self.max_retries number of times. |
547 | + self.max_retries = 0 means to retry forever. |
548 | + Sleep between tries, starting at self.interval_start |
549 | + seconds, backing off self.interval_stepping number of seconds |
550 | + each attempt. |
551 | + """ |
552 | + |
553 | + attempt = 0 |
554 | + while True: |
555 | + params = self.params_list[next(self.next_broker_indices)] |
556 | + attempt += 1 |
557 | + try: |
558 | + self._connect(params) |
559 | + return |
560 | + except (IOError, self.connection_errors) as e: |
561 | + pass |
562 | + except Exception as e: |
563 | + # NOTE(comstud): Unfortunately it's possible for amqplib |
564 | + # to return an error not covered by its transport |
565 | + # connection_errors in the case of a timeout waiting for |
566 | + # a protocol response. (See paste link in LP888621) |
567 | + # So, we check all exceptions for 'timeout' in them |
568 | + # and try to reconnect in this case. |
569 | + if 'timeout' not in str(e): |
570 | + raise |
571 | + |
572 | + log_info = {} |
573 | + log_info['err_str'] = str(e) |
574 | + log_info['max_retries'] = self.max_retries |
575 | + log_info.update(params) |
576 | + |
577 | + if self.max_retries and attempt == self.max_retries: |
578 | + msg = _('Unable to connect to AMQP server on ' |
579 | + '%(hostname)s:%(port)d after %(max_retries)d ' |
580 | + 'tries: %(err_str)s') % log_info |
581 | + LOG.error(msg) |
582 | + raise rpc_common.RPCException(msg) |
583 | + |
584 | + if attempt == 1: |
585 | + sleep_time = self.interval_start or 1 |
586 | + elif attempt > 1: |
587 | + sleep_time += self.interval_stepping |
588 | + if self.interval_max: |
589 | + sleep_time = min(sleep_time, self.interval_max) |
590 | + |
591 | + log_info['sleep_time'] = sleep_time |
592 | + LOG.error(_LE('AMQP server on %(hostname)s:%(port)d is ' |
593 | + 'unreachable: %(err_str)s. Trying again in ' |
594 | + '%(sleep_time)d seconds.') % log_info) |
595 | + time.sleep(sleep_time) |
596 | + |
597 | + def ensure(self, error_callback, method, *args, **kwargs): |
598 | + while True: |
599 | + try: |
600 | + return method(*args, **kwargs) |
601 | + except (self.connection_errors, socket.timeout, IOError) as e: |
602 | + if error_callback: |
603 | + error_callback(e) |
604 | + except Exception as e: |
605 | + # NOTE(comstud): Unfortunately it's possible for amqplib |
606 | + # to return an error not covered by its transport |
607 | + # connection_errors in the case of a timeout waiting for |
608 | + # a protocol response. (See paste link in LP888621) |
609 | + # So, we check all exceptions for 'timeout' in them |
610 | + # and try to reconnect in this case. |
611 | + if 'timeout' not in str(e): |
612 | + raise |
613 | + if error_callback: |
614 | + error_callback(e) |
615 | + self.reconnect() |
616 | + |
617 | + def get_channel(self): |
618 | + """Convenience call for bin/clear_rabbit_queues.""" |
619 | + return self.channel |
620 | + |
621 | + def close(self): |
622 | + """Close/release this connection.""" |
623 | + self.cancel_consumer_thread() |
624 | + self.wait_on_proxy_callbacks() |
625 | + self.connection.release() |
626 | + self.connection = None |
627 | + |
628 | + def reset(self): |
629 | + """Reset a connection so it can be used again.""" |
630 | + self.cancel_consumer_thread() |
631 | + self.wait_on_proxy_callbacks() |
632 | + self.channel.close() |
633 | + self.channel = self.connection.channel() |
634 | + # work around 'memory' transport bug in 1.1.3 |
635 | + if self.memory_transport: |
636 | + self.channel._new_queue('ae.undeliver') |
637 | + self.consumers = [] |
638 | + |
639 | + def declare_consumer(self, consumer_cls, topic, callback): |
640 | + """Create a Consumer using the class that was passed in and |
641 | + add it to our list of consumers |
642 | + """ |
643 | + |
644 | + def _connect_error(exc): |
645 | + log_info = {'topic': topic, 'err_str': str(exc)} |
646 | + LOG.error(_LE("Failed to declare consumer for topic '%(topic)s': " |
647 | + "%(err_str)s") % log_info) |
648 | + |
649 | + def _declare_consumer(): |
650 | + consumer = consumer_cls(self.conf, self.channel, topic, callback, |
651 | + six.next(self.consumer_num)) |
652 | + self.consumers.append(consumer) |
653 | + return consumer |
654 | + |
655 | + return self.ensure(_connect_error, _declare_consumer) |
656 | + |
657 | + def iterconsume(self, limit=None, timeout=None): |
658 | + """Return an iterator that will consume from all queues/consumers.""" |
659 | + |
660 | + info = {'do_consume': True} |
661 | + |
662 | + def _error_callback(exc): |
663 | + if isinstance(exc, socket.timeout): |
664 | + LOG.debug('Timed out waiting for RPC response: %s' % |
665 | + str(exc)) |
666 | + raise rpc_common.Timeout() |
667 | + else: |
668 | + LOG.exception(_LE('Failed to consume message from queue: %s') % |
669 | + str(exc)) |
670 | + info['do_consume'] = True |
671 | + |
672 | + def _consume(): |
673 | + if info['do_consume']: |
674 | + queues_head = self.consumers[:-1] # not fanout. |
675 | + queues_tail = self.consumers[-1] # fanout |
676 | + for queue in queues_head: |
677 | + queue.consume(nowait=True) |
678 | + queues_tail.consume(nowait=False) |
679 | + info['do_consume'] = False |
680 | + return self.connection.drain_events(timeout=timeout) |
681 | + |
682 | + for iteration in itertools.count(0): |
683 | + if limit and iteration >= limit: |
684 | + raise StopIteration |
685 | + yield self.ensure(_error_callback, _consume) |
686 | + |
687 | + def cancel_consumer_thread(self): |
688 | + """Cancel a consumer thread.""" |
689 | + if self.consumer_thread is not None: |
690 | + self.consumer_thread.kill() |
691 | + try: |
692 | + self.consumer_thread.wait() |
693 | + except greenlet.GreenletExit: |
694 | + pass |
695 | + self.consumer_thread = None |
696 | + |
697 | + def wait_on_proxy_callbacks(self): |
698 | + """Wait for all proxy callback threads to exit.""" |
699 | + for proxy_cb in self.proxy_callbacks: |
700 | + proxy_cb.wait() |
701 | + |
702 | + def publisher_send(self, cls, topic, msg, timeout=None, **kwargs): |
703 | + """Send to a publisher based on the publisher class.""" |
704 | + |
705 | + def _error_callback(exc): |
706 | + log_info = {'topic': topic, 'err_str': str(exc)} |
707 | + LOG.exception(_LE("Failed to publish message to topic " |
708 | + "'%(topic)s': %(err_str)s") % log_info) |
709 | + |
710 | + def _publish(): |
711 | + publisher = cls(self.conf, self.channel, topic, **kwargs) |
712 | + publisher.send(msg, timeout) |
713 | + |
714 | + self.ensure(_error_callback, _publish) |
715 | + |
716 | + def declare_direct_consumer(self, topic, callback): |
717 | + """Create a 'direct' queue. |
718 | + In nova's use, this is generally a msg_id queue used for |
719 | + responses for call/multicall |
720 | + """ |
721 | + self.declare_consumer(DirectConsumer, topic, callback) |
722 | + |
723 | + def declare_topic_consumer(self, topic, callback=None, queue_name=None, |
724 | + exchange_name=None, ack_on_error=True): |
725 | + """Create a 'topic' consumer.""" |
726 | + self.declare_consumer(functools.partial(TopicConsumer, |
727 | + name=queue_name, |
728 | + exchange_name=exchange_name, |
729 | + ack_on_error=ack_on_error, |
730 | + ), |
731 | + topic, callback) |
732 | + |
733 | + def declare_fanout_consumer(self, topic, callback): |
734 | + """Create a 'fanout' consumer.""" |
735 | + self.declare_consumer(FanoutConsumer, topic, callback) |
736 | + |
737 | + def direct_send(self, msg_id, msg): |
738 | + """Send a 'direct' message.""" |
739 | + self.publisher_send(DirectPublisher, msg_id, msg) |
740 | + |
741 | + def topic_send(self, topic, msg, timeout=None): |
742 | + """Send a 'topic' message.""" |
743 | + self.publisher_send(TopicPublisher, topic, msg, timeout) |
744 | + |
745 | + def fanout_send(self, topic, msg): |
746 | + """Send a 'fanout' message.""" |
747 | + self.publisher_send(FanoutPublisher, topic, msg) |
748 | + |
749 | + def notify_send(self, topic, msg, **kwargs): |
750 | + """Send a notify message on a topic.""" |
751 | + self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs) |
752 | + |
753 | + def consume(self, limit=None): |
754 | + """Consume from all queues/consumers.""" |
755 | + it = self.iterconsume(limit=limit) |
756 | + while True: |
757 | + try: |
758 | + six.next(it) |
759 | + except StopIteration: |
760 | + return |
761 | + |
762 | + def consume_in_thread(self): |
763 | + """Consumer from all queues/consumers in a greenthread.""" |
764 | + @excutils.forever_retry_uncaught_exceptions |
765 | + def _consumer_thread(): |
766 | + try: |
767 | + self.consume() |
768 | + except greenlet.GreenletExit: |
769 | + return |
770 | + if self.consumer_thread is None: |
771 | + self.consumer_thread = eventlet.spawn(_consumer_thread) |
772 | + return self.consumer_thread |
773 | + |
774 | + def create_consumer(self, topic, proxy, fanout=False): |
775 | + """Create a consumer that calls a method in a proxy object.""" |
776 | + proxy_cb = rpc_amqp.ProxyCallback( |
777 | + self.conf, proxy, |
778 | + rpc_amqp.get_connection_pool(self.conf, Connection)) |
779 | + self.proxy_callbacks.append(proxy_cb) |
780 | + |
781 | + if fanout: |
782 | + self.declare_fanout_consumer(topic, proxy_cb) |
783 | + else: |
784 | + self.declare_topic_consumer(topic, proxy_cb) |
785 | + |
786 | + def create_worker(self, topic, proxy, pool_name): |
787 | + """Create a worker that calls a method in a proxy object.""" |
788 | + proxy_cb = rpc_amqp.ProxyCallback( |
789 | + self.conf, proxy, |
790 | + rpc_amqp.get_connection_pool(self.conf, Connection)) |
791 | + self.proxy_callbacks.append(proxy_cb) |
792 | + self.declare_topic_consumer(topic, proxy_cb, pool_name) |
793 | + |
794 | + def join_consumer_pool(self, callback, pool_name, topic, |
795 | + exchange_name=None, ack_on_error=True): |
796 | + """Register as a member of a group of consumers for a given topic from |
797 | + the specified exchange. |
798 | + |
799 | + Exactly one member of a given pool will receive each message. |
800 | + |
801 | + A message will be delivered to multiple pools, if more than |
802 | + one is created. |
803 | + """ |
804 | + callback_wrapper = rpc_amqp.CallbackWrapper( |
805 | + conf=self.conf, |
806 | + callback=callback, |
807 | + connection_pool=rpc_amqp.get_connection_pool(self.conf, |
808 | + Connection), |
809 | + wait_for_consumers=not ack_on_error |
810 | + ) |
811 | + self.proxy_callbacks.append(callback_wrapper) |
812 | + self.declare_topic_consumer( |
813 | + queue_name=pool_name, |
814 | + topic=topic, |
815 | + exchange_name=exchange_name, |
816 | + callback=callback_wrapper, |
817 | + ack_on_error=ack_on_error, |
818 | + ) |
819 | + |
820 | + |
821 | +def create_connection(conf, new=True): |
822 | + """Create a connection.""" |
823 | + return rpc_amqp.create_connection( |
824 | + conf, new, |
825 | + rpc_amqp.get_connection_pool(conf, Connection)) |
826 | + |
827 | + |
828 | +def multicall(conf, context, topic, msg, timeout=None): |
829 | + """Make a call that returns multiple times.""" |
830 | + return rpc_amqp.multicall( |
831 | + conf, context, topic, msg, timeout, |
832 | + rpc_amqp.get_connection_pool(conf, Connection)) |
833 | + |
834 | + |
835 | +def call(conf, context, topic, msg, timeout=None): |
836 | + """Sends a message on a topic and wait for a response.""" |
837 | + return rpc_amqp.call( |
838 | + conf, context, topic, msg, timeout, |
839 | + rpc_amqp.get_connection_pool(conf, Connection)) |
840 | + |
841 | + |
842 | +def cast(conf, context, topic, msg): |
843 | + """Sends a message on a topic without waiting for a response.""" |
844 | + return rpc_amqp.cast( |
845 | + conf, context, topic, msg, |
846 | + rpc_amqp.get_connection_pool(conf, Connection)) |
847 | + |
848 | + |
849 | +def fanout_cast(conf, context, topic, msg): |
850 | + """Sends a message on a fanout exchange without waiting for a response.""" |
851 | + return rpc_amqp.fanout_cast( |
852 | + conf, context, topic, msg, |
853 | + rpc_amqp.get_connection_pool(conf, Connection)) |
854 | + |
855 | + |
856 | +def cast_to_server(conf, context, server_params, topic, msg): |
857 | + """Sends a message on a topic to a specific server.""" |
858 | + return rpc_amqp.cast_to_server( |
859 | + conf, context, server_params, topic, msg, |
860 | + rpc_amqp.get_connection_pool(conf, Connection)) |
861 | + |
862 | + |
863 | +def fanout_cast_to_server(conf, context, server_params, topic, msg): |
864 | + """Sends a message on a fanout exchange to a specific server.""" |
865 | + return rpc_amqp.fanout_cast_to_server( |
866 | + conf, context, server_params, topic, msg, |
867 | + rpc_amqp.get_connection_pool(conf, Connection)) |
868 | + |
869 | + |
870 | +def notify(conf, context, topic, msg, envelope): |
871 | + """Sends a notification event on a topic.""" |
872 | + return rpc_amqp.notify( |
873 | + conf, context, topic, msg, |
874 | + rpc_amqp.get_connection_pool(conf, Connection), |
875 | + envelope) |
876 | + |
877 | + |
878 | +def cleanup(): |
879 | + return rpc_amqp.cleanup(Connection.pool) |
880 | |
881 | === added file '.pc/skip-ipv6-tests.patch/.timestamp' |
882 | === added file '.pc/use-concurrency.patch/.timestamp' |
883 | === modified file 'debian/changelog' |
884 | --- debian/changelog 2015-06-22 10:14:52 +0000 |
885 | +++ debian/changelog 2016-01-08 08:41:27 +0000 |
886 | @@ -1,3 +1,11 @@ |
887 | +neutron (1:2014.1.5-0ubuntu2) trusty; urgency=medium |
888 | + |
889 | + * Backport upstream release. (LP: #1318721): |
890 | + - d/p/fix-reconnect-race-condition-with-rabbitmq-cluster.patch: |
891 | + Redeclare if exception is catched after self.queue.declare() failed. |
892 | + |
893 | + -- Hui Xiang <hui.xiang@canonical.com> Thu, 17 Dec 2015 16:35:40 +0800 |
894 | + |
895 | neutron (1:2014.1.5-0ubuntu1) trusty; urgency=medium |
896 | |
897 | * Resynchronize with stable/icehouse (877df58) (LP: #1467533): |
898 | |
899 | === added file 'debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch' |
900 | --- debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch 1970-01-01 00:00:00 +0000 |
901 | +++ debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch 2016-01-08 08:41:27 +0000 |
902 | @@ -0,0 +1,77 @@ |
903 | +--- debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch 2016-01-07 15:15:26.000000000 +0800 |
904 | ++++ debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch 1970-01-01 08:00:00.000000000 +0800 |
905 | +@@ -1,74 +0,0 @@ |
906 | +-Description: Fix reconnect race condition with RabbitMQ cluster |
907 | +- |
908 | +- Retry declaring Queue if it fail to workaround the race condition |
909 | +- that may happen when both Neutron and RabbitMQ cluster are trying |
910 | +- to create and delete (Respectively) Queues and Exchanges that have |
911 | +- auto-delete flag set. |
912 | +- |
913 | +- Change-Id: I90febd7c8358b7cccf852c272810e2fad3d91dcd |
914 | +- Closes-Bug: #1318721 |
915 | +- |
916 | +-Author: Mouad Benchchaoui <m.benchchaoui@x-ion.de> |
917 | +-Origin: backport, https://review.openstack.org/#/c/93399/1 |
918 | +-Bug: https://bugs.launchpad.net/neutron/+bug/1318721 |
919 | +---- |
920 | +-This patch header follows DEP-3: http://dep.debian.net/deps/dep3/ |
921 | +-Index: neutron-2014.1.5/neutron/openstack/common/rpc/impl_kombu.py |
922 | +-=================================================================== |
923 | +---- neutron-2014.1.5.orig/neutron/openstack/common/rpc/impl_kombu.py 2015-06-18 22:26:11.000000000 +0000 |
924 | +-+++ neutron-2014.1.5/neutron/openstack/common/rpc/impl_kombu.py 2015-12-01 02:40:27.806902000 +0000 |
925 | +-@@ -137,7 +137,53 @@ |
926 | +- self.channel = channel |
927 | +- self.kwargs['channel'] = channel |
928 | +- self.queue = kombu.entity.Queue(**self.kwargs) |
929 | +-- self.queue.declare() |
930 | +-+ try: |
931 | +-+ self.queue.declare() |
932 | +-+ except Exception: |
933 | +-+ # NOTE(Mouad): Catching Exception b/c Kombu doesn't raise a proper |
934 | +-+ # error instead it's raise what the underlying transport library |
935 | +-+ # raise which can be either 'ampq.NotFound' or |
936 | +-+ # 'librabbitmq.ChannelError' depending on which transport library |
937 | +-+ # is used. |
938 | +-+ LOG.exception("Declaring queue fail retrying ...") |
939 | +-+ # NOTE(Mouad): We need to re-try Queue creation in case the Queue |
940 | +-+ # was created with auto-delete this instruct the Broker to delete |
941 | +-+ # the Queue when the last Consumer disconnect from it, and the |
942 | +-+ # Exchange when the last Queue is deleted from this Exchange. |
943 | +-+ # |
944 | +-+ # Now in a RabbitMQ cluster setup, if the cluster node that we are |
945 | +-+ # connected to go down, 2 things will happen: |
946 | +-+ # |
947 | +-+ # 1. From RabbitMQ side the Queues will be deleted from the other |
948 | +-+ # cluster nodes and then the correspanding Exchanges will also |
949 | +-+ # be deleted. |
950 | +-+ # 2. From Neutron side, Neutron will reconnect to another cluster |
951 | +-+ # node and start creating Exchanges then Queues then Binding |
952 | +-+ # (The order is important to understand the problem). |
953 | +-+ # |
954 | +-+ # Now this may lead to a race condition, specially if Neutron create |
955 | +-+ # Exchange and Queue and before Neutron could bind Queue to Exchange |
956 | +-+ # RabbitMQ nodes just received the 'signal' that they need to delete |
957 | +-+ # Exchanges with auto-delete that belong to the down node, so they |
958 | +-+ # will delete the Exchanges that was just created, and when Neutron |
959 | +-+ # try to bind Queue with Exchange, the Binding will fail b/c the |
960 | +-+ # Exchange is not found. |
961 | +-+ # |
962 | +-+ # But if the first Queue declartion work and binding was created we |
963 | +-+ # suppose that RabbitMQ will not try to delete the Exchange even if |
964 | +-+ # the auto-delete propagation wasn't received yet, b/c the Queue |
965 | +-+ # have a Consumer now and a Binding exist with the Exchange. |
966 | +-+ # |
967 | +-+ # Note: AMQP 0-9-1 deprecated the use of 'auto-delete' for Exchanges |
968 | +-+ # but according to here[1] RabbitMQ doesn't seem that he will delete |
969 | +-+ # it: |
970 | +-+ # |
971 | +-+ # The 'auto-delete' flag on 'exchange.declare' got deprecated in |
972 | +-+ # 0-9-1. Auto-delete exchanges are actually quite useful, so this |
973 | +-+ # flag should be restored. |
974 | +-+ # |
975 | +-+ # [1] http://www.rabbitmq.com/amqp-0-9-1-errata.html |
976 | +-+ self.queue.declare() |
977 | +- |
978 | +- def _callback_handler(self, message, callback): |
979 | +- """Call callback with deserialized message. |
980 | |
981 | === modified file 'debian/patches/series' |
982 | --- debian/patches/series 2014-08-08 08:30:52 +0000 |
983 | +++ debian/patches/series 2016-01-08 08:41:27 +0000 |
984 | @@ -2,3 +2,4 @@ |
985 | disable-udev-tests.patch |
986 | skip-ipv6-tests.patch |
987 | use-concurrency.patch |
988 | +fix-reconnect-race-condition-with-rabbitmq-cluster.patch |
989 | |
990 | === modified file 'neutron/openstack/common/rpc/impl_kombu.py' |
991 | --- neutron/openstack/common/rpc/impl_kombu.py 2014-04-01 16:22:54 +0000 |
992 | +++ neutron/openstack/common/rpc/impl_kombu.py 2016-01-08 08:41:27 +0000 |
993 | @@ -137,7 +137,53 @@ |
994 | self.channel = channel |
995 | self.kwargs['channel'] = channel |
996 | self.queue = kombu.entity.Queue(**self.kwargs) |
997 | - self.queue.declare() |
998 | + try: |
999 | + self.queue.declare() |
1000 | + except Exception: |
1001 | + # NOTE(Mouad): Catching Exception b/c Kombu doesn't raise a proper |
1002 | + # error instead it's raise what the underlying transport library |
1003 | + # raise which can be either 'ampq.NotFound' or |
1004 | + # 'librabbitmq.ChannelError' depending on which transport library |
1005 | + # is used. |
1006 | + LOG.exception("Declaring queue fail retrying ...") |
1007 | + # NOTE(Mouad): We need to re-try Queue creation in case the Queue |
1008 | + # was created with auto-delete this instruct the Broker to delete |
1009 | + # the Queue when the last Consumer disconnect from it, and the |
1010 | + # Exchange when the last Queue is deleted from this Exchange. |
1011 | + # |
1012 | + # Now in a RabbitMQ cluster setup, if the cluster node that we are |
1013 | + # connected to go down, 2 things will happen: |
1014 | + # |
1015 | + # 1. From RabbitMQ side the Queues will be deleted from the other |
1016 | + # cluster nodes and then the correspanding Exchanges will also |
1017 | + # be deleted. |
1018 | + # 2. From Neutron side, Neutron will reconnect to another cluster |
1019 | + # node and start creating Exchanges then Queues then Binding |
1020 | + # (The order is important to understand the problem). |
1021 | + # |
1022 | + # Now this may lead to a race condition, specially if Neutron create |
1023 | + # Exchange and Queue and before Neutron could bind Queue to Exchange |
1024 | + # RabbitMQ nodes just received the 'signal' that they need to delete |
1025 | + # Exchanges with auto-delete that belong to the down node, so they |
1026 | + # will delete the Exchanges that was just created, and when Neutron |
1027 | + # try to bind Queue with Exchange, the Binding will fail b/c the |
1028 | + # Exchange is not found. |
1029 | + # |
1030 | + # But if the first Queue declartion work and binding was created we |
1031 | + # suppose that RabbitMQ will not try to delete the Exchange even if |
1032 | + # the auto-delete propagation wasn't received yet, b/c the Queue |
1033 | + # have a Consumer now and a Binding exist with the Exchange. |
1034 | + # |
1035 | + # Note: AMQP 0-9-1 deprecated the use of 'auto-delete' for Exchanges |
1036 | + # but according to here[1] RabbitMQ doesn't seem that he will delete |
1037 | + # it: |
1038 | + # |
1039 | + # The 'auto-delete' flag on 'exchange.declare' got deprecated in |
1040 | + # 0-9-1. Auto-delete exchanges are actually quite useful, so this |
1041 | + # flag should be restored. |
1042 | + # |
1043 | + # [1] http://www.rabbitmq.com/amqp-0-9-1-errata.html |
1044 | + self.queue.declare() |
1045 | |
1046 | def _callback_handler(self, message, callback): |
1047 | """Call callback with deserialized message. |