Merge lp:~ubuntu-branches/ubuntu/trusty/oslo.messaging/trusty-updates-201602230032 into lp:ubuntu/trusty-updates/oslo.messaging

Proposed by Ubuntu Package Importer
Status: Needs review
Proposed branch: lp:~ubuntu-branches/ubuntu/trusty/oslo.messaging/trusty-updates-201602230032
Merge into: lp:ubuntu/trusty-updates/oslo.messaging
Diff against target: 6223 lines (+22/-5952) (has conflicts)
20 files modified
.pc/.quilt_patches (+0/-1)
.pc/.quilt_series (+0/-1)
.pc/.version (+0/-1)
.pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo/messaging/_drivers/amqpdriver.py (+0/-443)
.pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo/messaging/_drivers/common.py (+0/-509)
.pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo/messaging/_drivers/impl_rabbit.py (+0/-784)
.pc/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch/oslo/messaging/_drivers/common.py (+0/-535)
.pc/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch/tests/test_utils.py (+0/-49)
.pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo/messaging/_drivers/common.py (+0/-535)
.pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo/messaging/_drivers/impl_rabbit.py (+0/-793)
.pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/tests/test_rabbit.py (+0/-646)
.pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/tests/test_utils.py (+0/-64)
.pc/redeclare-consumers-when-ack-requeue-fails.patch/oslo/messaging/_drivers/impl_rabbit.py (+0/-819)
.pc/skip-qpid-tests.patch/tests/test_qpid.py (+0/-615)
oslo/messaging/_drivers/amqpdriver.py (+8/-15)
oslo/messaging/_drivers/common.py (+0/-26)
oslo/messaging/_drivers/impl_rabbit.py (+11/-53)
tests/test_qpid.py (+1/-9)
tests/test_rabbit.py (+2/-21)
tests/test_utils.py (+0/-33)
Conflict: can't delete .pc because it is not empty.  Not deleting.
Conflict because .pc is not versioned, but has versioned children.  Versioned directory.
Contents conflict in .pc/applied-patches
To merge this branch: bzr merge lp:~ubuntu-branches/ubuntu/trusty/oslo.messaging/trusty-updates-201602230032
Reviewer Review Type Date Requested Status
Ubuntu Development Team Pending
Review via email: mp+286852@code.launchpad.net

Description of the change

The package importer has detected a possible inconsistency between the package history in the archive and the history in bzr. As the archive is authoritative the importer has made lp:ubuntu/trusty-updates/oslo.messaging reflect what is in the archive and the old bzr branch has been pushed to lp:~ubuntu-branches/ubuntu/trusty/oslo.messaging/trusty-updates-201602230032. This merge proposal was created so that an Ubuntu developer can review the situations and perform a merge/upload if necessary. There are three typical cases where this can happen.
  1. Where someone pushes a change to bzr and someone else uploads the package without that change. This is the reason that this check is done by the importer. If this appears to be the case then a merge/upload should be done if the changes that were in bzr are still desirable.
  2. The importer incorrectly detected the above situation when someone made a change in bzr and then uploaded it.
  3. The importer incorrectly detected the above situation when someone just uploaded a package and didn't touch bzr.

If this case doesn't appear to be the first situation then set the status of the merge proposal to "Rejected" and help avoid the problem in future by filing a bug at https://bugs.launchpad.net/udd linking to this merge proposal.

(this is an automatically generated message)

To post a comment you must log in.

Unmerged revisions

9. By Chuck Short

* Backport upstream fix (LP: #1318721):
  - d/p/fix-reconnect-race-condition-with-rabbitmq-cluster.patch:
    Redeclare if exception is catched after self.queue.declare() failed.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== removed file '.pc/.quilt_patches'
--- .pc/.quilt_patches 2013-08-14 14:11:08 +0000
+++ .pc/.quilt_patches 1970-01-01 00:00:00 +0000
@@ -1,1 +0,0 @@
1debian/patches
20
=== removed file '.pc/.quilt_series'
--- .pc/.quilt_series 2013-08-14 14:11:08 +0000
+++ .pc/.quilt_series 1970-01-01 00:00:00 +0000
@@ -1,1 +0,0 @@
1series
20
=== removed file '.pc/.version'
--- .pc/.version 2013-08-14 14:11:08 +0000
+++ .pc/.version 1970-01-01 00:00:00 +0000
@@ -1,1 +0,0 @@
12
20
=== removed directory '.pc/0001-rabbit-more-precise-iterconsume-timeout.patch'
=== removed directory '.pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo'
=== removed directory '.pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo/messaging'
=== removed directory '.pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo/messaging/_drivers'
=== removed file '.pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo/messaging/_drivers/amqpdriver.py'
--- .pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo/messaging/_drivers/amqpdriver.py 2015-04-23 15:56:08 +0000
+++ .pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo/messaging/_drivers/amqpdriver.py 1970-01-01 00:00:00 +0000
@@ -1,443 +0,0 @@
1
2# Copyright 2013 Red Hat, Inc.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16__all__ = ['AMQPDriverBase']
17
18import logging
19import threading
20import uuid
21
22from six import moves
23
24from oslo import messaging
25from oslo.messaging._drivers import amqp as rpc_amqp
26from oslo.messaging._drivers import base
27from oslo.messaging._drivers import common as rpc_common
28
29LOG = logging.getLogger(__name__)
30
31
32class AMQPIncomingMessage(base.IncomingMessage):
33
34 def __init__(self, listener, ctxt, message, unique_id, msg_id, reply_q):
35 super(AMQPIncomingMessage, self).__init__(listener, ctxt,
36 dict(message))
37
38 self.unique_id = unique_id
39 self.msg_id = msg_id
40 self.reply_q = reply_q
41 self.acknowledge_callback = message.acknowledge
42 self.requeue_callback = message.requeue
43
44 def _send_reply(self, conn, reply=None, failure=None,
45 ending=False, log_failure=True):
46 if failure:
47 failure = rpc_common.serialize_remote_exception(failure,
48 log_failure)
49
50 msg = {'result': reply, 'failure': failure}
51 if ending:
52 msg['ending'] = True
53
54 rpc_amqp._add_unique_id(msg)
55
56 # If a reply_q exists, add the msg_id to the reply and pass the
57 # reply_q to direct_send() to use it as the response queue.
58 # Otherwise use the msg_id for backward compatibility.
59 if self.reply_q:
60 msg['_msg_id'] = self.msg_id
61 conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
62 else:
63 conn.direct_send(self.msg_id, rpc_common.serialize_msg(msg))
64
65 def reply(self, reply=None, failure=None, log_failure=True):
66 with self.listener.driver._get_connection() as conn:
67 self._send_reply(conn, reply, failure, log_failure=log_failure)
68 self._send_reply(conn, ending=True)
69
70 def acknowledge(self):
71 self.listener.msg_id_cache.add(self.unique_id)
72 self.acknowledge_callback()
73
74 def requeue(self):
75 # NOTE(sileht): In case of the connection is lost between receiving the
76 # message and requeing it, this requeue call fail
77 # but because the message is not acknowledged and not added to the
78 # msg_id_cache, the message will be reconsumed, the only difference is
79 # the message stay at the beginning of the queue instead of moving to
80 # the end.
81 self.requeue_callback()
82
83
84class AMQPListener(base.Listener):
85
86 def __init__(self, driver, conn):
87 super(AMQPListener, self).__init__(driver)
88 self.conn = conn
89 self.msg_id_cache = rpc_amqp._MsgIdCache()
90 self.incoming = []
91
92 def __call__(self, message):
93 # FIXME(markmc): logging isn't driver specific
94 rpc_common._safe_log(LOG.debug, 'received %s', dict(message))
95
96 unique_id = self.msg_id_cache.check_duplicate_message(message)
97 ctxt = rpc_amqp.unpack_context(self.conf, message)
98
99 self.incoming.append(AMQPIncomingMessage(self,
100 ctxt.to_dict(),
101 message,
102 unique_id,
103 ctxt.msg_id,
104 ctxt.reply_q))
105
106 def poll(self):
107 while True:
108 if self.incoming:
109 return self.incoming.pop(0)
110 self.conn.consume(limit=1)
111
112
113class ReplyWaiters(object):
114
115 WAKE_UP = object()
116
117 def __init__(self):
118 self._queues = {}
119 self._wrn_threshold = 10
120
121 def get(self, msg_id, timeout):
122 try:
123 return self._queues[msg_id].get(block=True, timeout=timeout)
124 except moves.queue.Empty:
125 raise messaging.MessagingTimeout('Timed out waiting for a reply '
126 'to message ID %s' % msg_id)
127
128 def check(self, msg_id):
129 try:
130 return self._queues[msg_id].get(block=False)
131 except moves.queue.Empty:
132 return None
133
134 def put(self, msg_id, message_data):
135 queue = self._queues.get(msg_id)
136 if not queue:
137 LOG.warn('No calling threads waiting for msg_id : %(msg_id)s'
138 ', message : %(data)s', {'msg_id': msg_id,
139 'data': message_data})
140 LOG.warn('_queues: %s' % str(self._queues))
141 else:
142 queue.put(message_data)
143
144 def wake_all(self, except_id):
145 msg_ids = [i for i in self._queues.keys() if i != except_id]
146 for msg_id in msg_ids:
147 self.put(msg_id, self.WAKE_UP)
148
149 def add(self, msg_id, queue):
150 self._queues[msg_id] = queue
151 if len(self._queues) > self._wrn_threshold:
152 LOG.warn('Number of call queues is greater than warning '
153 'threshold: %d. There could be a leak.' %
154 self._wrn_threshold)
155 self._wrn_threshold *= 2
156
157 def remove(self, msg_id):
158 del self._queues[msg_id]
159
160
161class ReplyWaiter(object):
162
163 def __init__(self, conf, reply_q, conn, allowed_remote_exmods):
164 self.conf = conf
165 self.conn = conn
166 self.reply_q = reply_q
167 self.allowed_remote_exmods = allowed_remote_exmods
168
169 self.conn_lock = threading.Lock()
170 self.incoming = []
171 self.msg_id_cache = rpc_amqp._MsgIdCache()
172 self.waiters = ReplyWaiters()
173
174 conn.declare_direct_consumer(reply_q, self)
175
176 def __call__(self, message):
177 message.acknowledge()
178 self.incoming.append(message)
179
180 def listen(self, msg_id):
181 queue = moves.queue.Queue()
182 self.waiters.add(msg_id, queue)
183
184 def unlisten(self, msg_id):
185 self.waiters.remove(msg_id)
186
187 def _process_reply(self, data):
188 result = None
189 ending = False
190 self.msg_id_cache.check_duplicate_message(data)
191 if data['failure']:
192 failure = data['failure']
193 result = rpc_common.deserialize_remote_exception(
194 failure, self.allowed_remote_exmods)
195 elif data.get('ending', False):
196 ending = True
197 else:
198 result = data['result']
199 return result, ending
200
201 def _poll_connection(self, msg_id, timeout):
202 while True:
203 while self.incoming:
204 message_data = self.incoming.pop(0)
205
206 incoming_msg_id = message_data.pop('_msg_id', None)
207 if incoming_msg_id == msg_id:
208 return self._process_reply(message_data)
209
210 self.waiters.put(incoming_msg_id, message_data)
211
212 try:
213 self.conn.consume(limit=1, timeout=timeout)
214 except rpc_common.Timeout:
215 raise messaging.MessagingTimeout('Timed out waiting for a '
216 'reply to message ID %s'
217 % msg_id)
218
219 def _poll_queue(self, msg_id, timeout):
220 message = self.waiters.get(msg_id, timeout)
221 if message is self.waiters.WAKE_UP:
222 return None, None, True # lock was released
223
224 reply, ending = self._process_reply(message)
225 return reply, ending, False
226
227 def _check_queue(self, msg_id):
228 while True:
229 message = self.waiters.check(msg_id)
230 if message is self.waiters.WAKE_UP:
231 continue
232 if message is None:
233 return None, None, True # queue is empty
234
235 reply, ending = self._process_reply(message)
236 return reply, ending, False
237
238 def wait(self, msg_id, timeout):
239 #
240 # NOTE(markmc): we're waiting for a reply for msg_id to come in for on
241 # the reply_q, but there may be other threads also waiting for replies
242 # to other msg_ids
243 #
244 # Only one thread can be consuming from the queue using this connection
245 # and we don't want to hold open a connection per thread, so instead we
246 # have the first thread take responsibility for passing replies not
247 # intended for itself to the appropriate thread.
248 #
249 final_reply = None
250 while True:
251 if self.conn_lock.acquire(False):
252 # Ok, we're the thread responsible for polling the connection
253 try:
254 # Check the queue to see if a previous lock-holding thread
255 # queued up a reply already
256 while True:
257 reply, ending, empty = self._check_queue(msg_id)
258 if empty:
259 break
260 if not ending:
261 final_reply = reply
262 else:
263 return final_reply
264
265 # Now actually poll the connection
266 while True:
267 reply, ending = self._poll_connection(msg_id, timeout)
268 if not ending:
269 final_reply = reply
270 else:
271 return final_reply
272 finally:
273 self.conn_lock.release()
274 # We've got our reply, tell the other threads to wake up
275 # so that one of them will take over the responsibility for
276 # polling the connection
277 self.waiters.wake_all(msg_id)
278 else:
279 # We're going to wait for the first thread to pass us our reply
280 reply, ending, trylock = self._poll_queue(msg_id, timeout)
281 if trylock:
282 # The first thread got its reply, let's try and take over
283 # the responsibility for polling
284 continue
285 if not ending:
286 final_reply = reply
287 else:
288 return final_reply
289
290
291class AMQPDriverBase(base.BaseDriver):
292
293 def __init__(self, conf, url, connection_pool,
294 default_exchange=None, allowed_remote_exmods=[]):
295 super(AMQPDriverBase, self).__init__(conf, url, default_exchange,
296 allowed_remote_exmods)
297
298 self._server_params = self._server_params_from_url(self._url)
299
300 self._default_exchange = default_exchange
301
302 # FIXME(markmc): temp hack
303 if self._default_exchange:
304 self.conf.set_override('control_exchange', self._default_exchange)
305
306 self._connection_pool = connection_pool
307
308 self._reply_q_lock = threading.Lock()
309 self._reply_q = None
310 self._reply_q_conn = None
311 self._waiter = None
312
313 def _server_params_from_url(self, url):
314 sp = {}
315
316 if url.virtual_host is not None:
317 sp['virtual_host'] = url.virtual_host
318
319 if url.hosts:
320 # FIXME(markmc): support multiple hosts
321 host = url.hosts[0]
322
323 sp['hostname'] = host.hostname
324 if host.port is not None:
325 sp['port'] = host.port
326 sp['username'] = host.username or ''
327 sp['password'] = host.password or ''
328
329 return sp
330
331 def _get_connection(self, pooled=True):
332 # FIXME(markmc): we don't yet have a connection pool for each
333 # Transport instance, so we'll only use the pool with the
334 # transport configuration from the config file
335 server_params = self._server_params or None
336 if server_params:
337 pooled = False
338 return rpc_amqp.ConnectionContext(self.conf,
339 self._connection_pool,
340 pooled=pooled,
341 server_params=server_params)
342
343 def _get_reply_q(self):
344 with self._reply_q_lock:
345 if self._reply_q is not None:
346 return self._reply_q
347
348 reply_q = 'reply_' + uuid.uuid4().hex
349
350 conn = self._get_connection(pooled=False)
351
352 self._waiter = ReplyWaiter(self.conf, reply_q, conn,
353 self._allowed_remote_exmods)
354
355 self._reply_q = reply_q
356 self._reply_q_conn = conn
357
358 return self._reply_q
359
360 def _send(self, target, ctxt, message,
361 wait_for_reply=None, timeout=None,
362 envelope=True, notify=False):
363
364 # FIXME(markmc): remove this temporary hack
365 class Context(object):
366 def __init__(self, d):
367 self.d = d
368
369 def to_dict(self):
370 return self.d
371
372 context = Context(ctxt)
373 msg = message
374
375 if wait_for_reply:
376 msg_id = uuid.uuid4().hex
377 msg.update({'_msg_id': msg_id})
378 LOG.debug('MSG_ID is %s' % (msg_id))
379 msg.update({'_reply_q': self._get_reply_q()})
380
381 rpc_amqp._add_unique_id(msg)
382 rpc_amqp.pack_context(msg, context)
383
384 if envelope:
385 msg = rpc_common.serialize_msg(msg)
386
387 if wait_for_reply:
388 self._waiter.listen(msg_id)
389
390 try:
391 with self._get_connection() as conn:
392 if notify:
393 conn.notify_send(target.topic, msg)
394 elif target.fanout:
395 conn.fanout_send(target.topic, msg)
396 else:
397 topic = target.topic
398 if target.server:
399 topic = '%s.%s' % (target.topic, target.server)
400 conn.topic_send(topic, msg, timeout=timeout)
401
402 if wait_for_reply:
403 result = self._waiter.wait(msg_id, timeout)
404 if isinstance(result, Exception):
405 raise result
406 return result
407 finally:
408 if wait_for_reply:
409 self._waiter.unlisten(msg_id)
410
411 def send(self, target, ctxt, message, wait_for_reply=None, timeout=None):
412 return self._send(target, ctxt, message, wait_for_reply, timeout)
413
414 def send_notification(self, target, ctxt, message, version):
415 return self._send(target, ctxt, message,
416 envelope=(version == 2.0), notify=True)
417
418 def listen(self, target):
419 conn = self._get_connection(pooled=False)
420
421 listener = AMQPListener(self, conn)
422
423 conn.declare_topic_consumer(target.topic, listener)
424 conn.declare_topic_consumer('%s.%s' % (target.topic, target.server),
425 listener)
426 conn.declare_fanout_consumer(target.topic, listener)
427
428 return listener
429
430 def listen_for_notifications(self, targets_and_priorities):
431 conn = self._get_connection(pooled=False)
432
433 listener = AMQPListener(self, conn)
434 for target, priority in targets_and_priorities:
435 conn.declare_topic_consumer('%s.%s' % (target.topic, priority),
436 callback=listener,
437 exchange_name=target.exchange)
438 return listener
439
440 def cleanup(self):
441 if self._connection_pool:
442 self._connection_pool.empty()
443 self._connection_pool = None
4440
=== removed file '.pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo/messaging/_drivers/common.py'
--- .pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo/messaging/_drivers/common.py 2015-04-23 15:56:08 +0000
+++ .pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo/messaging/_drivers/common.py 1970-01-01 00:00:00 +0000
@@ -1,509 +0,0 @@
1# Copyright 2010 United States Government as represented by the
2# Administrator of the National Aeronautics and Space Administration.
3# All Rights Reserved.
4# Copyright 2011 Red Hat, Inc.
5#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17
18import copy
19import logging
20import sys
21import traceback
22
23from oslo.config import cfg
24from oslo import messaging
25import six
26
27from oslo.messaging import _utils as utils
28from oslo.messaging.openstack.common import importutils
29from oslo.messaging.openstack.common import jsonutils
30
31# FIXME(markmc): remove this
32_ = lambda s: s
33
34LOG = logging.getLogger(__name__)
35
36_EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins'
37
38
39'''RPC Envelope Version.
40
41This version number applies to the top level structure of messages sent out.
42It does *not* apply to the message payload, which must be versioned
43independently. For example, when using rpc APIs, a version number is applied
44for changes to the API being exposed over rpc. This version number is handled
45in the rpc proxy and dispatcher modules.
46
47This version number applies to the message envelope that is used in the
48serialization done inside the rpc layer. See serialize_msg() and
49deserialize_msg().
50
51The current message format (version 2.0) is very simple. It is:
52
53 {
54 'oslo.version': <RPC Envelope Version as a String>,
55 'oslo.message': <Application Message Payload, JSON encoded>
56 }
57
58Message format version '1.0' is just considered to be the messages we sent
59without a message envelope.
60
61So, the current message envelope just includes the envelope version. It may
62eventually contain additional information, such as a signature for the message
63payload.
64
65We will JSON encode the application message payload. The message envelope,
66which includes the JSON encoded application message body, will be passed down
67to the messaging libraries as a dict.
68'''
69_RPC_ENVELOPE_VERSION = '2.0'
70
71_VERSION_KEY = 'oslo.version'
72_MESSAGE_KEY = 'oslo.message'
73
74_REMOTE_POSTFIX = '_Remote'
75
76_exception_opts = [
77 cfg.ListOpt('allowed_rpc_exception_modules',
78 default=['oslo.messaging.exceptions',
79 'nova.exception',
80 'cinder.exception',
81 _EXCEPTIONS_MODULE,
82 ],
83 help='Modules of exceptions that are permitted to be '
84 'recreated upon receiving exception data from an rpc '
85 'call.'),
86]
87
88
89class RPCException(Exception):
90 msg_fmt = _("An unknown RPC related exception occurred.")
91
92 def __init__(self, message=None, **kwargs):
93 self.kwargs = kwargs
94
95 if not message:
96 try:
97 message = self.msg_fmt % kwargs
98
99 except Exception:
100 # kwargs doesn't match a variable in the message
101 # log the issue and the kwargs
102 LOG.exception(_('Exception in string format operation'))
103 for name, value in six.iteritems(kwargs):
104 LOG.error("%s: %s" % (name, value))
105 # at least get the core message out if something happened
106 message = self.msg_fmt
107
108 super(RPCException, self).__init__(message)
109
110
111class RemoteError(RPCException):
112 """Signifies that a remote class has raised an exception.
113
114 Contains a string representation of the type of the original exception,
115 the value of the original exception, and the traceback. These are
116 sent to the parent as a joined string so printing the exception
117 contains all of the relevant info.
118
119 """
120 msg_fmt = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
121
122 def __init__(self, exc_type=None, value=None, traceback=None):
123 self.exc_type = exc_type
124 self.value = value
125 self.traceback = traceback
126 super(RemoteError, self).__init__(exc_type=exc_type,
127 value=value,
128 traceback=traceback)
129
130
131class Timeout(RPCException):
132 """Signifies that a timeout has occurred.
133
134 This exception is raised if the rpc_response_timeout is reached while
135 waiting for a response from the remote side.
136 """
137 msg_fmt = _('Timeout while waiting on RPC response - '
138 'topic: "%(topic)s", RPC method: "%(method)s" '
139 'info: "%(info)s"')
140
141 def __init__(self, info=None, topic=None, method=None):
142 """Initiates Timeout object.
143
144 :param info: Extra info to convey to the user
145 :param topic: The topic that the rpc call was sent to
146 :param rpc_method_name: The name of the rpc method being
147 called
148 """
149 self.info = info
150 self.topic = topic
151 self.method = method
152 super(Timeout, self).__init__(
153 None,
154 info=info or _('<unknown>'),
155 topic=topic or _('<unknown>'),
156 method=method or _('<unknown>'))
157
158
159class DuplicateMessageError(RPCException):
160 msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.")
161
162
163class InvalidRPCConnectionReuse(RPCException):
164 msg_fmt = _("Invalid reuse of an RPC connection.")
165
166
167class UnsupportedRpcVersion(RPCException):
168 msg_fmt = _("Specified RPC version, %(version)s, not supported by "
169 "this endpoint.")
170
171
172class UnsupportedRpcEnvelopeVersion(RPCException):
173 msg_fmt = _("Specified RPC envelope version, %(version)s, "
174 "not supported by this endpoint.")
175
176
177class RpcVersionCapError(RPCException):
178 msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low")
179
180
181class Connection(object):
182 """A connection, returned by rpc.create_connection().
183
184 This class represents a connection to the message bus used for rpc.
185 An instance of this class should never be created by users of the rpc API.
186 Use rpc.create_connection() instead.
187 """
188 def close(self):
189 """Close the connection.
190
191 This method must be called when the connection will no longer be used.
192 It will ensure that any resources associated with the connection, such
193 as a network connection, and cleaned up.
194 """
195 raise NotImplementedError()
196
197 def create_consumer(self, topic, proxy, fanout=False):
198 """Create a consumer on this connection.
199
200 A consumer is associated with a message queue on the backend message
201 bus. The consumer will read messages from the queue, unpack them, and
202 dispatch them to the proxy object. The contents of the message pulled
203 off of the queue will determine which method gets called on the proxy
204 object.
205
206 :param topic: This is a name associated with what to consume from.
207 Multiple instances of a service may consume from the same
208 topic. For example, all instances of nova-compute consume
209 from a queue called "compute". In that case, the
210 messages will get distributed amongst the consumers in a
211 round-robin fashion if fanout=False. If fanout=True,
212 every consumer associated with this topic will get a
213 copy of every message.
214 :param proxy: The object that will handle all incoming messages.
215 :param fanout: Whether or not this is a fanout topic. See the
216 documentation for the topic parameter for some
217 additional comments on this.
218 """
219 raise NotImplementedError()
220
221 def create_worker(self, topic, proxy, pool_name):
222 """Create a worker on this connection.
223
224 A worker is like a regular consumer of messages directed to a
225 topic, except that it is part of a set of such consumers (the
226 "pool") which may run in parallel. Every pool of workers will
227 receive a given message, but only one worker in the pool will
228 be asked to process it. Load is distributed across the members
229 of the pool in round-robin fashion.
230
231 :param topic: This is a name associated with what to consume from.
232 Multiple instances of a service may consume from the same
233 topic.
234 :param proxy: The object that will handle all incoming messages.
235 :param pool_name: String containing the name of the pool of workers
236 """
237 raise NotImplementedError()
238
239 def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
240 """Register as a member of a group of consumers.
241
242 Uses given topic from the specified exchange.
243 Exactly one member of a given pool will receive each message.
244
245 A message will be delivered to multiple pools, if more than
246 one is created.
247
248 :param callback: Callable to be invoked for each message.
249 :type callback: callable accepting one argument
250 :param pool_name: The name of the consumer pool.
251 :type pool_name: str
252 :param topic: The routing topic for desired messages.
253 :type topic: str
254 :param exchange_name: The name of the message exchange where
255 the client should attach. Defaults to
256 the configured exchange.
257 :type exchange_name: str
258 """
259 raise NotImplementedError()
260
261 def consume_in_thread(self):
262 """Spawn a thread to handle incoming messages.
263
264 Spawn a thread that will be responsible for handling all incoming
265 messages for consumers that were set up on this connection.
266
267 Message dispatching inside of this is expected to be implemented in a
268 non-blocking manner. An example implementation would be having this
269 thread pull messages in for all of the consumers, but utilize a thread
270 pool for dispatching the messages to the proxy objects.
271 """
272 raise NotImplementedError()
273
274
275def _safe_log(log_func, msg, msg_data):
276 """Sanitizes the msg_data field before logging."""
277 SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass']
278
279 def _fix_passwords(d):
280 """Sanitizes the password fields in the dictionary."""
281 for k in six.iterkeys(d):
282 if k.lower().find('password') != -1:
283 d[k] = '<SANITIZED>'
284 elif k.lower() in SANITIZE:
285 d[k] = '<SANITIZED>'
286 elif isinstance(d[k], dict):
287 _fix_passwords(d[k])
288 return d
289
290 return log_func(msg, _fix_passwords(copy.deepcopy(msg_data)))
291
292
293def serialize_remote_exception(failure_info, log_failure=True):
294 """Prepares exception data to be sent over rpc.
295
296 Failure_info should be a sys.exc_info() tuple.
297
298 """
299 tb = traceback.format_exception(*failure_info)
300 failure = failure_info[1]
301 if log_failure:
302 LOG.error(_("Returning exception %s to caller"),
303 six.text_type(failure))
304 LOG.error(tb)
305
306 kwargs = {}
307 if hasattr(failure, 'kwargs'):
308 kwargs = failure.kwargs
309
310 # NOTE(matiu): With cells, it's possible to re-raise remote, remote
311 # exceptions. Lets turn it back into the original exception type.
312 cls_name = str(failure.__class__.__name__)
313 mod_name = str(failure.__class__.__module__)
314 if (cls_name.endswith(_REMOTE_POSTFIX) and
315 mod_name.endswith(_REMOTE_POSTFIX)):
316 cls_name = cls_name[:-len(_REMOTE_POSTFIX)]
317 mod_name = mod_name[:-len(_REMOTE_POSTFIX)]
318
319 data = {
320 'class': cls_name,
321 'module': mod_name,
322 'message': six.text_type(failure),
323 'tb': tb,
324 'args': failure.args,
325 'kwargs': kwargs
326 }
327
328 json_data = jsonutils.dumps(data)
329
330 return json_data
331
332
333def deserialize_remote_exception(data, allowed_remote_exmods):
334 failure = jsonutils.loads(str(data))
335
336 trace = failure.get('tb', [])
337 message = failure.get('message', "") + "\n" + "\n".join(trace)
338 name = failure.get('class')
339 module = failure.get('module')
340
341 # NOTE(ameade): We DO NOT want to allow just any module to be imported, in
342 # order to prevent arbitrary code execution.
343 if module != _EXCEPTIONS_MODULE and module not in allowed_remote_exmods:
344 return messaging.RemoteError(name, failure.get('message'), trace)
345
346 try:
347 mod = importutils.import_module(module)
348 klass = getattr(mod, name)
349 if not issubclass(klass, Exception):
350 raise TypeError("Can only deserialize Exceptions")
351
352 failure = klass(*failure.get('args', []), **failure.get('kwargs', {}))
353 except (AttributeError, TypeError, ImportError):
354 return messaging.RemoteError(name, failure.get('message'), trace)
355
356 ex_type = type(failure)
357 str_override = lambda self: message
358 new_ex_type = type(ex_type.__name__ + _REMOTE_POSTFIX, (ex_type,),
359 {'__str__': str_override, '__unicode__': str_override})
360 new_ex_type.__module__ = '%s%s' % (module, _REMOTE_POSTFIX)
361 try:
362 # NOTE(ameade): Dynamically create a new exception type and swap it in
363 # as the new type for the exception. This only works on user defined
364 # Exceptions and not core Python exceptions. This is important because
365 # we cannot necessarily change an exception message so we must override
366 # the __str__ method.
367 failure.__class__ = new_ex_type
368 except TypeError:
369 # NOTE(ameade): If a core exception then just add the traceback to the
370 # first exception argument.
371 failure.args = (message,) + failure.args[1:]
372 return failure
373
374
375class CommonRpcContext(object):
376 def __init__(self, **kwargs):
377 self.values = kwargs
378
379 def __getattr__(self, key):
380 try:
381 return self.values[key]
382 except KeyError:
383 raise AttributeError(key)
384
385 def to_dict(self):
386 return copy.deepcopy(self.values)
387
388 @classmethod
389 def from_dict(cls, values):
390 return cls(**values)
391
392 def deepcopy(self):
393 return self.from_dict(self.to_dict())
394
395 def update_store(self):
396 #local.store.context = self
397 pass
398
399 def elevated(self, read_deleted=None, overwrite=False):
400 """Return a version of this context with admin flag set."""
401 # TODO(russellb) This method is a bit of a nova-ism. It makes
402 # some assumptions about the data in the request context sent
403 # across rpc, while the rest of this class does not. We could get
404 # rid of this if we changed the nova code that uses this to
405 # convert the RpcContext back to its native RequestContext doing
406 # something like nova.context.RequestContext.from_dict(ctxt.to_dict())
407
408 context = self.deepcopy()
409 context.values['is_admin'] = True
410
411 context.values.setdefault('roles', [])
412
413 if 'admin' not in context.values['roles']:
414 context.values['roles'].append('admin')
415
416 if read_deleted is not None:
417 context.values['read_deleted'] = read_deleted
418
419 return context
420
421
422class ClientException(Exception):
423 """Encapsulates actual exception expected to be hit by a RPC proxy object.
424
425 Merely instantiating it records the current exception information, which
426 will be passed back to the RPC client without exceptional logging.
427 """
428 def __init__(self):
429 self._exc_info = sys.exc_info()
430
431
432def catch_client_exception(exceptions, func, *args, **kwargs):
433 try:
434 return func(*args, **kwargs)
435 except Exception as e:
436 if type(e) in exceptions:
437 raise ClientException()
438 else:
439 raise
440
441
442def client_exceptions(*exceptions):
443 """Decorator for manager methods that raise expected exceptions.
444
445 Marking a Manager method with this decorator allows the declaration
446 of expected exceptions that the RPC layer should not consider fatal,
447 and not log as if they were generated in a real error scenario. Note
448 that this will cause listed exceptions to be wrapped in a
449 ClientException, which is used internally by the RPC layer.
450 """
451 def outer(func):
452 def inner(*args, **kwargs):
453 return catch_client_exception(exceptions, func, *args, **kwargs)
454 return inner
455 return outer
456
457
458def serialize_msg(raw_msg):
459 # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
460 # information about this format.
461 msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
462 _MESSAGE_KEY: jsonutils.dumps(raw_msg)}
463
464 return msg
465
466
467def deserialize_msg(msg):
468 # NOTE(russellb): Hang on to your hats, this road is about to
469 # get a little bumpy.
470 #
471 # Robustness Principle:
472 # "Be strict in what you send, liberal in what you accept."
473 #
474 # At this point we have to do a bit of guessing about what it
475 # is we just received. Here is the set of possibilities:
476 #
477 # 1) We received a dict. This could be 2 things:
478 #
479 # a) Inspect it to see if it looks like a standard message envelope.
480 # If so, great!
481 #
482 # b) If it doesn't look like a standard message envelope, it could either
483 # be a notification, or a message from before we added a message
484 # envelope (referred to as version 1.0).
485 # Just return the message as-is.
486 #
487 # 2) It's any other non-dict type. Just return it and hope for the best.
488 # This case covers return values from rpc.call() from before message
489 # envelopes were used. (messages to call a method were always a dict)
490
491 if not isinstance(msg, dict):
492 # See #2 above.
493 return msg
494
495 base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
496 if not all(map(lambda key: key in msg, base_envelope_keys)):
497 # See #1.b above.
498 return msg
499
500 # At this point we think we have the message envelope
501 # format we were expecting. (#1.a above)
502
503 if not utils.version_is_compatible(_RPC_ENVELOPE_VERSION,
504 msg[_VERSION_KEY]):
505 raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
506
507 raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
508
509 return raw_msg
5100
=== removed file '.pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo/messaging/_drivers/impl_rabbit.py'
--- .pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo/messaging/_drivers/impl_rabbit.py 2015-04-23 15:56:08 +0000
+++ .pc/0001-rabbit-more-precise-iterconsume-timeout.patch/oslo/messaging/_drivers/impl_rabbit.py 1970-01-01 00:00:00 +0000
@@ -1,784 +0,0 @@
1# Copyright 2011 OpenStack Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15import functools
16import itertools
17import logging
18import socket
19import ssl
20import time
21import uuid
22
23import kombu
24import kombu.connection
25import kombu.entity
26import kombu.messaging
27from oslo.config import cfg
28import six
29
30from oslo.messaging._drivers import amqp as rpc_amqp
31from oslo.messaging._drivers import amqpdriver
32from oslo.messaging._drivers import common as rpc_common
33from oslo.messaging.openstack.common import network_utils
34
35# FIXME(markmc): remove this
36_ = lambda s: s
37
38rabbit_opts = [
39 cfg.StrOpt('kombu_ssl_version',
40 default='',
41 help='SSL version to use (valid only if SSL enabled). '
42 'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
43 'be available on some distributions.'
44 ),
45 cfg.StrOpt('kombu_ssl_keyfile',
46 default='',
47 help='SSL key file (valid only if SSL enabled).'),
48 cfg.StrOpt('kombu_ssl_certfile',
49 default='',
50 help='SSL cert file (valid only if SSL enabled).'),
51 cfg.StrOpt('kombu_ssl_ca_certs',
52 default='',
53 help=('SSL certification authority file '
54 '(valid only if SSL enabled).')),
55 cfg.FloatOpt('kombu_reconnect_delay',
56 default=1.0,
57 help='How long to wait before reconnecting in response to an '
58 'AMQP consumer cancel notification.'),
59 cfg.StrOpt('rabbit_host',
60 default='localhost',
61 help='The RabbitMQ broker address where a single node is '
62 'used.'),
63 cfg.IntOpt('rabbit_port',
64 default=5672,
65 help='The RabbitMQ broker port where a single node is used.'),
66 cfg.ListOpt('rabbit_hosts',
67 default=['$rabbit_host:$rabbit_port'],
68 help='RabbitMQ HA cluster host:port pairs.'),
69 cfg.BoolOpt('rabbit_use_ssl',
70 default=False,
71 help='Connect over SSL for RabbitMQ.'),
72 cfg.StrOpt('rabbit_userid',
73 default='guest',
74 help='The RabbitMQ userid.'),
75 cfg.StrOpt('rabbit_password',
76 default='guest',
77 help='The RabbitMQ password.',
78 secret=True),
79 cfg.StrOpt('rabbit_login_method',
80 default='AMQPLAIN',
81 help='the RabbitMQ login method'),
82 cfg.StrOpt('rabbit_virtual_host',
83 default='/',
84 help='The RabbitMQ virtual host.'),
85 cfg.IntOpt('rabbit_retry_interval',
86 default=1,
87 help='How frequently to retry connecting with RabbitMQ.'),
88 cfg.IntOpt('rabbit_retry_backoff',
89 default=2,
90 help='How long to backoff for between retries when connecting '
91 'to RabbitMQ.'),
92 cfg.IntOpt('rabbit_max_retries',
93 default=0,
94 help='Maximum number of RabbitMQ connection retries. '
95 'Default is 0 (infinite retry count).'),
96 cfg.BoolOpt('rabbit_ha_queues',
97 default=False,
98 help='Use HA queues in RabbitMQ (x-ha-policy: all). '
99 'If you change this option, you must wipe the '
100 'RabbitMQ database.'),
101
102 # FIXME(markmc): this was toplevel in openstack.common.rpc
103 cfg.BoolOpt('fake_rabbit',
104 default=False,
105 help='If passed, use a fake RabbitMQ provider.'),
106]
107
108LOG = logging.getLogger(__name__)
109
110
111def _get_queue_arguments(conf):
112 """Construct the arguments for declaring a queue.
113
114 If the rabbit_ha_queues option is set, we declare a mirrored queue
115 as described here:
116
117 http://www.rabbitmq.com/ha.html
118
119 Setting x-ha-policy to all means that the queue will be mirrored
120 to all nodes in the cluster.
121 """
122 return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
123
124
125class RabbitMessage(dict):
126 def __init__(self, raw_message):
127 super(RabbitMessage, self).__init__(
128 rpc_common.deserialize_msg(raw_message.payload))
129 self._raw_message = raw_message
130
131 def acknowledge(self):
132 self._raw_message.ack()
133
134 def requeue(self):
135 self._raw_message.requeue()
136
137
138class ConsumerBase(object):
139 """Consumer base class."""
140
141 def __init__(self, channel, callback, tag, **kwargs):
142 """Declare a queue on an amqp channel.
143
144 'channel' is the amqp channel to use
145 'callback' is the callback to call when messages are received
146 'tag' is a unique ID for the consumer on the channel
147
148 queue name, exchange name, and other kombu options are
149 passed in here as a dictionary.
150 """
151 self.callback = callback
152 self.tag = str(tag)
153 self.kwargs = kwargs
154 self.queue = None
155 self.reconnect(channel)
156
157 def reconnect(self, channel):
158 """Re-declare the queue after a rabbit reconnect."""
159 self.channel = channel
160 self.kwargs['channel'] = channel
161 self.queue = kombu.entity.Queue(**self.kwargs)
162 self.queue.declare()
163
164 def _callback_handler(self, message, callback):
165 """Call callback with deserialized message.
166
167 Messages that are processed and ack'ed.
168 """
169
170 try:
171 callback(RabbitMessage(message))
172 except Exception:
173 LOG.exception(_("Failed to process message"
174 " ... skipping it."))
175 message.ack()
176
177 def consume(self, *args, **kwargs):
178 """Actually declare the consumer on the amqp channel. This will
179 start the flow of messages from the queue. Using the
180 Connection.iterconsume() iterator will process the messages,
181 calling the appropriate callback.
182
183 If a callback is specified in kwargs, use that. Otherwise,
184 use the callback passed during __init__()
185
186 If kwargs['nowait'] is True, then this call will block until
187 a message is read.
188
189 """
190
191 options = {'consumer_tag': self.tag}
192 options['nowait'] = kwargs.get('nowait', False)
193 callback = kwargs.get('callback', self.callback)
194 if not callback:
195 raise ValueError("No callback defined")
196
197 def _callback(raw_message):
198 message = self.channel.message_to_python(raw_message)
199 self._callback_handler(message, callback)
200
201 self.queue.consume(*args, callback=_callback, **options)
202
203 def cancel(self):
204 """Cancel the consuming from the queue, if it has started."""
205 try:
206 self.queue.cancel(self.tag)
207 except KeyError as e:
208 # NOTE(comstud): Kludge to get around a amqplib bug
209 if str(e) != "u'%s'" % self.tag:
210 raise
211 self.queue = None
212
213
214class DirectConsumer(ConsumerBase):
215 """Queue/consumer class for 'direct'."""
216
217 def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
218 """Init a 'direct' queue.
219
220 'channel' is the amqp channel to use
221 'msg_id' is the msg_id to listen on
222 'callback' is the callback to call when messages are received
223 'tag' is a unique ID for the consumer on the channel
224
225 Other kombu options may be passed
226 """
227 # Default options
228 options = {'durable': False,
229 'queue_arguments': _get_queue_arguments(conf),
230 'auto_delete': True,
231 'exclusive': False}
232 options.update(kwargs)
233 exchange = kombu.entity.Exchange(name=msg_id,
234 type='direct',
235 durable=options['durable'],
236 auto_delete=options['auto_delete'])
237 super(DirectConsumer, self).__init__(channel,
238 callback,
239 tag,
240 name=msg_id,
241 exchange=exchange,
242 routing_key=msg_id,
243 **options)
244
245
246class TopicConsumer(ConsumerBase):
247 """Consumer class for 'topic'."""
248
249 def __init__(self, conf, channel, topic, callback, tag, name=None,
250 exchange_name=None, **kwargs):
251 """Init a 'topic' queue.
252
253 :param channel: the amqp channel to use
254 :param topic: the topic to listen on
255 :paramtype topic: str
256 :param callback: the callback to call when messages are received
257 :param tag: a unique ID for the consumer on the channel
258 :param name: optional queue name, defaults to topic
259 :paramtype name: str
260
261 Other kombu options may be passed as keyword arguments
262 """
263 # Default options
264 options = {'durable': conf.amqp_durable_queues,
265 'queue_arguments': _get_queue_arguments(conf),
266 'auto_delete': conf.amqp_auto_delete,
267 'exclusive': False}
268 options.update(kwargs)
269 exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
270 exchange = kombu.entity.Exchange(name=exchange_name,
271 type='topic',
272 durable=options['durable'],
273 auto_delete=options['auto_delete'])
274 super(TopicConsumer, self).__init__(channel,
275 callback,
276 tag,
277 name=name or topic,
278 exchange=exchange,
279 routing_key=topic,
280 **options)
281
282
283class FanoutConsumer(ConsumerBase):
284 """Consumer class for 'fanout'."""
285
286 def __init__(self, conf, channel, topic, callback, tag, **kwargs):
287 """Init a 'fanout' queue.
288
289 'channel' is the amqp channel to use
290 'topic' is the topic to listen on
291 'callback' is the callback to call when messages are received
292 'tag' is a unique ID for the consumer on the channel
293
294 Other kombu options may be passed
295 """
296 unique = uuid.uuid4().hex
297 exchange_name = '%s_fanout' % topic
298 queue_name = '%s_fanout_%s' % (topic, unique)
299
300 # Default options
301 options = {'durable': False,
302 'queue_arguments': _get_queue_arguments(conf),
303 'auto_delete': True,
304 'exclusive': False}
305 options.update(kwargs)
306 exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
307 durable=options['durable'],
308 auto_delete=options['auto_delete'])
309 super(FanoutConsumer, self).__init__(channel, callback, tag,
310 name=queue_name,
311 exchange=exchange,
312 routing_key=topic,
313 **options)
314
315
316class Publisher(object):
317 """Base Publisher class."""
318
319 def __init__(self, channel, exchange_name, routing_key, **kwargs):
320 """Init the Publisher class with the exchange_name, routing_key,
321 and other options
322 """
323 self.exchange_name = exchange_name
324 self.routing_key = routing_key
325 self.kwargs = kwargs
326 self.reconnect(channel)
327
328 def reconnect(self, channel):
329 """Re-establish the Producer after a rabbit reconnection."""
330 self.exchange = kombu.entity.Exchange(name=self.exchange_name,
331 **self.kwargs)
332 self.producer = kombu.messaging.Producer(exchange=self.exchange,
333 channel=channel,
334 routing_key=self.routing_key)
335
336 def send(self, msg, timeout=None):
337 """Send a message."""
338 if timeout:
339 #
340 # AMQP TTL is in milliseconds when set in the header.
341 #
342 self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
343 else:
344 self.producer.publish(msg)
345
346
347class DirectPublisher(Publisher):
348 """Publisher class for 'direct'."""
349 def __init__(self, conf, channel, msg_id, **kwargs):
350 """Init a 'direct' publisher.
351
352 Kombu options may be passed as keyword args to override defaults
353 """
354
355 options = {'durable': False,
356 'auto_delete': True,
357 'exclusive': False}
358 options.update(kwargs)
359 super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
360 type='direct', **options)
361
362
363class TopicPublisher(Publisher):
364 """Publisher class for 'topic'."""
365 def __init__(self, conf, channel, topic, **kwargs):
366 """Init a 'topic' publisher.
367
368 Kombu options may be passed as keyword args to override defaults
369 """
370 options = {'durable': conf.amqp_durable_queues,
371 'auto_delete': conf.amqp_auto_delete,
372 'exclusive': False}
373 options.update(kwargs)
374 exchange_name = rpc_amqp.get_control_exchange(conf)
375 super(TopicPublisher, self).__init__(channel,
376 exchange_name,
377 topic,
378 type='topic',
379 **options)
380
381
382class FanoutPublisher(Publisher):
383 """Publisher class for 'fanout'."""
384 def __init__(self, conf, channel, topic, **kwargs):
385 """Init a 'fanout' publisher.
386
387 Kombu options may be passed as keyword args to override defaults
388 """
389 options = {'durable': False,
390 'auto_delete': True,
391 'exclusive': False}
392 options.update(kwargs)
393 super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
394 None, type='fanout', **options)
395
396
397class NotifyPublisher(TopicPublisher):
398 """Publisher class for 'notify'."""
399
400 def __init__(self, conf, channel, topic, **kwargs):
401 self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
402 self.queue_arguments = _get_queue_arguments(conf)
403 super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
404
405 def reconnect(self, channel):
406 super(NotifyPublisher, self).reconnect(channel)
407
408 # NOTE(jerdfelt): Normally the consumer would create the queue, but
409 # we do this to ensure that messages don't get dropped if the
410 # consumer is started after we do
411 queue = kombu.entity.Queue(channel=channel,
412 exchange=self.exchange,
413 durable=self.durable,
414 name=self.routing_key,
415 routing_key=self.routing_key,
416 queue_arguments=self.queue_arguments)
417 queue.declare()
418
419
420class Connection(object):
421 """Connection object."""
422
423 pool = None
424
425 def __init__(self, conf, server_params=None):
426 self.consumers = []
427 self.conf = conf
428 self.max_retries = self.conf.rabbit_max_retries
429 # Try forever?
430 if self.max_retries <= 0:
431 self.max_retries = None
432 self.interval_start = self.conf.rabbit_retry_interval
433 self.interval_stepping = self.conf.rabbit_retry_backoff
434 # max retry-interval = 30 seconds
435 self.interval_max = 30
436 self.memory_transport = False
437
438 if server_params is None:
439 server_params = {}
440 # Keys to translate from server_params to kombu params
441 server_params_to_kombu_params = {'username': 'userid'}
442
443 ssl_params = self._fetch_ssl_params()
444 params_list = []
445 for adr in self.conf.rabbit_hosts:
446 hostname, port = network_utils.parse_host_port(
447 adr, default_port=self.conf.rabbit_port)
448
449 params = {
450 'hostname': hostname,
451 'port': port,
452 'userid': self.conf.rabbit_userid,
453 'password': self.conf.rabbit_password,
454 'login_method': self.conf.rabbit_login_method,
455 'virtual_host': self.conf.rabbit_virtual_host,
456 }
457
458 for sp_key, value in six.iteritems(server_params):
459 p_key = server_params_to_kombu_params.get(sp_key, sp_key)
460 params[p_key] = value
461
462 if self.conf.fake_rabbit:
463 params['transport'] = 'memory'
464 if self.conf.rabbit_use_ssl:
465 params['ssl'] = ssl_params
466
467 params_list.append(params)
468
469 self.params_list = itertools.cycle(params_list)
470
471 self.memory_transport = self.conf.fake_rabbit
472
473 self.connection = None
474 self.do_consume = None
475 self.reconnect()
476
477 # FIXME(markmc): use oslo sslutils when it is available as a library
478 _SSL_PROTOCOLS = {
479 "tlsv1": ssl.PROTOCOL_TLSv1,
480 "sslv23": ssl.PROTOCOL_SSLv23,
481 "sslv3": ssl.PROTOCOL_SSLv3
482 }
483
484 try:
485 _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2
486 except AttributeError:
487 pass
488
489 @classmethod
490 def validate_ssl_version(cls, version):
491 key = version.lower()
492 try:
493 return cls._SSL_PROTOCOLS[key]
494 except KeyError:
495 raise RuntimeError(_("Invalid SSL version : %s") % version)
496
497 def _fetch_ssl_params(self):
498 """Handles fetching what ssl params should be used for the connection
499 (if any).
500 """
501 ssl_params = dict()
502
503 # http://docs.python.org/library/ssl.html - ssl.wrap_socket
504 if self.conf.kombu_ssl_version:
505 ssl_params['ssl_version'] = self.validate_ssl_version(
506 self.conf.kombu_ssl_version)
507 if self.conf.kombu_ssl_keyfile:
508 ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
509 if self.conf.kombu_ssl_certfile:
510 ssl_params['certfile'] = self.conf.kombu_ssl_certfile
511 if self.conf.kombu_ssl_ca_certs:
512 ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs
513 # We might want to allow variations in the
514 # future with this?
515 ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
516
517 # Return the extended behavior or just have the default behavior
518 return ssl_params or True
519
520 def _connect(self, params):
521 """Connect to rabbit. Re-establish any queues that may have
522 been declared before if we are reconnecting. Exceptions should
523 be handled by the caller.
524 """
525 if self.connection:
526 LOG.info(_("Reconnecting to AMQP server on "
527 "%(hostname)s:%(port)d") % params)
528 try:
529 # XXX(nic): when reconnecting to a RabbitMQ cluster
530 # with mirrored queues in use, the attempt to release the
531 # connection can hang "indefinitely" somewhere deep down
532 # in Kombu. Blocking the thread for a bit prior to
533 # release seems to kludge around the problem where it is
534 # otherwise reproduceable.
535 if self.conf.kombu_reconnect_delay > 0:
536 LOG.info(_("Delaying reconnect for %1.1f seconds...") %
537 self.conf.kombu_reconnect_delay)
538 time.sleep(self.conf.kombu_reconnect_delay)
539
540 self.connection.release()
541 except self.connection_errors:
542 pass
543 # Setting this in case the next statement fails, though
544 # it shouldn't be doing any network operations, yet.
545 self.connection = None
546 self.connection = kombu.connection.BrokerConnection(**params)
547 self.connection_errors = self.connection.connection_errors
548 self.channel_errors = self.connection.channel_errors
549 if self.memory_transport:
550 # Kludge to speed up tests.
551 self.connection.transport.polling_interval = 0.0
552 self.do_consume = True
553 self.consumer_num = itertools.count(1)
554 self.connection.connect()
555 self.channel = self.connection.channel()
556 # work around 'memory' transport bug in 1.1.3
557 if self.memory_transport:
558 self.channel._new_queue('ae.undeliver')
559 for consumer in self.consumers:
560 consumer.reconnect(self.channel)
561 LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') %
562 params)
563
564 def reconnect(self):
565 """Handles reconnecting and re-establishing queues.
566 Will retry up to self.max_retries number of times.
567 self.max_retries = 0 means to retry forever.
568 Sleep between tries, starting at self.interval_start
569 seconds, backing off self.interval_stepping number of seconds
570 each attempt.
571 """
572
573 attempt = 0
574 while True:
575 params = six.next(self.params_list)
576 attempt += 1
577 try:
578 self._connect(params)
579 return
580 except IOError as e:
581 pass
582 except self.connection_errors as e:
583 pass
584 except Exception as e:
585 # NOTE(comstud): Unfortunately it's possible for amqplib
586 # to return an error not covered by its transport
587 # connection_errors in the case of a timeout waiting for
588 # a protocol response. (See paste link in LP888621)
589 # So, we check all exceptions for 'timeout' in them
590 # and try to reconnect in this case.
591 if 'timeout' not in str(e):
592 raise
593
594 log_info = {}
595 log_info['err_str'] = str(e)
596 log_info['max_retries'] = self.max_retries
597 log_info.update(params)
598
599 if self.max_retries and attempt == self.max_retries:
600 msg = _('Unable to connect to AMQP server on '
601 '%(hostname)s:%(port)d after %(max_retries)d '
602 'tries: %(err_str)s') % log_info
603 LOG.error(msg)
604 raise rpc_common.RPCException(msg)
605
606 if attempt == 1:
607 sleep_time = self.interval_start or 1
608 elif attempt > 1:
609 sleep_time += self.interval_stepping
610 if self.interval_max:
611 sleep_time = min(sleep_time, self.interval_max)
612
613 log_info['sleep_time'] = sleep_time
614 LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
615 'unreachable: %(err_str)s. Trying again in '
616 '%(sleep_time)d seconds.') % log_info)
617 time.sleep(sleep_time)
618
619 def ensure(self, error_callback, method, *args, **kwargs):
620 while True:
621 try:
622 return method(*args, **kwargs)
623 except self.connection_errors as e:
624 if error_callback:
625 error_callback(e)
626 except self.channel_errors as e:
627 if error_callback:
628 error_callback(e)
629 except (socket.timeout, IOError) as e:
630 if error_callback:
631 error_callback(e)
632 except Exception as e:
633 # NOTE(comstud): Unfortunately it's possible for amqplib
634 # to return an error not covered by its transport
635 # connection_errors in the case of a timeout waiting for
636 # a protocol response. (See paste link in LP888621)
637 # So, we check all exceptions for 'timeout' in them
638 # and try to reconnect in this case.
639 if 'timeout' not in str(e):
640 raise
641 if error_callback:
642 error_callback(e)
643 self.reconnect()
644
645 def get_channel(self):
646 """Convenience call for bin/clear_rabbit_queues."""
647 return self.channel
648
649 def close(self):
650 """Close/release this connection."""
651 self.connection.release()
652 self.connection = None
653
654 def reset(self):
655 """Reset a connection so it can be used again."""
656 self.channel.close()
657 self.channel = self.connection.channel()
658 # work around 'memory' transport bug in 1.1.3
659 if self.memory_transport:
660 self.channel._new_queue('ae.undeliver')
661 self.consumers = []
662
663 def declare_consumer(self, consumer_cls, topic, callback):
664 """Create a Consumer using the class that was passed in and
665 add it to our list of consumers
666 """
667
668 def _connect_error(exc):
669 log_info = {'topic': topic, 'err_str': str(exc)}
670 LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
671 "%(err_str)s") % log_info)
672
673 def _declare_consumer():
674 consumer = consumer_cls(self.conf, self.channel, topic, callback,
675 six.next(self.consumer_num))
676 self.consumers.append(consumer)
677 return consumer
678
679 return self.ensure(_connect_error, _declare_consumer)
680
681 def iterconsume(self, limit=None, timeout=None):
682 """Return an iterator that will consume from all queues/consumers."""
683
684 def _error_callback(exc):
685 if isinstance(exc, socket.timeout):
686 LOG.debug(_('Timed out waiting for RPC response: %s') %
687 str(exc))
688 raise rpc_common.Timeout()
689 else:
690 LOG.exception(_('Failed to consume message from queue: %s') %
691 str(exc))
692 self.do_consume = True
693
694 def _consume():
695 if self.do_consume:
696 queues_head = self.consumers[:-1] # not fanout.
697 queues_tail = self.consumers[-1] # fanout
698 for queue in queues_head:
699 queue.consume(nowait=True)
700 queues_tail.consume(nowait=False)
701 self.do_consume = False
702 return self.connection.drain_events(timeout=timeout)
703
704 for iteration in itertools.count(0):
705 if limit and iteration >= limit:
706 raise StopIteration
707 yield self.ensure(_error_callback, _consume)
708
709 def publisher_send(self, cls, topic, msg, timeout=None, **kwargs):
710 """Send to a publisher based on the publisher class."""
711
712 def _error_callback(exc):
713 log_info = {'topic': topic, 'err_str': str(exc)}
714 LOG.exception(_("Failed to publish message to topic "
715 "'%(topic)s': %(err_str)s") % log_info)
716
717 def _publish():
718 publisher = cls(self.conf, self.channel, topic, **kwargs)
719 publisher.send(msg, timeout)
720
721 self.ensure(_error_callback, _publish)
722
723 def declare_direct_consumer(self, topic, callback):
724 """Create a 'direct' queue.
725 In nova's use, this is generally a msg_id queue used for
726 responses for call/multicall
727 """
728 self.declare_consumer(DirectConsumer, topic, callback)
729
730 def declare_topic_consumer(self, topic, callback=None, queue_name=None,
731 exchange_name=None):
732 """Create a 'topic' consumer."""
733 self.declare_consumer(functools.partial(TopicConsumer,
734 name=queue_name,
735 exchange_name=exchange_name,
736 ),
737 topic, callback)
738
739 def declare_fanout_consumer(self, topic, callback):
740 """Create a 'fanout' consumer."""
741 self.declare_consumer(FanoutConsumer, topic, callback)
742
743 def direct_send(self, msg_id, msg):
744 """Send a 'direct' message."""
745 self.publisher_send(DirectPublisher, msg_id, msg)
746
747 def topic_send(self, topic, msg, timeout=None):
748 """Send a 'topic' message."""
749 self.publisher_send(TopicPublisher, topic, msg, timeout)
750
751 def fanout_send(self, topic, msg):
752 """Send a 'fanout' message."""
753 self.publisher_send(FanoutPublisher, topic, msg)
754
755 def notify_send(self, topic, msg, **kwargs):
756 """Send a notify message on a topic."""
757 self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs)
758
759 def consume(self, limit=None, timeout=None):
760 """Consume from all queues/consumers."""
761 it = self.iterconsume(limit=limit, timeout=timeout)
762 while True:
763 try:
764 six.next(it)
765 except StopIteration:
766 return
767
768
769class RabbitDriver(amqpdriver.AMQPDriverBase):
770
771 def __init__(self, conf, url, default_exchange=None,
772 allowed_remote_exmods=[]):
773 conf.register_opts(rabbit_opts)
774 conf.register_opts(rpc_amqp.amqp_opts)
775
776 connection_pool = rpc_amqp.get_connection_pool(conf, Connection)
777
778 super(RabbitDriver, self).__init__(conf, url,
779 connection_pool,
780 default_exchange,
781 allowed_remote_exmods)
782
783 def require_features(self, requeue=True):
784 pass
7850
=== removed directory '.pc/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch'
=== removed directory '.pc/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch/oslo'
=== removed directory '.pc/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch/oslo/messaging'
=== removed directory '.pc/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch/oslo/messaging/_drivers'
=== removed file '.pc/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch/oslo/messaging/_drivers/common.py'
--- .pc/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch/oslo/messaging/_drivers/common.py 2015-04-23 15:56:08 +0000
+++ .pc/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch/oslo/messaging/_drivers/common.py 1970-01-01 00:00:00 +0000
@@ -1,535 +0,0 @@
1# Copyright 2010 United States Government as represented by the
2# Administrator of the National Aeronautics and Space Administration.
3# All Rights Reserved.
4# Copyright 2011 Red Hat, Inc.
5#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17
18import copy
19import logging
20import sys
21import time
22import traceback
23
24from oslo.config import cfg
25from oslo import messaging
26import six
27
28from oslo.messaging import _utils as utils
29from oslo.messaging.openstack.common import importutils
30from oslo.messaging.openstack.common import jsonutils
31
32# FIXME(markmc): remove this
33_ = lambda s: s
34
35LOG = logging.getLogger(__name__)
36
37_EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins'
38
39
40'''RPC Envelope Version.
41
42This version number applies to the top level structure of messages sent out.
43It does *not* apply to the message payload, which must be versioned
44independently. For example, when using rpc APIs, a version number is applied
45for changes to the API being exposed over rpc. This version number is handled
46in the rpc proxy and dispatcher modules.
47
48This version number applies to the message envelope that is used in the
49serialization done inside the rpc layer. See serialize_msg() and
50deserialize_msg().
51
52The current message format (version 2.0) is very simple. It is:
53
54 {
55 'oslo.version': <RPC Envelope Version as a String>,
56 'oslo.message': <Application Message Payload, JSON encoded>
57 }
58
59Message format version '1.0' is just considered to be the messages we sent
60without a message envelope.
61
62So, the current message envelope just includes the envelope version. It may
63eventually contain additional information, such as a signature for the message
64payload.
65
66We will JSON encode the application message payload. The message envelope,
67which includes the JSON encoded application message body, will be passed down
68to the messaging libraries as a dict.
69'''
70_RPC_ENVELOPE_VERSION = '2.0'
71
72_VERSION_KEY = 'oslo.version'
73_MESSAGE_KEY = 'oslo.message'
74
75_REMOTE_POSTFIX = '_Remote'
76
77_exception_opts = [
78 cfg.ListOpt('allowed_rpc_exception_modules',
79 default=['oslo.messaging.exceptions',
80 'nova.exception',
81 'cinder.exception',
82 _EXCEPTIONS_MODULE,
83 ],
84 help='Modules of exceptions that are permitted to be '
85 'recreated upon receiving exception data from an rpc '
86 'call.'),
87]
88
89
90class RPCException(Exception):
91 msg_fmt = _("An unknown RPC related exception occurred.")
92
93 def __init__(self, message=None, **kwargs):
94 self.kwargs = kwargs
95
96 if not message:
97 try:
98 message = self.msg_fmt % kwargs
99
100 except Exception:
101 # kwargs doesn't match a variable in the message
102 # log the issue and the kwargs
103 LOG.exception(_('Exception in string format operation'))
104 for name, value in six.iteritems(kwargs):
105 LOG.error("%s: %s" % (name, value))
106 # at least get the core message out if something happened
107 message = self.msg_fmt
108
109 super(RPCException, self).__init__(message)
110
111
112class RemoteError(RPCException):
113 """Signifies that a remote class has raised an exception.
114
115 Contains a string representation of the type of the original exception,
116 the value of the original exception, and the traceback. These are
117 sent to the parent as a joined string so printing the exception
118 contains all of the relevant info.
119
120 """
121 msg_fmt = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
122
123 def __init__(self, exc_type=None, value=None, traceback=None):
124 self.exc_type = exc_type
125 self.value = value
126 self.traceback = traceback
127 super(RemoteError, self).__init__(exc_type=exc_type,
128 value=value,
129 traceback=traceback)
130
131
132class Timeout(RPCException):
133 """Signifies that a timeout has occurred.
134
135 This exception is raised if the rpc_response_timeout is reached while
136 waiting for a response from the remote side.
137 """
138 msg_fmt = _('Timeout while waiting on RPC response - '
139 'topic: "%(topic)s", RPC method: "%(method)s" '
140 'info: "%(info)s"')
141
142 def __init__(self, info=None, topic=None, method=None):
143 """Initiates Timeout object.
144
145 :param info: Extra info to convey to the user
146 :param topic: The topic that the rpc call was sent to
147 :param rpc_method_name: The name of the rpc method being
148 called
149 """
150 self.info = info
151 self.topic = topic
152 self.method = method
153 super(Timeout, self).__init__(
154 None,
155 info=info or _('<unknown>'),
156 topic=topic or _('<unknown>'),
157 method=method or _('<unknown>'))
158
159
160class DuplicateMessageError(RPCException):
161 msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.")
162
163
164class InvalidRPCConnectionReuse(RPCException):
165 msg_fmt = _("Invalid reuse of an RPC connection.")
166
167
168class UnsupportedRpcVersion(RPCException):
169 msg_fmt = _("Specified RPC version, %(version)s, not supported by "
170 "this endpoint.")
171
172
173class UnsupportedRpcEnvelopeVersion(RPCException):
174 msg_fmt = _("Specified RPC envelope version, %(version)s, "
175 "not supported by this endpoint.")
176
177
178class RpcVersionCapError(RPCException):
179 msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low")
180
181
182class Connection(object):
183 """A connection, returned by rpc.create_connection().
184
185 This class represents a connection to the message bus used for rpc.
186 An instance of this class should never be created by users of the rpc API.
187 Use rpc.create_connection() instead.
188 """
189 def close(self):
190 """Close the connection.
191
192 This method must be called when the connection will no longer be used.
193 It will ensure that any resources associated with the connection, such
194 as a network connection, and cleaned up.
195 """
196 raise NotImplementedError()
197
198 def create_consumer(self, topic, proxy, fanout=False):
199 """Create a consumer on this connection.
200
201 A consumer is associated with a message queue on the backend message
202 bus. The consumer will read messages from the queue, unpack them, and
203 dispatch them to the proxy object. The contents of the message pulled
204 off of the queue will determine which method gets called on the proxy
205 object.
206
207 :param topic: This is a name associated with what to consume from.
208 Multiple instances of a service may consume from the same
209 topic. For example, all instances of nova-compute consume
210 from a queue called "compute". In that case, the
211 messages will get distributed amongst the consumers in a
212 round-robin fashion if fanout=False. If fanout=True,
213 every consumer associated with this topic will get a
214 copy of every message.
215 :param proxy: The object that will handle all incoming messages.
216 :param fanout: Whether or not this is a fanout topic. See the
217 documentation for the topic parameter for some
218 additional comments on this.
219 """
220 raise NotImplementedError()
221
222 def create_worker(self, topic, proxy, pool_name):
223 """Create a worker on this connection.
224
225 A worker is like a regular consumer of messages directed to a
226 topic, except that it is part of a set of such consumers (the
227 "pool") which may run in parallel. Every pool of workers will
228 receive a given message, but only one worker in the pool will
229 be asked to process it. Load is distributed across the members
230 of the pool in round-robin fashion.
231
232 :param topic: This is a name associated with what to consume from.
233 Multiple instances of a service may consume from the same
234 topic.
235 :param proxy: The object that will handle all incoming messages.
236 :param pool_name: String containing the name of the pool of workers
237 """
238 raise NotImplementedError()
239
240 def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
241 """Register as a member of a group of consumers.
242
243 Uses given topic from the specified exchange.
244 Exactly one member of a given pool will receive each message.
245
246 A message will be delivered to multiple pools, if more than
247 one is created.
248
249 :param callback: Callable to be invoked for each message.
250 :type callback: callable accepting one argument
251 :param pool_name: The name of the consumer pool.
252 :type pool_name: str
253 :param topic: The routing topic for desired messages.
254 :type topic: str
255 :param exchange_name: The name of the message exchange where
256 the client should attach. Defaults to
257 the configured exchange.
258 :type exchange_name: str
259 """
260 raise NotImplementedError()
261
262 def consume_in_thread(self):
263 """Spawn a thread to handle incoming messages.
264
265 Spawn a thread that will be responsible for handling all incoming
266 messages for consumers that were set up on this connection.
267
268 Message dispatching inside of this is expected to be implemented in a
269 non-blocking manner. An example implementation would be having this
270 thread pull messages in for all of the consumers, but utilize a thread
271 pool for dispatching the messages to the proxy objects.
272 """
273 raise NotImplementedError()
274
275
276def _safe_log(log_func, msg, msg_data):
277 """Sanitizes the msg_data field before logging."""
278 SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass']
279
280 def _fix_passwords(d):
281 """Sanitizes the password fields in the dictionary."""
282 for k in six.iterkeys(d):
283 if k.lower().find('password') != -1:
284 d[k] = '<SANITIZED>'
285 elif k.lower() in SANITIZE:
286 d[k] = '<SANITIZED>'
287 elif isinstance(d[k], dict):
288 _fix_passwords(d[k])
289 return d
290
291 return log_func(msg, _fix_passwords(copy.deepcopy(msg_data)))
292
293
294def serialize_remote_exception(failure_info, log_failure=True):
295 """Prepares exception data to be sent over rpc.
296
297 Failure_info should be a sys.exc_info() tuple.
298
299 """
300 tb = traceback.format_exception(*failure_info)
301 failure = failure_info[1]
302 if log_failure:
303 LOG.error(_("Returning exception %s to caller"),
304 six.text_type(failure))
305 LOG.error(tb)
306
307 kwargs = {}
308 if hasattr(failure, 'kwargs'):
309 kwargs = failure.kwargs
310
311 # NOTE(matiu): With cells, it's possible to re-raise remote, remote
312 # exceptions. Lets turn it back into the original exception type.
313 cls_name = str(failure.__class__.__name__)
314 mod_name = str(failure.__class__.__module__)
315 if (cls_name.endswith(_REMOTE_POSTFIX) and
316 mod_name.endswith(_REMOTE_POSTFIX)):
317 cls_name = cls_name[:-len(_REMOTE_POSTFIX)]
318 mod_name = mod_name[:-len(_REMOTE_POSTFIX)]
319
320 data = {
321 'class': cls_name,
322 'module': mod_name,
323 'message': six.text_type(failure),
324 'tb': tb,
325 'args': failure.args,
326 'kwargs': kwargs
327 }
328
329 json_data = jsonutils.dumps(data)
330
331 return json_data
332
333
334def deserialize_remote_exception(data, allowed_remote_exmods):
335 failure = jsonutils.loads(str(data))
336
337 trace = failure.get('tb', [])
338 message = failure.get('message', "") + "\n" + "\n".join(trace)
339 name = failure.get('class')
340 module = failure.get('module')
341
342 # NOTE(ameade): We DO NOT want to allow just any module to be imported, in
343 # order to prevent arbitrary code execution.
344 if module != _EXCEPTIONS_MODULE and module not in allowed_remote_exmods:
345 return messaging.RemoteError(name, failure.get('message'), trace)
346
347 try:
348 mod = importutils.import_module(module)
349 klass = getattr(mod, name)
350 if not issubclass(klass, Exception):
351 raise TypeError("Can only deserialize Exceptions")
352
353 failure = klass(*failure.get('args', []), **failure.get('kwargs', {}))
354 except (AttributeError, TypeError, ImportError):
355 return messaging.RemoteError(name, failure.get('message'), trace)
356
357 ex_type = type(failure)
358 str_override = lambda self: message
359 new_ex_type = type(ex_type.__name__ + _REMOTE_POSTFIX, (ex_type,),
360 {'__str__': str_override, '__unicode__': str_override})
361 new_ex_type.__module__ = '%s%s' % (module, _REMOTE_POSTFIX)
362 try:
363 # NOTE(ameade): Dynamically create a new exception type and swap it in
364 # as the new type for the exception. This only works on user defined
365 # Exceptions and not core Python exceptions. This is important because
366 # we cannot necessarily change an exception message so we must override
367 # the __str__ method.
368 failure.__class__ = new_ex_type
369 except TypeError:
370 # NOTE(ameade): If a core exception then just add the traceback to the
371 # first exception argument.
372 failure.args = (message,) + failure.args[1:]
373 return failure
374
375
376class CommonRpcContext(object):
377 def __init__(self, **kwargs):
378 self.values = kwargs
379
380 def __getattr__(self, key):
381 try:
382 return self.values[key]
383 except KeyError:
384 raise AttributeError(key)
385
386 def to_dict(self):
387 return copy.deepcopy(self.values)
388
389 @classmethod
390 def from_dict(cls, values):
391 return cls(**values)
392
393 def deepcopy(self):
394 return self.from_dict(self.to_dict())
395
396 def update_store(self):
397 #local.store.context = self
398 pass
399
400 def elevated(self, read_deleted=None, overwrite=False):
401 """Return a version of this context with admin flag set."""
402 # TODO(russellb) This method is a bit of a nova-ism. It makes
403 # some assumptions about the data in the request context sent
404 # across rpc, while the rest of this class does not. We could get
405 # rid of this if we changed the nova code that uses this to
406 # convert the RpcContext back to its native RequestContext doing
407 # something like nova.context.RequestContext.from_dict(ctxt.to_dict())
408
409 context = self.deepcopy()
410 context.values['is_admin'] = True
411
412 context.values.setdefault('roles', [])
413
414 if 'admin' not in context.values['roles']:
415 context.values['roles'].append('admin')
416
417 if read_deleted is not None:
418 context.values['read_deleted'] = read_deleted
419
420 return context
421
422
423class ClientException(Exception):
424 """Encapsulates actual exception expected to be hit by a RPC proxy object.
425
426 Merely instantiating it records the current exception information, which
427 will be passed back to the RPC client without exceptional logging.
428 """
429 def __init__(self):
430 self._exc_info = sys.exc_info()
431
432
433def catch_client_exception(exceptions, func, *args, **kwargs):
434 try:
435 return func(*args, **kwargs)
436 except Exception as e:
437 if type(e) in exceptions:
438 raise ClientException()
439 else:
440 raise
441
442
443def client_exceptions(*exceptions):
444 """Decorator for manager methods that raise expected exceptions.
445
446 Marking a Manager method with this decorator allows the declaration
447 of expected exceptions that the RPC layer should not consider fatal,
448 and not log as if they were generated in a real error scenario. Note
449 that this will cause listed exceptions to be wrapped in a
450 ClientException, which is used internally by the RPC layer.
451 """
452 def outer(func):
453 def inner(*args, **kwargs):
454 return catch_client_exception(exceptions, func, *args, **kwargs)
455 return inner
456 return outer
457
458
459def serialize_msg(raw_msg):
460 # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
461 # information about this format.
462 msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
463 _MESSAGE_KEY: jsonutils.dumps(raw_msg)}
464
465 return msg
466
467
468def deserialize_msg(msg):
469 # NOTE(russellb): Hang on to your hats, this road is about to
470 # get a little bumpy.
471 #
472 # Robustness Principle:
473 # "Be strict in what you send, liberal in what you accept."
474 #
475 # At this point we have to do a bit of guessing about what it
476 # is we just received. Here is the set of possibilities:
477 #
478 # 1) We received a dict. This could be 2 things:
479 #
480 # a) Inspect it to see if it looks like a standard message envelope.
481 # If so, great!
482 #
483 # b) If it doesn't look like a standard message envelope, it could either
484 # be a notification, or a message from before we added a message
485 # envelope (referred to as version 1.0).
486 # Just return the message as-is.
487 #
488 # 2) It's any other non-dict type. Just return it and hope for the best.
489 # This case covers return values from rpc.call() from before message
490 # envelopes were used. (messages to call a method were always a dict)
491
492 if not isinstance(msg, dict):
493 # See #2 above.
494 return msg
495
496 base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
497 if not all(map(lambda key: key in msg, base_envelope_keys)):
498 # See #1.b above.
499 return msg
500
501 # At this point we think we have the message envelope
502 # format we were expecting. (#1.a above)
503
504 if not utils.version_is_compatible(_RPC_ENVELOPE_VERSION,
505 msg[_VERSION_KEY]):
506 raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
507
508 raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
509
510 return raw_msg
511
512
513class DecayingTimer(object):
514 def __init__(self, duration=None):
515 self._duration = duration
516 self._ends_at = None
517
518 def start(self):
519 if self._duration is not None:
520 self._ends_at = time.time() + max(0, self._duration)
521 return self
522
523 def check_return(self, timeout_callback, *args, **kwargs):
524 if self._duration is None:
525 return None
526 if self._ends_at is None:
527 raise RuntimeError("Can not check/return a timeout from a timer"
528 " that has not been started")
529
530 maximum = kwargs.pop('maximum', None)
531 left = self._ends_at - time.time()
532 if left <= 0:
533 timeout_callback(*args, **kwargs)
534
535 return left if maximum is None else min(left, maximum)
5360
=== removed directory '.pc/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch/tests'
=== removed file '.pc/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch/tests/test_utils.py'
--- .pc/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch/tests/test_utils.py 2015-04-23 15:56:08 +0000
+++ .pc/0002-rabbit-fix-timeout-timer-when-duration-is-None.patch/tests/test_utils.py 1970-01-01 00:00:00 +0000
@@ -1,49 +0,0 @@
1
2# Copyright 2013 Red Hat, Inc.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16from oslo.messaging import _utils as utils
17from tests import utils as test_utils
18
19
20class VersionIsCompatibleTestCase(test_utils.BaseTestCase):
21 def test_version_is_compatible_same(self):
22 self.assertTrue(utils.version_is_compatible('1.23', '1.23'))
23
24 def test_version_is_compatible_newer_minor(self):
25 self.assertTrue(utils.version_is_compatible('1.24', '1.23'))
26
27 def test_version_is_compatible_older_minor(self):
28 self.assertFalse(utils.version_is_compatible('1.22', '1.23'))
29
30 def test_version_is_compatible_major_difference1(self):
31 self.assertFalse(utils.version_is_compatible('2.23', '1.23'))
32
33 def test_version_is_compatible_major_difference2(self):
34 self.assertFalse(utils.version_is_compatible('1.23', '2.23'))
35
36 def test_version_is_compatible_newer_rev(self):
37 self.assertFalse(utils.version_is_compatible('1.23', '1.23.1'))
38
39 def test_version_is_compatible_newer_rev_both(self):
40 self.assertFalse(utils.version_is_compatible('1.23.1', '1.23.2'))
41
42 def test_version_is_compatible_older_rev_both(self):
43 self.assertTrue(utils.version_is_compatible('1.23.2', '1.23.1'))
44
45 def test_version_is_compatible_older_rev(self):
46 self.assertTrue(utils.version_is_compatible('1.24', '1.23.1'))
47
48 def test_version_is_compatible_no_rev_is_zero(self):
49 self.assertTrue(utils.version_is_compatible('1.23.0', '1.23'))
500
=== removed directory '.pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch'
=== removed directory '.pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo'
=== removed directory '.pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo/messaging'
=== removed directory '.pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo/messaging/_drivers'
=== removed file '.pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo/messaging/_drivers/common.py'
--- .pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo/messaging/_drivers/common.py 2015-04-23 15:56:08 +0000
+++ .pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo/messaging/_drivers/common.py 1970-01-01 00:00:00 +0000
@@ -1,535 +0,0 @@
1# Copyright 2010 United States Government as represented by the
2# Administrator of the National Aeronautics and Space Administration.
3# All Rights Reserved.
4# Copyright 2011 Red Hat, Inc.
5#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
17
18import copy
19import logging
20import sys
21import time
22import traceback
23
24from oslo.config import cfg
25from oslo import messaging
26import six
27
28from oslo.messaging import _utils as utils
29from oslo.messaging.openstack.common import importutils
30from oslo.messaging.openstack.common import jsonutils
31
32# FIXME(markmc): remove this
33_ = lambda s: s
34
35LOG = logging.getLogger(__name__)
36
37_EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins'
38
39
40'''RPC Envelope Version.
41
42This version number applies to the top level structure of messages sent out.
43It does *not* apply to the message payload, which must be versioned
44independently. For example, when using rpc APIs, a version number is applied
45for changes to the API being exposed over rpc. This version number is handled
46in the rpc proxy and dispatcher modules.
47
48This version number applies to the message envelope that is used in the
49serialization done inside the rpc layer. See serialize_msg() and
50deserialize_msg().
51
52The current message format (version 2.0) is very simple. It is:
53
54 {
55 'oslo.version': <RPC Envelope Version as a String>,
56 'oslo.message': <Application Message Payload, JSON encoded>
57 }
58
59Message format version '1.0' is just considered to be the messages we sent
60without a message envelope.
61
62So, the current message envelope just includes the envelope version. It may
63eventually contain additional information, such as a signature for the message
64payload.
65
66We will JSON encode the application message payload. The message envelope,
67which includes the JSON encoded application message body, will be passed down
68to the messaging libraries as a dict.
69'''
70_RPC_ENVELOPE_VERSION = '2.0'
71
72_VERSION_KEY = 'oslo.version'
73_MESSAGE_KEY = 'oslo.message'
74
75_REMOTE_POSTFIX = '_Remote'
76
77_exception_opts = [
78 cfg.ListOpt('allowed_rpc_exception_modules',
79 default=['oslo.messaging.exceptions',
80 'nova.exception',
81 'cinder.exception',
82 _EXCEPTIONS_MODULE,
83 ],
84 help='Modules of exceptions that are permitted to be '
85 'recreated upon receiving exception data from an rpc '
86 'call.'),
87]
88
89
90class RPCException(Exception):
91 msg_fmt = _("An unknown RPC related exception occurred.")
92
93 def __init__(self, message=None, **kwargs):
94 self.kwargs = kwargs
95
96 if not message:
97 try:
98 message = self.msg_fmt % kwargs
99
100 except Exception:
101 # kwargs doesn't match a variable in the message
102 # log the issue and the kwargs
103 LOG.exception(_('Exception in string format operation'))
104 for name, value in six.iteritems(kwargs):
105 LOG.error("%s: %s" % (name, value))
106 # at least get the core message out if something happened
107 message = self.msg_fmt
108
109 super(RPCException, self).__init__(message)
110
111
112class RemoteError(RPCException):
113 """Signifies that a remote class has raised an exception.
114
115 Contains a string representation of the type of the original exception,
116 the value of the original exception, and the traceback. These are
117 sent to the parent as a joined string so printing the exception
118 contains all of the relevant info.
119
120 """
121 msg_fmt = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
122
123 def __init__(self, exc_type=None, value=None, traceback=None):
124 self.exc_type = exc_type
125 self.value = value
126 self.traceback = traceback
127 super(RemoteError, self).__init__(exc_type=exc_type,
128 value=value,
129 traceback=traceback)
130
131
132class Timeout(RPCException):
133 """Signifies that a timeout has occurred.
134
135 This exception is raised if the rpc_response_timeout is reached while
136 waiting for a response from the remote side.
137 """
138 msg_fmt = _('Timeout while waiting on RPC response - '
139 'topic: "%(topic)s", RPC method: "%(method)s" '
140 'info: "%(info)s"')
141
142 def __init__(self, info=None, topic=None, method=None):
143 """Initiates Timeout object.
144
145 :param info: Extra info to convey to the user
146 :param topic: The topic that the rpc call was sent to
147 :param rpc_method_name: The name of the rpc method being
148 called
149 """
150 self.info = info
151 self.topic = topic
152 self.method = method
153 super(Timeout, self).__init__(
154 None,
155 info=info or _('<unknown>'),
156 topic=topic or _('<unknown>'),
157 method=method or _('<unknown>'))
158
159
160class DuplicateMessageError(RPCException):
161 msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.")
162
163
164class InvalidRPCConnectionReuse(RPCException):
165 msg_fmt = _("Invalid reuse of an RPC connection.")
166
167
168class UnsupportedRpcVersion(RPCException):
169 msg_fmt = _("Specified RPC version, %(version)s, not supported by "
170 "this endpoint.")
171
172
173class UnsupportedRpcEnvelopeVersion(RPCException):
174 msg_fmt = _("Specified RPC envelope version, %(version)s, "
175 "not supported by this endpoint.")
176
177
178class RpcVersionCapError(RPCException):
179 msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low")
180
181
182class Connection(object):
183 """A connection, returned by rpc.create_connection().
184
185 This class represents a connection to the message bus used for rpc.
186 An instance of this class should never be created by users of the rpc API.
187 Use rpc.create_connection() instead.
188 """
189 def close(self):
190 """Close the connection.
191
192 This method must be called when the connection will no longer be used.
193 It will ensure that any resources associated with the connection, such
194 as a network connection, and cleaned up.
195 """
196 raise NotImplementedError()
197
198 def create_consumer(self, topic, proxy, fanout=False):
199 """Create a consumer on this connection.
200
201 A consumer is associated with a message queue on the backend message
202 bus. The consumer will read messages from the queue, unpack them, and
203 dispatch them to the proxy object. The contents of the message pulled
204 off of the queue will determine which method gets called on the proxy
205 object.
206
207 :param topic: This is a name associated with what to consume from.
208 Multiple instances of a service may consume from the same
209 topic. For example, all instances of nova-compute consume
210 from a queue called "compute". In that case, the
211 messages will get distributed amongst the consumers in a
212 round-robin fashion if fanout=False. If fanout=True,
213 every consumer associated with this topic will get a
214 copy of every message.
215 :param proxy: The object that will handle all incoming messages.
216 :param fanout: Whether or not this is a fanout topic. See the
217 documentation for the topic parameter for some
218 additional comments on this.
219 """
220 raise NotImplementedError()
221
222 def create_worker(self, topic, proxy, pool_name):
223 """Create a worker on this connection.
224
225 A worker is like a regular consumer of messages directed to a
226 topic, except that it is part of a set of such consumers (the
227 "pool") which may run in parallel. Every pool of workers will
228 receive a given message, but only one worker in the pool will
229 be asked to process it. Load is distributed across the members
230 of the pool in round-robin fashion.
231
232 :param topic: This is a name associated with what to consume from.
233 Multiple instances of a service may consume from the same
234 topic.
235 :param proxy: The object that will handle all incoming messages.
236 :param pool_name: String containing the name of the pool of workers
237 """
238 raise NotImplementedError()
239
240 def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
241 """Register as a member of a group of consumers.
242
243 Uses given topic from the specified exchange.
244 Exactly one member of a given pool will receive each message.
245
246 A message will be delivered to multiple pools, if more than
247 one is created.
248
249 :param callback: Callable to be invoked for each message.
250 :type callback: callable accepting one argument
251 :param pool_name: The name of the consumer pool.
252 :type pool_name: str
253 :param topic: The routing topic for desired messages.
254 :type topic: str
255 :param exchange_name: The name of the message exchange where
256 the client should attach. Defaults to
257 the configured exchange.
258 :type exchange_name: str
259 """
260 raise NotImplementedError()
261
262 def consume_in_thread(self):
263 """Spawn a thread to handle incoming messages.
264
265 Spawn a thread that will be responsible for handling all incoming
266 messages for consumers that were set up on this connection.
267
268 Message dispatching inside of this is expected to be implemented in a
269 non-blocking manner. An example implementation would be having this
270 thread pull messages in for all of the consumers, but utilize a thread
271 pool for dispatching the messages to the proxy objects.
272 """
273 raise NotImplementedError()
274
275
276def _safe_log(log_func, msg, msg_data):
277 """Sanitizes the msg_data field before logging."""
278 SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass']
279
280 def _fix_passwords(d):
281 """Sanitizes the password fields in the dictionary."""
282 for k in six.iterkeys(d):
283 if k.lower().find('password') != -1:
284 d[k] = '<SANITIZED>'
285 elif k.lower() in SANITIZE:
286 d[k] = '<SANITIZED>'
287 elif isinstance(d[k], dict):
288 _fix_passwords(d[k])
289 return d
290
291 return log_func(msg, _fix_passwords(copy.deepcopy(msg_data)))
292
293
294def serialize_remote_exception(failure_info, log_failure=True):
295 """Prepares exception data to be sent over rpc.
296
297 Failure_info should be a sys.exc_info() tuple.
298
299 """
300 tb = traceback.format_exception(*failure_info)
301 failure = failure_info[1]
302 if log_failure:
303 LOG.error(_("Returning exception %s to caller"),
304 six.text_type(failure))
305 LOG.error(tb)
306
307 kwargs = {}
308 if hasattr(failure, 'kwargs'):
309 kwargs = failure.kwargs
310
311 # NOTE(matiu): With cells, it's possible to re-raise remote, remote
312 # exceptions. Lets turn it back into the original exception type.
313 cls_name = str(failure.__class__.__name__)
314 mod_name = str(failure.__class__.__module__)
315 if (cls_name.endswith(_REMOTE_POSTFIX) and
316 mod_name.endswith(_REMOTE_POSTFIX)):
317 cls_name = cls_name[:-len(_REMOTE_POSTFIX)]
318 mod_name = mod_name[:-len(_REMOTE_POSTFIX)]
319
320 data = {
321 'class': cls_name,
322 'module': mod_name,
323 'message': six.text_type(failure),
324 'tb': tb,
325 'args': failure.args,
326 'kwargs': kwargs
327 }
328
329 json_data = jsonutils.dumps(data)
330
331 return json_data
332
333
334def deserialize_remote_exception(data, allowed_remote_exmods):
335 failure = jsonutils.loads(str(data))
336
337 trace = failure.get('tb', [])
338 message = failure.get('message', "") + "\n" + "\n".join(trace)
339 name = failure.get('class')
340 module = failure.get('module')
341
342 # NOTE(ameade): We DO NOT want to allow just any module to be imported, in
343 # order to prevent arbitrary code execution.
344 if module != _EXCEPTIONS_MODULE and module not in allowed_remote_exmods:
345 return messaging.RemoteError(name, failure.get('message'), trace)
346
347 try:
348 mod = importutils.import_module(module)
349 klass = getattr(mod, name)
350 if not issubclass(klass, Exception):
351 raise TypeError("Can only deserialize Exceptions")
352
353 failure = klass(*failure.get('args', []), **failure.get('kwargs', {}))
354 except (AttributeError, TypeError, ImportError):
355 return messaging.RemoteError(name, failure.get('message'), trace)
356
357 ex_type = type(failure)
358 str_override = lambda self: message
359 new_ex_type = type(ex_type.__name__ + _REMOTE_POSTFIX, (ex_type,),
360 {'__str__': str_override, '__unicode__': str_override})
361 new_ex_type.__module__ = '%s%s' % (module, _REMOTE_POSTFIX)
362 try:
363 # NOTE(ameade): Dynamically create a new exception type and swap it in
364 # as the new type for the exception. This only works on user defined
365 # Exceptions and not core Python exceptions. This is important because
366 # we cannot necessarily change an exception message so we must override
367 # the __str__ method.
368 failure.__class__ = new_ex_type
369 except TypeError:
370 # NOTE(ameade): If a core exception then just add the traceback to the
371 # first exception argument.
372 failure.args = (message,) + failure.args[1:]
373 return failure
374
375
376class CommonRpcContext(object):
377 def __init__(self, **kwargs):
378 self.values = kwargs
379
380 def __getattr__(self, key):
381 try:
382 return self.values[key]
383 except KeyError:
384 raise AttributeError(key)
385
386 def to_dict(self):
387 return copy.deepcopy(self.values)
388
389 @classmethod
390 def from_dict(cls, values):
391 return cls(**values)
392
393 def deepcopy(self):
394 return self.from_dict(self.to_dict())
395
396 def update_store(self):
397 #local.store.context = self
398 pass
399
400 def elevated(self, read_deleted=None, overwrite=False):
401 """Return a version of this context with admin flag set."""
402 # TODO(russellb) This method is a bit of a nova-ism. It makes
403 # some assumptions about the data in the request context sent
404 # across rpc, while the rest of this class does not. We could get
405 # rid of this if we changed the nova code that uses this to
406 # convert the RpcContext back to its native RequestContext doing
407 # something like nova.context.RequestContext.from_dict(ctxt.to_dict())
408
409 context = self.deepcopy()
410 context.values['is_admin'] = True
411
412 context.values.setdefault('roles', [])
413
414 if 'admin' not in context.values['roles']:
415 context.values['roles'].append('admin')
416
417 if read_deleted is not None:
418 context.values['read_deleted'] = read_deleted
419
420 return context
421
422
423class ClientException(Exception):
424 """Encapsulates actual exception expected to be hit by a RPC proxy object.
425
426 Merely instantiating it records the current exception information, which
427 will be passed back to the RPC client without exceptional logging.
428 """
429 def __init__(self):
430 self._exc_info = sys.exc_info()
431
432
433def catch_client_exception(exceptions, func, *args, **kwargs):
434 try:
435 return func(*args, **kwargs)
436 except Exception as e:
437 if type(e) in exceptions:
438 raise ClientException()
439 else:
440 raise
441
442
443def client_exceptions(*exceptions):
444 """Decorator for manager methods that raise expected exceptions.
445
446 Marking a Manager method with this decorator allows the declaration
447 of expected exceptions that the RPC layer should not consider fatal,
448 and not log as if they were generated in a real error scenario. Note
449 that this will cause listed exceptions to be wrapped in a
450 ClientException, which is used internally by the RPC layer.
451 """
452 def outer(func):
453 def inner(*args, **kwargs):
454 return catch_client_exception(exceptions, func, *args, **kwargs)
455 return inner
456 return outer
457
458
459def serialize_msg(raw_msg):
460 # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
461 # information about this format.
462 msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
463 _MESSAGE_KEY: jsonutils.dumps(raw_msg)}
464
465 return msg
466
467
468def deserialize_msg(msg):
469 # NOTE(russellb): Hang on to your hats, this road is about to
470 # get a little bumpy.
471 #
472 # Robustness Principle:
473 # "Be strict in what you send, liberal in what you accept."
474 #
475 # At this point we have to do a bit of guessing about what it
476 # is we just received. Here is the set of possibilities:
477 #
478 # 1) We received a dict. This could be 2 things:
479 #
480 # a) Inspect it to see if it looks like a standard message envelope.
481 # If so, great!
482 #
483 # b) If it doesn't look like a standard message envelope, it could either
484 # be a notification, or a message from before we added a message
485 # envelope (referred to as version 1.0).
486 # Just return the message as-is.
487 #
488 # 2) It's any other non-dict type. Just return it and hope for the best.
489 # This case covers return values from rpc.call() from before message
490 # envelopes were used. (messages to call a method were always a dict)
491
492 if not isinstance(msg, dict):
493 # See #2 above.
494 return msg
495
496 base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
497 if not all(map(lambda key: key in msg, base_envelope_keys)):
498 # See #1.b above.
499 return msg
500
501 # At this point we think we have the message envelope
502 # format we were expecting. (#1.a above)
503
504 if not utils.version_is_compatible(_RPC_ENVELOPE_VERSION,
505 msg[_VERSION_KEY]):
506 raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
507
508 raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
509
510 return raw_msg
511
512
513class DecayingTimer(object):
514 def __init__(self, duration=None):
515 self._duration = duration
516 self._ends_at = None
517
518 def start(self):
519 if self._duration is not None:
520 self._ends_at = time.time() + max(0, self._duration)
521 return self
522
523 def check_return(self, timeout_callback, *args, **kwargs):
524 maximum = kwargs.pop('maximum', None)
525 if self._duration is None:
526 return None if maximum is None else maximum
527 if self._ends_at is None:
528 raise RuntimeError("Can not check/return a timeout from a timer"
529 " that has not been started")
530
531 left = self._ends_at - time.time()
532 if left <= 0:
533 timeout_callback(*args, **kwargs)
534
535 return left if maximum is None else min(left, maximum)
5360
=== removed file '.pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo/messaging/_drivers/impl_rabbit.py'
--- .pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo/messaging/_drivers/impl_rabbit.py 2015-04-23 15:56:08 +0000
+++ .pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/oslo/messaging/_drivers/impl_rabbit.py 1970-01-01 00:00:00 +0000
@@ -1,793 +0,0 @@
1# Copyright 2011 OpenStack Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15import functools
16import itertools
17import logging
18import socket
19import ssl
20import time
21import uuid
22
23import kombu
24import kombu.connection
25import kombu.entity
26import kombu.messaging
27from oslo.config import cfg
28import six
29
30from oslo.messaging._drivers import amqp as rpc_amqp
31from oslo.messaging._drivers import amqpdriver
32from oslo.messaging._drivers import common as rpc_common
33from oslo.messaging.openstack.common import network_utils
34
35# FIXME(markmc): remove this
36_ = lambda s: s
37
38rabbit_opts = [
39 cfg.StrOpt('kombu_ssl_version',
40 default='',
41 help='SSL version to use (valid only if SSL enabled). '
42 'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
43 'be available on some distributions.'
44 ),
45 cfg.StrOpt('kombu_ssl_keyfile',
46 default='',
47 help='SSL key file (valid only if SSL enabled).'),
48 cfg.StrOpt('kombu_ssl_certfile',
49 default='',
50 help='SSL cert file (valid only if SSL enabled).'),
51 cfg.StrOpt('kombu_ssl_ca_certs',
52 default='',
53 help=('SSL certification authority file '
54 '(valid only if SSL enabled).')),
55 cfg.FloatOpt('kombu_reconnect_delay',
56 default=1.0,
57 help='How long to wait before reconnecting in response to an '
58 'AMQP consumer cancel notification.'),
59 cfg.StrOpt('rabbit_host',
60 default='localhost',
61 help='The RabbitMQ broker address where a single node is '
62 'used.'),
63 cfg.IntOpt('rabbit_port',
64 default=5672,
65 help='The RabbitMQ broker port where a single node is used.'),
66 cfg.ListOpt('rabbit_hosts',
67 default=['$rabbit_host:$rabbit_port'],
68 help='RabbitMQ HA cluster host:port pairs.'),
69 cfg.BoolOpt('rabbit_use_ssl',
70 default=False,
71 help='Connect over SSL for RabbitMQ.'),
72 cfg.StrOpt('rabbit_userid',
73 default='guest',
74 help='The RabbitMQ userid.'),
75 cfg.StrOpt('rabbit_password',
76 default='guest',
77 help='The RabbitMQ password.',
78 secret=True),
79 cfg.StrOpt('rabbit_login_method',
80 default='AMQPLAIN',
81 help='the RabbitMQ login method'),
82 cfg.StrOpt('rabbit_virtual_host',
83 default='/',
84 help='The RabbitMQ virtual host.'),
85 cfg.IntOpt('rabbit_retry_interval',
86 default=1,
87 help='How frequently to retry connecting with RabbitMQ.'),
88 cfg.IntOpt('rabbit_retry_backoff',
89 default=2,
90 help='How long to backoff for between retries when connecting '
91 'to RabbitMQ.'),
92 cfg.IntOpt('rabbit_max_retries',
93 default=0,
94 help='Maximum number of RabbitMQ connection retries. '
95 'Default is 0 (infinite retry count).'),
96 cfg.BoolOpt('rabbit_ha_queues',
97 default=False,
98 help='Use HA queues in RabbitMQ (x-ha-policy: all). '
99 'If you change this option, you must wipe the '
100 'RabbitMQ database.'),
101
102 # FIXME(markmc): this was toplevel in openstack.common.rpc
103 cfg.BoolOpt('fake_rabbit',
104 default=False,
105 help='If passed, use a fake RabbitMQ provider.'),
106]
107
108LOG = logging.getLogger(__name__)
109
110
111def _get_queue_arguments(conf):
112 """Construct the arguments for declaring a queue.
113
114 If the rabbit_ha_queues option is set, we declare a mirrored queue
115 as described here:
116
117 http://www.rabbitmq.com/ha.html
118
119 Setting x-ha-policy to all means that the queue will be mirrored
120 to all nodes in the cluster.
121 """
122 return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
123
124
125class RabbitMessage(dict):
126 def __init__(self, raw_message):
127 super(RabbitMessage, self).__init__(
128 rpc_common.deserialize_msg(raw_message.payload))
129 self._raw_message = raw_message
130
131 def acknowledge(self):
132 self._raw_message.ack()
133
134 def requeue(self):
135 self._raw_message.requeue()
136
137
138class ConsumerBase(object):
139 """Consumer base class."""
140
141 def __init__(self, channel, callback, tag, **kwargs):
142 """Declare a queue on an amqp channel.
143
144 'channel' is the amqp channel to use
145 'callback' is the callback to call when messages are received
146 'tag' is a unique ID for the consumer on the channel
147
148 queue name, exchange name, and other kombu options are
149 passed in here as a dictionary.
150 """
151 self.callback = callback
152 self.tag = str(tag)
153 self.kwargs = kwargs
154 self.queue = None
155 self.reconnect(channel)
156
157 def reconnect(self, channel):
158 """Re-declare the queue after a rabbit reconnect."""
159 self.channel = channel
160 self.kwargs['channel'] = channel
161 self.queue = kombu.entity.Queue(**self.kwargs)
162 self.queue.declare()
163
164 def _callback_handler(self, message, callback):
165 """Call callback with deserialized message.
166
167 Messages that are processed and ack'ed.
168 """
169
170 try:
171 callback(RabbitMessage(message))
172 except Exception:
173 LOG.exception(_("Failed to process message"
174 " ... skipping it."))
175 message.ack()
176
177 def consume(self, *args, **kwargs):
178 """Actually declare the consumer on the amqp channel. This will
179 start the flow of messages from the queue. Using the
180 Connection.iterconsume() iterator will process the messages,
181 calling the appropriate callback.
182
183 If a callback is specified in kwargs, use that. Otherwise,
184 use the callback passed during __init__()
185
186 If kwargs['nowait'] is True, then this call will block until
187 a message is read.
188
189 """
190
191 options = {'consumer_tag': self.tag}
192 options['nowait'] = kwargs.get('nowait', False)
193 callback = kwargs.get('callback', self.callback)
194 if not callback:
195 raise ValueError("No callback defined")
196
197 def _callback(raw_message):
198 message = self.channel.message_to_python(raw_message)
199 self._callback_handler(message, callback)
200
201 self.queue.consume(*args, callback=_callback, **options)
202
203 def cancel(self):
204 """Cancel the consuming from the queue, if it has started."""
205 try:
206 self.queue.cancel(self.tag)
207 except KeyError as e:
208 # NOTE(comstud): Kludge to get around a amqplib bug
209 if str(e) != "u'%s'" % self.tag:
210 raise
211 self.queue = None
212
213
214class DirectConsumer(ConsumerBase):
215 """Queue/consumer class for 'direct'."""
216
217 def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
218 """Init a 'direct' queue.
219
220 'channel' is the amqp channel to use
221 'msg_id' is the msg_id to listen on
222 'callback' is the callback to call when messages are received
223 'tag' is a unique ID for the consumer on the channel
224
225 Other kombu options may be passed
226 """
227 # Default options
228 options = {'durable': False,
229 'queue_arguments': _get_queue_arguments(conf),
230 'auto_delete': True,
231 'exclusive': False}
232 options.update(kwargs)
233 exchange = kombu.entity.Exchange(name=msg_id,
234 type='direct',
235 durable=options['durable'],
236 auto_delete=options['auto_delete'])
237 super(DirectConsumer, self).__init__(channel,
238 callback,
239 tag,
240 name=msg_id,
241 exchange=exchange,
242 routing_key=msg_id,
243 **options)
244
245
246class TopicConsumer(ConsumerBase):
247 """Consumer class for 'topic'."""
248
249 def __init__(self, conf, channel, topic, callback, tag, name=None,
250 exchange_name=None, **kwargs):
251 """Init a 'topic' queue.
252
253 :param channel: the amqp channel to use
254 :param topic: the topic to listen on
255 :paramtype topic: str
256 :param callback: the callback to call when messages are received
257 :param tag: a unique ID for the consumer on the channel
258 :param name: optional queue name, defaults to topic
259 :paramtype name: str
260
261 Other kombu options may be passed as keyword arguments
262 """
263 # Default options
264 options = {'durable': conf.amqp_durable_queues,
265 'queue_arguments': _get_queue_arguments(conf),
266 'auto_delete': conf.amqp_auto_delete,
267 'exclusive': False}
268 options.update(kwargs)
269 exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
270 exchange = kombu.entity.Exchange(name=exchange_name,
271 type='topic',
272 durable=options['durable'],
273 auto_delete=options['auto_delete'])
274 super(TopicConsumer, self).__init__(channel,
275 callback,
276 tag,
277 name=name or topic,
278 exchange=exchange,
279 routing_key=topic,
280 **options)
281
282
283class FanoutConsumer(ConsumerBase):
284 """Consumer class for 'fanout'."""
285
286 def __init__(self, conf, channel, topic, callback, tag, **kwargs):
287 """Init a 'fanout' queue.
288
289 'channel' is the amqp channel to use
290 'topic' is the topic to listen on
291 'callback' is the callback to call when messages are received
292 'tag' is a unique ID for the consumer on the channel
293
294 Other kombu options may be passed
295 """
296 unique = uuid.uuid4().hex
297 exchange_name = '%s_fanout' % topic
298 queue_name = '%s_fanout_%s' % (topic, unique)
299
300 # Default options
301 options = {'durable': False,
302 'queue_arguments': _get_queue_arguments(conf),
303 'auto_delete': True,
304 'exclusive': False}
305 options.update(kwargs)
306 exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
307 durable=options['durable'],
308 auto_delete=options['auto_delete'])
309 super(FanoutConsumer, self).__init__(channel, callback, tag,
310 name=queue_name,
311 exchange=exchange,
312 routing_key=topic,
313 **options)
314
315
316class Publisher(object):
317 """Base Publisher class."""
318
319 def __init__(self, channel, exchange_name, routing_key, **kwargs):
320 """Init the Publisher class with the exchange_name, routing_key,
321 and other options
322 """
323 self.exchange_name = exchange_name
324 self.routing_key = routing_key
325 self.kwargs = kwargs
326 self.reconnect(channel)
327
328 def reconnect(self, channel):
329 """Re-establish the Producer after a rabbit reconnection."""
330 self.exchange = kombu.entity.Exchange(name=self.exchange_name,
331 **self.kwargs)
332 self.producer = kombu.messaging.Producer(exchange=self.exchange,
333 channel=channel,
334 routing_key=self.routing_key)
335
336 def send(self, msg, timeout=None):
337 """Send a message."""
338 if timeout:
339 #
340 # AMQP TTL is in milliseconds when set in the header.
341 #
342 self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
343 else:
344 self.producer.publish(msg)
345
346
347class DirectPublisher(Publisher):
348 """Publisher class for 'direct'."""
349 def __init__(self, conf, channel, msg_id, **kwargs):
350 """Init a 'direct' publisher.
351
352 Kombu options may be passed as keyword args to override defaults
353 """
354
355 options = {'durable': False,
356 'auto_delete': True,
357 'exclusive': False}
358 options.update(kwargs)
359 super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
360 type='direct', **options)
361
362
363class TopicPublisher(Publisher):
364 """Publisher class for 'topic'."""
365 def __init__(self, conf, channel, topic, **kwargs):
366 """Init a 'topic' publisher.
367
368 Kombu options may be passed as keyword args to override defaults
369 """
370 options = {'durable': conf.amqp_durable_queues,
371 'auto_delete': conf.amqp_auto_delete,
372 'exclusive': False}
373 options.update(kwargs)
374 exchange_name = rpc_amqp.get_control_exchange(conf)
375 super(TopicPublisher, self).__init__(channel,
376 exchange_name,
377 topic,
378 type='topic',
379 **options)
380
381
382class FanoutPublisher(Publisher):
383 """Publisher class for 'fanout'."""
384 def __init__(self, conf, channel, topic, **kwargs):
385 """Init a 'fanout' publisher.
386
387 Kombu options may be passed as keyword args to override defaults
388 """
389 options = {'durable': False,
390 'auto_delete': True,
391 'exclusive': False}
392 options.update(kwargs)
393 super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
394 None, type='fanout', **options)
395
396
397class NotifyPublisher(TopicPublisher):
398 """Publisher class for 'notify'."""
399
400 def __init__(self, conf, channel, topic, **kwargs):
401 self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
402 self.queue_arguments = _get_queue_arguments(conf)
403 super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
404
405 def reconnect(self, channel):
406 super(NotifyPublisher, self).reconnect(channel)
407
408 # NOTE(jerdfelt): Normally the consumer would create the queue, but
409 # we do this to ensure that messages don't get dropped if the
410 # consumer is started after we do
411 queue = kombu.entity.Queue(channel=channel,
412 exchange=self.exchange,
413 durable=self.durable,
414 name=self.routing_key,
415 routing_key=self.routing_key,
416 queue_arguments=self.queue_arguments)
417 queue.declare()
418
419
420class Connection(object):
421 """Connection object."""
422
423 pool = None
424
425 def __init__(self, conf, server_params=None):
426 self.consumers = []
427 self.conf = conf
428 self.max_retries = self.conf.rabbit_max_retries
429 # Try forever?
430 if self.max_retries <= 0:
431 self.max_retries = None
432 self.interval_start = self.conf.rabbit_retry_interval
433 self.interval_stepping = self.conf.rabbit_retry_backoff
434 # max retry-interval = 30 seconds
435 self.interval_max = 30
436 self.memory_transport = False
437
438 if server_params is None:
439 server_params = {}
440 # Keys to translate from server_params to kombu params
441 server_params_to_kombu_params = {'username': 'userid'}
442
443 ssl_params = self._fetch_ssl_params()
444 params_list = []
445 for adr in self.conf.rabbit_hosts:
446 hostname, port = network_utils.parse_host_port(
447 adr, default_port=self.conf.rabbit_port)
448
449 params = {
450 'hostname': hostname,
451 'port': port,
452 'userid': self.conf.rabbit_userid,
453 'password': self.conf.rabbit_password,
454 'login_method': self.conf.rabbit_login_method,
455 'virtual_host': self.conf.rabbit_virtual_host,
456 }
457
458 for sp_key, value in six.iteritems(server_params):
459 p_key = server_params_to_kombu_params.get(sp_key, sp_key)
460 params[p_key] = value
461
462 if self.conf.fake_rabbit:
463 params['transport'] = 'memory'
464 if self.conf.rabbit_use_ssl:
465 params['ssl'] = ssl_params
466
467 params_list.append(params)
468
469 self.params_list = itertools.cycle(params_list)
470
471 self.memory_transport = self.conf.fake_rabbit
472
473 self.connection = None
474 self.do_consume = None
475 self.reconnect()
476
477 # FIXME(markmc): use oslo sslutils when it is available as a library
478 _SSL_PROTOCOLS = {
479 "tlsv1": ssl.PROTOCOL_TLSv1,
480 "sslv23": ssl.PROTOCOL_SSLv23,
481 "sslv3": ssl.PROTOCOL_SSLv3
482 }
483
484 try:
485 _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2
486 except AttributeError:
487 pass
488
489 @classmethod
490 def validate_ssl_version(cls, version):
491 key = version.lower()
492 try:
493 return cls._SSL_PROTOCOLS[key]
494 except KeyError:
495 raise RuntimeError(_("Invalid SSL version : %s") % version)
496
497 def _fetch_ssl_params(self):
498 """Handles fetching what ssl params should be used for the connection
499 (if any).
500 """
501 ssl_params = dict()
502
503 # http://docs.python.org/library/ssl.html - ssl.wrap_socket
504 if self.conf.kombu_ssl_version:
505 ssl_params['ssl_version'] = self.validate_ssl_version(
506 self.conf.kombu_ssl_version)
507 if self.conf.kombu_ssl_keyfile:
508 ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
509 if self.conf.kombu_ssl_certfile:
510 ssl_params['certfile'] = self.conf.kombu_ssl_certfile
511 if self.conf.kombu_ssl_ca_certs:
512 ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs
513 # We might want to allow variations in the
514 # future with this?
515 ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
516
517 # Return the extended behavior or just have the default behavior
518 return ssl_params or True
519
520 def _connect(self, params):
521 """Connect to rabbit. Re-establish any queues that may have
522 been declared before if we are reconnecting. Exceptions should
523 be handled by the caller.
524 """
525 if self.connection:
526 LOG.info(_("Reconnecting to AMQP server on "
527 "%(hostname)s:%(port)d") % params)
528 try:
529 # XXX(nic): when reconnecting to a RabbitMQ cluster
530 # with mirrored queues in use, the attempt to release the
531 # connection can hang "indefinitely" somewhere deep down
532 # in Kombu. Blocking the thread for a bit prior to
533 # release seems to kludge around the problem where it is
534 # otherwise reproduceable.
535 if self.conf.kombu_reconnect_delay > 0:
536 LOG.info(_("Delaying reconnect for %1.1f seconds...") %
537 self.conf.kombu_reconnect_delay)
538 time.sleep(self.conf.kombu_reconnect_delay)
539
540 self.connection.release()
541 except self.connection_errors:
542 pass
543 # Setting this in case the next statement fails, though
544 # it shouldn't be doing any network operations, yet.
545 self.connection = None
546 self.connection = kombu.connection.BrokerConnection(**params)
547 self.connection_errors = self.connection.connection_errors
548 self.channel_errors = self.connection.channel_errors
549 if self.memory_transport:
550 # Kludge to speed up tests.
551 self.connection.transport.polling_interval = 0.0
552 self.do_consume = True
553 self.consumer_num = itertools.count(1)
554 self.connection.connect()
555 self.channel = self.connection.channel()
556 # work around 'memory' transport bug in 1.1.3
557 if self.memory_transport:
558 self.channel._new_queue('ae.undeliver')
559 for consumer in self.consumers:
560 consumer.reconnect(self.channel)
561 LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') %
562 params)
563
564 def reconnect(self):
565 """Handles reconnecting and re-establishing queues.
566 Will retry up to self.max_retries number of times.
567 self.max_retries = 0 means to retry forever.
568 Sleep between tries, starting at self.interval_start
569 seconds, backing off self.interval_stepping number of seconds
570 each attempt.
571 """
572
573 attempt = 0
574 while True:
575 params = six.next(self.params_list)
576 attempt += 1
577 try:
578 self._connect(params)
579 return
580 except IOError as e:
581 pass
582 except self.connection_errors as e:
583 pass
584 except Exception as e:
585 # NOTE(comstud): Unfortunately it's possible for amqplib
586 # to return an error not covered by its transport
587 # connection_errors in the case of a timeout waiting for
588 # a protocol response. (See paste link in LP888621)
589 # So, we check all exceptions for 'timeout' in them
590 # and try to reconnect in this case.
591 if 'timeout' not in str(e):
592 raise
593
594 log_info = {}
595 log_info['err_str'] = str(e)
596 log_info['max_retries'] = self.max_retries
597 log_info.update(params)
598
599 if self.max_retries and attempt == self.max_retries:
600 msg = _('Unable to connect to AMQP server on '
601 '%(hostname)s:%(port)d after %(max_retries)d '
602 'tries: %(err_str)s') % log_info
603 LOG.error(msg)
604 raise rpc_common.RPCException(msg)
605
606 if attempt == 1:
607 sleep_time = self.interval_start or 1
608 elif attempt > 1:
609 sleep_time += self.interval_stepping
610 if self.interval_max:
611 sleep_time = min(sleep_time, self.interval_max)
612
613 log_info['sleep_time'] = sleep_time
614 LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
615 'unreachable: %(err_str)s. Trying again in '
616 '%(sleep_time)d seconds.') % log_info)
617 time.sleep(sleep_time)
618
619 def ensure(self, error_callback, method, *args, **kwargs):
620 while True:
621 try:
622 return method(*args, **kwargs)
623 except self.connection_errors as e:
624 if error_callback:
625 error_callback(e)
626 except self.channel_errors as e:
627 if error_callback:
628 error_callback(e)
629 except (socket.timeout, IOError) as e:
630 if error_callback:
631 error_callback(e)
632 except Exception as e:
633 # NOTE(comstud): Unfortunately it's possible for amqplib
634 # to return an error not covered by its transport
635 # connection_errors in the case of a timeout waiting for
636 # a protocol response. (See paste link in LP888621)
637 # So, we check all exceptions for 'timeout' in them
638 # and try to reconnect in this case.
639 if 'timeout' not in str(e):
640 raise
641 if error_callback:
642 error_callback(e)
643 self.reconnect()
644
645 def get_channel(self):
646 """Convenience call for bin/clear_rabbit_queues."""
647 return self.channel
648
649 def close(self):
650 """Close/release this connection."""
651 self.connection.release()
652 self.connection = None
653
654 def reset(self):
655 """Reset a connection so it can be used again."""
656 self.channel.close()
657 self.channel = self.connection.channel()
658 # work around 'memory' transport bug in 1.1.3
659 if self.memory_transport:
660 self.channel._new_queue('ae.undeliver')
661 self.consumers = []
662
663 def declare_consumer(self, consumer_cls, topic, callback):
664 """Create a Consumer using the class that was passed in and
665 add it to our list of consumers
666 """
667
668 def _connect_error(exc):
669 log_info = {'topic': topic, 'err_str': str(exc)}
670 LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
671 "%(err_str)s") % log_info)
672
673 def _declare_consumer():
674 consumer = consumer_cls(self.conf, self.channel, topic, callback,
675 six.next(self.consumer_num))
676 self.consumers.append(consumer)
677 return consumer
678
679 return self.ensure(_connect_error, _declare_consumer)
680
681 def iterconsume(self, limit=None, timeout=None):
682 """Return an iterator that will consume from all queues/consumers."""
683 timer = rpc_common.DecayingTimer(duration=timeout)
684 timer.start()
685
686 def _raise_timeout(exc):
687 LOG.debug('Timed out waiting for RPC response: %s', exc)
688 raise rpc_common.Timeout()
689
690 def _error_callback(exc):
691 self.do_consume = True
692 timer.check_return(_raise_timeout, exc)
693 LOG.exception(_('Failed to consume message from queue: %s'),
694 exc)
695
696 def _consume():
697 if self.do_consume:
698 queues_head = self.consumers[:-1] # not fanout.
699 queues_tail = self.consumers[-1] # fanout
700 for queue in queues_head:
701 queue.consume(nowait=True)
702 queues_tail.consume(nowait=False)
703 self.do_consume = False
704
705 poll_timeout = 1 if timeout is None else min(timeout, 1)
706 while True:
707 try:
708 return self.connection.drain_events(timeout=poll_timeout)
709 except socket.timeout as exc:
710 poll_timeout = timer.check_return(_raise_timeout, exc,
711 maximum=1)
712
713 for iteration in itertools.count(0):
714 if limit and iteration >= limit:
715 raise StopIteration
716 yield self.ensure(_error_callback, _consume)
717
718 def publisher_send(self, cls, topic, msg, timeout=None, **kwargs):
719 """Send to a publisher based on the publisher class."""
720
721 def _error_callback(exc):
722 log_info = {'topic': topic, 'err_str': str(exc)}
723 LOG.exception(_("Failed to publish message to topic "
724 "'%(topic)s': %(err_str)s") % log_info)
725
726 def _publish():
727 publisher = cls(self.conf, self.channel, topic, **kwargs)
728 publisher.send(msg, timeout)
729
730 self.ensure(_error_callback, _publish)
731
732 def declare_direct_consumer(self, topic, callback):
733 """Create a 'direct' queue.
734 In nova's use, this is generally a msg_id queue used for
735 responses for call/multicall
736 """
737 self.declare_consumer(DirectConsumer, topic, callback)
738
739 def declare_topic_consumer(self, topic, callback=None, queue_name=None,
740 exchange_name=None):
741 """Create a 'topic' consumer."""
742 self.declare_consumer(functools.partial(TopicConsumer,
743 name=queue_name,
744 exchange_name=exchange_name,
745 ),
746 topic, callback)
747
748 def declare_fanout_consumer(self, topic, callback):
749 """Create a 'fanout' consumer."""
750 self.declare_consumer(FanoutConsumer, topic, callback)
751
752 def direct_send(self, msg_id, msg):
753 """Send a 'direct' message."""
754 self.publisher_send(DirectPublisher, msg_id, msg)
755
756 def topic_send(self, topic, msg, timeout=None):
757 """Send a 'topic' message."""
758 self.publisher_send(TopicPublisher, topic, msg, timeout)
759
760 def fanout_send(self, topic, msg):
761 """Send a 'fanout' message."""
762 self.publisher_send(FanoutPublisher, topic, msg)
763
764 def notify_send(self, topic, msg, **kwargs):
765 """Send a notify message on a topic."""
766 self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs)
767
768 def consume(self, limit=None, timeout=None):
769 """Consume from all queues/consumers."""
770 it = self.iterconsume(limit=limit, timeout=timeout)
771 while True:
772 try:
773 six.next(it)
774 except StopIteration:
775 return
776
777
778class RabbitDriver(amqpdriver.AMQPDriverBase):
779
780 def __init__(self, conf, url, default_exchange=None,
781 allowed_remote_exmods=[]):
782 conf.register_opts(rabbit_opts)
783 conf.register_opts(rpc_amqp.amqp_opts)
784
785 connection_pool = rpc_amqp.get_connection_pool(conf, Connection)
786
787 super(RabbitDriver, self).__init__(conf, url,
788 connection_pool,
789 default_exchange,
790 allowed_remote_exmods)
791
792 def require_features(self, requeue=True):
793 pass
7940
=== removed directory '.pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/tests'
=== removed file '.pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/tests/test_rabbit.py'
--- .pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/tests/test_rabbit.py 2015-04-23 15:56:08 +0000
+++ .pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/tests/test_rabbit.py 1970-01-01 00:00:00 +0000
@@ -1,646 +0,0 @@
1# Copyright 2013 Red Hat, Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15import datetime
16import sys
17import threading
18import uuid
19
20import fixtures
21import kombu
22import testscenarios
23
24from oslo import messaging
25from oslo.messaging._drivers import amqpdriver
26from oslo.messaging._drivers import common as driver_common
27from oslo.messaging._drivers import impl_rabbit as rabbit_driver
28from oslo.messaging.openstack.common import jsonutils
29from tests import utils as test_utils
30
31load_tests = testscenarios.load_tests_apply_scenarios
32
33
34class TestRabbitDriverLoad(test_utils.BaseTestCase):
35
36 def setUp(self):
37 super(TestRabbitDriverLoad, self).setUp()
38 self.messaging_conf.transport_driver = 'rabbit'
39 self.messaging_conf.in_memory = True
40
41 def test_driver_load(self):
42 transport = messaging.get_transport(self.conf)
43 self.assertIsInstance(transport._driver, rabbit_driver.RabbitDriver)
44
45
46class TestRabbitTransportURL(test_utils.BaseTestCase):
47
48 scenarios = [
49 ('none', dict(url=None, expected=None)),
50 ('empty',
51 dict(url='rabbit:///',
52 expected=dict(virtual_host=''))),
53 ('localhost',
54 dict(url='rabbit://localhost/',
55 expected=dict(hostname='localhost',
56 username='',
57 password='',
58 virtual_host=''))),
59 ('virtual_host',
60 dict(url='rabbit:///vhost',
61 expected=dict(virtual_host='vhost'))),
62 ('no_creds',
63 dict(url='rabbit://host/virtual_host',
64 expected=dict(hostname='host',
65 username='',
66 password='',
67 virtual_host='virtual_host'))),
68 ('no_port',
69 dict(url='rabbit://user:password@host/virtual_host',
70 expected=dict(hostname='host',
71 username='user',
72 password='password',
73 virtual_host='virtual_host'))),
74 ('full_url',
75 dict(url='rabbit://user:password@host:10/virtual_host',
76 expected=dict(hostname='host',
77 port=10,
78 username='user',
79 password='password',
80 virtual_host='virtual_host'))),
81 ]
82
83 def setUp(self):
84 super(TestRabbitTransportURL, self).setUp()
85
86 self.messaging_conf.transport_driver = 'rabbit'
87 self.messaging_conf.in_memory = True
88
89 self._server_params = []
90 cnx_init = rabbit_driver.Connection.__init__
91
92 def record_params(cnx, conf, server_params=None):
93 self._server_params.append(server_params)
94 return cnx_init(cnx, conf, server_params)
95
96 def dummy_send(cnx, topic, msg, timeout=None):
97 pass
98
99 self.stubs.Set(rabbit_driver.Connection, '__init__', record_params)
100 self.stubs.Set(rabbit_driver.Connection, 'topic_send', dummy_send)
101
102 self._driver = messaging.get_transport(self.conf, self.url)._driver
103 self._target = messaging.Target(topic='testtopic')
104
105 def test_transport_url_listen(self):
106 self._driver.listen(self._target)
107 self.assertEqual(self._server_params[0], self.expected)
108
109 def test_transport_url_listen_for_notification(self):
110 self._driver.listen_for_notifications(
111 [(messaging.Target(topic='topic'), 'info')])
112 self.assertEqual(self._server_params[0], self.expected)
113
114 def test_transport_url_send(self):
115 self._driver.send(self._target, {}, {})
116 self.assertEqual(self._server_params[0], self.expected)
117
118
119class TestSendReceive(test_utils.BaseTestCase):
120
121 _n_senders = [
122 ('single_sender', dict(n_senders=1)),
123 ('multiple_senders', dict(n_senders=10)),
124 ]
125
126 _context = [
127 ('empty_context', dict(ctxt={})),
128 ('with_context', dict(ctxt={'user': 'mark'})),
129 ]
130
131 _reply = [
132 ('rx_id', dict(rx_id=True, reply=None)),
133 ('none', dict(rx_id=False, reply=None)),
134 ('empty_list', dict(rx_id=False, reply=[])),
135 ('empty_dict', dict(rx_id=False, reply={})),
136 ('false', dict(rx_id=False, reply=False)),
137 ('zero', dict(rx_id=False, reply=0)),
138 ]
139
140 _failure = [
141 ('success', dict(failure=False)),
142 ('failure', dict(failure=True, expected=False)),
143 ('expected_failure', dict(failure=True, expected=True)),
144 ]
145
146 _timeout = [
147 ('no_timeout', dict(timeout=None)),
148 ('timeout', dict(timeout=0.01)), # FIXME(markmc): timeout=0 is broken?
149 ]
150
151 @classmethod
152 def generate_scenarios(cls):
153 cls.scenarios = testscenarios.multiply_scenarios(cls._n_senders,
154 cls._context,
155 cls._reply,
156 cls._failure,
157 cls._timeout)
158
159 def setUp(self):
160 super(TestSendReceive, self).setUp()
161 self.messaging_conf.transport_driver = 'rabbit'
162 self.messaging_conf.in_memory = True
163
164 def test_send_receive(self):
165 transport = messaging.get_transport(self.conf)
166 self.addCleanup(transport.cleanup)
167
168 driver = transport._driver
169
170 target = messaging.Target(topic='testtopic')
171
172 listener = driver.listen(target)
173
174 senders = []
175 replies = []
176 msgs = []
177 errors = []
178
179 def stub_error(msg, *a, **kw):
180 if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]):
181 a = a[0]
182 errors.append(str(msg) % a)
183
184 self.stubs.Set(driver_common.LOG, 'error', stub_error)
185
186 def send_and_wait_for_reply(i):
187 try:
188 replies.append(driver.send(target,
189 self.ctxt,
190 {'tx_id': i},
191 wait_for_reply=True,
192 timeout=self.timeout))
193 self.assertFalse(self.failure)
194 self.assertIsNone(self.timeout)
195 except (ZeroDivisionError, messaging.MessagingTimeout) as e:
196 replies.append(e)
197 self.assertTrue(self.failure or self.timeout is not None)
198
199 while len(senders) < self.n_senders:
200 senders.append(threading.Thread(target=send_and_wait_for_reply,
201 args=(len(senders), )))
202
203 for i in range(len(senders)):
204 senders[i].start()
205
206 received = listener.poll()
207 self.assertIsNotNone(received)
208 self.assertEqual(received.ctxt, self.ctxt)
209 self.assertEqual(received.message, {'tx_id': i})
210 msgs.append(received)
211
212 # reply in reverse, except reply to the first guy second from last
213 order = list(range(len(senders)-1, -1, -1))
214 if len(order) > 1:
215 order[-1], order[-2] = order[-2], order[-1]
216
217 for i in order:
218 if self.timeout is None:
219 if self.failure:
220 try:
221 raise ZeroDivisionError
222 except Exception:
223 failure = sys.exc_info()
224 msgs[i].reply(failure=failure,
225 log_failure=not self.expected)
226 elif self.rx_id:
227 msgs[i].reply({'rx_id': i})
228 else:
229 msgs[i].reply(self.reply)
230 senders[i].join()
231
232 self.assertEqual(len(replies), len(senders))
233 for i, reply in enumerate(replies):
234 if self.timeout is not None:
235 self.assertIsInstance(reply, messaging.MessagingTimeout)
236 elif self.failure:
237 self.assertIsInstance(reply, ZeroDivisionError)
238 elif self.rx_id:
239 self.assertEqual(reply, {'rx_id': order[i]})
240 else:
241 self.assertEqual(reply, self.reply)
242
243 if not self.timeout and self.failure and not self.expected:
244 self.assertTrue(len(errors) > 0, errors)
245 else:
246 self.assertEqual(len(errors), 0, errors)
247
248
249TestSendReceive.generate_scenarios()
250
251
252class TestRacyWaitForReply(test_utils.BaseTestCase):
253
254 def setUp(self):
255 super(TestRacyWaitForReply, self).setUp()
256 self.messaging_conf.transport_driver = 'rabbit'
257 self.messaging_conf.in_memory = True
258
259 def test_send_receive(self):
260 transport = messaging.get_transport(self.conf)
261 self.addCleanup(transport.cleanup)
262
263 driver = transport._driver
264
265 target = messaging.Target(topic='testtopic')
266
267 listener = driver.listen(target)
268
269 senders = []
270 replies = []
271 msgs = []
272
273 wait_conditions = []
274 orig_reply_waiter = amqpdriver.ReplyWaiter.wait
275
276 def reply_waiter(self, msg_id, timeout):
277 if wait_conditions:
278 with wait_conditions[0]:
279 wait_conditions.pop().wait()
280 return orig_reply_waiter(self, msg_id, timeout)
281
282 self.stubs.Set(amqpdriver.ReplyWaiter, 'wait', reply_waiter)
283
284 def send_and_wait_for_reply(i):
285 replies.append(driver.send(target,
286 {},
287 {'tx_id': i},
288 wait_for_reply=True,
289 timeout=None))
290
291 while len(senders) < 2:
292 t = threading.Thread(target=send_and_wait_for_reply,
293 args=(len(senders), ))
294 t.daemon = True
295 senders.append(t)
296
297 # Start the first guy, receive his message, but delay his polling
298 notify_condition = threading.Condition()
299 wait_conditions.append(notify_condition)
300 senders[0].start()
301
302 msgs.append(listener.poll())
303 self.assertEqual(msgs[-1].message, {'tx_id': 0})
304
305 # Start the second guy, receive his message
306 senders[1].start()
307
308 msgs.append(listener.poll())
309 self.assertEqual(msgs[-1].message, {'tx_id': 1})
310
311 # Reply to both in order, making the second thread queue
312 # the reply meant for the first thread
313 msgs[0].reply({'rx_id': 0})
314 msgs[1].reply({'rx_id': 1})
315
316 # Wait for the second thread to finish
317 senders[1].join()
318
319 # Let the first thread continue
320 with notify_condition:
321 notify_condition.notify()
322
323 # Wait for the first thread to finish
324 senders[0].join()
325
326 # Verify replies were received out of order
327 self.assertEqual(len(replies), len(senders))
328 self.assertEqual(replies[0], {'rx_id': 1})
329 self.assertEqual(replies[1], {'rx_id': 0})
330
331
332def _declare_queue(target):
333 connection = kombu.connection.BrokerConnection(transport='memory')
334
335 # Kludge to speed up tests.
336 connection.transport.polling_interval = 0.0
337
338 connection.connect()
339 channel = connection.channel()
340
341 # work around 'memory' transport bug in 1.1.3
342 channel._new_queue('ae.undeliver')
343
344 if target.fanout:
345 exchange = kombu.entity.Exchange(name=target.topic + '_fanout',
346 type='fanout',
347 durable=False,
348 auto_delete=True)
349 queue = kombu.entity.Queue(name=target.topic + '_fanout_12345',
350 channel=channel,
351 exchange=exchange,
352 routing_key=target.topic)
353 if target.server:
354 exchange = kombu.entity.Exchange(name='openstack',
355 type='topic',
356 durable=False,
357 auto_delete=False)
358 topic = '%s.%s' % (target.topic, target.server)
359 queue = kombu.entity.Queue(name=topic,
360 channel=channel,
361 exchange=exchange,
362 routing_key=topic)
363 else:
364 exchange = kombu.entity.Exchange(name='openstack',
365 type='topic',
366 durable=False,
367 auto_delete=False)
368 queue = kombu.entity.Queue(name=target.topic,
369 channel=channel,
370 exchange=exchange,
371 routing_key=target.topic)
372
373 queue.declare()
374
375 return connection, channel, queue
376
377
378class TestRequestWireFormat(test_utils.BaseTestCase):
379
380 _target = [
381 ('topic_target',
382 dict(topic='testtopic', server=None, fanout=False)),
383 ('server_target',
384 dict(topic='testtopic', server='testserver', fanout=False)),
385 # NOTE(markmc): https://github.com/celery/kombu/issues/195
386 ('fanout_target',
387 dict(topic='testtopic', server=None, fanout=True,
388 skip_msg='Requires kombu>2.5.12 to fix kombu issue #195')),
389 ]
390
391 _msg = [
392 ('empty_msg',
393 dict(msg={}, expected={})),
394 ('primitive_msg',
395 dict(msg={'foo': 'bar'}, expected={'foo': 'bar'})),
396 ('complex_msg',
397 dict(msg={'a': {'b': datetime.datetime(1920, 2, 3, 4, 5, 6, 7)}},
398 expected={'a': {'b': '1920-02-03T04:05:06.000007'}})),
399 ]
400
401 _context = [
402 ('empty_ctxt', dict(ctxt={}, expected_ctxt={})),
403 ('user_project_ctxt',
404 dict(ctxt={'user': 'mark', 'project': 'snarkybunch'},
405 expected_ctxt={'_context_user': 'mark',
406 '_context_project': 'snarkybunch'})),
407 ]
408
409 @classmethod
410 def generate_scenarios(cls):
411 cls.scenarios = testscenarios.multiply_scenarios(cls._msg,
412 cls._context,
413 cls._target)
414
415 def setUp(self):
416 super(TestRequestWireFormat, self).setUp()
417 self.messaging_conf.transport_driver = 'rabbit'
418 self.messaging_conf.in_memory = True
419
420 self.uuids = []
421 self.orig_uuid4 = uuid.uuid4
422 self.useFixture(fixtures.MonkeyPatch('uuid.uuid4', self.mock_uuid4))
423
424 def mock_uuid4(self):
425 self.uuids.append(self.orig_uuid4())
426 return self.uuids[-1]
427
428 def test_request_wire_format(self):
429 if hasattr(self, 'skip_msg'):
430 self.skipTest(self.skip_msg)
431
432 transport = messaging.get_transport(self.conf)
433 self.addCleanup(transport.cleanup)
434
435 driver = transport._driver
436
437 target = messaging.Target(topic=self.topic,
438 server=self.server,
439 fanout=self.fanout)
440
441 connection, channel, queue = _declare_queue(target)
442 self.addCleanup(connection.release)
443
444 driver.send(target, self.ctxt, self.msg)
445
446 msgs = []
447
448 def callback(msg):
449 msg = channel.message_to_python(msg)
450 msg.ack()
451 msgs.append(msg.payload)
452
453 queue.consume(callback=callback,
454 consumer_tag='1',
455 nowait=False)
456
457 connection.drain_events()
458
459 self.assertEqual(1, len(msgs))
460 self.assertIn('oslo.message', msgs[0])
461
462 received = msgs[0]
463 received['oslo.message'] = jsonutils.loads(received['oslo.message'])
464
465 # FIXME(markmc): add _msg_id and _reply_q check
466 expected_msg = {
467 '_unique_id': self.uuids[0].hex,
468 }
469 expected_msg.update(self.expected)
470 expected_msg.update(self.expected_ctxt)
471
472 expected = {
473 'oslo.version': '2.0',
474 'oslo.message': expected_msg,
475 }
476
477 self.assertEqual(expected, received)
478
479
480TestRequestWireFormat.generate_scenarios()
481
482
483def _create_producer(target):
484 connection = kombu.connection.BrokerConnection(transport='memory')
485
486 # Kludge to speed up tests.
487 connection.transport.polling_interval = 0.0
488
489 connection.connect()
490 channel = connection.channel()
491
492 # work around 'memory' transport bug in 1.1.3
493 channel._new_queue('ae.undeliver')
494
495 if target.fanout:
496 exchange = kombu.entity.Exchange(name=target.topic + '_fanout',
497 type='fanout',
498 durable=False,
499 auto_delete=True)
500 producer = kombu.messaging.Producer(exchange=exchange,
501 channel=channel,
502 routing_key=target.topic)
503 elif target.server:
504 exchange = kombu.entity.Exchange(name='openstack',
505 type='topic',
506 durable=False,
507 auto_delete=False)
508 topic = '%s.%s' % (target.topic, target.server)
509 producer = kombu.messaging.Producer(exchange=exchange,
510 channel=channel,
511 routing_key=topic)
512 else:
513 exchange = kombu.entity.Exchange(name='openstack',
514 type='topic',
515 durable=False,
516 auto_delete=False)
517 producer = kombu.messaging.Producer(exchange=exchange,
518 channel=channel,
519 routing_key=target.topic)
520
521 return connection, producer
522
523
524class TestReplyWireFormat(test_utils.BaseTestCase):
525
526 _target = [
527 ('topic_target',
528 dict(topic='testtopic', server=None, fanout=False)),
529 ('server_target',
530 dict(topic='testtopic', server='testserver', fanout=False)),
531 # NOTE(markmc): https://github.com/celery/kombu/issues/195
532 ('fanout_target',
533 dict(topic='testtopic', server=None, fanout=True,
534 skip_msg='Requires kombu>2.5.12 to fix kombu issue #195')),
535 ]
536
537 _msg = [
538 ('empty_msg',
539 dict(msg={}, expected={})),
540 ('primitive_msg',
541 dict(msg={'foo': 'bar'}, expected={'foo': 'bar'})),
542 ('complex_msg',
543 dict(msg={'a': {'b': '1920-02-03T04:05:06.000007'}},
544 expected={'a': {'b': '1920-02-03T04:05:06.000007'}})),
545 ]
546
547 _context = [
548 ('empty_ctxt', dict(ctxt={}, expected_ctxt={})),
549 ('user_project_ctxt',
550 dict(ctxt={'_context_user': 'mark',
551 '_context_project': 'snarkybunch'},
552 expected_ctxt={'user': 'mark', 'project': 'snarkybunch'})),
553 ]
554
555 @classmethod
556 def generate_scenarios(cls):
557 cls.scenarios = testscenarios.multiply_scenarios(cls._msg,
558 cls._context,
559 cls._target)
560
561 def setUp(self):
562 super(TestReplyWireFormat, self).setUp()
563 self.messaging_conf.transport_driver = 'rabbit'
564 self.messaging_conf.in_memory = True
565
566 def test_reply_wire_format(self):
567 if hasattr(self, 'skip_msg'):
568 self.skipTest(self.skip_msg)
569
570 transport = messaging.get_transport(self.conf)
571 self.addCleanup(transport.cleanup)
572
573 driver = transport._driver
574
575 target = messaging.Target(topic=self.topic,
576 server=self.server,
577 fanout=self.fanout)
578
579 listener = driver.listen(target)
580
581 connection, producer = _create_producer(target)
582 self.addCleanup(connection.release)
583
584 msg = {
585 'oslo.version': '2.0',
586 'oslo.message': {}
587 }
588
589 msg['oslo.message'].update(self.msg)
590 msg['oslo.message'].update(self.ctxt)
591
592 msg['oslo.message'].update({
593 '_msg_id': uuid.uuid4().hex,
594 '_unique_id': uuid.uuid4().hex,
595 '_reply_q': 'reply_' + uuid.uuid4().hex,
596 })
597
598 msg['oslo.message'] = jsonutils.dumps(msg['oslo.message'])
599
600 producer.publish(msg)
601
602 received = listener.poll()
603 self.assertIsNotNone(received)
604 self.assertEqual(self.expected_ctxt, received.ctxt)
605 self.assertEqual(self.expected, received.message)
606
607
608TestReplyWireFormat.generate_scenarios()
609
610
611class RpcKombuHATestCase(test_utils.BaseTestCase):
612
613 def test_reconnect_order(self):
614 brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
615 brokers_count = len(brokers)
616
617 self.conf.rabbit_hosts = brokers
618 self.conf.rabbit_max_retries = 1
619
620 info = {'attempt': 0}
621
622 def _connect(myself, params):
623 # do as little work that is enough to pass connection attempt
624 myself.connection = kombu.connection.BrokerConnection(**params)
625 myself.connection_errors = myself.connection.connection_errors
626
627 expected_broker = brokers[info['attempt'] % brokers_count]
628 self.assertEqual(params['hostname'], expected_broker)
629
630 info['attempt'] += 1
631
632 # just make sure connection instantiation does not fail with an
633 # exception
634 self.stubs.Set(rabbit_driver.Connection, '_connect', _connect)
635
636 # starting from the first broker in the list
637 connection = rabbit_driver.Connection(self.conf)
638
639 # now that we have connection object, revert to the real 'connect'
640 # implementation
641 self.stubs.UnsetAll()
642
643 for i in range(len(brokers)):
644 self.assertRaises(driver_common.RPCException, connection.reconnect)
645
646 connection.close()
6470
=== removed file '.pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/tests/test_utils.py'
--- .pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/tests/test_utils.py 2015-04-23 15:56:08 +0000
+++ .pc/0003-Declare-DirectPublisher-exchanges-with-passive-True.patch/tests/test_utils.py 1970-01-01 00:00:00 +0000
@@ -1,64 +0,0 @@
1
2# Copyright 2013 Red Hat, Inc.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16from oslo.messaging._drivers import common
17from oslo.messaging import _utils as utils
18from tests import utils as test_utils
19
20
21class VersionIsCompatibleTestCase(test_utils.BaseTestCase):
22 def test_version_is_compatible_same(self):
23 self.assertTrue(utils.version_is_compatible('1.23', '1.23'))
24
25 def test_version_is_compatible_newer_minor(self):
26 self.assertTrue(utils.version_is_compatible('1.24', '1.23'))
27
28 def test_version_is_compatible_older_minor(self):
29 self.assertFalse(utils.version_is_compatible('1.22', '1.23'))
30
31 def test_version_is_compatible_major_difference1(self):
32 self.assertFalse(utils.version_is_compatible('2.23', '1.23'))
33
34 def test_version_is_compatible_major_difference2(self):
35 self.assertFalse(utils.version_is_compatible('1.23', '2.23'))
36
37 def test_version_is_compatible_newer_rev(self):
38 self.assertFalse(utils.version_is_compatible('1.23', '1.23.1'))
39
40 def test_version_is_compatible_newer_rev_both(self):
41 self.assertFalse(utils.version_is_compatible('1.23.1', '1.23.2'))
42
43 def test_version_is_compatible_older_rev_both(self):
44 self.assertTrue(utils.version_is_compatible('1.23.2', '1.23.1'))
45
46 def test_version_is_compatible_older_rev(self):
47 self.assertTrue(utils.version_is_compatible('1.24', '1.23.1'))
48
49 def test_version_is_compatible_no_rev_is_zero(self):
50 self.assertTrue(utils.version_is_compatible('1.23.0', '1.23'))
51
52
53class TimerTestCase(test_utils.BaseTestCase):
54 def test_duration_is_none(self):
55 t = common.DecayingTimer()
56 t.start()
57 remaining = t.check_return(None)
58 self.assertEqual(None, remaining)
59
60 def test_duration_is_none_and_maximun_set(self):
61 t = common.DecayingTimer()
62 t.start()
63 remaining = t.check_return(None, maximum=2)
64 self.assertEqual(2, remaining)
650
=== renamed file '.pc/applied-patches' => '.pc/applied-patches.THIS'
=== removed directory '.pc/redeclare-consumers-when-ack-requeue-fails.patch'
=== removed directory '.pc/redeclare-consumers-when-ack-requeue-fails.patch/oslo'
=== removed directory '.pc/redeclare-consumers-when-ack-requeue-fails.patch/oslo/messaging'
=== removed directory '.pc/redeclare-consumers-when-ack-requeue-fails.patch/oslo/messaging/_drivers'
=== removed file '.pc/redeclare-consumers-when-ack-requeue-fails.patch/oslo/messaging/_drivers/impl_rabbit.py'
--- .pc/redeclare-consumers-when-ack-requeue-fails.patch/oslo/messaging/_drivers/impl_rabbit.py 2015-06-25 09:59:42 +0000
+++ .pc/redeclare-consumers-when-ack-requeue-fails.patch/oslo/messaging/_drivers/impl_rabbit.py 1970-01-01 00:00:00 +0000
@@ -1,819 +0,0 @@
1# Copyright 2011 OpenStack Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15import functools
16import itertools
17import logging
18import socket
19import ssl
20import time
21import uuid
22
23import kombu
24import kombu.connection
25import kombu.entity
26import kombu.messaging
27from oslo.config import cfg
28import six
29
30from oslo.messaging._drivers import amqp as rpc_amqp
31from oslo.messaging._drivers import amqpdriver
32from oslo.messaging._drivers import common as rpc_common
33from oslo.messaging.openstack.common import network_utils
34
35# FIXME(markmc): remove this
36_ = lambda s: s
37
38rabbit_opts = [
39 cfg.StrOpt('kombu_ssl_version',
40 default='',
41 help='SSL version to use (valid only if SSL enabled). '
42 'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
43 'be available on some distributions.'
44 ),
45 cfg.StrOpt('kombu_ssl_keyfile',
46 default='',
47 help='SSL key file (valid only if SSL enabled).'),
48 cfg.StrOpt('kombu_ssl_certfile',
49 default='',
50 help='SSL cert file (valid only if SSL enabled).'),
51 cfg.StrOpt('kombu_ssl_ca_certs',
52 default='',
53 help=('SSL certification authority file '
54 '(valid only if SSL enabled).')),
55 cfg.FloatOpt('kombu_reconnect_delay',
56 default=1.0,
57 help='How long to wait before reconnecting in response to an '
58 'AMQP consumer cancel notification.'),
59 cfg.StrOpt('rabbit_host',
60 default='localhost',
61 help='The RabbitMQ broker address where a single node is '
62 'used.'),
63 cfg.IntOpt('rabbit_port',
64 default=5672,
65 help='The RabbitMQ broker port where a single node is used.'),
66 cfg.ListOpt('rabbit_hosts',
67 default=['$rabbit_host:$rabbit_port'],
68 help='RabbitMQ HA cluster host:port pairs.'),
69 cfg.BoolOpt('rabbit_use_ssl',
70 default=False,
71 help='Connect over SSL for RabbitMQ.'),
72 cfg.StrOpt('rabbit_userid',
73 default='guest',
74 help='The RabbitMQ userid.'),
75 cfg.StrOpt('rabbit_password',
76 default='guest',
77 help='The RabbitMQ password.',
78 secret=True),
79 cfg.StrOpt('rabbit_login_method',
80 default='AMQPLAIN',
81 help='the RabbitMQ login method'),
82 cfg.StrOpt('rabbit_virtual_host',
83 default='/',
84 help='The RabbitMQ virtual host.'),
85 cfg.IntOpt('rabbit_retry_interval',
86 default=1,
87 help='How frequently to retry connecting with RabbitMQ.'),
88 cfg.IntOpt('rabbit_retry_backoff',
89 default=2,
90 help='How long to backoff for between retries when connecting '
91 'to RabbitMQ.'),
92 cfg.IntOpt('rabbit_max_retries',
93 default=0,
94 help='Maximum number of RabbitMQ connection retries. '
95 'Default is 0 (infinite retry count).'),
96 cfg.BoolOpt('rabbit_ha_queues',
97 default=False,
98 help='Use HA queues in RabbitMQ (x-ha-policy: all). '
99 'If you change this option, you must wipe the '
100 'RabbitMQ database.'),
101
102 # FIXME(markmc): this was toplevel in openstack.common.rpc
103 cfg.BoolOpt('fake_rabbit',
104 default=False,
105 help='If passed, use a fake RabbitMQ provider.'),
106]
107
108LOG = logging.getLogger(__name__)
109
110
111def _get_queue_arguments(conf):
112 """Construct the arguments for declaring a queue.
113
114 If the rabbit_ha_queues option is set, we declare a mirrored queue
115 as described here:
116
117 http://www.rabbitmq.com/ha.html
118
119 Setting x-ha-policy to all means that the queue will be mirrored
120 to all nodes in the cluster.
121 """
122 return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
123
124
125class RabbitMessage(dict):
126 def __init__(self, raw_message):
127 super(RabbitMessage, self).__init__(
128 rpc_common.deserialize_msg(raw_message.payload))
129 self._raw_message = raw_message
130
131 def acknowledge(self):
132 self._raw_message.ack()
133
134 def requeue(self):
135 self._raw_message.requeue()
136
137
138class ConsumerBase(object):
139 """Consumer base class."""
140
141 def __init__(self, channel, callback, tag, **kwargs):
142 """Declare a queue on an amqp channel.
143
144 'channel' is the amqp channel to use
145 'callback' is the callback to call when messages are received
146 'tag' is a unique ID for the consumer on the channel
147
148 queue name, exchange name, and other kombu options are
149 passed in here as a dictionary.
150 """
151 self.callback = callback
152 self.tag = str(tag)
153 self.kwargs = kwargs
154 self.queue = None
155 self.reconnect(channel)
156
157 def reconnect(self, channel):
158 """Re-declare the queue after a rabbit reconnect."""
159 self.channel = channel
160 self.kwargs['channel'] = channel
161 self.queue = kombu.entity.Queue(**self.kwargs)
162 self.queue.declare()
163
164 def _callback_handler(self, message, callback):
165 """Call callback with deserialized message.
166
167 Messages that are processed and ack'ed.
168 """
169
170 try:
171 callback(RabbitMessage(message))
172 except Exception:
173 LOG.exception(_("Failed to process message"
174 " ... skipping it."))
175 message.ack()
176
177 def consume(self, *args, **kwargs):
178 """Actually declare the consumer on the amqp channel. This will
179 start the flow of messages from the queue. Using the
180 Connection.iterconsume() iterator will process the messages,
181 calling the appropriate callback.
182
183 If a callback is specified in kwargs, use that. Otherwise,
184 use the callback passed during __init__()
185
186 If kwargs['nowait'] is True, then this call will block until
187 a message is read.
188
189 """
190
191 options = {'consumer_tag': self.tag}
192 options['nowait'] = kwargs.get('nowait', False)
193 callback = kwargs.get('callback', self.callback)
194 if not callback:
195 raise ValueError("No callback defined")
196
197 def _callback(raw_message):
198 message = self.channel.message_to_python(raw_message)
199 self._callback_handler(message, callback)
200
201 self.queue.consume(*args, callback=_callback, **options)
202
203 def cancel(self):
204 """Cancel the consuming from the queue, if it has started."""
205 try:
206 self.queue.cancel(self.tag)
207 except KeyError as e:
208 # NOTE(comstud): Kludge to get around a amqplib bug
209 if str(e) != "u'%s'" % self.tag:
210 raise
211 self.queue = None
212
213
214class DirectConsumer(ConsumerBase):
215 """Queue/consumer class for 'direct'."""
216
217 def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
218 """Init a 'direct' queue.
219
220 'channel' is the amqp channel to use
221 'msg_id' is the msg_id to listen on
222 'callback' is the callback to call when messages are received
223 'tag' is a unique ID for the consumer on the channel
224
225 Other kombu options may be passed
226 """
227 # Default options
228 options = {'durable': False,
229 'queue_arguments': _get_queue_arguments(conf),
230 'auto_delete': True,
231 'exclusive': False}
232 options.update(kwargs)
233 exchange = kombu.entity.Exchange(name=msg_id,
234 type='direct',
235 durable=options['durable'],
236 auto_delete=options['auto_delete'])
237 super(DirectConsumer, self).__init__(channel,
238 callback,
239 tag,
240 name=msg_id,
241 exchange=exchange,
242 routing_key=msg_id,
243 **options)
244
245
246class TopicConsumer(ConsumerBase):
247 """Consumer class for 'topic'."""
248
249 def __init__(self, conf, channel, topic, callback, tag, name=None,
250 exchange_name=None, **kwargs):
251 """Init a 'topic' queue.
252
253 :param channel: the amqp channel to use
254 :param topic: the topic to listen on
255 :paramtype topic: str
256 :param callback: the callback to call when messages are received
257 :param tag: a unique ID for the consumer on the channel
258 :param name: optional queue name, defaults to topic
259 :paramtype name: str
260
261 Other kombu options may be passed as keyword arguments
262 """
263 # Default options
264 options = {'durable': conf.amqp_durable_queues,
265 'queue_arguments': _get_queue_arguments(conf),
266 'auto_delete': conf.amqp_auto_delete,
267 'exclusive': False}
268 options.update(kwargs)
269 exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
270 exchange = kombu.entity.Exchange(name=exchange_name,
271 type='topic',
272 durable=options['durable'],
273 auto_delete=options['auto_delete'])
274 super(TopicConsumer, self).__init__(channel,
275 callback,
276 tag,
277 name=name or topic,
278 exchange=exchange,
279 routing_key=topic,
280 **options)
281
282
283class FanoutConsumer(ConsumerBase):
284 """Consumer class for 'fanout'."""
285
286 def __init__(self, conf, channel, topic, callback, tag, **kwargs):
287 """Init a 'fanout' queue.
288
289 'channel' is the amqp channel to use
290 'topic' is the topic to listen on
291 'callback' is the callback to call when messages are received
292 'tag' is a unique ID for the consumer on the channel
293
294 Other kombu options may be passed
295 """
296 unique = uuid.uuid4().hex
297 exchange_name = '%s_fanout' % topic
298 queue_name = '%s_fanout_%s' % (topic, unique)
299
300 # Default options
301 options = {'durable': False,
302 'queue_arguments': _get_queue_arguments(conf),
303 'auto_delete': True,
304 'exclusive': False}
305 options.update(kwargs)
306 exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
307 durable=options['durable'],
308 auto_delete=options['auto_delete'])
309 super(FanoutConsumer, self).__init__(channel, callback, tag,
310 name=queue_name,
311 exchange=exchange,
312 routing_key=topic,
313 **options)
314
315
316class Publisher(object):
317 """Base Publisher class."""
318
319 def __init__(self, channel, exchange_name, routing_key, **kwargs):
320 """Init the Publisher class with the exchange_name, routing_key,
321 and other options
322 """
323 self.exchange_name = exchange_name
324 self.routing_key = routing_key
325 self.kwargs = kwargs
326 self.reconnect(channel)
327
328 def reconnect(self, channel):
329 """Re-establish the Producer after a rabbit reconnection."""
330 self.exchange = kombu.entity.Exchange(name=self.exchange_name,
331 **self.kwargs)
332 self.producer = kombu.messaging.Producer(exchange=self.exchange,
333 channel=channel,
334 routing_key=self.routing_key)
335
336 def send(self, msg, timeout=None):
337 """Send a message."""
338 if timeout:
339 #
340 # AMQP TTL is in milliseconds when set in the header.
341 #
342 self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
343 else:
344 self.producer.publish(msg)
345
346
347class DirectPublisher(Publisher):
348 """Publisher class for 'direct'."""
349 def __init__(self, conf, channel, msg_id, **kwargs):
350 """Init a 'direct' publisher.
351
352 Kombu options may be passed as keyword args to override defaults
353 """
354
355 options = {'durable': False,
356 'auto_delete': True,
357 'exclusive': False,
358 'passive': True}
359 options.update(kwargs)
360 super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
361 type='direct', **options)
362
363
364class TopicPublisher(Publisher):
365 """Publisher class for 'topic'."""
366 def __init__(self, conf, channel, topic, **kwargs):
367 """Init a 'topic' publisher.
368
369 Kombu options may be passed as keyword args to override defaults
370 """
371 options = {'durable': conf.amqp_durable_queues,
372 'auto_delete': conf.amqp_auto_delete,
373 'exclusive': False}
374
375 options.update(kwargs)
376 exchange_name = rpc_amqp.get_control_exchange(conf)
377 super(TopicPublisher, self).__init__(channel,
378 exchange_name,
379 topic,
380 type='topic',
381 **options)
382
383
384class FanoutPublisher(Publisher):
385 """Publisher class for 'fanout'."""
386 def __init__(self, conf, channel, topic, **kwargs):
387 """Init a 'fanout' publisher.
388
389 Kombu options may be passed as keyword args to override defaults
390 """
391 options = {'durable': False,
392 'auto_delete': True,
393 'exclusive': False}
394 options.update(kwargs)
395 super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
396 None, type='fanout', **options)
397
398
399class NotifyPublisher(TopicPublisher):
400 """Publisher class for 'notify'."""
401
402 def __init__(self, conf, channel, topic, **kwargs):
403 self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
404 self.queue_arguments = _get_queue_arguments(conf)
405 super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
406
407 def reconnect(self, channel):
408 super(NotifyPublisher, self).reconnect(channel)
409
410 # NOTE(jerdfelt): Normally the consumer would create the queue, but
411 # we do this to ensure that messages don't get dropped if the
412 # consumer is started after we do
413 queue = kombu.entity.Queue(channel=channel,
414 exchange=self.exchange,
415 durable=self.durable,
416 name=self.routing_key,
417 routing_key=self.routing_key,
418 queue_arguments=self.queue_arguments)
419 queue.declare()
420
421
422class Connection(object):
423 """Connection object."""
424
425 pool = None
426
427 def __init__(self, conf, server_params=None):
428 self.consumers = []
429 self.conf = conf
430 self.max_retries = self.conf.rabbit_max_retries
431 # Try forever?
432 if self.max_retries <= 0:
433 self.max_retries = None
434 self.interval_start = self.conf.rabbit_retry_interval
435 self.interval_stepping = self.conf.rabbit_retry_backoff
436 # max retry-interval = 30 seconds
437 self.interval_max = 30
438 self.memory_transport = False
439
440 if server_params is None:
441 server_params = {}
442 # Keys to translate from server_params to kombu params
443 server_params_to_kombu_params = {'username': 'userid'}
444
445 ssl_params = self._fetch_ssl_params()
446 params_list = []
447 for adr in self.conf.rabbit_hosts:
448 hostname, port = network_utils.parse_host_port(
449 adr, default_port=self.conf.rabbit_port)
450
451 params = {
452 'hostname': hostname,
453 'port': port,
454 'userid': self.conf.rabbit_userid,
455 'password': self.conf.rabbit_password,
456 'login_method': self.conf.rabbit_login_method,
457 'virtual_host': self.conf.rabbit_virtual_host,
458 }
459
460 for sp_key, value in six.iteritems(server_params):
461 p_key = server_params_to_kombu_params.get(sp_key, sp_key)
462 params[p_key] = value
463
464 if self.conf.fake_rabbit:
465 params['transport'] = 'memory'
466 if self.conf.rabbit_use_ssl:
467 params['ssl'] = ssl_params
468
469 params_list.append(params)
470
471 self.params_list = itertools.cycle(params_list)
472
473 self.memory_transport = self.conf.fake_rabbit
474
475 self.connection = None
476 self.do_consume = None
477 self.reconnect()
478
479 # FIXME(markmc): use oslo sslutils when it is available as a library
480 _SSL_PROTOCOLS = {
481 "tlsv1": ssl.PROTOCOL_TLSv1,
482 "sslv23": ssl.PROTOCOL_SSLv23,
483 "sslv3": ssl.PROTOCOL_SSLv3
484 }
485
486 try:
487 _SSL_PROTOCOLS["sslv2"] = ssl.PROTOCOL_SSLv2
488 except AttributeError:
489 pass
490
491 @classmethod
492 def validate_ssl_version(cls, version):
493 key = version.lower()
494 try:
495 return cls._SSL_PROTOCOLS[key]
496 except KeyError:
497 raise RuntimeError(_("Invalid SSL version : %s") % version)
498
499 def _fetch_ssl_params(self):
500 """Handles fetching what ssl params should be used for the connection
501 (if any).
502 """
503 ssl_params = dict()
504
505 # http://docs.python.org/library/ssl.html - ssl.wrap_socket
506 if self.conf.kombu_ssl_version:
507 ssl_params['ssl_version'] = self.validate_ssl_version(
508 self.conf.kombu_ssl_version)
509 if self.conf.kombu_ssl_keyfile:
510 ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
511 if self.conf.kombu_ssl_certfile:
512 ssl_params['certfile'] = self.conf.kombu_ssl_certfile
513 if self.conf.kombu_ssl_ca_certs:
514 ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs
515 # We might want to allow variations in the
516 # future with this?
517 ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
518
519 # Return the extended behavior or just have the default behavior
520 return ssl_params or True
521
522 def _connect(self, params):
523 """Connect to rabbit. Re-establish any queues that may have
524 been declared before if we are reconnecting. Exceptions should
525 be handled by the caller.
526 """
527 if self.connection:
528 LOG.info(_("Reconnecting to AMQP server on "
529 "%(hostname)s:%(port)d") % params)
530 try:
531 # XXX(nic): when reconnecting to a RabbitMQ cluster
532 # with mirrored queues in use, the attempt to release the
533 # connection can hang "indefinitely" somewhere deep down
534 # in Kombu. Blocking the thread for a bit prior to
535 # release seems to kludge around the problem where it is
536 # otherwise reproduceable.
537 if self.conf.kombu_reconnect_delay > 0:
538 LOG.info(_("Delaying reconnect for %1.1f seconds...") %
539 self.conf.kombu_reconnect_delay)
540 time.sleep(self.conf.kombu_reconnect_delay)
541
542 self.connection.release()
543 except self.connection_errors:
544 pass
545 # Setting this in case the next statement fails, though
546 # it shouldn't be doing any network operations, yet.
547 self.connection = None
548 self.connection = kombu.connection.BrokerConnection(**params)
549 self.connection_errors = self.connection.connection_errors
550 self.channel_errors = self.connection.channel_errors
551 if self.memory_transport:
552 # Kludge to speed up tests.
553 self.connection.transport.polling_interval = 0.0
554 self.do_consume = True
555 self.consumer_num = itertools.count(1)
556 self.connection.connect()
The diff has been truncated for viewing.

Subscribers

People subscribed via source and target branches

to all changes: