Merge lp:~cjwatson/python-oops-amqp/pytest-full into lp:python-oops-amqp
- pytest-full
- Merge into trunk
Proposed by
Colin Watson
Status: | Merged |
---|---|
Merged at revision: | 26 |
Proposed branch: | lp:~cjwatson/python-oops-amqp/pytest-full |
Merge into: | lp:python-oops-amqp |
Diff against target: |
903 lines (+405/-416) 8 files modified
NEWS (+1/-0) README (+5/-9) oops_amqp/tests/__init__.py (+0/-135) setup.cfg (+1/-2) tests/conftest.py (+104/-0) tests/test_publisher.py (+122/-115) tests/test_receiver.py (+172/-154) tox.ini (+0/-1) |
To merge this branch: | bzr merge lp:~cjwatson/python-oops-amqp/pytest-full |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Guruprasad | Approve | ||
Andrey Fedoseev (community) | Approve | ||
Review via email: mp+430882@code.launchpad.net |
Commit message
Port test suite to pytest.
Description of the change
This turns out to be about three times faster on my system, probably mainly due to using a session-scoped server fixture.
To post a comment you must log in.
Revision history for this message
Guruprasad (lgp171188) : | # |
review:
Approve
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'NEWS' |
2 | --- NEWS 2022-09-30 22:53:58 +0000 |
3 | +++ NEWS 2022-10-03 10:15:17 +0000 |
4 | @@ -8,6 +8,7 @@ |
5 | |
6 | - Switch from buildout to tox. (Colin Watson) |
7 | - Switch to declarative setup.cfg. (Colin Watson) |
8 | +- Port test suite to pytest. (Colin Watson) |
9 | |
10 | 0.1.0 |
11 | ----- |
12 | |
13 | === modified file 'README' |
14 | --- README 2022-09-30 22:41:01 +0000 |
15 | +++ README 2022-10-03 10:15:17 +0000 |
16 | @@ -17,7 +17,7 @@ |
17 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
18 | GNU Lesser General Public License version 3 (see the file LICENSE). |
19 | |
20 | -The oops_amqp package provides an AMQP OOPS http://pypi.python.org/pypi/oops) |
21 | +The oops_amqp package provides an AMQP OOPS (https://pypi.org/project/oops) |
22 | publisher, and a small daemon that listens on amqp for OOPS reports and |
23 | republishes them (into a supplied publisher). The OOPS framework permits |
24 | falling back to additional publishers if AMQP is down. |
25 | @@ -29,20 +29,16 @@ |
26 | |
27 | * bson |
28 | |
29 | -* oops (http://pypi.python.org/pypi/oops) 0.0.11 or newer. |
30 | +* oops (https://pypi.org/project/oops) 0.0.11 or newer. |
31 | |
32 | * amqp |
33 | |
34 | Testing Dependencies |
35 | ==================== |
36 | |
37 | -* oops-datedir-repo (http://pypi.python.org/pypi/oops_datedir_repo) |
38 | - |
39 | -* rabbitfixture (http://pypi.python.org/pypi/rabbitfixture) |
40 | - |
41 | -* testresources (http://pypi.python.org/pypi/testresources) |
42 | - |
43 | -* testtools (http://pypi.python.org/pypi/testtools) |
44 | +* pytest (https://pypi.org/project/pytest) |
45 | + |
46 | +* rabbitfixture (https://pypi.org/project/rabbitfixture) |
47 | |
48 | Usage |
49 | ===== |
50 | |
51 | === removed file 'oops_amqp/tests/__init__.py' |
52 | --- oops_amqp/tests/__init__.py 2018-03-12 11:48:55 +0000 |
53 | +++ oops_amqp/tests/__init__.py 1970-01-01 00:00:00 +0000 |
54 | @@ -1,135 +0,0 @@ |
55 | -# Copyright (c) 2011, Canonical Ltd |
56 | -# |
57 | -# This program is free software: you can redistribute it and/or modify |
58 | -# it under the terms of the GNU Lesser General Public License as published by |
59 | -# the Free Software Foundation, version 3 only. |
60 | -# |
61 | -# This program is distributed in the hope that it will be useful, |
62 | -# but WITHOUT ANY WARRANTY; without even the implied warranty of |
63 | -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
64 | -# GNU Lesser General Public License for more details. |
65 | -# |
66 | -# You should have received a copy of the GNU Lesser General Public License |
67 | -# along with this program. If not, see <http://www.gnu.org/licenses/>. |
68 | -# GNU Lesser General Public License version 3 (see the file LICENSE). |
69 | - |
70 | -"""Tests for oops_amqp.""" |
71 | - |
72 | -from __future__ import absolute_import, print_function |
73 | - |
74 | -from unittest import TestLoader |
75 | - |
76 | -import amqp |
77 | -from fixtures import Fixture |
78 | -from rabbitfixture.server import RabbitServer |
79 | -import testtools |
80 | -from testresources import ( |
81 | - _get_result, |
82 | - FixtureResource, |
83 | - OptimisingTestSuite, |
84 | - setUpResources, |
85 | - tearDownResources, |
86 | - ) |
87 | - |
88 | -from oops_amqp.utils import close_ignoring_connection_errors |
89 | - |
90 | -__all__ = [ |
91 | - 'ChannelFixture', |
92 | - 'QueueFixture', |
93 | - 'test_suite', |
94 | - 'TestCase', |
95 | - ] |
96 | - |
97 | - |
98 | -class QueueFixture(Fixture): |
99 | - """Create an exchange with a subscribed queue on an AMQP instance. |
100 | - |
101 | - The exchange and queue are made durable (to permit testing with amqp server |
102 | - restarts) and non-auto-delete (to permit dropping the test connection and |
103 | - restablishing it without losing the config (e.g. server restarts). |
104 | - |
105 | - If the server is restarted, the channel may need to be updated before |
106 | - teardown, or the teardown will not be able to delete the exchange and queue. |
107 | - |
108 | - Possibly wants to live in rabbitfixture, or a amqpfixture. |
109 | - """ |
110 | - |
111 | - def __init__(self, channel, unique_string_factory): |
112 | - """Create a QueueFixture. |
113 | - |
114 | - :param channel: An amqplib channel to request the exchange and queue |
115 | - over. |
116 | - :param unique_string_factory: A helper that will return a (process |
117 | - lifetime scope) unique string. |
118 | - """ |
119 | - self.channel = channel |
120 | - self.unique_string_factory = unique_string_factory |
121 | - |
122 | - def setUp(self): |
123 | - super(QueueFixture, self).setUp() |
124 | - self.exchange_name = self.unique_string_factory() |
125 | - self.channel.exchange_declare( |
126 | - exchange=self.exchange_name, type="fanout", durable=True, |
127 | - auto_delete=False) |
128 | - self.addCleanup(self.delete_exchange) |
129 | - self.queue_name, _, _ = self.channel.queue_declare( |
130 | - durable=True, auto_delete=False) |
131 | - self.addCleanup(self.delete_queue) |
132 | - self.channel.queue_bind(self.queue_name, self.exchange_name) |
133 | - |
134 | - def delete_queue(self): |
135 | - self.channel.queue_delete(self.queue_name) |
136 | - |
137 | - def delete_exchange(self): |
138 | - self.channel.exchange_delete(self.exchange_name) |
139 | - |
140 | - |
141 | -class ChannelFixture(Fixture): |
142 | - """Create an AMQP connection and channel for tests. |
143 | - |
144 | - :ivar connection: an amqplib connection. |
145 | - :ivar channel: an amqplib channel |
146 | - """ |
147 | - |
148 | - def __init__(self, connection_factory): |
149 | - super(ChannelFixture, self).__init__() |
150 | - self.connection_factory = connection_factory |
151 | - |
152 | - def setUp(self): |
153 | - super(ChannelFixture, self).setUp() |
154 | - self.connection = self.connection_factory() |
155 | - self.connection.connect() |
156 | - self.addCleanup(close_ignoring_connection_errors, self.connection) |
157 | - self.channel = self.connection.channel() |
158 | - self.addCleanup(close_ignoring_connection_errors, self.channel) |
159 | - |
160 | - |
161 | -class TestCase(testtools.TestCase): |
162 | - """Subclass to start a RabbitMQ server.""" |
163 | - |
164 | - resources = [('rabbit', FixtureResource(RabbitServer()))] |
165 | - |
166 | - def setUp(self): |
167 | - super(TestCase, self).setUp() |
168 | - # ResourcedTestCase handles teardown in the wrong order for us (we |
169 | - # need to ensure that the RabbitServer fixture is only cleaned up |
170 | - # after any other fixtures registered by individual tests), so we |
171 | - # imitate it manually. |
172 | - result = _get_result() |
173 | - setUpResources(self, self.resources, result) |
174 | - self.addCleanup(tearDownResources, self, self.resources, result) |
175 | - |
176 | - def connection_factory(self): |
177 | - """When called, return an amqplib connection.""" |
178 | - return amqp.Connection(host="%s:%s" % (self.rabbit.config.hostname, |
179 | - self.rabbit.config.port), userid="guest", password="guest", |
180 | - virtual_host="/") |
181 | - |
182 | - |
183 | -def test_suite(): |
184 | - test_mod_names = [ |
185 | - 'publisher', |
186 | - 'receiver', |
187 | - ] |
188 | - return OptimisingTestSuite(TestLoader().loadTestsFromNames( |
189 | - ['oops_amqp.tests.test_' + name for name in test_mod_names])) |
190 | |
191 | === modified file 'setup.cfg' |
192 | --- setup.cfg 2022-09-30 22:53:58 +0000 |
193 | +++ setup.cfg 2022-10-03 10:15:17 +0000 |
194 | @@ -36,7 +36,6 @@ |
195 | |
196 | [options.extras_require] |
197 | test = |
198 | + pytest |
199 | rabbitfixture |
200 | six |
201 | - testresources |
202 | - testtools |
203 | |
204 | === renamed directory 'oops_amqp/tests' => 'tests' |
205 | === added file 'tests/conftest.py' |
206 | --- tests/conftest.py 1970-01-01 00:00:00 +0000 |
207 | +++ tests/conftest.py 2022-10-03 10:15:17 +0000 |
208 | @@ -0,0 +1,104 @@ |
209 | +# Copyright (C) 2011-2022, Canonical Ltd. |
210 | +# |
211 | +# This program is free software: you can redistribute it and/or modify |
212 | +# it under the terms of the GNU Lesser General Public License as published by |
213 | +# the Free Software Foundation, version 3 only. |
214 | +# |
215 | +# This program is distributed in the hope that it will be useful, |
216 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
217 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
218 | +# GNU Lesser General Public License for more details. |
219 | +# |
220 | +# You should have received a copy of the GNU Lesser General Public License |
221 | +# along with this program. If not, see <http://www.gnu.org/licenses/>. |
222 | +# GNU Lesser General Public License version 3 (see the file LICENSE). |
223 | + |
224 | +import itertools |
225 | +from functools import partial |
226 | + |
227 | +import amqp |
228 | +import pytest |
229 | +from rabbitfixture.server import RabbitServer |
230 | + |
231 | +from oops_amqp.utils import close_ignoring_connection_errors |
232 | + |
233 | + |
234 | +_unique_id_gen = itertools.count(1) |
235 | + |
236 | + |
237 | +@pytest.fixture |
238 | +def get_unique_integer(): |
239 | + def get(): |
240 | + return next(_unique_id_gen) |
241 | + |
242 | + return get |
243 | + |
244 | + |
245 | +@pytest.fixture |
246 | +def get_unique_string(get_unique_integer): |
247 | + def get(prefix): |
248 | + return "%s-%d" % (prefix, get_unique_integer()) |
249 | + |
250 | + return get |
251 | + |
252 | + |
253 | +@pytest.fixture(scope="session") |
254 | +def rabbit(): |
255 | + rabbit = RabbitServer() |
256 | + rabbit.setUp() |
257 | + try: |
258 | + yield rabbit |
259 | + finally: |
260 | + rabbit.cleanUp() |
261 | + |
262 | + |
263 | +@pytest.fixture |
264 | +def connection_factory(rabbit): |
265 | + return partial( |
266 | + amqp.Connection, |
267 | + host="%s:%s" % (rabbit.config.hostname, rabbit.config.port), |
268 | + userid="guest", |
269 | + password="guest", |
270 | + virtual_host="/", |
271 | + ) |
272 | + |
273 | + |
274 | +@pytest.fixture |
275 | +def connection(connection_factory): |
276 | + connection = connection_factory() |
277 | + connection.connect() |
278 | + try: |
279 | + yield connection |
280 | + finally: |
281 | + close_ignoring_connection_errors(connection) |
282 | + |
283 | + |
284 | +@pytest.fixture |
285 | +def channel(connection): |
286 | + channel = connection.channel() |
287 | + try: |
288 | + yield channel |
289 | + finally: |
290 | + close_ignoring_connection_errors(channel) |
291 | + |
292 | + |
293 | +@pytest.fixture |
294 | +def exchange_name(channel, get_unique_string): |
295 | + exchange_name = get_unique_string("exchange") |
296 | + channel.exchange_declare( |
297 | + exchange=exchange_name, type="fanout", durable=True, auto_delete=False |
298 | + ) |
299 | + try: |
300 | + yield exchange_name |
301 | + finally: |
302 | + channel.exchange_delete(exchange_name) |
303 | + |
304 | + |
305 | +@pytest.fixture |
306 | +def queue_name(channel, exchange_name): |
307 | + queue_name, _, _ = channel.queue_declare(durable=True, auto_delete=False) |
308 | + try: |
309 | + channel.queue_bind(queue_name, exchange_name) |
310 | + yield queue_name |
311 | + finally: |
312 | + channel.queue_delete(queue_name) |
313 | |
314 | === modified file 'tests/test_publisher.py' |
315 | --- oops_amqp/tests/test_publisher.py 2018-03-12 11:48:55 +0000 |
316 | +++ tests/test_publisher.py 2022-10-03 10:15:17 +0000 |
317 | @@ -22,118 +22,125 @@ |
318 | from oops_amqp import ( |
319 | anybson as bson, |
320 | Publisher, |
321 | - ) |
322 | -from oops_amqp.tests import ( |
323 | - ChannelFixture, |
324 | - QueueFixture, |
325 | - TestCase, |
326 | - ) |
327 | - |
328 | - |
329 | -class TestPublisher(TestCase): |
330 | - |
331 | - def test_publish_inherit_id(self): |
332 | - # OOPS id's can be set outside of Publisher(). |
333 | - channel = self.useFixture( |
334 | - ChannelFixture(self.connection_factory)).channel |
335 | - queue = self.useFixture(QueueFixture(channel, self.getUniqueString)) |
336 | - publisher = Publisher(self.connection_factory, queue.exchange_name, "", |
337 | - inherit_id=True) |
338 | - reference_oops = {'id': 'kept', 'akey': 'avalue'} |
339 | - oops = dict(reference_oops) |
340 | - expected_id = 'kept' |
341 | - oops_ids = publisher(oops) |
342 | - # Publication returns the oops ID allocated. |
343 | - self.assertEqual([expected_id], oops_ids) |
344 | - # The oops should not be altered by publication. |
345 | - self.assertEqual(reference_oops, oops) |
346 | - # The received OOPS should have the ID embedded and be a bson dict. |
347 | - def check_oops(msg): |
348 | - body = msg.body |
349 | - if not isinstance(body, bytes): |
350 | - body = body.encode(msg.content_encoding or 'UTF-8') |
351 | - self.assertEqual(reference_oops, bson.loads(body)) |
352 | - channel.basic_ack(msg.delivery_tag) |
353 | - channel.basic_cancel(queue.queue_name) |
354 | - channel.basic_consume( |
355 | - queue.queue_name, callback=check_oops, |
356 | - consumer_tag=queue.queue_name) |
357 | - channel.connection.drain_events() |
358 | - |
359 | - def test_publish(self): |
360 | - # Publishing an oops sends it to the exchange, making a connection as |
361 | - # it goes. |
362 | - channel = self.useFixture( |
363 | - ChannelFixture(self.connection_factory)).channel |
364 | - queue = self.useFixture(QueueFixture(channel, self.getUniqueString)) |
365 | - publisher = Publisher(self.connection_factory, queue.exchange_name, "") |
366 | - reference_oops = {'akey': 'avalue'} |
367 | - oops = dict(reference_oops) |
368 | - id_bson = md5(bson.dumps(oops)).hexdigest() |
369 | - expected_id = "OOPS-%s" % id_bson |
370 | - oops_ids = publisher(oops) |
371 | - # Publication returns the oops ID allocated. |
372 | - self.assertEqual([expected_id], oops_ids) |
373 | - # The oops should not be altered by publication. |
374 | - self.assertEqual(reference_oops, oops) |
375 | - # The received OOPS should have the ID embedded and be a bson dict. |
376 | - expected_oops = dict(reference_oops) |
377 | - expected_oops['id'] = oops_ids[0] |
378 | - def check_oops(msg): |
379 | - body = msg.body |
380 | - if not isinstance(body, bytes): |
381 | - body = body.encode(msg.content_encoding or 'UTF-8') |
382 | - self.assertEqual(expected_oops, bson.loads(body)) |
383 | - channel.basic_ack(msg.delivery_tag) |
384 | - channel.basic_cancel(queue.queue_name) |
385 | - channel.basic_consume( |
386 | - queue.queue_name, callback=check_oops, |
387 | - consumer_tag=queue.queue_name) |
388 | - channel.connection.drain_events() |
389 | - |
390 | - def test_publish_amqp_already_down(self): |
391 | - # If amqp is down when a connection is attempted, None is returned to |
392 | - # indicate that publication failed - and publishing after it comes back |
393 | - # works. |
394 | - channel = self.useFixture( |
395 | - ChannelFixture(self.connection_factory)).channel |
396 | - queue = self.useFixture(QueueFixture(channel, self.getUniqueString)) |
397 | - # The private method use and the restart of rabbit before it gets torn |
398 | - # down are bugs in rabbitfixture that will be fixed in a future |
399 | - # release. |
400 | - self.rabbit.runner._stop() |
401 | - try: |
402 | - publisher = Publisher( |
403 | - self.connection_factory, queue.exchange_name, "") |
404 | - oops = {'akey': 42} |
405 | - self.assertEqual([], publisher(oops)) |
406 | - finally: |
407 | - self.rabbit.runner._start() |
408 | - connection = self.connection_factory() |
409 | - connection.connect() |
410 | - queue.channel = connection.channel() |
411 | - self.assertNotEqual([], publisher(oops)) |
412 | - |
413 | - def test_publish_amqp_down_after_use(self): |
414 | - # If amqp goes down after its been successfully used, None is returned |
415 | - # to indicate that publication failed - and publishing after it comes |
416 | - # back works. |
417 | - channel = self.useFixture( |
418 | - ChannelFixture(self.connection_factory)).channel |
419 | - queue = self.useFixture(QueueFixture(channel, self.getUniqueString)) |
420 | - publisher = Publisher(self.connection_factory, queue.exchange_name, "") |
421 | - oops = {'akey': 42} |
422 | - self.assertNotEqual(None, publisher(oops)) |
423 | - # The private method use and the restart of rabbit before it gets torn |
424 | - # down are bugs in rabbitfixture that will be fixed in a future |
425 | - # release. |
426 | - self.rabbit.runner._stop() |
427 | - try: |
428 | - self.assertEqual([], publisher(oops)) |
429 | - finally: |
430 | - self.rabbit.runner._start() |
431 | - connection = self.connection_factory() |
432 | - connection.connect() |
433 | - queue.channel = connection.channel() |
434 | - self.assertNotEqual([], publisher(oops)) |
435 | - |
436 | +) |
437 | + |
438 | + |
439 | +def test_publish_inherit_id( |
440 | + connection_factory, channel, exchange_name, queue_name |
441 | +): |
442 | + # OOPS IDs can be set outside of Publisher(). |
443 | + publisher = Publisher( |
444 | + connection_factory, exchange_name, "", inherit_id=True |
445 | + ) |
446 | + reference_oops = {"id": "kept", "akey": "avalue"} |
447 | + oops = dict(reference_oops) |
448 | + expected_id = "kept" |
449 | + oops_ids = publisher(oops) |
450 | + # Publication returns the oops ID allocated. |
451 | + assert oops_ids == [expected_id] |
452 | + # The oops should not be altered by publication. |
453 | + assert oops == reference_oops |
454 | + |
455 | + # The received OOPS should have the ID embedded and be a bson dict. |
456 | + def check_oops(msg): |
457 | + body = msg.body |
458 | + if not isinstance(body, bytes): |
459 | + body = body.encode(msg.content_encoding or "UTF-8") |
460 | + assert bson.loads(body) == reference_oops |
461 | + channel.basic_ack(msg.delivery_tag) |
462 | + channel.basic_cancel(queue_name) |
463 | + |
464 | + channel.basic_consume( |
465 | + queue_name, callback=check_oops, consumer_tag=queue_name |
466 | + ) |
467 | + channel.connection.drain_events() |
468 | + |
469 | + |
470 | +def test_publish(connection_factory, channel, exchange_name, queue_name): |
471 | + # Publishing an oops sends it to the exchange, making a connection as |
472 | + # it goes. |
473 | + publisher = Publisher(connection_factory, exchange_name, "") |
474 | + reference_oops = {"akey": "avalue"} |
475 | + oops = dict(reference_oops) |
476 | + id_bson = md5(bson.dumps(oops)).hexdigest() |
477 | + expected_id = "OOPS-%s" % id_bson |
478 | + oops_ids = publisher(oops) |
479 | + # Publication returns the oops ID allocated. |
480 | + assert oops_ids == [expected_id] |
481 | + # The oops should not be altered by publication. |
482 | + assert oops == reference_oops |
483 | + # The received OOPS should have the ID embedded and be a bson dict. |
484 | + expected_oops = dict(reference_oops) |
485 | + expected_oops["id"] = oops_ids[0] |
486 | + |
487 | + def check_oops(msg): |
488 | + body = msg.body |
489 | + if not isinstance(body, bytes): |
490 | + body = body.encode(msg.content_encoding or "UTF-8") |
491 | + assert bson.loads(body) == expected_oops |
492 | + channel.basic_ack(msg.delivery_tag) |
493 | + channel.basic_cancel(queue_name) |
494 | + |
495 | + channel.basic_consume( |
496 | + queue_name, callback=check_oops, consumer_tag=queue_name |
497 | + ) |
498 | + channel.connection.drain_events() |
499 | + |
500 | + |
501 | +def test_publish_amqp_already_down( |
502 | + rabbit, connection_factory, channel, get_unique_string |
503 | +): |
504 | + # If amqp is down when a connection is attempted, None is returned to |
505 | + # indicate that publication failed - and publishing after it comes back |
506 | + # works. |
507 | + # The private method use and the restart of rabbit before it gets torn |
508 | + # down are bugs in rabbitfixture that will be fixed in a future |
509 | + # release. |
510 | + exchange_name = get_unique_string("exchange") |
511 | + channel.exchange_declare( |
512 | + exchange=exchange_name, type="fanout", durable=True, auto_delete=False |
513 | + ) |
514 | + try: |
515 | + rabbit.runner._stop() |
516 | + try: |
517 | + publisher = Publisher(connection_factory, exchange_name, "") |
518 | + oops = {"akey": 42} |
519 | + assert publisher(oops) == [] |
520 | + finally: |
521 | + rabbit.runner._start() |
522 | + connection = connection_factory() |
523 | + connection.connect() |
524 | + channel = connection.channel() |
525 | + assert publisher(oops) != [] |
526 | + finally: |
527 | + channel.exchange_delete(exchange_name) |
528 | + |
529 | + |
530 | +def test_publish_amqp_down_after_use( |
531 | + rabbit, connection_factory, channel, get_unique_string |
532 | +): |
533 | + # If amqp goes down after its been successfully used, None is returned |
534 | + # to indicate that publication failed - and publishing after it comes |
535 | + # back works. |
536 | + exchange_name = get_unique_string("exchange") |
537 | + channel.exchange_declare( |
538 | + exchange=exchange_name, type="fanout", durable=True, auto_delete=False |
539 | + ) |
540 | + try: |
541 | + publisher = Publisher(connection_factory, exchange_name, "") |
542 | + oops = {"akey": 42} |
543 | + assert publisher(oops) is not None |
544 | + # The private method use and the restart of rabbit before it gets |
545 | + # torn down are bugs in rabbitfixture that will be fixed in a future |
546 | + # release. |
547 | + rabbit.runner._stop() |
548 | + try: |
549 | + assert publisher(oops) == [] |
550 | + finally: |
551 | + rabbit.runner._start() |
552 | + connection = connection_factory() |
553 | + connection.connect() |
554 | + channel = connection.channel() |
555 | + assert publisher(oops) != [] |
556 | + finally: |
557 | + channel.exchange_delete(exchange_name) |
558 | |
559 | === modified file 'tests/test_receiver.py' |
560 | --- oops_amqp/tests/test_receiver.py 2018-03-12 11:48:55 +0000 |
561 | +++ tests/test_receiver.py 2022-10-03 10:15:17 +0000 |
562 | @@ -27,158 +27,176 @@ |
563 | from oops_amqp import ( |
564 | anybson as bson, |
565 | Receiver, |
566 | - ) |
567 | -from oops_amqp.tests import ( |
568 | - ChannelFixture, |
569 | - QueueFixture, |
570 | - TestCase, |
571 | - ) |
572 | - |
573 | - |
574 | -class TestReceiver(TestCase): |
575 | - |
576 | - def test_stop_on_sentinel(self): |
577 | - # A sentinel can be used to stop the receiver (useful for testing). |
578 | - reports = [] |
579 | - def capture(report): |
580 | - reports.append(report) |
581 | - return [report['id']] |
582 | - expected_report = {'id': 'foo', 'otherkey': 42} |
583 | - message = amqp.Message(bson.dumps(expected_report)) |
584 | - channel = self.useFixture( |
585 | - ChannelFixture(self.connection_factory)).channel |
586 | - queue = self.useFixture(QueueFixture(channel, self.getUniqueString)) |
587 | - channel.basic_publish( |
588 | - message, queue.exchange_name, routing_key="") |
589 | - sentinel = b"xxx" |
590 | - channel.basic_publish( |
591 | - amqp.Message(sentinel), queue.exchange_name, routing_key="") |
592 | - config = Config() |
593 | - config.publisher = capture |
594 | - receiver = Receiver(config, self.connection_factory, queue.queue_name) |
595 | - receiver.sentinel = sentinel |
596 | - receiver.run_forever() |
597 | - self.assertEqual([expected_report], reports) |
598 | - |
599 | - def test_stop_via_stopping(self): |
600 | - # Setting the stopping field should stop the run_forever loop. |
601 | - reports = [] |
602 | - def capture(report): |
603 | - reports.append(report) |
604 | - return [report['id']] |
605 | - expected_report = {'id': 'foo', 'otherkey': 42} |
606 | - message = amqp.Message(bson.dumps(expected_report)) |
607 | - channel = self.useFixture( |
608 | - ChannelFixture(self.connection_factory)).channel |
609 | - queue = self.useFixture(QueueFixture(channel, self.getUniqueString)) |
610 | - channel.basic_publish( |
611 | - message, queue.exchange_name, routing_key="") |
612 | - config = Config() |
613 | - config.publisher = capture |
614 | - # We don't want to loop forever: patch the channel so that after one |
615 | - # call to wait (which will get our injected message) the loop will shut |
616 | - # down. |
617 | - def patching_factory(): |
618 | - connection = self.connection_factory() |
619 | - old_channel = connection.channel |
620 | - def new_channel(): |
621 | - result = old_channel() |
622 | - old_wait = result.wait |
623 | - def new_wait(*args, **kwargs): |
624 | - receiver.stopping = True |
625 | - return old_wait(*args, **kwargs) |
626 | - result.wait = new_wait |
627 | - return result |
628 | - connection.channel = new_channel |
629 | - return connection |
630 | - receiver = Receiver(config, patching_factory, queue.queue_name) |
631 | - receiver.run_forever() |
632 | - self.assertEqual([expected_report], reports) |
633 | - |
634 | - def test_run_forever(self): |
635 | - # run_forever subscribes and then calls drain_events in a loop. |
636 | - calls = [] |
637 | - class FakeChannel: |
638 | - def __init__(self, calls): |
639 | - self.calls = calls |
640 | - self.is_open = True |
641 | - def basic_consume(self, queue_name, callback=None): |
642 | - self.calls.append(('basic_consume', queue_name, callback)) |
643 | - return 'tag' |
644 | - def basic_cancel(self, tag): |
645 | - self.calls.append(('basic_cancel', tag)) |
646 | - def close(self): |
647 | - self.is_open = False |
648 | - class FakeConnection: |
649 | - def __init__(self, calls): |
650 | - self.calls = calls |
651 | - def connect(self): |
652 | - pass |
653 | - def channel(self): |
654 | - return FakeChannel(calls) |
655 | - def drain_events(self, timeout=None): |
656 | - self.calls.append(('drain_events', timeout)) |
657 | - if len(self.calls) > 2: |
658 | - receiver.stopping = True |
659 | - def close(self): |
660 | - pass |
661 | - receiver = Receiver(None, lambda: FakeConnection(calls), 'foo') |
662 | - receiver.run_forever() |
663 | - self.assertEqual( |
664 | - [('basic_consume', 'foo', receiver.handle_report), |
665 | - ('drain_events', 1), |
666 | - ('drain_events', 1), |
667 | - ('basic_cancel', 'tag')], |
668 | - calls) |
669 | - |
670 | - def test_tolerates_amqp_trouble(self): |
671 | - # If the AMQP server is unavailable for a short period, the receiver |
672 | - # will automatically reconnect. |
673 | - # Break a connection to raise socket.error (which we know from the |
674 | - # publisher tests is what leaks through when rabbit is shutdown). |
675 | - # We raise it the first time on each amqp method call. |
676 | - reports = [] |
677 | - def capture(report): |
678 | - reports.append(report) |
679 | - return [report['id']] |
680 | - expected_report = {'id': 'foo', 'otherkey': 42} |
681 | - message = amqp.Message(bson.dumps(expected_report)) |
682 | - channel = self.useFixture( |
683 | - ChannelFixture(self.connection_factory)).channel |
684 | - queue = self.useFixture(QueueFixture(channel, self.getUniqueString)) |
685 | - channel.basic_publish(message, queue.exchange_name, routing_key="") |
686 | - config = Config() |
687 | - config.publisher = capture |
688 | - state = {} |
689 | - def error_once(func): |
690 | - def wrapped(*args, **kwargs): |
691 | - func_ref = six.get_function_code(func) |
692 | - if func_ref in state: |
693 | - return func(*args, **kwargs) |
694 | - else: |
695 | - state[func_ref] = True |
696 | - # Use EPIPE because the close() code checks that (though |
697 | - # the rest doesn't) |
698 | - raise socket.error(errno.EPIPE, "booyah") |
699 | - return wrapped |
700 | +) |
701 | + |
702 | + |
703 | +def test_stop_on_sentinel( |
704 | + connection_factory, channel, exchange_name, queue_name |
705 | +): |
706 | + # A sentinel can be used to stop the receiver (useful for testing). |
707 | + reports = [] |
708 | + |
709 | + def capture(report): |
710 | + reports.append(report) |
711 | + return [report["id"]] |
712 | + |
713 | + expected_report = {"id": "foo", "otherkey": 42} |
714 | + message = amqp.Message(bson.dumps(expected_report)) |
715 | + channel.basic_publish(message, exchange_name, routing_key="") |
716 | + sentinel = b"xxx" |
717 | + channel.basic_publish( |
718 | + amqp.Message(sentinel), exchange_name, routing_key="" |
719 | + ) |
720 | + config = Config() |
721 | + config.publisher = capture |
722 | + receiver = Receiver(config, connection_factory, queue_name) |
723 | + receiver.sentinel = sentinel |
724 | + receiver.run_forever() |
725 | + assert reports == [expected_report] |
726 | + |
727 | + |
728 | +def test_stop_via_stopping( |
729 | + connection_factory, channel, exchange_name, queue_name |
730 | +): |
731 | + # Setting the stopping field should stop the run_forever loop. |
732 | + reports = [] |
733 | + |
734 | + def capture(report): |
735 | + reports.append(report) |
736 | + return [report["id"]] |
737 | + |
738 | + expected_report = {"id": "foo", "otherkey": 42} |
739 | + message = amqp.Message(bson.dumps(expected_report)) |
740 | + channel.basic_publish(message, exchange_name, routing_key="") |
741 | + config = Config() |
742 | + config.publisher = capture |
743 | + # We don't want to loop forever: patch the channel so that after one |
744 | + # call to wait (which will get our injected message) the loop will shut |
745 | + # down. |
746 | + def patching_factory(): |
747 | + connection = connection_factory() |
748 | + old_channel = connection.channel |
749 | + |
750 | + def new_channel(): |
751 | + result = old_channel() |
752 | + old_wait = result.wait |
753 | + |
754 | + def new_wait(*args, **kwargs): |
755 | + receiver.stopping = True |
756 | + return old_wait(*args, **kwargs) |
757 | + |
758 | + result.wait = new_wait |
759 | + return result |
760 | + |
761 | + connection.channel = new_channel |
762 | + return connection |
763 | + |
764 | + receiver = Receiver(config, patching_factory, queue_name) |
765 | + receiver.run_forever() |
766 | + assert reports == [expected_report] |
767 | + |
768 | + |
769 | +def test_run_forever(): |
770 | + # run_forever subscribes and then calls drain_events in a loop. |
771 | + calls = [] |
772 | + |
773 | + class FakeChannel: |
774 | + def __init__(self, calls): |
775 | + self.calls = calls |
776 | + self.is_open = True |
777 | + |
778 | + def basic_consume(self, queue_name, callback=None): |
779 | + self.calls.append(("basic_consume", queue_name, callback)) |
780 | + return "tag" |
781 | + |
782 | + def basic_cancel(self, tag): |
783 | + self.calls.append(("basic_cancel", tag)) |
784 | + |
785 | + def close(self): |
786 | + self.is_open = False |
787 | + |
788 | + class FakeConnection: |
789 | + def __init__(self, calls): |
790 | + self.calls = calls |
791 | + |
792 | + def connect(self): |
793 | + pass |
794 | + |
795 | + def channel(self): |
796 | + return FakeChannel(calls) |
797 | + |
798 | + def drain_events(self, timeout=None): |
799 | + self.calls.append(("drain_events", timeout)) |
800 | + if len(self.calls) > 2: |
801 | + receiver.stopping = True |
802 | + |
803 | + def close(self): |
804 | + pass |
805 | + |
806 | + receiver = Receiver(None, lambda: FakeConnection(calls), "foo") |
807 | + receiver.run_forever() |
808 | + assert calls == [ |
809 | + ("basic_consume", "foo", receiver.handle_report), |
810 | + ("drain_events", 1), |
811 | + ("drain_events", 1), |
812 | + ("basic_cancel", "tag"), |
813 | + ] |
814 | + |
815 | + |
816 | +def test_tolerates_amqp_trouble( |
817 | + connection_factory, channel, exchange_name, queue_name |
818 | +): |
819 | + # If the AMQP server is unavailable for a short period, the receiver |
820 | + # will automatically reconnect. |
821 | + # Break a connection to raise socket.error (which we know from the |
822 | + # publisher tests is what leaks through when rabbit is shutdown). |
823 | + # We raise it the first time on each amqp method call. |
824 | + reports = [] |
825 | + |
826 | + def capture(report): |
827 | + reports.append(report) |
828 | + return [report["id"]] |
829 | + |
830 | + expected_report = {"id": "foo", "otherkey": 42} |
831 | + message = amqp.Message(bson.dumps(expected_report)) |
832 | + channel.basic_publish(message, exchange_name, routing_key="") |
833 | + config = Config() |
834 | + config.publisher = capture |
835 | + state = {} |
836 | + |
837 | + def error_once(func): |
838 | + def wrapped(*args, **kwargs): |
839 | + func_ref = six.get_function_code(func) |
840 | + if func_ref in state: |
841 | + return func(*args, **kwargs) |
842 | + else: |
843 | + state[func_ref] = True |
844 | + # Use EPIPE because the close() code checks that (though |
845 | + # the rest doesn't) |
846 | + raise socket.error(errno.EPIPE, "booyah") |
847 | + |
848 | + return wrapped |
849 | + |
850 | + @error_once |
851 | + def patching_factory(): |
852 | + connection = connection_factory() |
853 | + old_channel = connection.channel |
854 | + |
855 | @error_once |
856 | - def patching_factory(): |
857 | - connection = self.connection_factory() |
858 | - old_channel = connection.channel |
859 | - @error_once |
860 | - def new_channel(): |
861 | - result = old_channel() |
862 | - result.basic_consume = error_once(result.basic_consume) |
863 | - result.basic_cancel = error_once(result.basic_cancel) |
864 | - result.close = error_once(result.close) |
865 | - return result |
866 | - connection.channel = new_channel |
867 | - connection.drain_events = error_once(connection.drain_events) |
868 | - connection.close = error_once(connection.close) |
869 | - return connection |
870 | - receiver = Receiver(config, patching_factory, queue.queue_name) |
871 | - receiver.sentinel = b"arhh" |
872 | - channel.basic_publish( |
873 | - amqp.Message(b"arhh"), queue.exchange_name, routing_key="") |
874 | - receiver.run_forever() |
875 | - self.assertEqual([expected_report], reports) |
876 | + def new_channel(): |
877 | + result = old_channel() |
878 | + result.basic_consume = error_once(result.basic_consume) |
879 | + result.basic_cancel = error_once(result.basic_cancel) |
880 | + result.close = error_once(result.close) |
881 | + return result |
882 | + |
883 | + connection.channel = new_channel |
884 | + connection.drain_events = error_once(connection.drain_events) |
885 | + connection.close = error_once(connection.close) |
886 | + return connection |
887 | + |
888 | + receiver = Receiver(config, patching_factory, queue_name) |
889 | + receiver.sentinel = b"arhh" |
890 | + channel.basic_publish(amqp.Message(b"arhh"), exchange_name, routing_key="") |
891 | + receiver.run_forever() |
892 | + assert reports == [expected_report] |
893 | |
894 | === modified file 'tox.ini' |
895 | --- tox.ini 2022-09-30 22:41:01 +0000 |
896 | +++ tox.ini 2022-10-03 10:15:17 +0000 |
897 | @@ -11,6 +11,5 @@ |
898 | [testenv] |
899 | deps = |
900 | .[test] |
901 | - pytest |
902 | commands = |
903 | pytest {posargs} |
Nice