Merge lp:~cjwatson/python-oops-amqp/pytest-full into lp:python-oops-amqp

Proposed by Colin Watson
Status: Merged
Merged at revision: 26
Proposed branch: lp:~cjwatson/python-oops-amqp/pytest-full
Merge into: lp:python-oops-amqp
Diff against target: 903 lines (+405/-416)
8 files modified
NEWS (+1/-0)
README (+5/-9)
oops_amqp/tests/__init__.py (+0/-135)
setup.cfg (+1/-2)
tests/conftest.py (+104/-0)
tests/test_publisher.py (+122/-115)
tests/test_receiver.py (+172/-154)
tox.ini (+0/-1)
To merge this branch: bzr merge lp:~cjwatson/python-oops-amqp/pytest-full
Reviewer Review Type Date Requested Status
Guruprasad Approve
Andrey Fedoseev (community) Approve
Review via email: mp+430882@code.launchpad.net

Commit message

Port test suite to pytest.

Description of the change

This turns out to be about three times faster on my system, probably mainly due to using a session-scoped server fixture.

To post a comment you must log in.
Revision history for this message
Andrey Fedoseev (andrey-fedoseev) wrote :

Nice

review: Approve
Revision history for this message
Guruprasad (lgp171188) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'NEWS'
2--- NEWS 2022-09-30 22:53:58 +0000
3+++ NEWS 2022-10-03 10:15:17 +0000
4@@ -8,6 +8,7 @@
5
6 - Switch from buildout to tox. (Colin Watson)
7 - Switch to declarative setup.cfg. (Colin Watson)
8+- Port test suite to pytest. (Colin Watson)
9
10 0.1.0
11 -----
12
13=== modified file 'README'
14--- README 2022-09-30 22:41:01 +0000
15+++ README 2022-10-03 10:15:17 +0000
16@@ -17,7 +17,7 @@
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 GNU Lesser General Public License version 3 (see the file LICENSE).
19
20-The oops_amqp package provides an AMQP OOPS http://pypi.python.org/pypi/oops)
21+The oops_amqp package provides an AMQP OOPS (https://pypi.org/project/oops)
22 publisher, and a small daemon that listens on amqp for OOPS reports and
23 republishes them (into a supplied publisher). The OOPS framework permits
24 falling back to additional publishers if AMQP is down.
25@@ -29,20 +29,16 @@
26
27 * bson
28
29-* oops (http://pypi.python.org/pypi/oops) 0.0.11 or newer.
30+* oops (https://pypi.org/project/oops) 0.0.11 or newer.
31
32 * amqp
33
34 Testing Dependencies
35 ====================
36
37-* oops-datedir-repo (http://pypi.python.org/pypi/oops_datedir_repo)
38-
39-* rabbitfixture (http://pypi.python.org/pypi/rabbitfixture)
40-
41-* testresources (http://pypi.python.org/pypi/testresources)
42-
43-* testtools (http://pypi.python.org/pypi/testtools)
44+* pytest (https://pypi.org/project/pytest)
45+
46+* rabbitfixture (https://pypi.org/project/rabbitfixture)
47
48 Usage
49 =====
50
51=== removed file 'oops_amqp/tests/__init__.py'
52--- oops_amqp/tests/__init__.py 2018-03-12 11:48:55 +0000
53+++ oops_amqp/tests/__init__.py 1970-01-01 00:00:00 +0000
54@@ -1,135 +0,0 @@
55-# Copyright (c) 2011, Canonical Ltd
56-#
57-# This program is free software: you can redistribute it and/or modify
58-# it under the terms of the GNU Lesser General Public License as published by
59-# the Free Software Foundation, version 3 only.
60-#
61-# This program is distributed in the hope that it will be useful,
62-# but WITHOUT ANY WARRANTY; without even the implied warranty of
63-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
64-# GNU Lesser General Public License for more details.
65-#
66-# You should have received a copy of the GNU Lesser General Public License
67-# along with this program. If not, see <http://www.gnu.org/licenses/>.
68-# GNU Lesser General Public License version 3 (see the file LICENSE).
69-
70-"""Tests for oops_amqp."""
71-
72-from __future__ import absolute_import, print_function
73-
74-from unittest import TestLoader
75-
76-import amqp
77-from fixtures import Fixture
78-from rabbitfixture.server import RabbitServer
79-import testtools
80-from testresources import (
81- _get_result,
82- FixtureResource,
83- OptimisingTestSuite,
84- setUpResources,
85- tearDownResources,
86- )
87-
88-from oops_amqp.utils import close_ignoring_connection_errors
89-
90-__all__ = [
91- 'ChannelFixture',
92- 'QueueFixture',
93- 'test_suite',
94- 'TestCase',
95- ]
96-
97-
98-class QueueFixture(Fixture):
99- """Create an exchange with a subscribed queue on an AMQP instance.
100-
101- The exchange and queue are made durable (to permit testing with amqp server
102- restarts) and non-auto-delete (to permit dropping the test connection and
103- restablishing it without losing the config (e.g. server restarts).
104-
105- If the server is restarted, the channel may need to be updated before
106- teardown, or the teardown will not be able to delete the exchange and queue.
107-
108- Possibly wants to live in rabbitfixture, or a amqpfixture.
109- """
110-
111- def __init__(self, channel, unique_string_factory):
112- """Create a QueueFixture.
113-
114- :param channel: An amqplib channel to request the exchange and queue
115- over.
116- :param unique_string_factory: A helper that will return a (process
117- lifetime scope) unique string.
118- """
119- self.channel = channel
120- self.unique_string_factory = unique_string_factory
121-
122- def setUp(self):
123- super(QueueFixture, self).setUp()
124- self.exchange_name = self.unique_string_factory()
125- self.channel.exchange_declare(
126- exchange=self.exchange_name, type="fanout", durable=True,
127- auto_delete=False)
128- self.addCleanup(self.delete_exchange)
129- self.queue_name, _, _ = self.channel.queue_declare(
130- durable=True, auto_delete=False)
131- self.addCleanup(self.delete_queue)
132- self.channel.queue_bind(self.queue_name, self.exchange_name)
133-
134- def delete_queue(self):
135- self.channel.queue_delete(self.queue_name)
136-
137- def delete_exchange(self):
138- self.channel.exchange_delete(self.exchange_name)
139-
140-
141-class ChannelFixture(Fixture):
142- """Create an AMQP connection and channel for tests.
143-
144- :ivar connection: an amqplib connection.
145- :ivar channel: an amqplib channel
146- """
147-
148- def __init__(self, connection_factory):
149- super(ChannelFixture, self).__init__()
150- self.connection_factory = connection_factory
151-
152- def setUp(self):
153- super(ChannelFixture, self).setUp()
154- self.connection = self.connection_factory()
155- self.connection.connect()
156- self.addCleanup(close_ignoring_connection_errors, self.connection)
157- self.channel = self.connection.channel()
158- self.addCleanup(close_ignoring_connection_errors, self.channel)
159-
160-
161-class TestCase(testtools.TestCase):
162- """Subclass to start a RabbitMQ server."""
163-
164- resources = [('rabbit', FixtureResource(RabbitServer()))]
165-
166- def setUp(self):
167- super(TestCase, self).setUp()
168- # ResourcedTestCase handles teardown in the wrong order for us (we
169- # need to ensure that the RabbitServer fixture is only cleaned up
170- # after any other fixtures registered by individual tests), so we
171- # imitate it manually.
172- result = _get_result()
173- setUpResources(self, self.resources, result)
174- self.addCleanup(tearDownResources, self, self.resources, result)
175-
176- def connection_factory(self):
177- """When called, return an amqplib connection."""
178- return amqp.Connection(host="%s:%s" % (self.rabbit.config.hostname,
179- self.rabbit.config.port), userid="guest", password="guest",
180- virtual_host="/")
181-
182-
183-def test_suite():
184- test_mod_names = [
185- 'publisher',
186- 'receiver',
187- ]
188- return OptimisingTestSuite(TestLoader().loadTestsFromNames(
189- ['oops_amqp.tests.test_' + name for name in test_mod_names]))
190
191=== modified file 'setup.cfg'
192--- setup.cfg 2022-09-30 22:53:58 +0000
193+++ setup.cfg 2022-10-03 10:15:17 +0000
194@@ -36,7 +36,6 @@
195
196 [options.extras_require]
197 test =
198+ pytest
199 rabbitfixture
200 six
201- testresources
202- testtools
203
204=== renamed directory 'oops_amqp/tests' => 'tests'
205=== added file 'tests/conftest.py'
206--- tests/conftest.py 1970-01-01 00:00:00 +0000
207+++ tests/conftest.py 2022-10-03 10:15:17 +0000
208@@ -0,0 +1,104 @@
209+# Copyright (C) 2011-2022, Canonical Ltd.
210+#
211+# This program is free software: you can redistribute it and/or modify
212+# it under the terms of the GNU Lesser General Public License as published by
213+# the Free Software Foundation, version 3 only.
214+#
215+# This program is distributed in the hope that it will be useful,
216+# but WITHOUT ANY WARRANTY; without even the implied warranty of
217+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
218+# GNU Lesser General Public License for more details.
219+#
220+# You should have received a copy of the GNU Lesser General Public License
221+# along with this program. If not, see <http://www.gnu.org/licenses/>.
222+# GNU Lesser General Public License version 3 (see the file LICENSE).
223+
224+import itertools
225+from functools import partial
226+
227+import amqp
228+import pytest
229+from rabbitfixture.server import RabbitServer
230+
231+from oops_amqp.utils import close_ignoring_connection_errors
232+
233+
234+_unique_id_gen = itertools.count(1)
235+
236+
237+@pytest.fixture
238+def get_unique_integer():
239+ def get():
240+ return next(_unique_id_gen)
241+
242+ return get
243+
244+
245+@pytest.fixture
246+def get_unique_string(get_unique_integer):
247+ def get(prefix):
248+ return "%s-%d" % (prefix, get_unique_integer())
249+
250+ return get
251+
252+
253+@pytest.fixture(scope="session")
254+def rabbit():
255+ rabbit = RabbitServer()
256+ rabbit.setUp()
257+ try:
258+ yield rabbit
259+ finally:
260+ rabbit.cleanUp()
261+
262+
263+@pytest.fixture
264+def connection_factory(rabbit):
265+ return partial(
266+ amqp.Connection,
267+ host="%s:%s" % (rabbit.config.hostname, rabbit.config.port),
268+ userid="guest",
269+ password="guest",
270+ virtual_host="/",
271+ )
272+
273+
274+@pytest.fixture
275+def connection(connection_factory):
276+ connection = connection_factory()
277+ connection.connect()
278+ try:
279+ yield connection
280+ finally:
281+ close_ignoring_connection_errors(connection)
282+
283+
284+@pytest.fixture
285+def channel(connection):
286+ channel = connection.channel()
287+ try:
288+ yield channel
289+ finally:
290+ close_ignoring_connection_errors(channel)
291+
292+
293+@pytest.fixture
294+def exchange_name(channel, get_unique_string):
295+ exchange_name = get_unique_string("exchange")
296+ channel.exchange_declare(
297+ exchange=exchange_name, type="fanout", durable=True, auto_delete=False
298+ )
299+ try:
300+ yield exchange_name
301+ finally:
302+ channel.exchange_delete(exchange_name)
303+
304+
305+@pytest.fixture
306+def queue_name(channel, exchange_name):
307+ queue_name, _, _ = channel.queue_declare(durable=True, auto_delete=False)
308+ try:
309+ channel.queue_bind(queue_name, exchange_name)
310+ yield queue_name
311+ finally:
312+ channel.queue_delete(queue_name)
313
314=== modified file 'tests/test_publisher.py'
315--- oops_amqp/tests/test_publisher.py 2018-03-12 11:48:55 +0000
316+++ tests/test_publisher.py 2022-10-03 10:15:17 +0000
317@@ -22,118 +22,125 @@
318 from oops_amqp import (
319 anybson as bson,
320 Publisher,
321- )
322-from oops_amqp.tests import (
323- ChannelFixture,
324- QueueFixture,
325- TestCase,
326- )
327-
328-
329-class TestPublisher(TestCase):
330-
331- def test_publish_inherit_id(self):
332- # OOPS id's can be set outside of Publisher().
333- channel = self.useFixture(
334- ChannelFixture(self.connection_factory)).channel
335- queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
336- publisher = Publisher(self.connection_factory, queue.exchange_name, "",
337- inherit_id=True)
338- reference_oops = {'id': 'kept', 'akey': 'avalue'}
339- oops = dict(reference_oops)
340- expected_id = 'kept'
341- oops_ids = publisher(oops)
342- # Publication returns the oops ID allocated.
343- self.assertEqual([expected_id], oops_ids)
344- # The oops should not be altered by publication.
345- self.assertEqual(reference_oops, oops)
346- # The received OOPS should have the ID embedded and be a bson dict.
347- def check_oops(msg):
348- body = msg.body
349- if not isinstance(body, bytes):
350- body = body.encode(msg.content_encoding or 'UTF-8')
351- self.assertEqual(reference_oops, bson.loads(body))
352- channel.basic_ack(msg.delivery_tag)
353- channel.basic_cancel(queue.queue_name)
354- channel.basic_consume(
355- queue.queue_name, callback=check_oops,
356- consumer_tag=queue.queue_name)
357- channel.connection.drain_events()
358-
359- def test_publish(self):
360- # Publishing an oops sends it to the exchange, making a connection as
361- # it goes.
362- channel = self.useFixture(
363- ChannelFixture(self.connection_factory)).channel
364- queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
365- publisher = Publisher(self.connection_factory, queue.exchange_name, "")
366- reference_oops = {'akey': 'avalue'}
367- oops = dict(reference_oops)
368- id_bson = md5(bson.dumps(oops)).hexdigest()
369- expected_id = "OOPS-%s" % id_bson
370- oops_ids = publisher(oops)
371- # Publication returns the oops ID allocated.
372- self.assertEqual([expected_id], oops_ids)
373- # The oops should not be altered by publication.
374- self.assertEqual(reference_oops, oops)
375- # The received OOPS should have the ID embedded and be a bson dict.
376- expected_oops = dict(reference_oops)
377- expected_oops['id'] = oops_ids[0]
378- def check_oops(msg):
379- body = msg.body
380- if not isinstance(body, bytes):
381- body = body.encode(msg.content_encoding or 'UTF-8')
382- self.assertEqual(expected_oops, bson.loads(body))
383- channel.basic_ack(msg.delivery_tag)
384- channel.basic_cancel(queue.queue_name)
385- channel.basic_consume(
386- queue.queue_name, callback=check_oops,
387- consumer_tag=queue.queue_name)
388- channel.connection.drain_events()
389-
390- def test_publish_amqp_already_down(self):
391- # If amqp is down when a connection is attempted, None is returned to
392- # indicate that publication failed - and publishing after it comes back
393- # works.
394- channel = self.useFixture(
395- ChannelFixture(self.connection_factory)).channel
396- queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
397- # The private method use and the restart of rabbit before it gets torn
398- # down are bugs in rabbitfixture that will be fixed in a future
399- # release.
400- self.rabbit.runner._stop()
401- try:
402- publisher = Publisher(
403- self.connection_factory, queue.exchange_name, "")
404- oops = {'akey': 42}
405- self.assertEqual([], publisher(oops))
406- finally:
407- self.rabbit.runner._start()
408- connection = self.connection_factory()
409- connection.connect()
410- queue.channel = connection.channel()
411- self.assertNotEqual([], publisher(oops))
412-
413- def test_publish_amqp_down_after_use(self):
414- # If amqp goes down after its been successfully used, None is returned
415- # to indicate that publication failed - and publishing after it comes
416- # back works.
417- channel = self.useFixture(
418- ChannelFixture(self.connection_factory)).channel
419- queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
420- publisher = Publisher(self.connection_factory, queue.exchange_name, "")
421- oops = {'akey': 42}
422- self.assertNotEqual(None, publisher(oops))
423- # The private method use and the restart of rabbit before it gets torn
424- # down are bugs in rabbitfixture that will be fixed in a future
425- # release.
426- self.rabbit.runner._stop()
427- try:
428- self.assertEqual([], publisher(oops))
429- finally:
430- self.rabbit.runner._start()
431- connection = self.connection_factory()
432- connection.connect()
433- queue.channel = connection.channel()
434- self.assertNotEqual([], publisher(oops))
435-
436+)
437+
438+
439+def test_publish_inherit_id(
440+ connection_factory, channel, exchange_name, queue_name
441+):
442+ # OOPS IDs can be set outside of Publisher().
443+ publisher = Publisher(
444+ connection_factory, exchange_name, "", inherit_id=True
445+ )
446+ reference_oops = {"id": "kept", "akey": "avalue"}
447+ oops = dict(reference_oops)
448+ expected_id = "kept"
449+ oops_ids = publisher(oops)
450+ # Publication returns the oops ID allocated.
451+ assert oops_ids == [expected_id]
452+ # The oops should not be altered by publication.
453+ assert oops == reference_oops
454+
455+ # The received OOPS should have the ID embedded and be a bson dict.
456+ def check_oops(msg):
457+ body = msg.body
458+ if not isinstance(body, bytes):
459+ body = body.encode(msg.content_encoding or "UTF-8")
460+ assert bson.loads(body) == reference_oops
461+ channel.basic_ack(msg.delivery_tag)
462+ channel.basic_cancel(queue_name)
463+
464+ channel.basic_consume(
465+ queue_name, callback=check_oops, consumer_tag=queue_name
466+ )
467+ channel.connection.drain_events()
468+
469+
470+def test_publish(connection_factory, channel, exchange_name, queue_name):
471+ # Publishing an oops sends it to the exchange, making a connection as
472+ # it goes.
473+ publisher = Publisher(connection_factory, exchange_name, "")
474+ reference_oops = {"akey": "avalue"}
475+ oops = dict(reference_oops)
476+ id_bson = md5(bson.dumps(oops)).hexdigest()
477+ expected_id = "OOPS-%s" % id_bson
478+ oops_ids = publisher(oops)
479+ # Publication returns the oops ID allocated.
480+ assert oops_ids == [expected_id]
481+ # The oops should not be altered by publication.
482+ assert oops == reference_oops
483+ # The received OOPS should have the ID embedded and be a bson dict.
484+ expected_oops = dict(reference_oops)
485+ expected_oops["id"] = oops_ids[0]
486+
487+ def check_oops(msg):
488+ body = msg.body
489+ if not isinstance(body, bytes):
490+ body = body.encode(msg.content_encoding or "UTF-8")
491+ assert bson.loads(body) == expected_oops
492+ channel.basic_ack(msg.delivery_tag)
493+ channel.basic_cancel(queue_name)
494+
495+ channel.basic_consume(
496+ queue_name, callback=check_oops, consumer_tag=queue_name
497+ )
498+ channel.connection.drain_events()
499+
500+
501+def test_publish_amqp_already_down(
502+ rabbit, connection_factory, channel, get_unique_string
503+):
504+ # If amqp is down when a connection is attempted, None is returned to
505+ # indicate that publication failed - and publishing after it comes back
506+ # works.
507+ # The private method use and the restart of rabbit before it gets torn
508+ # down are bugs in rabbitfixture that will be fixed in a future
509+ # release.
510+ exchange_name = get_unique_string("exchange")
511+ channel.exchange_declare(
512+ exchange=exchange_name, type="fanout", durable=True, auto_delete=False
513+ )
514+ try:
515+ rabbit.runner._stop()
516+ try:
517+ publisher = Publisher(connection_factory, exchange_name, "")
518+ oops = {"akey": 42}
519+ assert publisher(oops) == []
520+ finally:
521+ rabbit.runner._start()
522+ connection = connection_factory()
523+ connection.connect()
524+ channel = connection.channel()
525+ assert publisher(oops) != []
526+ finally:
527+ channel.exchange_delete(exchange_name)
528+
529+
530+def test_publish_amqp_down_after_use(
531+ rabbit, connection_factory, channel, get_unique_string
532+):
533+ # If amqp goes down after its been successfully used, None is returned
534+ # to indicate that publication failed - and publishing after it comes
535+ # back works.
536+ exchange_name = get_unique_string("exchange")
537+ channel.exchange_declare(
538+ exchange=exchange_name, type="fanout", durable=True, auto_delete=False
539+ )
540+ try:
541+ publisher = Publisher(connection_factory, exchange_name, "")
542+ oops = {"akey": 42}
543+ assert publisher(oops) is not None
544+ # The private method use and the restart of rabbit before it gets
545+ # torn down are bugs in rabbitfixture that will be fixed in a future
546+ # release.
547+ rabbit.runner._stop()
548+ try:
549+ assert publisher(oops) == []
550+ finally:
551+ rabbit.runner._start()
552+ connection = connection_factory()
553+ connection.connect()
554+ channel = connection.channel()
555+ assert publisher(oops) != []
556+ finally:
557+ channel.exchange_delete(exchange_name)
558
559=== modified file 'tests/test_receiver.py'
560--- oops_amqp/tests/test_receiver.py 2018-03-12 11:48:55 +0000
561+++ tests/test_receiver.py 2022-10-03 10:15:17 +0000
562@@ -27,158 +27,176 @@
563 from oops_amqp import (
564 anybson as bson,
565 Receiver,
566- )
567-from oops_amqp.tests import (
568- ChannelFixture,
569- QueueFixture,
570- TestCase,
571- )
572-
573-
574-class TestReceiver(TestCase):
575-
576- def test_stop_on_sentinel(self):
577- # A sentinel can be used to stop the receiver (useful for testing).
578- reports = []
579- def capture(report):
580- reports.append(report)
581- return [report['id']]
582- expected_report = {'id': 'foo', 'otherkey': 42}
583- message = amqp.Message(bson.dumps(expected_report))
584- channel = self.useFixture(
585- ChannelFixture(self.connection_factory)).channel
586- queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
587- channel.basic_publish(
588- message, queue.exchange_name, routing_key="")
589- sentinel = b"xxx"
590- channel.basic_publish(
591- amqp.Message(sentinel), queue.exchange_name, routing_key="")
592- config = Config()
593- config.publisher = capture
594- receiver = Receiver(config, self.connection_factory, queue.queue_name)
595- receiver.sentinel = sentinel
596- receiver.run_forever()
597- self.assertEqual([expected_report], reports)
598-
599- def test_stop_via_stopping(self):
600- # Setting the stopping field should stop the run_forever loop.
601- reports = []
602- def capture(report):
603- reports.append(report)
604- return [report['id']]
605- expected_report = {'id': 'foo', 'otherkey': 42}
606- message = amqp.Message(bson.dumps(expected_report))
607- channel = self.useFixture(
608- ChannelFixture(self.connection_factory)).channel
609- queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
610- channel.basic_publish(
611- message, queue.exchange_name, routing_key="")
612- config = Config()
613- config.publisher = capture
614- # We don't want to loop forever: patch the channel so that after one
615- # call to wait (which will get our injected message) the loop will shut
616- # down.
617- def patching_factory():
618- connection = self.connection_factory()
619- old_channel = connection.channel
620- def new_channel():
621- result = old_channel()
622- old_wait = result.wait
623- def new_wait(*args, **kwargs):
624- receiver.stopping = True
625- return old_wait(*args, **kwargs)
626- result.wait = new_wait
627- return result
628- connection.channel = new_channel
629- return connection
630- receiver = Receiver(config, patching_factory, queue.queue_name)
631- receiver.run_forever()
632- self.assertEqual([expected_report], reports)
633-
634- def test_run_forever(self):
635- # run_forever subscribes and then calls drain_events in a loop.
636- calls = []
637- class FakeChannel:
638- def __init__(self, calls):
639- self.calls = calls
640- self.is_open = True
641- def basic_consume(self, queue_name, callback=None):
642- self.calls.append(('basic_consume', queue_name, callback))
643- return 'tag'
644- def basic_cancel(self, tag):
645- self.calls.append(('basic_cancel', tag))
646- def close(self):
647- self.is_open = False
648- class FakeConnection:
649- def __init__(self, calls):
650- self.calls = calls
651- def connect(self):
652- pass
653- def channel(self):
654- return FakeChannel(calls)
655- def drain_events(self, timeout=None):
656- self.calls.append(('drain_events', timeout))
657- if len(self.calls) > 2:
658- receiver.stopping = True
659- def close(self):
660- pass
661- receiver = Receiver(None, lambda: FakeConnection(calls), 'foo')
662- receiver.run_forever()
663- self.assertEqual(
664- [('basic_consume', 'foo', receiver.handle_report),
665- ('drain_events', 1),
666- ('drain_events', 1),
667- ('basic_cancel', 'tag')],
668- calls)
669-
670- def test_tolerates_amqp_trouble(self):
671- # If the AMQP server is unavailable for a short period, the receiver
672- # will automatically reconnect.
673- # Break a connection to raise socket.error (which we know from the
674- # publisher tests is what leaks through when rabbit is shutdown).
675- # We raise it the first time on each amqp method call.
676- reports = []
677- def capture(report):
678- reports.append(report)
679- return [report['id']]
680- expected_report = {'id': 'foo', 'otherkey': 42}
681- message = amqp.Message(bson.dumps(expected_report))
682- channel = self.useFixture(
683- ChannelFixture(self.connection_factory)).channel
684- queue = self.useFixture(QueueFixture(channel, self.getUniqueString))
685- channel.basic_publish(message, queue.exchange_name, routing_key="")
686- config = Config()
687- config.publisher = capture
688- state = {}
689- def error_once(func):
690- def wrapped(*args, **kwargs):
691- func_ref = six.get_function_code(func)
692- if func_ref in state:
693- return func(*args, **kwargs)
694- else:
695- state[func_ref] = True
696- # Use EPIPE because the close() code checks that (though
697- # the rest doesn't)
698- raise socket.error(errno.EPIPE, "booyah")
699- return wrapped
700+)
701+
702+
703+def test_stop_on_sentinel(
704+ connection_factory, channel, exchange_name, queue_name
705+):
706+ # A sentinel can be used to stop the receiver (useful for testing).
707+ reports = []
708+
709+ def capture(report):
710+ reports.append(report)
711+ return [report["id"]]
712+
713+ expected_report = {"id": "foo", "otherkey": 42}
714+ message = amqp.Message(bson.dumps(expected_report))
715+ channel.basic_publish(message, exchange_name, routing_key="")
716+ sentinel = b"xxx"
717+ channel.basic_publish(
718+ amqp.Message(sentinel), exchange_name, routing_key=""
719+ )
720+ config = Config()
721+ config.publisher = capture
722+ receiver = Receiver(config, connection_factory, queue_name)
723+ receiver.sentinel = sentinel
724+ receiver.run_forever()
725+ assert reports == [expected_report]
726+
727+
728+def test_stop_via_stopping(
729+ connection_factory, channel, exchange_name, queue_name
730+):
731+ # Setting the stopping field should stop the run_forever loop.
732+ reports = []
733+
734+ def capture(report):
735+ reports.append(report)
736+ return [report["id"]]
737+
738+ expected_report = {"id": "foo", "otherkey": 42}
739+ message = amqp.Message(bson.dumps(expected_report))
740+ channel.basic_publish(message, exchange_name, routing_key="")
741+ config = Config()
742+ config.publisher = capture
743+ # We don't want to loop forever: patch the channel so that after one
744+ # call to wait (which will get our injected message) the loop will shut
745+ # down.
746+ def patching_factory():
747+ connection = connection_factory()
748+ old_channel = connection.channel
749+
750+ def new_channel():
751+ result = old_channel()
752+ old_wait = result.wait
753+
754+ def new_wait(*args, **kwargs):
755+ receiver.stopping = True
756+ return old_wait(*args, **kwargs)
757+
758+ result.wait = new_wait
759+ return result
760+
761+ connection.channel = new_channel
762+ return connection
763+
764+ receiver = Receiver(config, patching_factory, queue_name)
765+ receiver.run_forever()
766+ assert reports == [expected_report]
767+
768+
769+def test_run_forever():
770+ # run_forever subscribes and then calls drain_events in a loop.
771+ calls = []
772+
773+ class FakeChannel:
774+ def __init__(self, calls):
775+ self.calls = calls
776+ self.is_open = True
777+
778+ def basic_consume(self, queue_name, callback=None):
779+ self.calls.append(("basic_consume", queue_name, callback))
780+ return "tag"
781+
782+ def basic_cancel(self, tag):
783+ self.calls.append(("basic_cancel", tag))
784+
785+ def close(self):
786+ self.is_open = False
787+
788+ class FakeConnection:
789+ def __init__(self, calls):
790+ self.calls = calls
791+
792+ def connect(self):
793+ pass
794+
795+ def channel(self):
796+ return FakeChannel(calls)
797+
798+ def drain_events(self, timeout=None):
799+ self.calls.append(("drain_events", timeout))
800+ if len(self.calls) > 2:
801+ receiver.stopping = True
802+
803+ def close(self):
804+ pass
805+
806+ receiver = Receiver(None, lambda: FakeConnection(calls), "foo")
807+ receiver.run_forever()
808+ assert calls == [
809+ ("basic_consume", "foo", receiver.handle_report),
810+ ("drain_events", 1),
811+ ("drain_events", 1),
812+ ("basic_cancel", "tag"),
813+ ]
814+
815+
816+def test_tolerates_amqp_trouble(
817+ connection_factory, channel, exchange_name, queue_name
818+):
819+ # If the AMQP server is unavailable for a short period, the receiver
820+ # will automatically reconnect.
821+ # Break a connection to raise socket.error (which we know from the
822+ # publisher tests is what leaks through when rabbit is shutdown).
823+ # We raise it the first time on each amqp method call.
824+ reports = []
825+
826+ def capture(report):
827+ reports.append(report)
828+ return [report["id"]]
829+
830+ expected_report = {"id": "foo", "otherkey": 42}
831+ message = amqp.Message(bson.dumps(expected_report))
832+ channel.basic_publish(message, exchange_name, routing_key="")
833+ config = Config()
834+ config.publisher = capture
835+ state = {}
836+
837+ def error_once(func):
838+ def wrapped(*args, **kwargs):
839+ func_ref = six.get_function_code(func)
840+ if func_ref in state:
841+ return func(*args, **kwargs)
842+ else:
843+ state[func_ref] = True
844+ # Use EPIPE because the close() code checks that (though
845+ # the rest doesn't)
846+ raise socket.error(errno.EPIPE, "booyah")
847+
848+ return wrapped
849+
850+ @error_once
851+ def patching_factory():
852+ connection = connection_factory()
853+ old_channel = connection.channel
854+
855 @error_once
856- def patching_factory():
857- connection = self.connection_factory()
858- old_channel = connection.channel
859- @error_once
860- def new_channel():
861- result = old_channel()
862- result.basic_consume = error_once(result.basic_consume)
863- result.basic_cancel = error_once(result.basic_cancel)
864- result.close = error_once(result.close)
865- return result
866- connection.channel = new_channel
867- connection.drain_events = error_once(connection.drain_events)
868- connection.close = error_once(connection.close)
869- return connection
870- receiver = Receiver(config, patching_factory, queue.queue_name)
871- receiver.sentinel = b"arhh"
872- channel.basic_publish(
873- amqp.Message(b"arhh"), queue.exchange_name, routing_key="")
874- receiver.run_forever()
875- self.assertEqual([expected_report], reports)
876+ def new_channel():
877+ result = old_channel()
878+ result.basic_consume = error_once(result.basic_consume)
879+ result.basic_cancel = error_once(result.basic_cancel)
880+ result.close = error_once(result.close)
881+ return result
882+
883+ connection.channel = new_channel
884+ connection.drain_events = error_once(connection.drain_events)
885+ connection.close = error_once(connection.close)
886+ return connection
887+
888+ receiver = Receiver(config, patching_factory, queue_name)
889+ receiver.sentinel = b"arhh"
890+ channel.basic_publish(amqp.Message(b"arhh"), exchange_name, routing_key="")
891+ receiver.run_forever()
892+ assert reports == [expected_report]
893
894=== modified file 'tox.ini'
895--- tox.ini 2022-09-30 22:41:01 +0000
896+++ tox.ini 2022-10-03 10:15:17 +0000
897@@ -11,6 +11,5 @@
898 [testenv]
899 deps =
900 .[test]
901- pytest
902 commands =
903 pytest {posargs}

Subscribers

People subscribed via source and target branches

to all changes: