Merge lp:~jtv/launchpad/commandspawner into lp:launchpad

Proposed by Jeroen T. Vermeulen
Status: Merged
Approved by: Leonard Richardson
Approved revision: no longer in the source branch.
Merged at revision: 12300
Proposed branch: lp:~jtv/launchpad/commandspawner
Merge into: lp:launchpad
Diff against target: 803 lines (+691/-53)
4 files modified
lib/lp/archivepublisher/ftparchive.py (+29/-51)
lib/lp/services/command_spawner.py (+252/-0)
lib/lp/services/tests/test_command_spawner.py (+401/-0)
lib/lp/testing/fakemethod.py (+9/-2)
To merge this branch: bzr merge lp:~jtv/launchpad/commandspawner
Reviewer Review Type Date Requested Status
Leonard Richardson (community) Approve
Launchpad code reviewers code Pending
Review via email: mp+48226@code.launchpad.net

Commit message

[r=leonardr][ui=none][bug=181368] CommandSpawner.

Description of the change

= CommandSpawner =

In order to fix bug 181368, we need to parallelize some runs of external commands. There doesn't seem to be any good reusable way to do this, so this branch provides one: CommandSpawner. It lets you run multiple commands in parallel, like so:

spawner = CommandSpawner()
spawner.start("/usr/local/bin/frobnicate")
spawner.start(["wget", url])
spawner.start("do_other_processing")
spawner.complete()

This example runs three commands in parallel, as separate processes, and in the last line waits for them all to complete. There are optional callback hooks that let you deal with output and error output, as well as one for process completion. Two simple handler classes provide a starting point for these callback hooks.

(I also added two convenience methods to FakeMethod to facilitate tests. It may come in handy elsewhere.)

I also converted ftparchive to use the CommandSpawner, though I kept actual parallelization of the run separate to keep the branch under control.

To test this I ran all Soyuz tests, as well as all that mentioned "archive" or "publish." In addition a full EC2 run is underway.

For Q/A, we'll want to ensure that the publish-distro script still works as expected and still runs its usual apt-ftparchive command line. This takes long enough that it's easy to observe in "top" or "ps."

No lint,

Jeroen

To post a comment you must log in.
Revision history for this message
Leonard Richardson (leonardr) wrote :

This is a big new piece of code, and as lifeless said on IRC it could be a standalone module. But you have the test coverage necessary to give me confidence that it works. You've already pushed the changes we talked about on IRC, so r=me.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'lib/lp/archivepublisher/ftparchive.py'
--- lib/lp/archivepublisher/ftparchive.py 2010-12-19 22:47:25 +0000
+++ lib/lp/archivepublisher/ftparchive.py 2011-02-01 20:06:35 +0000
@@ -2,9 +2,7 @@
2# GNU Affero General Public License version 3 (see the file LICENSE).2# GNU Affero General Public License version 3 (see the file LICENSE).
33
4import os4import os
5from select import select
6from StringIO import StringIO5from StringIO import StringIO
7import subprocess
86
9from storm.expr import (7from storm.expr import (
10 Desc,8 Desc,
@@ -24,6 +22,11 @@
24from lp.archivepublisher.utils import process_in_batches22from lp.archivepublisher.utils import process_in_batches
25from lp.registry.interfaces.pocket import PackagePublishingPocket23from lp.registry.interfaces.pocket import PackagePublishingPocket
26from lp.registry.model.sourcepackagename import SourcePackageName24from lp.registry.model.sourcepackagename import SourcePackageName
25from lp.services.command_spawner import (
26 CommandSpawner,
27 OutputLineHandler,
28 ReturnCodeReceiver,
29 )
27from lp.soyuz.enums import PackagePublishingStatus30from lp.soyuz.enums import PackagePublishingStatus
28from lp.soyuz.model.component import Component31from lp.soyuz.model.component import Component
29from lp.soyuz.model.section import Section32from lp.soyuz.model.section import Section
@@ -46,49 +49,6 @@
46 os.makedirs(path, 0755)49 os.makedirs(path, 0755)
4750
4851
49# XXX malcc 2006-09-20 : Move this somewhere useful. If generalised with
50# timeout handling and stderr passthrough, could be a single method used for
51# this and the similar requirement in test_on_merge.py.
52
53def run_subprocess_with_logging(process_and_args, log, prefix):
54 """Run a subprocess, gathering the output as it runs and logging it.
55
56 process_and_args is a list containing the process to run and the
57 arguments for it, just as passed in the first argument to
58 subprocess.Popen.
59
60 log is a logger to pass the output we gather.
61
62 prefix is a prefix to attach to each line of output when we log it.
63 """
64 proc = subprocess.Popen(process_and_args,
65 stdin=subprocess.PIPE,
66 stdout=subprocess.PIPE,
67 stderr=subprocess.PIPE,
68 close_fds=True)
69 proc.stdin.close()
70 open_readers = set([proc.stdout, proc.stderr])
71 buf = ""
72 while open_readers:
73 rlist, wlist, xlist = select(open_readers, [], [])
74
75 for reader in rlist:
76 chunk = os.read(reader.fileno(), 1024)
77 if chunk == "":
78 open_readers.remove(reader)
79 if buf:
80 log.debug(buf)
81 else:
82 buf += chunk
83 lines = buf.split("\n")
84 for line in lines[0:-1]:
85 log.debug("%s%s" % (prefix, line))
86 buf = lines[-1]
87
88 ret = proc.wait()
89 return ret
90
91
92DEFAULT_COMPONENT = "main"52DEFAULT_COMPONENT = "main"
9353
94CONFIG_HEADER = """54CONFIG_HEADER = """
@@ -170,13 +130,31 @@
170 def runApt(self, apt_config_filename):130 def runApt(self, apt_config_filename):
171 """Run apt in a subprocess and verify its return value. """131 """Run apt in a subprocess and verify its return value. """
172 self.log.debug("Filepath: %s" % apt_config_filename)132 self.log.debug("Filepath: %s" % apt_config_filename)
173 ret = run_subprocess_with_logging(["apt-ftparchive", "--no-contents",133 # XXX JeroenVermeulen 2011-02-01 bug=181368: Run parallel
174 "generate", apt_config_filename],134 # apt-ftparchive processes for the various architectures (plus
175 self.log, "a-f: ")135 # source).
176 if ret:136 stdout_handler = OutputLineHandler(self.log.debug, "a-f: ")
137 stderr_handler = OutputLineHandler(self.log.warning, "a-f: ")
138 completion_handler = ReturnCodeReceiver()
139 command = [
140 "apt-ftparchive",
141 "--no-contents",
142 "generate",
143 apt_config_filename,
144 ]
145 spawner = CommandSpawner()
146 spawner.start(
147 command, stdout_handler=stdout_handler,
148 stderr_handler=stderr_handler,
149 completion_handler=completion_handler)
150 spawner.complete()
151 stdout_handler.finalize()
152 stderr_handler.finalize()
153 if completion_handler.returncode != 0:
177 raise AssertionError(154 raise AssertionError(
178 "Failure from apt-ftparchive. Return code %s" % ret)155 "Failure from apt-ftparchive. Return code %s"
179 return ret156 % completion_handler.returncode)
157 return completion_handler.returncode
180158
181 #159 #
182 # Empty Pocket Requests160 # Empty Pocket Requests
183161
=== added file 'lib/lp/services/command_spawner.py'
--- lib/lp/services/command_spawner.py 1970-01-01 00:00:00 +0000
+++ lib/lp/services/command_spawner.py 2011-02-01 20:06:35 +0000
@@ -0,0 +1,252 @@
1# Copyright 2011 Canonical Ltd. This software is licensed under the
2# GNU Affero General Public License version 3 (see the file LICENSE).
3
4"""Execute commands in parallel sub-processes."""
5
6__metaclass__ = type
7__all__ = [
8 'CommandSpawner',
9 'OutputLineHandler',
10 'ReturnCodeReceiver',
11 ]
12
13import errno
14from fcntl import (
15 fcntl,
16 F_GETFL,
17 F_SETFL,
18 )
19from os import O_NONBLOCK
20import select
21import subprocess
22
23
24def get_process_output_files(process):
25 """Return the files we watch for output coming from `process`."""
26 return [
27 process.stdout,
28 process.stderr,
29 ]
30
31
32def make_files_nonblocking(files):
33 """Put each of `files` in non-blocking mode.
34
35 This allows the `CommandSpawner` to read all available output from a
36 process without blocking until the process completes.
37 """
38 for this_file in files:
39 fcntl(this_file, F_SETFL, fcntl(this_file, F_GETFL) | O_NONBLOCK)
40
41
42def has_pending_output(poll_event):
43 """Does the given event mask from `poll` indicate there's data to read?"""
44 input_mask = (select.POLLIN | select.POLLPRI)
45 return (poll_event & input_mask) != 0
46
47
48def has_terminated(poll_event):
49 """Does the given event mask from `poll` indicate process death?"""
50 death_mask = (select.POLLERR | select.POLLHUP | select.POLLNVAL)
51 return (poll_event & death_mask) != 0
52
53
54STDOUT = 1
55STDERR = 2
56COMPLETION = 3
57
58
59class CommandSpawner:
60 """Simple manager to execute commands in parallel.
61
62 Lets you run commands in sub-processes that will run simulaneously.
63 The CommandSpawner looks for output from the running processes, and
64 manages their cleanup.
65
66 The typical usage pattern is:
67
68 >>> spawner = CommandSpawner()
69 >>> spawner.start(["echo", "One parallel process"])
70 >>> spawner.start(["echo", "Another parallel process"])
71 >>> spawner.complete()
72
73 There are facilities for processing output and error output from the
74 sub-processes, as well as dealing with success and failure. You can
75 pass callbacks to the `start` method, to be called when these events
76 occur.
77
78 As yet there is no facility for feeding input to the processes.
79 """
80
81 def __init__(self):
82 self.running_processes = {}
83 self.poll = select.poll()
84
85 def start(self, command, stdout_handler=None, stderr_handler=None,
86 completion_handler=None):
87 """Run `command` in a sub-process.
88
89 This starts the command, but does not wait for it to complete.
90 Instead of waiting for completion, you can pass handlers that
91 will be called when certain events occur.
92
93 :param command: Command line to execute in a sub-process. May be
94 either a string (for a single executable name) or a list of
95 strings (for an executable name plus arguments).
96 :param stdout_handler: Callback to handle output received from the
97 sub-process. Must take the output as its sole argument. May be
98 called any number of times as output comes in.
99 :param stderr_handler: Callback to handle error output received from
100 the sub-process. Must take the output as its sole argument. May
101 be called any number of times as output comes in.
102 :param failure_handler: Callback to be invoked, exactly once, when the
103 sub-process exits. Must take `command`'s numeric return code as
104 its sole argument.
105 """
106 process = self._spawn(command)
107 handlers = {
108 STDOUT: stdout_handler,
109 STDERR: stderr_handler,
110 COMPLETION: completion_handler,
111 }
112 self.running_processes[process] = handlers
113 pipes = get_process_output_files(process)
114 for pipe in pipes:
115 self.poll.register(pipe, select.POLLIN | select.POLLPRI)
116 make_files_nonblocking(pipes)
117
118 def communicate(self):
119 """Execute one iteration of the main event loop. Blocks."""
120 # Poll for output, but also wake up periodically to check for
121 # completed processes.
122 milliseconds = 1
123 poll_result = self.poll.poll(milliseconds)
124
125 # Map each file descriptor to its poll events.
126 events_by_fd = dict(poll_result)
127
128 # Iterate over a copy of the processes list: we may be removing
129 # items from the original as processes complete.
130 processes = self.running_processes.keys()
131 for process in processes:
132 self._service(process, events_by_fd)
133 if process.returncode is not None:
134 # Process has completed. Remove it.
135 try:
136 self._handle(process, COMPLETION, process.returncode)
137 finally:
138 for pipe in get_process_output_files(process):
139 self.poll.unregister(pipe)
140 del self.running_processes[process]
141
142 def complete(self):
143 """Run `self.communicate` until all sub-processes have completed."""
144 while len(self.running_processes) > 0:
145 self.communicate()
146
147 def kill(self):
148 """Kill any remaining child processes.
149
150 You'll still need to call `complete` to make sure that the child
151 processes are cleaned up. Until then, they will stay around as
152 zombies.
153 """
154 for process in self.running_processes.iterkeys():
155 process.terminate()
156
157 def _spawn(self, command):
158 """Spawn a sub-process for `command`. Overridable in tests."""
159 return subprocess.Popen(
160 command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
161 close_fds=True)
162
163 def _handle(self, process, event, *args):
164 """If we have a handler for `event` on `process`, call it."""
165 process_handlers = self.running_processes[process]
166 handler = process_handlers.get(event)
167 if handler is not None:
168 handler(*args)
169
170 def _read(self, process, pipe_file, event):
171 """Read output from `pipe_file`."""
172 try:
173 output = pipe_file.read()
174 except IOError, e:
175 if e.errno != errno.EAGAIN:
176 # "Resource temporarily unavailable"--not an error
177 # really, just means there's nothing to read.
178 raise
179 else:
180 if len(output) > 0:
181 self._handle(process, event, output)
182
183 def _service(self, process, events_by_fd):
184 """Service `process`."""
185 stdout_events = events_by_fd.get(process.stdout.fileno(), 0)
186 stderr_events = events_by_fd.get(process.stderr.fileno(), 0)
187 if has_pending_output(stdout_events):
188 self._read(process, process.stdout, STDOUT)
189 if has_pending_output(stderr_events):
190 self._read(process, process.stderr, STDERR)
191 if has_terminated(stdout_events):
192 process.wait()
193
194
195class OutputLineHandler:
196 """Collect and handle lines of output from a sub-process.
197
198 Output received from a sub-process may not be neatly broken down by
199 line. This class collects them into lines and processes them one by
200 one. If desired, it can also add a prefix to each.
201 """
202
203 def __init__(self, line_processor, prefix=""):
204 """Set up an output line handler.
205
206 :param line_processor: A callback to be invoked for each line of
207 output received. Will receive exactly one argument: a single
208 nonempty line of text, without the trailing newline.
209 :param prefix: An optional string to be prefixed to each line of
210 output before it is sent into the `line_processor`.
211 """
212 self.line_processor = line_processor
213 self.prefix = prefix
214 self.incomplete_buffer = ""
215
216 def process_line(self, line):
217 """Process a single line of output."""
218 if line != "":
219 self.line_processor("%s%s" % (self.prefix, line))
220
221 def __call__(self, output):
222 """Process multi-line output.
223
224 Any trailing text not (yet) terminated with a newline is buffered.
225 """
226 lines = (self.incomplete_buffer + output).split("\n")
227 if not output.endswith("\n") and len(lines) > 0:
228 self.incomplete_buffer = lines[-1]
229 lines = lines[:-1]
230 for line in lines:
231 self.process_line(line)
232
233 def finalize(self):
234 """Process the remaining incomplete line, if any."""
235 if self.incomplete_buffer != "":
236 self.process_line(self.incomplete_buffer)
237 self.incomplete_buffer = ""
238
239
240class ReturnCodeReceiver:
241 """A minimal completion handler for `CommandSpawner` processes.
242
243 Does nothing but collect the command's return code.
244
245 :ivar returncode: The numerical return code retrieved from the
246 process. Stays None until the process completes.
247 """
248
249 returncode = None
250
251 def __call__(self, returncode):
252 self.returncode = returncode
0253
=== added file 'lib/lp/services/tests/test_command_spawner.py'
--- lib/lp/services/tests/test_command_spawner.py 1970-01-01 00:00:00 +0000
+++ lib/lp/services/tests/test_command_spawner.py 2011-02-01 20:06:35 +0000
@@ -0,0 +1,401 @@
1# Copyright 2011 Canonical Ltd. This software is licensed under the
2# GNU Affero General Public License version 3 (see the file LICENSE).
3
4"""Tests for `CommandSpawner`."""
5
6__metaclass__ = type
7
8from datetime import (
9 datetime,
10 timedelta,
11 )
12from fcntl import (
13 fcntl,
14 F_GETFL,
15 )
16from os import (
17 fdopen,
18 O_NONBLOCK,
19 pipe,
20 )
21from pytz import utc
22from testtools.matchers import LessThan
23
24from lp.testing import TestCase
25from lp.testing.fakemethod import FakeMethod
26from lp.services.command_spawner import (
27 CommandSpawner,
28 OutputLineHandler,
29 ReturnCodeReceiver,
30 )
31
32
33def make_pipe():
34 """Create a pipe of `file` objects."""
35 r, w = pipe()
36 return fdopen(r, 'r'), fdopen(w, 'w')
37
38
39def write_and_flush(pipe, text):
40 """Write `text` into `pipe`, and flush."""
41 pipe.write(text)
42 pipe.flush()
43
44
45class FakeProcess:
46 """Fake `subprocess.Popen` result."""
47
48 def __init__(self, returncode=None):
49 self.returncode = returncode
50 self.stdout, self.stdout_sink = make_pipe()
51 self.stderr, self.stderr_sink = make_pipe()
52
53
54def instrument_spawn(spawner, process):
55 """Instrument `spawner` to spawn a fake process."""
56 spawner._spawn = FakeMethod(result=process)
57
58
59def is_nonblocking(this_file):
60 """Is `this_file` in non-blocking mode?"""
61 flags = fcntl(this_file, F_GETFL)
62 return flags & O_NONBLOCK != 0
63
64
65class TestCommandSpawner(TestCase):
66 """Unit tests for `CommandSpawner`.
67
68 Uses fake processes, so does not test all the way down to the bare metal.
69 Commands are not actually run.
70 """
71
72 def _makeSpawnerAndProcess(self, returncode=None):
73 """Create a `CommandSpawner` and instrument it with a `FakeProcess`.
74
75 :return: A tuple of the spawner and the fake process it will "run."
76 """
77 spawner = CommandSpawner()
78 process = FakeProcess(returncode=returncode)
79 instrument_spawn(spawner, process)
80 return spawner, process
81
82 def test_starts_out_with_no_processes(self):
83 spawner = CommandSpawner()
84 self.assertEqual({}, spawner.running_processes)
85
86 def test_completes_with_no_processes(self):
87 spawner = CommandSpawner()
88 spawner.complete()
89 self.assertEqual({}, spawner.running_processes)
90
91 def test_kill_works_with_no_processes(self):
92 spawner = CommandSpawner()
93 spawner.kill()
94 self.assertEqual({}, spawner.running_processes)
95
96 def test_start_adds_a_process(self):
97 spawner, process = self._makeSpawnerAndProcess()
98 spawner.start("/bin/true")
99 self.assertEqual([process], spawner.running_processes.keys())
100
101 def test_start_runs_its_command(self):
102 spawner, process = self._makeSpawnerAndProcess()
103 spawner.start("/bin/true")
104 spawn_calls = spawner._spawn.calls
105 self.assertEqual([("/bin/true", )], spawner._spawn.extract_args())
106
107 def test_output_is_nonblocking(self):
108 spawner, process = self._makeSpawnerAndProcess()
109 spawner.start("/bin/true")
110 self.assertTrue(is_nonblocking(process.stdout))
111 self.assertTrue(is_nonblocking(process.stderr))
112
113 def test_can_add_multiple_processes(self):
114 spawner = CommandSpawner()
115
116 first_process = FakeProcess()
117 instrument_spawn(spawner, first_process)
118 spawner.start(["/bin/echo", "1"])
119
120 second_process = FakeProcess()
121 instrument_spawn(spawner, second_process)
122 spawner.start(["/bin/echo", "2"])
123
124 self.assertContentEqual(
125 [first_process, second_process], spawner.running_processes)
126
127 def test_kill_terminates_processes(self):
128 spawner, process = self._makeSpawnerAndProcess()
129 process.terminate = FakeMethod()
130 spawner.start("/bin/cat")
131 spawner.kill()
132 self.assertNotEqual(0, process.terminate.call_count)
133
134 def test_handles_output(self):
135 spawner, process = self._makeSpawnerAndProcess()
136 stdout_handler = FakeMethod()
137 spawner.start("ls", stdout_handler=stdout_handler)
138 write_and_flush(process.stdout_sink, "readme.txt\n")
139 spawner.communicate()
140 self.assertEqual([("readme.txt\n", )], stdout_handler.extract_args())
141
142 def test_handles_error_output(self):
143 spawner, process = self._makeSpawnerAndProcess()
144 stderr_handler = FakeMethod()
145 spawner.start("ls", stderr_handler=stderr_handler)
146 write_and_flush(process.stderr_sink, "File not found.\n")
147 spawner.communicate()
148 self.assertEqual(
149 [("File not found.\n", )], stderr_handler.extract_args())
150
151 def test_does_not_call_completion_handler_until_completion(self):
152 spawner, process = self._makeSpawnerAndProcess(returncode=None)
153 completion_handler = FakeMethod()
154 spawner.start("echo", completion_handler=completion_handler)
155 spawner.communicate()
156 self.assertEqual(0, completion_handler.call_count)
157
158 def test_calls_completion_handler_on_success(self):
159 spawner, process = self._makeSpawnerAndProcess(returncode=0)
160 completion_handler = FakeMethod()
161 spawner.start("echo", completion_handler=completion_handler)
162 spawner.complete()
163 self.assertEqual(1, completion_handler.call_count)
164
165 def test_calls_completion_handler_on_failure(self):
166 spawner, process = self._makeSpawnerAndProcess(returncode=1)
167 completion_handler = FakeMethod()
168 spawner.start("echo", completion_handler=completion_handler)
169 spawner.complete()
170 self.assertEqual(1, completion_handler.call_count)
171
172 def test_does_not_call_completion_handler_twice(self):
173 spawner, process = self._makeSpawnerAndProcess(returncode=0)
174 completion_handler = FakeMethod()
175 spawner.start("echo", completion_handler=completion_handler)
176 spawner.complete()
177 spawner.complete()
178 self.assertEqual(1, completion_handler.call_count)
179
180 def test_passes_return_code_to_completion_handler(self):
181 spawner, process = self._makeSpawnerAndProcess(returncode=101)
182 completion_handler = FakeMethod()
183 spawner.start("echo", completion_handler=completion_handler)
184 spawner.complete()
185 self.assertEqual(((101, ), {}), completion_handler.calls[-1])
186
187 def test_handles_output_before_completion(self):
188 spawner, process = self._makeSpawnerAndProcess(returncode=0)
189 handler = FakeMethod()
190 spawner.start(
191 "hello", stdout_handler=handler, completion_handler=handler)
192 write_and_flush(process.stdout_sink, "Hello\n")
193 spawner.complete()
194 self.assertEqual([("Hello\n", ), (0, )], handler.extract_args())
195
196 def test_handles_multiple_processes(self):
197 spawner = CommandSpawner()
198 handler = FakeMethod()
199
200 first_process = FakeProcess(returncode=1)
201 instrument_spawn(spawner, first_process)
202 spawner.start(["/bin/echo", "1"], completion_handler=handler)
203
204 second_process = FakeProcess(returncode=2)
205 instrument_spawn(spawner, second_process)
206 spawner.start(["/bin/echo", "2"], completion_handler=handler)
207
208 spawner.complete()
209 self.assertContentEqual([(1, ), (2, )], handler.extract_args())
210
211
212class AcceptOutput:
213 """Simple stdout or stderr handler."""
214
215 def __call__(self, output):
216 self.output = output
217
218
219class TestCommandSpawnerAcceptance(TestCase):
220 """Acceptance tests for `CommandSpawner`.
221
222 This test spawns actual processes, so be careful:
223 * Think about security when running commands.
224 * Don't rely on nonstandard commands.
225 * Don't hold up the test suite with slow commands.
226 """
227
228 def _makeSpawner(self):
229 """Create a `CommandSpawner`, and make sure it gets cleaned up."""
230 spawner = CommandSpawner()
231 self.addCleanup(spawner.complete)
232 self.addCleanup(spawner.kill)
233 return spawner
234
235 def test_command_can_be_string(self):
236 spawner = self._makeSpawner()
237 spawner.start("/bin/pwd")
238 spawner.complete()
239
240 def test_command_can_be_list(self):
241 spawner = self._makeSpawner()
242 spawner.start(["/bin/pwd"])
243 spawner.complete()
244
245 def test_calls_stdout_handler(self):
246 spawner = self._makeSpawner()
247 stdout_handler = AcceptOutput()
248 spawner.start(["echo", "hi"], stdout_handler=stdout_handler)
249 spawner.complete()
250 self.assertEqual("hi\n", stdout_handler.output)
251
252 def test_calls_completion_handler(self):
253 spawner = self._makeSpawner()
254 completion_handler = ReturnCodeReceiver()
255 spawner.start("/bin/true", completion_handler=completion_handler)
256 spawner.complete()
257 self.assertEqual(0, completion_handler.returncode)
258
259 def test_communicate_returns_after_event(self):
260 spawner = self._makeSpawner()
261 before = datetime.now(utc)
262 spawner.start(["/bin/sleep", "10"])
263 spawner.start("/bin/pwd")
264 spawner.communicate()
265 after = datetime.now(utc)
266 self.assertThat(after - before, LessThan(timedelta(seconds=10)))
267
268 def test_kill_terminates_processes(self):
269 spawner = self._makeSpawner()
270 spawner.start(["/bin/sleep", "10"])
271 spawner.start(["/bin/sleep", "10"])
272 before = datetime.now(utc)
273 spawner.kill()
274 spawner.complete()
275 after = datetime.now(utc)
276 self.assertThat(after - before, LessThan(timedelta(seconds=10)))
277
278 def test_start_does_not_block(self):
279 spawner = self._makeSpawner()
280 before = datetime.now(utc)
281 spawner.start(["/bin/sleep", "10"])
282 after = datetime.now(utc)
283 self.assertThat(after - before, LessThan(timedelta(seconds=10)))
284
285 def test_subprocesses_run_in_parallel(self):
286 spawner = self._makeSpawner()
287
288 processes = 10
289 seconds = 0.2
290 for counter in xrange(processes):
291 spawner.start(["/bin/sleep", str(seconds)])
292
293 before = datetime.now(utc)
294 spawner.complete()
295 after = datetime.now(utc)
296
297 sequential_time = timedelta(seconds=(seconds * processes))
298 self.assertThat(after - before, LessThan(sequential_time))
299
300 def test_integrates_with_outputlinehandler(self):
301 spawner = self._makeSpawner()
302 handler = OutputLineHandler(FakeMethod())
303 spawner.start(["echo", "hello"], stdout_handler=handler)
304 spawner.complete()
305 self.assertEqual([("hello", )], handler.line_processor.extract_args())
306
307 def test_integrates_with_returncodereceiver(self):
308 spawner = self._makeSpawner()
309 handler = ReturnCodeReceiver()
310 spawner.start("/bin/true", completion_handler=handler)
311 spawner.complete()
312 self.assertEqual(0, handler.returncode)
313
314
315class TestOutputLineHandler(TestCase):
316 """Unit tests for `OutputLineHandler`."""
317
318 def setUp(self):
319 super(TestOutputLineHandler, self).setUp()
320 self.handler = OutputLineHandler(FakeMethod())
321
322 def _getLines(self):
323 """Get the lines that were passed to `handler`'s line processor."""
324 return [
325 line
326 for (line, ) in self.handler.line_processor.extract_args()]
327
328 def test_processes_line(self):
329 self.handler("x\n")
330 self.assertEqual(["x"], self._getLines())
331
332 def test_buffers_partial_line(self):
333 self.handler("x")
334 self.assertEqual([], self._getLines())
335
336 def test_splits_lines(self):
337 self.handler("a\nb\n")
338 self.assertEqual(["a", "b"], self._getLines())
339
340 def test_ignores_empty_output(self):
341 self.handler("")
342 self.assertEqual([], self._getLines())
343
344 def test_finalize_ignores_empty_output(self):
345 self.handler("")
346 self.handler.finalize()
347 self.assertEqual([], self._getLines())
348
349 def test_ignores_empty_line(self):
350 self.handler("\n")
351 self.assertEqual([], self._getLines())
352
353 def test_joins_partial_lines(self):
354 self.handler("h")
355 self.handler("i\n")
356 self.assertEqual(["hi"], self._getLines())
357
358 def test_joins_lines_across_multiple_calls(self):
359 self.handler("h")
360 self.handler("i")
361 self.handler("!\n")
362 self.assertEqual(["hi!"], self._getLines())
363
364 def test_joins_lines_across_empty_calls(self):
365 self.handler("h")
366 self.handler("")
367 self.handler("i\n")
368 self.assertEqual(["hi"], self._getLines())
369
370 def test_finalize_processes_remaining_partial_line(self):
371 self.handler("foo")
372 self.handler.finalize()
373 self.assertEqual(["foo"], self._getLines())
374
375 def test_finalize_is_idempotent(self):
376 self.handler("foo")
377 self.handler.finalize()
378 self.handler.finalize()
379 self.assertEqual(["foo"], self._getLines())
380
381 def test_finalize_joins_partial_lines(self):
382 self.handler("h")
383 self.handler("i")
384 self.handler.finalize()
385 self.assertEqual(["hi"], self._getLines())
386
387 def test_adds_prefix(self):
388 self.handler.prefix = "->"
389 self.handler("here\n")
390 self.assertEqual(["->here"], self._getLines())
391
392 def test_finalize_adds_prefix(self):
393 self.handler.prefix = "->"
394 self.handler("here")
395 self.handler.finalize()
396 self.assertEqual(["->here"], self._getLines())
397
398 def test_empty_lines_are_ignored_despite_prefix(self):
399 self.handler.prefix = "->"
400 self.handler("\n")
401 self.assertEqual([], self._getLines())
0402
=== modified file 'lib/lp/testing/fakemethod.py'
--- lib/lp/testing/fakemethod.py 2010-04-01 09:39:20 +0000
+++ lib/lp/testing/fakemethod.py 2011-02-01 20:06:35 +0000
@@ -1,9 +1,8 @@
1# Copyright 2009, 2010 Canonical Ltd. This software is licensed under the1# Copyright 2009-2011 Canonical Ltd. This software is licensed under the
2# GNU Affero General Public License version 3 (see the file LICENSE).2# GNU Affero General Public License version 3 (see the file LICENSE).
33
4# pylint: disable-msg=E07024# pylint: disable-msg=E0702
55
6
7__metaclass__ = type6__metaclass__ = type
8__all__ = [7__all__ = [
9 'FakeMethod',8 'FakeMethod',
@@ -55,3 +54,11 @@
55 @property54 @property
56 def call_count(self):55 def call_count(self):
57 return len(self.calls)56 return len(self.calls)
57
58 def extract_args(self):
59 """Return just the calls' positional-arguments tuples."""
60 return [args for args, kwargs in self.calls]
61
62 def extract_kwargs(self):
63 """Return just the calls' keyword-arguments dicts."""
64 return [kwargs for args, kwargs in self.calls]