Merge lp:~xianghui/ubuntu/trusty/neutron/lp1318721 into lp:ubuntu/trusty-proposed/neutron

Proposed by Xiang Hui
Status: Approved
Approved by: Serge Hallyn
Approved revision: 37
Proposed branch: lp:~xianghui/ubuntu/trusty/neutron/lp1318721
Merge into: lp:ubuntu/trusty-proposed/neutron
Diff against target: 1047 lines (+992/-1)
6 files modified
.pc/applied-patches (+1/-0)
.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron/openstack/common/rpc/impl_kombu.py (+858/-0)
debian/changelog (+8/-0)
debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch (+77/-0)
debian/patches/series (+1/-0)
neutron/openstack/common/rpc/impl_kombu.py (+47/-1)
To merge this branch: bzr merge lp:~xianghui/ubuntu/trusty/neutron/lp1318721
Reviewer Review Type Date Requested Status
Ubuntu Development Team Pending
Review via email: mp+280835@code.launchpad.net

Description of the change

  * Backport of upstream release. (LP: #1318721):-
    - d/p/fix-reconnect-race-condition-with-rabbitmq-cluster.patch:
      Redeclare if exception is catched after self.queue.declare() failed.

To post a comment you must log in.
37. By Xiang Hui

Add dep3 header

Unmerged revisions

37. By Xiang Hui

Add dep3 header

36. By Xiang Hui

  * Backport of upstream release. (LP: #1318721):-
    - d/p/fix-reconnect-race-condition-with-rabbitmq-cluster.patch:
      Redeclare if exception is catched after self.queue.declare() failed.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file '.pc/applied-patches'
2--- .pc/applied-patches 2014-08-08 08:30:52 +0000
3+++ .pc/applied-patches 2016-01-08 08:41:27 +0000
4@@ -2,3 +2,4 @@
5 disable-udev-tests.patch
6 skip-ipv6-tests.patch
7 use-concurrency.patch
8+fix-reconnect-race-condition-with-rabbitmq-cluster.patch
9
10=== added file '.pc/disable-udev-tests.patch/.timestamp'
11=== added file '.pc/fix-quantum-configuration.patch/.timestamp'
12=== added directory '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch'
13=== added file '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/.timestamp'
14=== added directory '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron'
15=== added directory '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron/openstack'
16=== added directory '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron/openstack/common'
17=== added directory '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron/openstack/common/rpc'
18=== added file '.pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron/openstack/common/rpc/impl_kombu.py'
19--- .pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron/openstack/common/rpc/impl_kombu.py 1970-01-01 00:00:00 +0000
20+++ .pc/fix-reconnect-race-condition-with-rabbitmq-cluster.patch/neutron/openstack/common/rpc/impl_kombu.py 2016-01-08 08:41:27 +0000
21@@ -0,0 +1,858 @@
22+# Copyright 2011 OpenStack Foundation
23+#
24+# Licensed under the Apache License, Version 2.0 (the "License"); you may
25+# not use this file except in compliance with the License. You may obtain
26+# a copy of the License at
27+#
28+# http://www.apache.org/licenses/LICENSE-2.0
29+#
30+# Unless required by applicable law or agreed to in writing, software
31+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
32+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
33+# License for the specific language governing permissions and limitations
34+# under the License.
35+
36+import functools
37+import itertools
38+import socket
39+import ssl
40+import time
41+import uuid
42+
43+import eventlet
44+import greenlet
45+import kombu
46+import kombu.connection
47+import kombu.entity
48+import kombu.messaging
49+from oslo.config import cfg
50+import six
51+
52+from neutron.openstack.common import excutils
53+from neutron.openstack.common.gettextutils import _, _LE, _LI
54+from neutron.openstack.common import network_utils
55+from neutron.openstack.common.rpc import amqp as rpc_amqp
56+from neutron.openstack.common.rpc import common as rpc_common
57+from neutron.openstack.common import sslutils
58+
59+kombu_opts = [
60+ cfg.StrOpt('kombu_ssl_version',
61+ default='',
62+ help='If SSL is enabled, the SSL version to use. Valid '
63+ 'values are TLSv1, SSLv23 and SSLv3. SSLv2 might '
64+ 'be available on some distributions.'
65+ ),
66+ cfg.StrOpt('kombu_ssl_keyfile',
67+ default='',
68+ help='SSL key file (valid only if SSL enabled)'),
69+ cfg.StrOpt('kombu_ssl_certfile',
70+ default='',
71+ help='SSL cert file (valid only if SSL enabled)'),
72+ cfg.StrOpt('kombu_ssl_ca_certs',
73+ default='',
74+ help=('SSL certification authority file '
75+ '(valid only if SSL enabled)')),
76+ cfg.StrOpt('rabbit_host',
77+ default='localhost',
78+ help='The RabbitMQ broker address where a single node is used'),
79+ cfg.IntOpt('rabbit_port',
80+ default=5672,
81+ help='The RabbitMQ broker port where a single node is used'),
82+ cfg.ListOpt('rabbit_hosts',
83+ default=['$rabbit_host:$rabbit_port'],
84+ help='RabbitMQ HA cluster host:port pairs'),
85+ cfg.BoolOpt('rabbit_use_ssl',
86+ default=False,
87+ help='Connect over SSL for RabbitMQ'),
88+ cfg.StrOpt('rabbit_userid',
89+ default='guest',
90+ help='The RabbitMQ userid'),
91+ cfg.StrOpt('rabbit_password',
92+ default='guest',
93+ help='The RabbitMQ password',
94+ secret=True),
95+ cfg.StrOpt('rabbit_virtual_host',
96+ default='/',
97+ help='The RabbitMQ virtual host'),
98+ cfg.IntOpt('rabbit_retry_interval',
99+ default=1,
100+ help='How frequently to retry connecting with RabbitMQ'),
101+ cfg.IntOpt('rabbit_retry_backoff',
102+ default=2,
103+ help='How long to backoff for between retries when connecting '
104+ 'to RabbitMQ'),
105+ cfg.IntOpt('rabbit_max_retries',
106+ default=0,
107+ help='Maximum number of RabbitMQ connection retries. '
108+ 'Default is 0 (infinite retry count)'),
109+ cfg.BoolOpt('rabbit_ha_queues',
110+ default=False,
111+ help='Use HA queues in RabbitMQ (x-ha-policy: all). '
112+ 'If you change this option, you must wipe the '
113+ 'RabbitMQ database.'),
114+
115+]
116+
117+cfg.CONF.register_opts(kombu_opts)
118+
119+LOG = rpc_common.LOG
120+
121+
122+def _get_queue_arguments(conf):
123+ """Construct the arguments for declaring a queue.
124+
125+ If the rabbit_ha_queues option is set, we declare a mirrored queue
126+ as described here:
127+
128+ http://www.rabbitmq.com/ha.html
129+
130+ Setting x-ha-policy to all means that the queue will be mirrored
131+ to all nodes in the cluster.
132+ """
133+ return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
134+
135+
136+class ConsumerBase(object):
137+ """Consumer base class."""
138+
139+ def __init__(self, channel, callback, tag, **kwargs):
140+ """Declare a queue on an amqp channel.
141+
142+ 'channel' is the amqp channel to use
143+ 'callback' is the callback to call when messages are received
144+ 'tag' is a unique ID for the consumer on the channel
145+
146+ queue name, exchange name, and other kombu options are
147+ passed in here as a dictionary.
148+ """
149+ self.callback = callback
150+ self.tag = str(tag)
151+ self.kwargs = kwargs
152+ self.queue = None
153+ self.ack_on_error = kwargs.get('ack_on_error', True)
154+ self.reconnect(channel)
155+
156+ def reconnect(self, channel):
157+ """Re-declare the queue after a rabbit reconnect."""
158+ self.channel = channel
159+ self.kwargs['channel'] = channel
160+ self.queue = kombu.entity.Queue(**self.kwargs)
161+ self.queue.declare()
162+
163+ def _callback_handler(self, message, callback):
164+ """Call callback with deserialized message.
165+
166+ Messages that are processed without exception are ack'ed.
167+
168+ If the message processing generates an exception, it will be
169+ ack'ed if ack_on_error=True. Otherwise it will be .requeue()'ed.
170+ """
171+
172+ try:
173+ msg = rpc_common.deserialize_msg(message.payload)
174+ callback(msg)
175+ except Exception:
176+ if self.ack_on_error:
177+ LOG.exception(_LE("Failed to process message"
178+ " ... skipping it."))
179+ message.ack()
180+ else:
181+ LOG.exception(_LE("Failed to process message"
182+ " ... will requeue."))
183+ message.requeue()
184+ else:
185+ message.ack()
186+
187+ def consume(self, *args, **kwargs):
188+ """Actually declare the consumer on the amqp channel. This will
189+ start the flow of messages from the queue. Using the
190+ Connection.iterconsume() iterator will process the messages,
191+ calling the appropriate callback.
192+
193+ If a callback is specified in kwargs, use that. Otherwise,
194+ use the callback passed during __init__()
195+
196+ If kwargs['nowait'] is True, then this call will block until
197+ a message is read.
198+
199+ """
200+
201+ options = {'consumer_tag': self.tag}
202+ options['nowait'] = kwargs.get('nowait', False)
203+ callback = kwargs.get('callback', self.callback)
204+ if not callback:
205+ raise ValueError("No callback defined")
206+
207+ def _callback(raw_message):
208+ message = self.channel.message_to_python(raw_message)
209+ self._callback_handler(message, callback)
210+
211+ self.queue.consume(*args, callback=_callback, **options)
212+
213+ def cancel(self):
214+ """Cancel the consuming from the queue, if it has started."""
215+ try:
216+ self.queue.cancel(self.tag)
217+ except KeyError as e:
218+ # NOTE(comstud): Kludge to get around a amqplib bug
219+ if str(e) != "u'%s'" % self.tag:
220+ raise
221+ self.queue = None
222+
223+
224+class DirectConsumer(ConsumerBase):
225+ """Queue/consumer class for 'direct'."""
226+
227+ def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
228+ """Init a 'direct' queue.
229+
230+ 'channel' is the amqp channel to use
231+ 'msg_id' is the msg_id to listen on
232+ 'callback' is the callback to call when messages are received
233+ 'tag' is a unique ID for the consumer on the channel
234+
235+ Other kombu options may be passed
236+ """
237+ # Default options
238+ options = {'durable': False,
239+ 'queue_arguments': _get_queue_arguments(conf),
240+ 'auto_delete': True,
241+ 'exclusive': False}
242+ options.update(kwargs)
243+ exchange = kombu.entity.Exchange(name=msg_id,
244+ type='direct',
245+ durable=options['durable'],
246+ auto_delete=options['auto_delete'])
247+ super(DirectConsumer, self).__init__(channel,
248+ callback,
249+ tag,
250+ name=msg_id,
251+ exchange=exchange,
252+ routing_key=msg_id,
253+ **options)
254+
255+
256+class TopicConsumer(ConsumerBase):
257+ """Consumer class for 'topic'."""
258+
259+ def __init__(self, conf, channel, topic, callback, tag, name=None,
260+ exchange_name=None, **kwargs):
261+ """Init a 'topic' queue.
262+
263+ :param channel: the amqp channel to use
264+ :param topic: the topic to listen on
265+ :paramtype topic: str
266+ :param callback: the callback to call when messages are received
267+ :param tag: a unique ID for the consumer on the channel
268+ :param name: optional queue name, defaults to topic
269+ :paramtype name: str
270+
271+ Other kombu options may be passed as keyword arguments
272+ """
273+ # Default options
274+ options = {'durable': conf.amqp_durable_queues,
275+ 'queue_arguments': _get_queue_arguments(conf),
276+ 'auto_delete': conf.amqp_auto_delete,
277+ 'exclusive': False}
278+ options.update(kwargs)
279+ exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
280+ exchange = kombu.entity.Exchange(name=exchange_name,
281+ type='topic',
282+ durable=options['durable'],
283+ auto_delete=options['auto_delete'])
284+ super(TopicConsumer, self).__init__(channel,
285+ callback,
286+ tag,
287+ name=name or topic,
288+ exchange=exchange,
289+ routing_key=topic,
290+ **options)
291+
292+
293+class FanoutConsumer(ConsumerBase):
294+ """Consumer class for 'fanout'."""
295+
296+ def __init__(self, conf, channel, topic, callback, tag, **kwargs):
297+ """Init a 'fanout' queue.
298+
299+ 'channel' is the amqp channel to use
300+ 'topic' is the topic to listen on
301+ 'callback' is the callback to call when messages are received
302+ 'tag' is a unique ID for the consumer on the channel
303+
304+ Other kombu options may be passed
305+ """
306+ unique = uuid.uuid4().hex
307+ exchange_name = '%s_fanout' % topic
308+ queue_name = '%s_fanout_%s' % (topic, unique)
309+
310+ # Default options
311+ options = {'durable': False,
312+ 'queue_arguments': _get_queue_arguments(conf),
313+ 'auto_delete': True,
314+ 'exclusive': False}
315+ options.update(kwargs)
316+ exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
317+ durable=options['durable'],
318+ auto_delete=options['auto_delete'])
319+ super(FanoutConsumer, self).__init__(channel, callback, tag,
320+ name=queue_name,
321+ exchange=exchange,
322+ routing_key=topic,
323+ **options)
324+
325+
326+class Publisher(object):
327+ """Base Publisher class."""
328+
329+ def __init__(self, channel, exchange_name, routing_key, **kwargs):
330+ """Init the Publisher class with the exchange_name, routing_key,
331+ and other options
332+ """
333+ self.exchange_name = exchange_name
334+ self.routing_key = routing_key
335+ self.kwargs = kwargs
336+ self.reconnect(channel)
337+
338+ def reconnect(self, channel):
339+ """Re-establish the Producer after a rabbit reconnection."""
340+ self.exchange = kombu.entity.Exchange(name=self.exchange_name,
341+ **self.kwargs)
342+ self.producer = kombu.messaging.Producer(exchange=self.exchange,
343+ channel=channel,
344+ routing_key=self.routing_key)
345+
346+ def send(self, msg, timeout=None):
347+ """Send a message."""
348+ if timeout:
349+ #
350+ # AMQP TTL is in milliseconds when set in the header.
351+ #
352+ self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
353+ else:
354+ self.producer.publish(msg)
355+
356+
357+class DirectPublisher(Publisher):
358+ """Publisher class for 'direct'."""
359+ def __init__(self, conf, channel, msg_id, **kwargs):
360+ """init a 'direct' publisher.
361+
362+ Kombu options may be passed as keyword args to override defaults
363+ """
364+
365+ options = {'durable': False,
366+ 'auto_delete': True,
367+ 'exclusive': False}
368+ options.update(kwargs)
369+ super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
370+ type='direct', **options)
371+
372+
373+class TopicPublisher(Publisher):
374+ """Publisher class for 'topic'."""
375+ def __init__(self, conf, channel, topic, **kwargs):
376+ """init a 'topic' publisher.
377+
378+ Kombu options may be passed as keyword args to override defaults
379+ """
380+ options = {'durable': conf.amqp_durable_queues,
381+ 'auto_delete': conf.amqp_auto_delete,
382+ 'exclusive': False}
383+ options.update(kwargs)
384+ exchange_name = rpc_amqp.get_control_exchange(conf)
385+ super(TopicPublisher, self).__init__(channel,
386+ exchange_name,
387+ topic,
388+ type='topic',
389+ **options)
390+
391+
392+class FanoutPublisher(Publisher):
393+ """Publisher class for 'fanout'."""
394+ def __init__(self, conf, channel, topic, **kwargs):
395+ """init a 'fanout' publisher.
396+
397+ Kombu options may be passed as keyword args to override defaults
398+ """
399+ options = {'durable': False,
400+ 'auto_delete': True,
401+ 'exclusive': False}
402+ options.update(kwargs)
403+ super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
404+ None, type='fanout', **options)
405+
406+
407+class NotifyPublisher(TopicPublisher):
408+ """Publisher class for 'notify'."""
409+
410+ def __init__(self, conf, channel, topic, **kwargs):
411+ self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
412+ self.queue_arguments = _get_queue_arguments(conf)
413+ super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
414+
415+ def reconnect(self, channel):
416+ super(NotifyPublisher, self).reconnect(channel)
417+
418+ # NOTE(jerdfelt): Normally the consumer would create the queue, but
419+ # we do this to ensure that messages don't get dropped if the
420+ # consumer is started after we do
421+ queue = kombu.entity.Queue(channel=channel,
422+ exchange=self.exchange,
423+ durable=self.durable,
424+ name=self.routing_key,
425+ routing_key=self.routing_key,
426+ queue_arguments=self.queue_arguments)
427+ queue.declare()
428+
429+
430+class Connection(object):
431+ """Connection object."""
432+
433+ pool = None
434+
435+ def __init__(self, conf, server_params=None):
436+ self.consumers = []
437+ self.consumer_thread = None
438+ self.proxy_callbacks = []
439+ self.conf = conf
440+ self.max_retries = self.conf.rabbit_max_retries
441+ # Try forever?
442+ if self.max_retries <= 0:
443+ self.max_retries = None
444+ self.interval_start = self.conf.rabbit_retry_interval
445+ self.interval_stepping = self.conf.rabbit_retry_backoff
446+ # max retry-interval = 30 seconds
447+ self.interval_max = 30
448+ self.memory_transport = False
449+
450+ if server_params is None:
451+ server_params = {}
452+ # Keys to translate from server_params to kombu params
453+ server_params_to_kombu_params = {'username': 'userid'}
454+
455+ ssl_params = self._fetch_ssl_params()
456+ params_list = []
457+ for adr in self.conf.rabbit_hosts:
458+ hostname, port = network_utils.parse_host_port(
459+ adr, default_port=self.conf.rabbit_port)
460+
461+ params = {
462+ 'hostname': hostname,
463+ 'port': port,
464+ 'userid': self.conf.rabbit_userid,
465+ 'password': self.conf.rabbit_password,
466+ 'virtual_host': self.conf.rabbit_virtual_host,
467+ }
468+
469+ for sp_key, value in six.iteritems(server_params):
470+ p_key = server_params_to_kombu_params.get(sp_key, sp_key)
471+ params[p_key] = value
472+
473+ if self.conf.fake_rabbit:
474+ params['transport'] = 'memory'
475+ if self.conf.rabbit_use_ssl:
476+ params['ssl'] = ssl_params
477+
478+ params_list.append(params)
479+
480+ self.params_list = params_list
481+
482+ brokers_count = len(self.params_list)
483+ self.next_broker_indices = itertools.cycle(range(brokers_count))
484+
485+ self.memory_transport = self.conf.fake_rabbit
486+
487+ self.connection = None
488+ self.reconnect()
489+
490+ def _fetch_ssl_params(self):
491+ """Handles fetching what ssl params should be used for the connection
492+ (if any).
493+ """
494+ ssl_params = dict()
495+
496+ # http://docs.python.org/library/ssl.html - ssl.wrap_socket
497+ if self.conf.kombu_ssl_version:
498+ ssl_params['ssl_version'] = sslutils.validate_ssl_version(
499+ self.conf.kombu_ssl_version)
500+ if self.conf.kombu_ssl_keyfile:
501+ ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
502+ if self.conf.kombu_ssl_certfile:
503+ ssl_params['certfile'] = self.conf.kombu_ssl_certfile
504+ if self.conf.kombu_ssl_ca_certs:
505+ ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs
506+ # We might want to allow variations in the
507+ # future with this?
508+ ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
509+
510+ # Return the extended behavior or just have the default behavior
511+ return ssl_params or True
512+
513+ def _connect(self, params):
514+ """Connect to rabbit. Re-establish any queues that may have
515+ been declared before if we are reconnecting. Exceptions should
516+ be handled by the caller.
517+ """
518+ if self.connection:
519+ LOG.info(_LI("Reconnecting to AMQP server on "
520+ "%(hostname)s:%(port)d") % params)
521+ try:
522+ self.connection.release()
523+ except self.connection_errors:
524+ pass
525+ # Setting this in case the next statement fails, though
526+ # it shouldn't be doing any network operations, yet.
527+ self.connection = None
528+ self.connection = kombu.connection.BrokerConnection(**params)
529+ self.connection_errors = self.connection.connection_errors
530+ if self.memory_transport:
531+ # Kludge to speed up tests.
532+ self.connection.transport.polling_interval = 0.0
533+ self.consumer_num = itertools.count(1)
534+ self.connection.connect()
535+ self.channel = self.connection.channel()
536+ # work around 'memory' transport bug in 1.1.3
537+ if self.memory_transport:
538+ self.channel._new_queue('ae.undeliver')
539+ for consumer in self.consumers:
540+ consumer.reconnect(self.channel)
541+ LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d') %
542+ params)
543+
544+ def reconnect(self):
545+ """Handles reconnecting and re-establishing queues.
546+ Will retry up to self.max_retries number of times.
547+ self.max_retries = 0 means to retry forever.
548+ Sleep between tries, starting at self.interval_start
549+ seconds, backing off self.interval_stepping number of seconds
550+ each attempt.
551+ """
552+
553+ attempt = 0
554+ while True:
555+ params = self.params_list[next(self.next_broker_indices)]
556+ attempt += 1
557+ try:
558+ self._connect(params)
559+ return
560+ except (IOError, self.connection_errors) as e:
561+ pass
562+ except Exception as e:
563+ # NOTE(comstud): Unfortunately it's possible for amqplib
564+ # to return an error not covered by its transport
565+ # connection_errors in the case of a timeout waiting for
566+ # a protocol response. (See paste link in LP888621)
567+ # So, we check all exceptions for 'timeout' in them
568+ # and try to reconnect in this case.
569+ if 'timeout' not in str(e):
570+ raise
571+
572+ log_info = {}
573+ log_info['err_str'] = str(e)
574+ log_info['max_retries'] = self.max_retries
575+ log_info.update(params)
576+
577+ if self.max_retries and attempt == self.max_retries:
578+ msg = _('Unable to connect to AMQP server on '
579+ '%(hostname)s:%(port)d after %(max_retries)d '
580+ 'tries: %(err_str)s') % log_info
581+ LOG.error(msg)
582+ raise rpc_common.RPCException(msg)
583+
584+ if attempt == 1:
585+ sleep_time = self.interval_start or 1
586+ elif attempt > 1:
587+ sleep_time += self.interval_stepping
588+ if self.interval_max:
589+ sleep_time = min(sleep_time, self.interval_max)
590+
591+ log_info['sleep_time'] = sleep_time
592+ LOG.error(_LE('AMQP server on %(hostname)s:%(port)d is '
593+ 'unreachable: %(err_str)s. Trying again in '
594+ '%(sleep_time)d seconds.') % log_info)
595+ time.sleep(sleep_time)
596+
597+ def ensure(self, error_callback, method, *args, **kwargs):
598+ while True:
599+ try:
600+ return method(*args, **kwargs)
601+ except (self.connection_errors, socket.timeout, IOError) as e:
602+ if error_callback:
603+ error_callback(e)
604+ except Exception as e:
605+ # NOTE(comstud): Unfortunately it's possible for amqplib
606+ # to return an error not covered by its transport
607+ # connection_errors in the case of a timeout waiting for
608+ # a protocol response. (See paste link in LP888621)
609+ # So, we check all exceptions for 'timeout' in them
610+ # and try to reconnect in this case.
611+ if 'timeout' not in str(e):
612+ raise
613+ if error_callback:
614+ error_callback(e)
615+ self.reconnect()
616+
617+ def get_channel(self):
618+ """Convenience call for bin/clear_rabbit_queues."""
619+ return self.channel
620+
621+ def close(self):
622+ """Close/release this connection."""
623+ self.cancel_consumer_thread()
624+ self.wait_on_proxy_callbacks()
625+ self.connection.release()
626+ self.connection = None
627+
628+ def reset(self):
629+ """Reset a connection so it can be used again."""
630+ self.cancel_consumer_thread()
631+ self.wait_on_proxy_callbacks()
632+ self.channel.close()
633+ self.channel = self.connection.channel()
634+ # work around 'memory' transport bug in 1.1.3
635+ if self.memory_transport:
636+ self.channel._new_queue('ae.undeliver')
637+ self.consumers = []
638+
639+ def declare_consumer(self, consumer_cls, topic, callback):
640+ """Create a Consumer using the class that was passed in and
641+ add it to our list of consumers
642+ """
643+
644+ def _connect_error(exc):
645+ log_info = {'topic': topic, 'err_str': str(exc)}
646+ LOG.error(_LE("Failed to declare consumer for topic '%(topic)s': "
647+ "%(err_str)s") % log_info)
648+
649+ def _declare_consumer():
650+ consumer = consumer_cls(self.conf, self.channel, topic, callback,
651+ six.next(self.consumer_num))
652+ self.consumers.append(consumer)
653+ return consumer
654+
655+ return self.ensure(_connect_error, _declare_consumer)
656+
657+ def iterconsume(self, limit=None, timeout=None):
658+ """Return an iterator that will consume from all queues/consumers."""
659+
660+ info = {'do_consume': True}
661+
662+ def _error_callback(exc):
663+ if isinstance(exc, socket.timeout):
664+ LOG.debug('Timed out waiting for RPC response: %s' %
665+ str(exc))
666+ raise rpc_common.Timeout()
667+ else:
668+ LOG.exception(_LE('Failed to consume message from queue: %s') %
669+ str(exc))
670+ info['do_consume'] = True
671+
672+ def _consume():
673+ if info['do_consume']:
674+ queues_head = self.consumers[:-1] # not fanout.
675+ queues_tail = self.consumers[-1] # fanout
676+ for queue in queues_head:
677+ queue.consume(nowait=True)
678+ queues_tail.consume(nowait=False)
679+ info['do_consume'] = False
680+ return self.connection.drain_events(timeout=timeout)
681+
682+ for iteration in itertools.count(0):
683+ if limit and iteration >= limit:
684+ raise StopIteration
685+ yield self.ensure(_error_callback, _consume)
686+
687+ def cancel_consumer_thread(self):
688+ """Cancel a consumer thread."""
689+ if self.consumer_thread is not None:
690+ self.consumer_thread.kill()
691+ try:
692+ self.consumer_thread.wait()
693+ except greenlet.GreenletExit:
694+ pass
695+ self.consumer_thread = None
696+
697+ def wait_on_proxy_callbacks(self):
698+ """Wait for all proxy callback threads to exit."""
699+ for proxy_cb in self.proxy_callbacks:
700+ proxy_cb.wait()
701+
702+ def publisher_send(self, cls, topic, msg, timeout=None, **kwargs):
703+ """Send to a publisher based on the publisher class."""
704+
705+ def _error_callback(exc):
706+ log_info = {'topic': topic, 'err_str': str(exc)}
707+ LOG.exception(_LE("Failed to publish message to topic "
708+ "'%(topic)s': %(err_str)s") % log_info)
709+
710+ def _publish():
711+ publisher = cls(self.conf, self.channel, topic, **kwargs)
712+ publisher.send(msg, timeout)
713+
714+ self.ensure(_error_callback, _publish)
715+
716+ def declare_direct_consumer(self, topic, callback):
717+ """Create a 'direct' queue.
718+ In nova's use, this is generally a msg_id queue used for
719+ responses for call/multicall
720+ """
721+ self.declare_consumer(DirectConsumer, topic, callback)
722+
723+ def declare_topic_consumer(self, topic, callback=None, queue_name=None,
724+ exchange_name=None, ack_on_error=True):
725+ """Create a 'topic' consumer."""
726+ self.declare_consumer(functools.partial(TopicConsumer,
727+ name=queue_name,
728+ exchange_name=exchange_name,
729+ ack_on_error=ack_on_error,
730+ ),
731+ topic, callback)
732+
733+ def declare_fanout_consumer(self, topic, callback):
734+ """Create a 'fanout' consumer."""
735+ self.declare_consumer(FanoutConsumer, topic, callback)
736+
737+ def direct_send(self, msg_id, msg):
738+ """Send a 'direct' message."""
739+ self.publisher_send(DirectPublisher, msg_id, msg)
740+
741+ def topic_send(self, topic, msg, timeout=None):
742+ """Send a 'topic' message."""
743+ self.publisher_send(TopicPublisher, topic, msg, timeout)
744+
745+ def fanout_send(self, topic, msg):
746+ """Send a 'fanout' message."""
747+ self.publisher_send(FanoutPublisher, topic, msg)
748+
749+ def notify_send(self, topic, msg, **kwargs):
750+ """Send a notify message on a topic."""
751+ self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs)
752+
753+ def consume(self, limit=None):
754+ """Consume from all queues/consumers."""
755+ it = self.iterconsume(limit=limit)
756+ while True:
757+ try:
758+ six.next(it)
759+ except StopIteration:
760+ return
761+
762+ def consume_in_thread(self):
763+ """Consumer from all queues/consumers in a greenthread."""
764+ @excutils.forever_retry_uncaught_exceptions
765+ def _consumer_thread():
766+ try:
767+ self.consume()
768+ except greenlet.GreenletExit:
769+ return
770+ if self.consumer_thread is None:
771+ self.consumer_thread = eventlet.spawn(_consumer_thread)
772+ return self.consumer_thread
773+
774+ def create_consumer(self, topic, proxy, fanout=False):
775+ """Create a consumer that calls a method in a proxy object."""
776+ proxy_cb = rpc_amqp.ProxyCallback(
777+ self.conf, proxy,
778+ rpc_amqp.get_connection_pool(self.conf, Connection))
779+ self.proxy_callbacks.append(proxy_cb)
780+
781+ if fanout:
782+ self.declare_fanout_consumer(topic, proxy_cb)
783+ else:
784+ self.declare_topic_consumer(topic, proxy_cb)
785+
786+ def create_worker(self, topic, proxy, pool_name):
787+ """Create a worker that calls a method in a proxy object."""
788+ proxy_cb = rpc_amqp.ProxyCallback(
789+ self.conf, proxy,
790+ rpc_amqp.get_connection_pool(self.conf, Connection))
791+ self.proxy_callbacks.append(proxy_cb)
792+ self.declare_topic_consumer(topic, proxy_cb, pool_name)
793+
794+ def join_consumer_pool(self, callback, pool_name, topic,
795+ exchange_name=None, ack_on_error=True):
796+ """Register as a member of a group of consumers for a given topic from
797+ the specified exchange.
798+
799+ Exactly one member of a given pool will receive each message.
800+
801+ A message will be delivered to multiple pools, if more than
802+ one is created.
803+ """
804+ callback_wrapper = rpc_amqp.CallbackWrapper(
805+ conf=self.conf,
806+ callback=callback,
807+ connection_pool=rpc_amqp.get_connection_pool(self.conf,
808+ Connection),
809+ wait_for_consumers=not ack_on_error
810+ )
811+ self.proxy_callbacks.append(callback_wrapper)
812+ self.declare_topic_consumer(
813+ queue_name=pool_name,
814+ topic=topic,
815+ exchange_name=exchange_name,
816+ callback=callback_wrapper,
817+ ack_on_error=ack_on_error,
818+ )
819+
820+
821+def create_connection(conf, new=True):
822+ """Create a connection."""
823+ return rpc_amqp.create_connection(
824+ conf, new,
825+ rpc_amqp.get_connection_pool(conf, Connection))
826+
827+
828+def multicall(conf, context, topic, msg, timeout=None):
829+ """Make a call that returns multiple times."""
830+ return rpc_amqp.multicall(
831+ conf, context, topic, msg, timeout,
832+ rpc_amqp.get_connection_pool(conf, Connection))
833+
834+
835+def call(conf, context, topic, msg, timeout=None):
836+ """Sends a message on a topic and wait for a response."""
837+ return rpc_amqp.call(
838+ conf, context, topic, msg, timeout,
839+ rpc_amqp.get_connection_pool(conf, Connection))
840+
841+
842+def cast(conf, context, topic, msg):
843+ """Sends a message on a topic without waiting for a response."""
844+ return rpc_amqp.cast(
845+ conf, context, topic, msg,
846+ rpc_amqp.get_connection_pool(conf, Connection))
847+
848+
849+def fanout_cast(conf, context, topic, msg):
850+ """Sends a message on a fanout exchange without waiting for a response."""
851+ return rpc_amqp.fanout_cast(
852+ conf, context, topic, msg,
853+ rpc_amqp.get_connection_pool(conf, Connection))
854+
855+
856+def cast_to_server(conf, context, server_params, topic, msg):
857+ """Sends a message on a topic to a specific server."""
858+ return rpc_amqp.cast_to_server(
859+ conf, context, server_params, topic, msg,
860+ rpc_amqp.get_connection_pool(conf, Connection))
861+
862+
863+def fanout_cast_to_server(conf, context, server_params, topic, msg):
864+ """Sends a message on a fanout exchange to a specific server."""
865+ return rpc_amqp.fanout_cast_to_server(
866+ conf, context, server_params, topic, msg,
867+ rpc_amqp.get_connection_pool(conf, Connection))
868+
869+
870+def notify(conf, context, topic, msg, envelope):
871+ """Sends a notification event on a topic."""
872+ return rpc_amqp.notify(
873+ conf, context, topic, msg,
874+ rpc_amqp.get_connection_pool(conf, Connection),
875+ envelope)
876+
877+
878+def cleanup():
879+ return rpc_amqp.cleanup(Connection.pool)
880
881=== added file '.pc/skip-ipv6-tests.patch/.timestamp'
882=== added file '.pc/use-concurrency.patch/.timestamp'
883=== modified file 'debian/changelog'
884--- debian/changelog 2015-06-22 10:14:52 +0000
885+++ debian/changelog 2016-01-08 08:41:27 +0000
886@@ -1,3 +1,11 @@
887+neutron (1:2014.1.5-0ubuntu2) trusty; urgency=medium
888+
889+ * Backport upstream release. (LP: #1318721):
890+ - d/p/fix-reconnect-race-condition-with-rabbitmq-cluster.patch:
891+ Redeclare if exception is catched after self.queue.declare() failed.
892+
893+ -- Hui Xiang <hui.xiang@canonical.com> Thu, 17 Dec 2015 16:35:40 +0800
894+
895 neutron (1:2014.1.5-0ubuntu1) trusty; urgency=medium
896
897 * Resynchronize with stable/icehouse (877df58) (LP: #1467533):
898
899=== added file 'debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch'
900--- debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch 1970-01-01 00:00:00 +0000
901+++ debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch 2016-01-08 08:41:27 +0000
902@@ -0,0 +1,77 @@
903+--- debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch 2016-01-07 15:15:26.000000000 +0800
904++++ debian/patches/fix-reconnect-race-condition-with-rabbitmq-cluster.patch 1970-01-01 08:00:00.000000000 +0800
905+@@ -1,74 +0,0 @@
906+-Description: Fix reconnect race condition with RabbitMQ cluster
907+-
908+- Retry declaring Queue if it fail to workaround the race condition
909+- that may happen when both Neutron and RabbitMQ cluster are trying
910+- to create and delete (Respectively) Queues and Exchanges that have
911+- auto-delete flag set.
912+-
913+- Change-Id: I90febd7c8358b7cccf852c272810e2fad3d91dcd
914+- Closes-Bug: #1318721
915+-
916+-Author: Mouad Benchchaoui <m.benchchaoui@x-ion.de>
917+-Origin: backport, https://review.openstack.org/#/c/93399/1
918+-Bug: https://bugs.launchpad.net/neutron/+bug/1318721
919+----
920+-This patch header follows DEP-3: http://dep.debian.net/deps/dep3/
921+-Index: neutron-2014.1.5/neutron/openstack/common/rpc/impl_kombu.py
922+-===================================================================
923+---- neutron-2014.1.5.orig/neutron/openstack/common/rpc/impl_kombu.py 2015-06-18 22:26:11.000000000 +0000
924+-+++ neutron-2014.1.5/neutron/openstack/common/rpc/impl_kombu.py 2015-12-01 02:40:27.806902000 +0000
925+-@@ -137,7 +137,53 @@
926+- self.channel = channel
927+- self.kwargs['channel'] = channel
928+- self.queue = kombu.entity.Queue(**self.kwargs)
929+-- self.queue.declare()
930+-+ try:
931+-+ self.queue.declare()
932+-+ except Exception:
933+-+ # NOTE(Mouad): Catching Exception b/c Kombu doesn't raise a proper
934+-+ # error instead it's raise what the underlying transport library
935+-+ # raise which can be either 'ampq.NotFound' or
936+-+ # 'librabbitmq.ChannelError' depending on which transport library
937+-+ # is used.
938+-+ LOG.exception("Declaring queue fail retrying ...")
939+-+ # NOTE(Mouad): We need to re-try Queue creation in case the Queue
940+-+ # was created with auto-delete this instruct the Broker to delete
941+-+ # the Queue when the last Consumer disconnect from it, and the
942+-+ # Exchange when the last Queue is deleted from this Exchange.
943+-+ #
944+-+ # Now in a RabbitMQ cluster setup, if the cluster node that we are
945+-+ # connected to go down, 2 things will happen:
946+-+ #
947+-+ # 1. From RabbitMQ side the Queues will be deleted from the other
948+-+ # cluster nodes and then the correspanding Exchanges will also
949+-+ # be deleted.
950+-+ # 2. From Neutron side, Neutron will reconnect to another cluster
951+-+ # node and start creating Exchanges then Queues then Binding
952+-+ # (The order is important to understand the problem).
953+-+ #
954+-+ # Now this may lead to a race condition, specially if Neutron create
955+-+ # Exchange and Queue and before Neutron could bind Queue to Exchange
956+-+ # RabbitMQ nodes just received the 'signal' that they need to delete
957+-+ # Exchanges with auto-delete that belong to the down node, so they
958+-+ # will delete the Exchanges that was just created, and when Neutron
959+-+ # try to bind Queue with Exchange, the Binding will fail b/c the
960+-+ # Exchange is not found.
961+-+ #
962+-+ # But if the first Queue declartion work and binding was created we
963+-+ # suppose that RabbitMQ will not try to delete the Exchange even if
964+-+ # the auto-delete propagation wasn't received yet, b/c the Queue
965+-+ # have a Consumer now and a Binding exist with the Exchange.
966+-+ #
967+-+ # Note: AMQP 0-9-1 deprecated the use of 'auto-delete' for Exchanges
968+-+ # but according to here[1] RabbitMQ doesn't seem that he will delete
969+-+ # it:
970+-+ #
971+-+ # The 'auto-delete' flag on 'exchange.declare' got deprecated in
972+-+ # 0-9-1. Auto-delete exchanges are actually quite useful, so this
973+-+ # flag should be restored.
974+-+ #
975+-+ # [1] http://www.rabbitmq.com/amqp-0-9-1-errata.html
976+-+ self.queue.declare()
977+-
978+- def _callback_handler(self, message, callback):
979+- """Call callback with deserialized message.
980
981=== modified file 'debian/patches/series'
982--- debian/patches/series 2014-08-08 08:30:52 +0000
983+++ debian/patches/series 2016-01-08 08:41:27 +0000
984@@ -2,3 +2,4 @@
985 disable-udev-tests.patch
986 skip-ipv6-tests.patch
987 use-concurrency.patch
988+fix-reconnect-race-condition-with-rabbitmq-cluster.patch
989
990=== modified file 'neutron/openstack/common/rpc/impl_kombu.py'
991--- neutron/openstack/common/rpc/impl_kombu.py 2014-04-01 16:22:54 +0000
992+++ neutron/openstack/common/rpc/impl_kombu.py 2016-01-08 08:41:27 +0000
993@@ -137,7 +137,53 @@
994 self.channel = channel
995 self.kwargs['channel'] = channel
996 self.queue = kombu.entity.Queue(**self.kwargs)
997- self.queue.declare()
998+ try:
999+ self.queue.declare()
1000+ except Exception:
1001+ # NOTE(Mouad): Catching Exception b/c Kombu doesn't raise a proper
1002+ # error instead it's raise what the underlying transport library
1003+ # raise which can be either 'ampq.NotFound' or
1004+ # 'librabbitmq.ChannelError' depending on which transport library
1005+ # is used.
1006+ LOG.exception("Declaring queue fail retrying ...")
1007+ # NOTE(Mouad): We need to re-try Queue creation in case the Queue
1008+ # was created with auto-delete this instruct the Broker to delete
1009+ # the Queue when the last Consumer disconnect from it, and the
1010+ # Exchange when the last Queue is deleted from this Exchange.
1011+ #
1012+ # Now in a RabbitMQ cluster setup, if the cluster node that we are
1013+ # connected to go down, 2 things will happen:
1014+ #
1015+ # 1. From RabbitMQ side the Queues will be deleted from the other
1016+ # cluster nodes and then the correspanding Exchanges will also
1017+ # be deleted.
1018+ # 2. From Neutron side, Neutron will reconnect to another cluster
1019+ # node and start creating Exchanges then Queues then Binding
1020+ # (The order is important to understand the problem).
1021+ #
1022+ # Now this may lead to a race condition, specially if Neutron create
1023+ # Exchange and Queue and before Neutron could bind Queue to Exchange
1024+ # RabbitMQ nodes just received the 'signal' that they need to delete
1025+ # Exchanges with auto-delete that belong to the down node, so they
1026+ # will delete the Exchanges that was just created, and when Neutron
1027+ # try to bind Queue with Exchange, the Binding will fail b/c the
1028+ # Exchange is not found.
1029+ #
1030+ # But if the first Queue declartion work and binding was created we
1031+ # suppose that RabbitMQ will not try to delete the Exchange even if
1032+ # the auto-delete propagation wasn't received yet, b/c the Queue
1033+ # have a Consumer now and a Binding exist with the Exchange.
1034+ #
1035+ # Note: AMQP 0-9-1 deprecated the use of 'auto-delete' for Exchanges
1036+ # but according to here[1] RabbitMQ doesn't seem that he will delete
1037+ # it:
1038+ #
1039+ # The 'auto-delete' flag on 'exchange.declare' got deprecated in
1040+ # 0-9-1. Auto-delete exchanges are actually quite useful, so this
1041+ # flag should be restored.
1042+ #
1043+ # [1] http://www.rabbitmq.com/amqp-0-9-1-errata.html
1044+ self.queue.declare()
1045
1046 def _callback_handler(self, message, callback):
1047 """Call callback with deserialized message.

Subscribers

People subscribed via source and target branches

to all changes: