Merge lp:~jtv/launchpad/commandspawner into lp:launchpad

Proposed by Jeroen T. Vermeulen
Status: Merged
Approved by: Leonard Richardson
Approved revision: no longer in the source branch.
Merged at revision: 12300
Proposed branch: lp:~jtv/launchpad/commandspawner
Merge into: lp:launchpad
Diff against target: 803 lines (+691/-53)
4 files modified
lib/lp/archivepublisher/ftparchive.py (+29/-51)
lib/lp/services/command_spawner.py (+252/-0)
lib/lp/services/tests/test_command_spawner.py (+401/-0)
lib/lp/testing/fakemethod.py (+9/-2)
To merge this branch: bzr merge lp:~jtv/launchpad/commandspawner
Reviewer Review Type Date Requested Status
Leonard Richardson (community) Approve
Launchpad code reviewers code Pending
Review via email: mp+48226@code.launchpad.net

Commit message

[r=leonardr][ui=none][bug=181368] CommandSpawner.

Description of the change

= CommandSpawner =

In order to fix bug 181368, we need to parallelize some runs of external commands. There doesn't seem to be any good reusable way to do this, so this branch provides one: CommandSpawner. It lets you run multiple commands in parallel, like so:

spawner = CommandSpawner()
spawner.start("/usr/local/bin/frobnicate")
spawner.start(["wget", url])
spawner.start("do_other_processing")
spawner.complete()

This example runs three commands in parallel, as separate processes, and in the last line waits for them all to complete. There are optional callback hooks that let you deal with output and error output, as well as one for process completion. Two simple handler classes provide a starting point for these callback hooks.

(I also added two convenience methods to FakeMethod to facilitate tests. It may come in handy elsewhere.)

I also converted ftparchive to use the CommandSpawner, though I kept actual parallelization of the run separate to keep the branch under control.

To test this I ran all Soyuz tests, as well as all that mentioned "archive" or "publish." In addition a full EC2 run is underway.

For Q/A, we'll want to ensure that the publish-distro script still works as expected and still runs its usual apt-ftparchive command line. This takes long enough that it's easy to observe in "top" or "ps."

No lint,

Jeroen

To post a comment you must log in.
Revision history for this message
Leonard Richardson (leonardr) wrote :

This is a big new piece of code, and as lifeless said on IRC it could be a standalone module. But you have the test coverage necessary to give me confidence that it works. You've already pushed the changes we talked about on IRC, so r=me.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/lp/archivepublisher/ftparchive.py'
2--- lib/lp/archivepublisher/ftparchive.py 2010-12-19 22:47:25 +0000
3+++ lib/lp/archivepublisher/ftparchive.py 2011-02-01 20:06:35 +0000
4@@ -2,9 +2,7 @@
5 # GNU Affero General Public License version 3 (see the file LICENSE).
6
7 import os
8-from select import select
9 from StringIO import StringIO
10-import subprocess
11
12 from storm.expr import (
13 Desc,
14@@ -24,6 +22,11 @@
15 from lp.archivepublisher.utils import process_in_batches
16 from lp.registry.interfaces.pocket import PackagePublishingPocket
17 from lp.registry.model.sourcepackagename import SourcePackageName
18+from lp.services.command_spawner import (
19+ CommandSpawner,
20+ OutputLineHandler,
21+ ReturnCodeReceiver,
22+ )
23 from lp.soyuz.enums import PackagePublishingStatus
24 from lp.soyuz.model.component import Component
25 from lp.soyuz.model.section import Section
26@@ -46,49 +49,6 @@
27 os.makedirs(path, 0755)
28
29
30-# XXX malcc 2006-09-20 : Move this somewhere useful. If generalised with
31-# timeout handling and stderr passthrough, could be a single method used for
32-# this and the similar requirement in test_on_merge.py.
33-
34-def run_subprocess_with_logging(process_and_args, log, prefix):
35- """Run a subprocess, gathering the output as it runs and logging it.
36-
37- process_and_args is a list containing the process to run and the
38- arguments for it, just as passed in the first argument to
39- subprocess.Popen.
40-
41- log is a logger to pass the output we gather.
42-
43- prefix is a prefix to attach to each line of output when we log it.
44- """
45- proc = subprocess.Popen(process_and_args,
46- stdin=subprocess.PIPE,
47- stdout=subprocess.PIPE,
48- stderr=subprocess.PIPE,
49- close_fds=True)
50- proc.stdin.close()
51- open_readers = set([proc.stdout, proc.stderr])
52- buf = ""
53- while open_readers:
54- rlist, wlist, xlist = select(open_readers, [], [])
55-
56- for reader in rlist:
57- chunk = os.read(reader.fileno(), 1024)
58- if chunk == "":
59- open_readers.remove(reader)
60- if buf:
61- log.debug(buf)
62- else:
63- buf += chunk
64- lines = buf.split("\n")
65- for line in lines[0:-1]:
66- log.debug("%s%s" % (prefix, line))
67- buf = lines[-1]
68-
69- ret = proc.wait()
70- return ret
71-
72-
73 DEFAULT_COMPONENT = "main"
74
75 CONFIG_HEADER = """
76@@ -170,13 +130,31 @@
77 def runApt(self, apt_config_filename):
78 """Run apt in a subprocess and verify its return value. """
79 self.log.debug("Filepath: %s" % apt_config_filename)
80- ret = run_subprocess_with_logging(["apt-ftparchive", "--no-contents",
81- "generate", apt_config_filename],
82- self.log, "a-f: ")
83- if ret:
84+ # XXX JeroenVermeulen 2011-02-01 bug=181368: Run parallel
85+ # apt-ftparchive processes for the various architectures (plus
86+ # source).
87+ stdout_handler = OutputLineHandler(self.log.debug, "a-f: ")
88+ stderr_handler = OutputLineHandler(self.log.warning, "a-f: ")
89+ completion_handler = ReturnCodeReceiver()
90+ command = [
91+ "apt-ftparchive",
92+ "--no-contents",
93+ "generate",
94+ apt_config_filename,
95+ ]
96+ spawner = CommandSpawner()
97+ spawner.start(
98+ command, stdout_handler=stdout_handler,
99+ stderr_handler=stderr_handler,
100+ completion_handler=completion_handler)
101+ spawner.complete()
102+ stdout_handler.finalize()
103+ stderr_handler.finalize()
104+ if completion_handler.returncode != 0:
105 raise AssertionError(
106- "Failure from apt-ftparchive. Return code %s" % ret)
107- return ret
108+ "Failure from apt-ftparchive. Return code %s"
109+ % completion_handler.returncode)
110+ return completion_handler.returncode
111
112 #
113 # Empty Pocket Requests
114
115=== added file 'lib/lp/services/command_spawner.py'
116--- lib/lp/services/command_spawner.py 1970-01-01 00:00:00 +0000
117+++ lib/lp/services/command_spawner.py 2011-02-01 20:06:35 +0000
118@@ -0,0 +1,252 @@
119+# Copyright 2011 Canonical Ltd. This software is licensed under the
120+# GNU Affero General Public License version 3 (see the file LICENSE).
121+
122+"""Execute commands in parallel sub-processes."""
123+
124+__metaclass__ = type
125+__all__ = [
126+ 'CommandSpawner',
127+ 'OutputLineHandler',
128+ 'ReturnCodeReceiver',
129+ ]
130+
131+import errno
132+from fcntl import (
133+ fcntl,
134+ F_GETFL,
135+ F_SETFL,
136+ )
137+from os import O_NONBLOCK
138+import select
139+import subprocess
140+
141+
142+def get_process_output_files(process):
143+ """Return the files we watch for output coming from `process`."""
144+ return [
145+ process.stdout,
146+ process.stderr,
147+ ]
148+
149+
150+def make_files_nonblocking(files):
151+ """Put each of `files` in non-blocking mode.
152+
153+ This allows the `CommandSpawner` to read all available output from a
154+ process without blocking until the process completes.
155+ """
156+ for this_file in files:
157+ fcntl(this_file, F_SETFL, fcntl(this_file, F_GETFL) | O_NONBLOCK)
158+
159+
160+def has_pending_output(poll_event):
161+ """Does the given event mask from `poll` indicate there's data to read?"""
162+ input_mask = (select.POLLIN | select.POLLPRI)
163+ return (poll_event & input_mask) != 0
164+
165+
166+def has_terminated(poll_event):
167+ """Does the given event mask from `poll` indicate process death?"""
168+ death_mask = (select.POLLERR | select.POLLHUP | select.POLLNVAL)
169+ return (poll_event & death_mask) != 0
170+
171+
172+STDOUT = 1
173+STDERR = 2
174+COMPLETION = 3
175+
176+
177+class CommandSpawner:
178+ """Simple manager to execute commands in parallel.
179+
180+ Lets you run commands in sub-processes that will run simulaneously.
181+ The CommandSpawner looks for output from the running processes, and
182+ manages their cleanup.
183+
184+ The typical usage pattern is:
185+
186+ >>> spawner = CommandSpawner()
187+ >>> spawner.start(["echo", "One parallel process"])
188+ >>> spawner.start(["echo", "Another parallel process"])
189+ >>> spawner.complete()
190+
191+ There are facilities for processing output and error output from the
192+ sub-processes, as well as dealing with success and failure. You can
193+ pass callbacks to the `start` method, to be called when these events
194+ occur.
195+
196+ As yet there is no facility for feeding input to the processes.
197+ """
198+
199+ def __init__(self):
200+ self.running_processes = {}
201+ self.poll = select.poll()
202+
203+ def start(self, command, stdout_handler=None, stderr_handler=None,
204+ completion_handler=None):
205+ """Run `command` in a sub-process.
206+
207+ This starts the command, but does not wait for it to complete.
208+ Instead of waiting for completion, you can pass handlers that
209+ will be called when certain events occur.
210+
211+ :param command: Command line to execute in a sub-process. May be
212+ either a string (for a single executable name) or a list of
213+ strings (for an executable name plus arguments).
214+ :param stdout_handler: Callback to handle output received from the
215+ sub-process. Must take the output as its sole argument. May be
216+ called any number of times as output comes in.
217+ :param stderr_handler: Callback to handle error output received from
218+ the sub-process. Must take the output as its sole argument. May
219+ be called any number of times as output comes in.
220+ :param failure_handler: Callback to be invoked, exactly once, when the
221+ sub-process exits. Must take `command`'s numeric return code as
222+ its sole argument.
223+ """
224+ process = self._spawn(command)
225+ handlers = {
226+ STDOUT: stdout_handler,
227+ STDERR: stderr_handler,
228+ COMPLETION: completion_handler,
229+ }
230+ self.running_processes[process] = handlers
231+ pipes = get_process_output_files(process)
232+ for pipe in pipes:
233+ self.poll.register(pipe, select.POLLIN | select.POLLPRI)
234+ make_files_nonblocking(pipes)
235+
236+ def communicate(self):
237+ """Execute one iteration of the main event loop. Blocks."""
238+ # Poll for output, but also wake up periodically to check for
239+ # completed processes.
240+ milliseconds = 1
241+ poll_result = self.poll.poll(milliseconds)
242+
243+ # Map each file descriptor to its poll events.
244+ events_by_fd = dict(poll_result)
245+
246+ # Iterate over a copy of the processes list: we may be removing
247+ # items from the original as processes complete.
248+ processes = self.running_processes.keys()
249+ for process in processes:
250+ self._service(process, events_by_fd)
251+ if process.returncode is not None:
252+ # Process has completed. Remove it.
253+ try:
254+ self._handle(process, COMPLETION, process.returncode)
255+ finally:
256+ for pipe in get_process_output_files(process):
257+ self.poll.unregister(pipe)
258+ del self.running_processes[process]
259+
260+ def complete(self):
261+ """Run `self.communicate` until all sub-processes have completed."""
262+ while len(self.running_processes) > 0:
263+ self.communicate()
264+
265+ def kill(self):
266+ """Kill any remaining child processes.
267+
268+ You'll still need to call `complete` to make sure that the child
269+ processes are cleaned up. Until then, they will stay around as
270+ zombies.
271+ """
272+ for process in self.running_processes.iterkeys():
273+ process.terminate()
274+
275+ def _spawn(self, command):
276+ """Spawn a sub-process for `command`. Overridable in tests."""
277+ return subprocess.Popen(
278+ command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
279+ close_fds=True)
280+
281+ def _handle(self, process, event, *args):
282+ """If we have a handler for `event` on `process`, call it."""
283+ process_handlers = self.running_processes[process]
284+ handler = process_handlers.get(event)
285+ if handler is not None:
286+ handler(*args)
287+
288+ def _read(self, process, pipe_file, event):
289+ """Read output from `pipe_file`."""
290+ try:
291+ output = pipe_file.read()
292+ except IOError, e:
293+ if e.errno != errno.EAGAIN:
294+ # "Resource temporarily unavailable"--not an error
295+ # really, just means there's nothing to read.
296+ raise
297+ else:
298+ if len(output) > 0:
299+ self._handle(process, event, output)
300+
301+ def _service(self, process, events_by_fd):
302+ """Service `process`."""
303+ stdout_events = events_by_fd.get(process.stdout.fileno(), 0)
304+ stderr_events = events_by_fd.get(process.stderr.fileno(), 0)
305+ if has_pending_output(stdout_events):
306+ self._read(process, process.stdout, STDOUT)
307+ if has_pending_output(stderr_events):
308+ self._read(process, process.stderr, STDERR)
309+ if has_terminated(stdout_events):
310+ process.wait()
311+
312+
313+class OutputLineHandler:
314+ """Collect and handle lines of output from a sub-process.
315+
316+ Output received from a sub-process may not be neatly broken down by
317+ line. This class collects them into lines and processes them one by
318+ one. If desired, it can also add a prefix to each.
319+ """
320+
321+ def __init__(self, line_processor, prefix=""):
322+ """Set up an output line handler.
323+
324+ :param line_processor: A callback to be invoked for each line of
325+ output received. Will receive exactly one argument: a single
326+ nonempty line of text, without the trailing newline.
327+ :param prefix: An optional string to be prefixed to each line of
328+ output before it is sent into the `line_processor`.
329+ """
330+ self.line_processor = line_processor
331+ self.prefix = prefix
332+ self.incomplete_buffer = ""
333+
334+ def process_line(self, line):
335+ """Process a single line of output."""
336+ if line != "":
337+ self.line_processor("%s%s" % (self.prefix, line))
338+
339+ def __call__(self, output):
340+ """Process multi-line output.
341+
342+ Any trailing text not (yet) terminated with a newline is buffered.
343+ """
344+ lines = (self.incomplete_buffer + output).split("\n")
345+ if not output.endswith("\n") and len(lines) > 0:
346+ self.incomplete_buffer = lines[-1]
347+ lines = lines[:-1]
348+ for line in lines:
349+ self.process_line(line)
350+
351+ def finalize(self):
352+ """Process the remaining incomplete line, if any."""
353+ if self.incomplete_buffer != "":
354+ self.process_line(self.incomplete_buffer)
355+ self.incomplete_buffer = ""
356+
357+
358+class ReturnCodeReceiver:
359+ """A minimal completion handler for `CommandSpawner` processes.
360+
361+ Does nothing but collect the command's return code.
362+
363+ :ivar returncode: The numerical return code retrieved from the
364+ process. Stays None until the process completes.
365+ """
366+
367+ returncode = None
368+
369+ def __call__(self, returncode):
370+ self.returncode = returncode
371
372=== added file 'lib/lp/services/tests/test_command_spawner.py'
373--- lib/lp/services/tests/test_command_spawner.py 1970-01-01 00:00:00 +0000
374+++ lib/lp/services/tests/test_command_spawner.py 2011-02-01 20:06:35 +0000
375@@ -0,0 +1,401 @@
376+# Copyright 2011 Canonical Ltd. This software is licensed under the
377+# GNU Affero General Public License version 3 (see the file LICENSE).
378+
379+"""Tests for `CommandSpawner`."""
380+
381+__metaclass__ = type
382+
383+from datetime import (
384+ datetime,
385+ timedelta,
386+ )
387+from fcntl import (
388+ fcntl,
389+ F_GETFL,
390+ )
391+from os import (
392+ fdopen,
393+ O_NONBLOCK,
394+ pipe,
395+ )
396+from pytz import utc
397+from testtools.matchers import LessThan
398+
399+from lp.testing import TestCase
400+from lp.testing.fakemethod import FakeMethod
401+from lp.services.command_spawner import (
402+ CommandSpawner,
403+ OutputLineHandler,
404+ ReturnCodeReceiver,
405+ )
406+
407+
408+def make_pipe():
409+ """Create a pipe of `file` objects."""
410+ r, w = pipe()
411+ return fdopen(r, 'r'), fdopen(w, 'w')
412+
413+
414+def write_and_flush(pipe, text):
415+ """Write `text` into `pipe`, and flush."""
416+ pipe.write(text)
417+ pipe.flush()
418+
419+
420+class FakeProcess:
421+ """Fake `subprocess.Popen` result."""
422+
423+ def __init__(self, returncode=None):
424+ self.returncode = returncode
425+ self.stdout, self.stdout_sink = make_pipe()
426+ self.stderr, self.stderr_sink = make_pipe()
427+
428+
429+def instrument_spawn(spawner, process):
430+ """Instrument `spawner` to spawn a fake process."""
431+ spawner._spawn = FakeMethod(result=process)
432+
433+
434+def is_nonblocking(this_file):
435+ """Is `this_file` in non-blocking mode?"""
436+ flags = fcntl(this_file, F_GETFL)
437+ return flags & O_NONBLOCK != 0
438+
439+
440+class TestCommandSpawner(TestCase):
441+ """Unit tests for `CommandSpawner`.
442+
443+ Uses fake processes, so does not test all the way down to the bare metal.
444+ Commands are not actually run.
445+ """
446+
447+ def _makeSpawnerAndProcess(self, returncode=None):
448+ """Create a `CommandSpawner` and instrument it with a `FakeProcess`.
449+
450+ :return: A tuple of the spawner and the fake process it will "run."
451+ """
452+ spawner = CommandSpawner()
453+ process = FakeProcess(returncode=returncode)
454+ instrument_spawn(spawner, process)
455+ return spawner, process
456+
457+ def test_starts_out_with_no_processes(self):
458+ spawner = CommandSpawner()
459+ self.assertEqual({}, spawner.running_processes)
460+
461+ def test_completes_with_no_processes(self):
462+ spawner = CommandSpawner()
463+ spawner.complete()
464+ self.assertEqual({}, spawner.running_processes)
465+
466+ def test_kill_works_with_no_processes(self):
467+ spawner = CommandSpawner()
468+ spawner.kill()
469+ self.assertEqual({}, spawner.running_processes)
470+
471+ def test_start_adds_a_process(self):
472+ spawner, process = self._makeSpawnerAndProcess()
473+ spawner.start("/bin/true")
474+ self.assertEqual([process], spawner.running_processes.keys())
475+
476+ def test_start_runs_its_command(self):
477+ spawner, process = self._makeSpawnerAndProcess()
478+ spawner.start("/bin/true")
479+ spawn_calls = spawner._spawn.calls
480+ self.assertEqual([("/bin/true", )], spawner._spawn.extract_args())
481+
482+ def test_output_is_nonblocking(self):
483+ spawner, process = self._makeSpawnerAndProcess()
484+ spawner.start("/bin/true")
485+ self.assertTrue(is_nonblocking(process.stdout))
486+ self.assertTrue(is_nonblocking(process.stderr))
487+
488+ def test_can_add_multiple_processes(self):
489+ spawner = CommandSpawner()
490+
491+ first_process = FakeProcess()
492+ instrument_spawn(spawner, first_process)
493+ spawner.start(["/bin/echo", "1"])
494+
495+ second_process = FakeProcess()
496+ instrument_spawn(spawner, second_process)
497+ spawner.start(["/bin/echo", "2"])
498+
499+ self.assertContentEqual(
500+ [first_process, second_process], spawner.running_processes)
501+
502+ def test_kill_terminates_processes(self):
503+ spawner, process = self._makeSpawnerAndProcess()
504+ process.terminate = FakeMethod()
505+ spawner.start("/bin/cat")
506+ spawner.kill()
507+ self.assertNotEqual(0, process.terminate.call_count)
508+
509+ def test_handles_output(self):
510+ spawner, process = self._makeSpawnerAndProcess()
511+ stdout_handler = FakeMethod()
512+ spawner.start("ls", stdout_handler=stdout_handler)
513+ write_and_flush(process.stdout_sink, "readme.txt\n")
514+ spawner.communicate()
515+ self.assertEqual([("readme.txt\n", )], stdout_handler.extract_args())
516+
517+ def test_handles_error_output(self):
518+ spawner, process = self._makeSpawnerAndProcess()
519+ stderr_handler = FakeMethod()
520+ spawner.start("ls", stderr_handler=stderr_handler)
521+ write_and_flush(process.stderr_sink, "File not found.\n")
522+ spawner.communicate()
523+ self.assertEqual(
524+ [("File not found.\n", )], stderr_handler.extract_args())
525+
526+ def test_does_not_call_completion_handler_until_completion(self):
527+ spawner, process = self._makeSpawnerAndProcess(returncode=None)
528+ completion_handler = FakeMethod()
529+ spawner.start("echo", completion_handler=completion_handler)
530+ spawner.communicate()
531+ self.assertEqual(0, completion_handler.call_count)
532+
533+ def test_calls_completion_handler_on_success(self):
534+ spawner, process = self._makeSpawnerAndProcess(returncode=0)
535+ completion_handler = FakeMethod()
536+ spawner.start("echo", completion_handler=completion_handler)
537+ spawner.complete()
538+ self.assertEqual(1, completion_handler.call_count)
539+
540+ def test_calls_completion_handler_on_failure(self):
541+ spawner, process = self._makeSpawnerAndProcess(returncode=1)
542+ completion_handler = FakeMethod()
543+ spawner.start("echo", completion_handler=completion_handler)
544+ spawner.complete()
545+ self.assertEqual(1, completion_handler.call_count)
546+
547+ def test_does_not_call_completion_handler_twice(self):
548+ spawner, process = self._makeSpawnerAndProcess(returncode=0)
549+ completion_handler = FakeMethod()
550+ spawner.start("echo", completion_handler=completion_handler)
551+ spawner.complete()
552+ spawner.complete()
553+ self.assertEqual(1, completion_handler.call_count)
554+
555+ def test_passes_return_code_to_completion_handler(self):
556+ spawner, process = self._makeSpawnerAndProcess(returncode=101)
557+ completion_handler = FakeMethod()
558+ spawner.start("echo", completion_handler=completion_handler)
559+ spawner.complete()
560+ self.assertEqual(((101, ), {}), completion_handler.calls[-1])
561+
562+ def test_handles_output_before_completion(self):
563+ spawner, process = self._makeSpawnerAndProcess(returncode=0)
564+ handler = FakeMethod()
565+ spawner.start(
566+ "hello", stdout_handler=handler, completion_handler=handler)
567+ write_and_flush(process.stdout_sink, "Hello\n")
568+ spawner.complete()
569+ self.assertEqual([("Hello\n", ), (0, )], handler.extract_args())
570+
571+ def test_handles_multiple_processes(self):
572+ spawner = CommandSpawner()
573+ handler = FakeMethod()
574+
575+ first_process = FakeProcess(returncode=1)
576+ instrument_spawn(spawner, first_process)
577+ spawner.start(["/bin/echo", "1"], completion_handler=handler)
578+
579+ second_process = FakeProcess(returncode=2)
580+ instrument_spawn(spawner, second_process)
581+ spawner.start(["/bin/echo", "2"], completion_handler=handler)
582+
583+ spawner.complete()
584+ self.assertContentEqual([(1, ), (2, )], handler.extract_args())
585+
586+
587+class AcceptOutput:
588+ """Simple stdout or stderr handler."""
589+
590+ def __call__(self, output):
591+ self.output = output
592+
593+
594+class TestCommandSpawnerAcceptance(TestCase):
595+ """Acceptance tests for `CommandSpawner`.
596+
597+ This test spawns actual processes, so be careful:
598+ * Think about security when running commands.
599+ * Don't rely on nonstandard commands.
600+ * Don't hold up the test suite with slow commands.
601+ """
602+
603+ def _makeSpawner(self):
604+ """Create a `CommandSpawner`, and make sure it gets cleaned up."""
605+ spawner = CommandSpawner()
606+ self.addCleanup(spawner.complete)
607+ self.addCleanup(spawner.kill)
608+ return spawner
609+
610+ def test_command_can_be_string(self):
611+ spawner = self._makeSpawner()
612+ spawner.start("/bin/pwd")
613+ spawner.complete()
614+
615+ def test_command_can_be_list(self):
616+ spawner = self._makeSpawner()
617+ spawner.start(["/bin/pwd"])
618+ spawner.complete()
619+
620+ def test_calls_stdout_handler(self):
621+ spawner = self._makeSpawner()
622+ stdout_handler = AcceptOutput()
623+ spawner.start(["echo", "hi"], stdout_handler=stdout_handler)
624+ spawner.complete()
625+ self.assertEqual("hi\n", stdout_handler.output)
626+
627+ def test_calls_completion_handler(self):
628+ spawner = self._makeSpawner()
629+ completion_handler = ReturnCodeReceiver()
630+ spawner.start("/bin/true", completion_handler=completion_handler)
631+ spawner.complete()
632+ self.assertEqual(0, completion_handler.returncode)
633+
634+ def test_communicate_returns_after_event(self):
635+ spawner = self._makeSpawner()
636+ before = datetime.now(utc)
637+ spawner.start(["/bin/sleep", "10"])
638+ spawner.start("/bin/pwd")
639+ spawner.communicate()
640+ after = datetime.now(utc)
641+ self.assertThat(after - before, LessThan(timedelta(seconds=10)))
642+
643+ def test_kill_terminates_processes(self):
644+ spawner = self._makeSpawner()
645+ spawner.start(["/bin/sleep", "10"])
646+ spawner.start(["/bin/sleep", "10"])
647+ before = datetime.now(utc)
648+ spawner.kill()
649+ spawner.complete()
650+ after = datetime.now(utc)
651+ self.assertThat(after - before, LessThan(timedelta(seconds=10)))
652+
653+ def test_start_does_not_block(self):
654+ spawner = self._makeSpawner()
655+ before = datetime.now(utc)
656+ spawner.start(["/bin/sleep", "10"])
657+ after = datetime.now(utc)
658+ self.assertThat(after - before, LessThan(timedelta(seconds=10)))
659+
660+ def test_subprocesses_run_in_parallel(self):
661+ spawner = self._makeSpawner()
662+
663+ processes = 10
664+ seconds = 0.2
665+ for counter in xrange(processes):
666+ spawner.start(["/bin/sleep", str(seconds)])
667+
668+ before = datetime.now(utc)
669+ spawner.complete()
670+ after = datetime.now(utc)
671+
672+ sequential_time = timedelta(seconds=(seconds * processes))
673+ self.assertThat(after - before, LessThan(sequential_time))
674+
675+ def test_integrates_with_outputlinehandler(self):
676+ spawner = self._makeSpawner()
677+ handler = OutputLineHandler(FakeMethod())
678+ spawner.start(["echo", "hello"], stdout_handler=handler)
679+ spawner.complete()
680+ self.assertEqual([("hello", )], handler.line_processor.extract_args())
681+
682+ def test_integrates_with_returncodereceiver(self):
683+ spawner = self._makeSpawner()
684+ handler = ReturnCodeReceiver()
685+ spawner.start("/bin/true", completion_handler=handler)
686+ spawner.complete()
687+ self.assertEqual(0, handler.returncode)
688+
689+
690+class TestOutputLineHandler(TestCase):
691+ """Unit tests for `OutputLineHandler`."""
692+
693+ def setUp(self):
694+ super(TestOutputLineHandler, self).setUp()
695+ self.handler = OutputLineHandler(FakeMethod())
696+
697+ def _getLines(self):
698+ """Get the lines that were passed to `handler`'s line processor."""
699+ return [
700+ line
701+ for (line, ) in self.handler.line_processor.extract_args()]
702+
703+ def test_processes_line(self):
704+ self.handler("x\n")
705+ self.assertEqual(["x"], self._getLines())
706+
707+ def test_buffers_partial_line(self):
708+ self.handler("x")
709+ self.assertEqual([], self._getLines())
710+
711+ def test_splits_lines(self):
712+ self.handler("a\nb\n")
713+ self.assertEqual(["a", "b"], self._getLines())
714+
715+ def test_ignores_empty_output(self):
716+ self.handler("")
717+ self.assertEqual([], self._getLines())
718+
719+ def test_finalize_ignores_empty_output(self):
720+ self.handler("")
721+ self.handler.finalize()
722+ self.assertEqual([], self._getLines())
723+
724+ def test_ignores_empty_line(self):
725+ self.handler("\n")
726+ self.assertEqual([], self._getLines())
727+
728+ def test_joins_partial_lines(self):
729+ self.handler("h")
730+ self.handler("i\n")
731+ self.assertEqual(["hi"], self._getLines())
732+
733+ def test_joins_lines_across_multiple_calls(self):
734+ self.handler("h")
735+ self.handler("i")
736+ self.handler("!\n")
737+ self.assertEqual(["hi!"], self._getLines())
738+
739+ def test_joins_lines_across_empty_calls(self):
740+ self.handler("h")
741+ self.handler("")
742+ self.handler("i\n")
743+ self.assertEqual(["hi"], self._getLines())
744+
745+ def test_finalize_processes_remaining_partial_line(self):
746+ self.handler("foo")
747+ self.handler.finalize()
748+ self.assertEqual(["foo"], self._getLines())
749+
750+ def test_finalize_is_idempotent(self):
751+ self.handler("foo")
752+ self.handler.finalize()
753+ self.handler.finalize()
754+ self.assertEqual(["foo"], self._getLines())
755+
756+ def test_finalize_joins_partial_lines(self):
757+ self.handler("h")
758+ self.handler("i")
759+ self.handler.finalize()
760+ self.assertEqual(["hi"], self._getLines())
761+
762+ def test_adds_prefix(self):
763+ self.handler.prefix = "->"
764+ self.handler("here\n")
765+ self.assertEqual(["->here"], self._getLines())
766+
767+ def test_finalize_adds_prefix(self):
768+ self.handler.prefix = "->"
769+ self.handler("here")
770+ self.handler.finalize()
771+ self.assertEqual(["->here"], self._getLines())
772+
773+ def test_empty_lines_are_ignored_despite_prefix(self):
774+ self.handler.prefix = "->"
775+ self.handler("\n")
776+ self.assertEqual([], self._getLines())
777
778=== modified file 'lib/lp/testing/fakemethod.py'
779--- lib/lp/testing/fakemethod.py 2010-04-01 09:39:20 +0000
780+++ lib/lp/testing/fakemethod.py 2011-02-01 20:06:35 +0000
781@@ -1,9 +1,8 @@
782-# Copyright 2009, 2010 Canonical Ltd. This software is licensed under the
783+# Copyright 2009-2011 Canonical Ltd. This software is licensed under the
784 # GNU Affero General Public License version 3 (see the file LICENSE).
785
786 # pylint: disable-msg=E0702
787
788-
789 __metaclass__ = type
790 __all__ = [
791 'FakeMethod',
792@@ -55,3 +54,11 @@
793 @property
794 def call_count(self):
795 return len(self.calls)
796+
797+ def extract_args(self):
798+ """Return just the calls' positional-arguments tuples."""
799+ return [args for args, kwargs in self.calls]
800+
801+ def extract_kwargs(self):
802+ """Return just the calls' keyword-arguments dicts."""
803+ return [kwargs for args, kwargs in self.calls]