Merge lp:~lifeless/bzr/test-speed into lp:~bzr/bzr/trunk-old

Proposed by Robert Collins
Status: Merged
Merge reported by: Robert Collins
Merged at revision: not available
Proposed branch: lp:~lifeless/bzr/test-speed
Merge into: lp:~bzr/bzr/trunk-old
Diff against target: 1320 lines
To merge this branch: bzr merge lp:~lifeless/bzr/test-speed
Reviewer Review Type Date Requested Status
Vincent Ladeuil Approve
Review via email: mp+10590@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Robert Collins (lifeless) wrote :

This finishes the thread I started tugging on this morning.

It does two basic things:
 - moves tests that are not for the bzr selftest command line out of
   bzrlib.tests.blackbox.test_selftest
 - where possiblestops invoking child processes to test how we call such
   processes, instead extending the existing idioms for intercepting
   such calls to test how the calls are setup.

This saves another 50% of test time in 'test_selftest' tests over my
patch earlier today.

-Rob

--

Revision history for this message
Vincent Ladeuil (vila) wrote :

Nice work and a particular kudo for the clarification comments.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'bzrlib/tests/__init__.py'
--- bzrlib/tests/__init__.py 2009-08-20 05:05:59 +0000
+++ bzrlib/tests/__init__.py 2009-08-24 06:35:12 +0000
@@ -3207,6 +3207,7 @@
3207 starting_with=None,3207 starting_with=None,
3208 runner_class=None,3208 runner_class=None,
3209 suite_decorators=None,3209 suite_decorators=None,
3210 stream=None,
3210 ):3211 ):
3211 """Run the whole test suite under the enhanced runner"""3212 """Run the whole test suite under the enhanced runner"""
3212 # XXX: Very ugly way to do this...3213 # XXX: Very ugly way to do this...
@@ -3230,9 +3231,14 @@
3230 else:3231 else:
3231 keep_only = load_test_id_list(load_list)3232 keep_only = load_test_id_list(load_list)
3232 if test_suite_factory is None:3233 if test_suite_factory is None:
3234 # Reduce loading time by loading modules based on the starting_with
3235 # patterns.
3233 suite = test_suite(keep_only, starting_with)3236 suite = test_suite(keep_only, starting_with)
3234 else:3237 else:
3235 suite = test_suite_factory()3238 suite = test_suite_factory()
3239 if starting_with:
3240 # But always filter as requested.
3241 suite = filter_suite_by_id_startswith(suite, starting_with)
3236 return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,3242 return run_suite(suite, 'testbzr', verbose=verbose, pattern=pattern,
3237 stop_on_failure=stop_on_failure,3243 stop_on_failure=stop_on_failure,
3238 transport=transport,3244 transport=transport,
@@ -3245,6 +3251,7 @@
3245 strict=strict,3251 strict=strict,
3246 runner_class=runner_class,3252 runner_class=runner_class,
3247 suite_decorators=suite_decorators,3253 suite_decorators=suite_decorators,
3254 stream=stream,
3248 )3255 )
3249 finally:3256 finally:
3250 default_transport = old_transport3257 default_transport = old_transport
@@ -3683,9 +3690,6 @@
3683 reload(sys)3690 reload(sys)
3684 sys.setdefaultencoding(default_encoding)3691 sys.setdefaultencoding(default_encoding)
36853692
3686 if starting_with:
3687 suite = filter_suite_by_id_startswith(suite, starting_with)
3688
3689 if keep_only is not None:3693 if keep_only is not None:
3690 # Now that the referred modules have loaded their tests, keep only the3694 # Now that the referred modules have loaded their tests, keep only the
3691 # requested ones.3695 # requested ones.
36923696
=== modified file 'bzrlib/tests/blackbox/test_selftest.py'
--- bzrlib/tests/blackbox/test_selftest.py 2009-07-10 07:14:02 +0000
+++ bzrlib/tests/blackbox/test_selftest.py 2009-08-24 06:35:13 +0000
@@ -16,474 +16,111 @@
1616
17"""UI tests for the test framework."""17"""UI tests for the test framework."""
1818
19from cStringIO import StringIO19import bzrlib.transport
20import os
21import re
22import signal
23import sys
24import unittest
25
26import bzrlib
27from bzrlib import (20from bzrlib import (
28 osutils,21 benchmarks,
22 tests,
29 )23 )
30from bzrlib.errors import ParamikoNotPresent24from bzrlib.errors import ParamikoNotPresent
31from bzrlib.tests import (25from bzrlib.tests import (
32 SubUnitFeature,26 SubUnitFeature,
33 TestCase,27 TestCase,
34 TestCaseInTempDir,28 TestCaseInTempDir,
35 TestCaseWithMemoryTransport,
36 TestCaseWithTransport,
37 TestUIFactory,
38 TestSkipped,29 TestSkipped,
39 )30 )
40from bzrlib.tests.blackbox import ExternalBase31
4132
4233class SelfTestPatch:
43class TestOptions(TestCase):34
4435 def get_params_passed_to_core(self, cmdline):
45 current_test = None36 params = []
37 def selftest(*args, **kwargs):
38 """Capture the arguments selftest was run with."""
39 params.append((args, kwargs))
40 return True
41 # Yes this prevents using threads to run the test suite in parallel,
42 # however we don't have a clean dependency injector for commands,
43 # and even if we did - we'd still be testing that the glue is wired
44 # up correctly. XXX: TODO: Solve this testing problem.
45 original_selftest = tests.selftest
46 tests.selftest = selftest
47 try:
48 self.run_bzr(cmdline)
49 return params[0]
50 finally:
51 tests.selftest = original_selftest
52
53
54class TestOptionsWritingToDisk(TestCaseInTempDir, SelfTestPatch):
55
56 def test_benchmark_runs_benchmark_tests(self):
57 """selftest --benchmark should change the suite factory."""
58 params = self.get_params_passed_to_core('selftest --benchmark')
59 self.assertEqual(benchmarks.test_suite,
60 params[1]['test_suite_factory'])
61 self.assertNotEqual(None, params[1]['bench_history'])
62 benchfile = open(".perf_history", "rt")
63 try:
64 lines = benchfile.readlines()
65 finally:
66 benchfile.close()
67 # Because we don't run the actual test code no output is made to the
68 # file.
69 self.assertEqual(0, len(lines))
70
71
72class TestOptions(TestCase, SelfTestPatch):
73
74 def test_load_list(self):
75 params = self.get_params_passed_to_core('selftest --load-list foo')
76 self.assertEqual('foo', params[1]['load_list'])
4677
47 def test_transport_set_to_sftp(self):78 def test_transport_set_to_sftp(self):
48 # test the --transport option has taken effect from within the79 # Test that we can pass a transport to the selftest core - sftp
49 # test_transport test80 # version.
50 try:81 try:
51 import bzrlib.transport.sftp82 import bzrlib.transport.sftp
52 except ParamikoNotPresent:83 except ParamikoNotPresent:
53 raise TestSkipped("Paramiko not present")84 raise TestSkipped("Paramiko not present")
54 if TestOptions.current_test != "test_transport_set_to_sftp":85 params = self.get_params_passed_to_core('selftest --transport=sftp')
55 return
56 self.assertEqual(bzrlib.transport.sftp.SFTPAbsoluteServer,86 self.assertEqual(bzrlib.transport.sftp.SFTPAbsoluteServer,
57 bzrlib.tests.default_transport)87 params[1]["transport"])
5888
59 def test_transport_set_to_memory(self):89 def test_transport_set_to_memory(self):
60 # test the --transport option has taken effect from within the90 # Test that we can pass a transport to the selftest core - memory
61 # test_transport test91 # version.
62 import bzrlib.transport.memory92 import bzrlib.transport.memory
63 if TestOptions.current_test != "test_transport_set_to_memory":93 params = self.get_params_passed_to_core('selftest --transport=memory')
64 return
65 self.assertEqual(bzrlib.transport.memory.MemoryServer,94 self.assertEqual(bzrlib.transport.memory.MemoryServer,
66 bzrlib.tests.default_transport)95 params[1]["transport"])
6796
68 def test_transport(self):97 def test_parameters_passed_to_core(self):
69 # test that --transport=sftp works98 params = self.get_params_passed_to_core('selftest --list-only')
70 try:99 self.assertTrue("list_only" in params[1])
71 import bzrlib.transport.sftp100 params = self.get_params_passed_to_core('selftest --list-only selftest')
72 except ParamikoNotPresent:101 self.assertTrue("list_only" in params[1])
73 raise TestSkipped("Paramiko not present")102 params = self.get_params_passed_to_core(['selftest', '--list-only',
74 old_transport = bzrlib.tests.default_transport103 '--exclude', 'selftest'])
75 old_root = TestCaseWithMemoryTransport.TEST_ROOT104 self.assertTrue("list_only" in params[1])
76 TestCaseWithMemoryTransport.TEST_ROOT = None105 params = self.get_params_passed_to_core(['selftest', '--list-only',
77 try:106 'selftest', '--randomize', 'now'])
78 TestOptions.current_test = "test_transport_set_to_sftp"107 self.assertSubset(["list_only", "random_seed"], params[1])
79 stdout = self.run_bzr(108
80 'selftest --transport=sftp test_transport_set_to_sftp')[0]109 def test_starting_with(self):
81 self.assertContainsRe(stdout, 'Ran 1 test')110 params = self.get_params_passed_to_core('selftest --starting-with foo')
82 self.assertEqual(old_transport, bzrlib.tests.default_transport)111 self.assertEqual(['foo'], params[1]['starting_with'])
83112
84 TestOptions.current_test = "test_transport_set_to_memory"113 def test_starting_with_multiple_argument(self):
85 stdout = self.run_bzr(114 params = self.get_params_passed_to_core(
86 'selftest --transport=memory test_transport_set_to_memory')[0]115 'selftest --starting-with foo --starting-with bar')
87 self.assertContainsRe(stdout, 'Ran 1 test')116 self.assertEqual(['foo', 'bar'], params[1]['starting_with'])
88 self.assertEqual(old_transport, bzrlib.tests.default_transport)
89 finally:
90 bzrlib.tests.default_transport = old_transport
91 TestOptions.current_test = None
92 TestCaseWithMemoryTransport.TEST_ROOT = old_root
93117
94 def test_subunit(self):118 def test_subunit(self):
95 """Passing --subunit results in subunit output."""
96 self.requireFeature(SubUnitFeature)119 self.requireFeature(SubUnitFeature)
97 from subunit import ProtocolTestCase120 params = self.get_params_passed_to_core('selftest --subunit')
98 stdout = self.run_bzr(121 self.assertEqual(tests.SubUnitBzrRunner, params[1]['runner_class'])
99 'selftest --subunit --no-plugins '122
100 'tests.test_selftest.SelftestTests.test_import_tests')[0]123 def _parse_test_list(self, lines, newlines_in_header=0):
101 stream = StringIO(str(stdout))
102 test = ProtocolTestCase(stream)
103 result = unittest.TestResult()
104 test.run(result)
105 self.assertEqual(1, result.testsRun)
106
107
108class TestRunBzr(ExternalBase):
109
110 def _run_bzr_core(self, argv, retcode=0, encoding=None, stdin=None,
111 working_dir=None):
112 """Override _run_bzr_core to test how it is invoked by run_bzr.
113
114 Attempts to run bzr from inside this class don't actually run it.
115
116 We test how run_bzr actually invokes bzr in another location.
117 Here we only need to test that it is run_bzr passes the right
118 parameters to run_bzr.
119 """
120 self.argv = list(argv)
121 self.retcode = retcode
122 self.encoding = encoding
123 self.stdin = stdin
124 self.working_dir = working_dir
125 return '', ''
126
127 def test_encoding(self):
128 """Test that run_bzr passes encoding to _run_bzr_core"""
129 self.run_bzr('foo bar')
130 self.assertEqual(None, self.encoding)
131 self.assertEqual(['foo', 'bar'], self.argv)
132
133 self.run_bzr('foo bar', encoding='baz')
134 self.assertEqual('baz', self.encoding)
135 self.assertEqual(['foo', 'bar'], self.argv)
136
137 def test_retcode(self):
138 """Test that run_bzr passes retcode to _run_bzr_core"""
139 # Default is retcode == 0
140 self.run_bzr('foo bar')
141 self.assertEqual(0, self.retcode)
142 self.assertEqual(['foo', 'bar'], self.argv)
143
144 self.run_bzr('foo bar', retcode=1)
145 self.assertEqual(1, self.retcode)
146 self.assertEqual(['foo', 'bar'], self.argv)
147
148 self.run_bzr('foo bar', retcode=None)
149 self.assertEqual(None, self.retcode)
150 self.assertEqual(['foo', 'bar'], self.argv)
151
152 self.run_bzr(['foo', 'bar'], retcode=3)
153 self.assertEqual(3, self.retcode)
154 self.assertEqual(['foo', 'bar'], self.argv)
155
156 def test_stdin(self):
157 # test that the stdin keyword to run_bzr is passed through to
158 # _run_bzr_core as-is. We do this by overriding
159 # _run_bzr_core in this class, and then calling run_bzr,
160 # which is a convenience function for _run_bzr_core, so
161 # should invoke it.
162 self.run_bzr('foo bar', stdin='gam')
163 self.assertEqual('gam', self.stdin)
164 self.assertEqual(['foo', 'bar'], self.argv)
165
166 self.run_bzr('foo bar', stdin='zippy')
167 self.assertEqual('zippy', self.stdin)
168 self.assertEqual(['foo', 'bar'], self.argv)
169
170 def test_working_dir(self):
171 """Test that run_bzr passes working_dir to _run_bzr_core"""
172 self.run_bzr('foo bar')
173 self.assertEqual(None, self.working_dir)
174 self.assertEqual(['foo', 'bar'], self.argv)
175
176 self.run_bzr('foo bar', working_dir='baz')
177 self.assertEqual('baz', self.working_dir)
178 self.assertEqual(['foo', 'bar'], self.argv)
179
180 def test_reject_extra_keyword_arguments(self):
181 self.assertRaises(TypeError, self.run_bzr, "foo bar",
182 error_regex=['error message'])
183
184
185class TestBenchmarkTests(TestCaseWithTransport):
186
187 def test_benchmark_runs_benchmark_tests(self):
188 """bzr selftest --benchmark should not run the default test suite."""
189 # We test this by passing a regression test name to --benchmark, which
190 # should result in 0 tests run.
191 old_root = TestCaseWithMemoryTransport.TEST_ROOT
192 try:
193 TestCaseWithMemoryTransport.TEST_ROOT = None
194 out, err = self.run_bzr('selftest --benchmark'
195 ' per_workingtree')
196 finally:
197 TestCaseWithMemoryTransport.TEST_ROOT = old_root
198 self.assertContainsRe(out, 'Ran 0 tests.*\n\nOK')
199 self.assertContainsRe(out, 'tests passed\n')
200 benchfile = open(".perf_history", "rt")
201 try:
202 lines = benchfile.readlines()
203 finally:
204 benchfile.close()
205 self.assertEqual(1, len(lines))
206 self.assertContainsRe(lines[0], "--date [0-9.]+")
207
208
209class TestRunBzrCaptured(ExternalBase):
210
211 def apply_redirected(self, stdin=None, stdout=None, stderr=None,
212 a_callable=None, *args, **kwargs):
213 self.stdin = stdin
214 self.factory_stdin = getattr(bzrlib.ui.ui_factory, "stdin", None)
215 self.factory = bzrlib.ui.ui_factory
216 self.working_dir = osutils.getcwd()
217 stdout.write('foo\n')
218 stderr.write('bar\n')
219 return 0
220
221 def test_stdin(self):
222 # test that the stdin keyword to _run_bzr_core is passed through to
223 # apply_redirected as a StringIO. We do this by overriding
224 # apply_redirected in this class, and then calling _run_bzr_core,
225 # which calls apply_redirected.
226 self.run_bzr(['foo', 'bar'], stdin='gam')
227 self.assertEqual('gam', self.stdin.read())
228 self.assertTrue(self.stdin is self.factory_stdin)
229 self.run_bzr(['foo', 'bar'], stdin='zippy')
230 self.assertEqual('zippy', self.stdin.read())
231 self.assertTrue(self.stdin is self.factory_stdin)
232
233 def test_ui_factory(self):
234 # each invocation of self.run_bzr should get its
235 # own UI factory, which is an instance of TestUIFactory,
236 # with stdin, stdout and stderr attached to the stdin,
237 # stdout and stderr of the invoked run_bzr
238 current_factory = bzrlib.ui.ui_factory
239 self.run_bzr(['foo'])
240 self.failIf(current_factory is self.factory)
241 self.assertNotEqual(sys.stdout, self.factory.stdout)
242 self.assertNotEqual(sys.stderr, self.factory.stderr)
243 self.assertEqual('foo\n', self.factory.stdout.getvalue())
244 self.assertEqual('bar\n', self.factory.stderr.getvalue())
245 self.assertIsInstance(self.factory, TestUIFactory)
246
247 def test_working_dir(self):
248 self.build_tree(['one/', 'two/'])
249 cwd = osutils.getcwd()
250
251 # Default is to work in the current directory
252 self.run_bzr(['foo', 'bar'])
253 self.assertEqual(cwd, self.working_dir)
254
255 self.run_bzr(['foo', 'bar'], working_dir=None)
256 self.assertEqual(cwd, self.working_dir)
257
258 # The function should be run in the alternative directory
259 # but afterwards the current working dir shouldn't be changed
260 self.run_bzr(['foo', 'bar'], working_dir='one')
261 self.assertNotEqual(cwd, self.working_dir)
262 self.assertEndsWith(self.working_dir, 'one')
263 self.assertEqual(cwd, osutils.getcwd())
264
265 self.run_bzr(['foo', 'bar'], working_dir='two')
266 self.assertNotEqual(cwd, self.working_dir)
267 self.assertEndsWith(self.working_dir, 'two')
268 self.assertEqual(cwd, osutils.getcwd())
269
270
271class TestRunBzrSubprocess(TestCaseWithTransport):
272
273 def test_run_bzr_subprocess(self):
274 """The run_bzr_helper_external command behaves nicely."""
275 result = self.run_bzr_subprocess('--version')
276 result = self.run_bzr_subprocess(['--version'])
277 result = self.run_bzr_subprocess('--version', retcode=None)
278 self.assertContainsRe(result[0], 'is free software')
279 self.assertRaises(AssertionError, self.run_bzr_subprocess,
280 '--versionn')
281 result = self.run_bzr_subprocess('--versionn', retcode=3)
282 result = self.run_bzr_subprocess('--versionn', retcode=None)
283 self.assertContainsRe(result[1], 'unknown command')
284 err = self.run_bzr_subprocess(['merge', '--merge-type',
285 'magic merge'], retcode=3)[1]
286 self.assertContainsRe(err, 'Bad value "magic merge" for option'
287 ' "merge-type"')
288
289 def test_run_bzr_subprocess_env(self):
290 """run_bzr_subprocess can set environment variables in the child only.
291
292 These changes should not change the running process, only the child.
293 """
294 # The test suite should unset this variable
295 self.assertEqual(None, os.environ.get('BZR_EMAIL'))
296 out, err = self.run_bzr_subprocess('whoami', env_changes={
297 'BZR_EMAIL':'Joe Foo <joe@foo.com>'
298 }, universal_newlines=True)
299 self.assertEqual('', err)
300 self.assertEqual('Joe Foo <joe@foo.com>\n', out)
301 # And it should not be modified
302 self.assertEqual(None, os.environ.get('BZR_EMAIL'))
303
304 # Do it again with a different address, just to make sure
305 # it is actually changing
306 out, err = self.run_bzr_subprocess('whoami', env_changes={
307 'BZR_EMAIL':'Barry <bar@foo.com>'
308 }, universal_newlines=True)
309 self.assertEqual('', err)
310 self.assertEqual('Barry <bar@foo.com>\n', out)
311 self.assertEqual(None, os.environ.get('BZR_EMAIL'))
312
313 def test_run_bzr_subprocess_env_del(self):
314 """run_bzr_subprocess can remove environment variables too."""
315 # Create a random email, so we are sure this won't collide
316 rand_bzr_email = 'John Doe <jdoe@%s.com>' % (osutils.rand_chars(20),)
317 rand_email = 'Jane Doe <jdoe@%s.com>' % (osutils.rand_chars(20),)
318 os.environ['BZR_EMAIL'] = rand_bzr_email
319 os.environ['EMAIL'] = rand_email
320 try:
321 # By default, the child will inherit the current env setting
322 out, err = self.run_bzr_subprocess('whoami', universal_newlines=True)
323 self.assertEqual('', err)
324 self.assertEqual(rand_bzr_email + '\n', out)
325
326 # Now that BZR_EMAIL is not set, it should fall back to EMAIL
327 out, err = self.run_bzr_subprocess('whoami',
328 env_changes={'BZR_EMAIL':None},
329 universal_newlines=True)
330 self.assertEqual('', err)
331 self.assertEqual(rand_email + '\n', out)
332
333 # This switches back to the default email guessing logic
334 # Which shouldn't match either of the above addresses
335 out, err = self.run_bzr_subprocess('whoami',
336 env_changes={'BZR_EMAIL':None, 'EMAIL':None},
337 universal_newlines=True)
338
339 self.assertEqual('', err)
340 self.assertNotEqual(rand_bzr_email + '\n', out)
341 self.assertNotEqual(rand_email + '\n', out)
342 finally:
343 # TestCase cleans up BZR_EMAIL, and EMAIL at startup
344 del os.environ['BZR_EMAIL']
345 del os.environ['EMAIL']
346
347 def test_run_bzr_subprocess_env_del_missing(self):
348 """run_bzr_subprocess won't fail if deleting a nonexistant env var"""
349 self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
350 out, err = self.run_bzr_subprocess('rocks',
351 env_changes={'NON_EXISTANT_ENV_VAR':None},
352 universal_newlines=True)
353 self.assertEqual('It sure does!\n', out)
354 self.assertEqual('', err)
355
356 def test_run_bzr_subprocess_working_dir(self):
357 """Test that we can specify the working dir for the child"""
358 cwd = osutils.getcwd()
359
360 self.make_branch_and_tree('.')
361 self.make_branch_and_tree('one')
362 self.make_branch_and_tree('two')
363
364 def get_root(**kwargs):
365 """Spawn a process to get the 'root' of the tree.
366
367 You can pass in arbitrary new arguments. This just makes
368 sure that the returned path doesn't have trailing whitespace.
369 """
370 return self.run_bzr_subprocess('root', **kwargs)[0].rstrip()
371
372 self.assertEqual(cwd, get_root())
373 self.assertEqual(cwd, get_root(working_dir=None))
374 # Has our path changed?
375 self.assertEqual(cwd, osutils.getcwd())
376
377 dir1 = get_root(working_dir='one')
378 self.assertEndsWith(dir1, 'one')
379 self.assertEqual(cwd, osutils.getcwd())
380
381 dir2 = get_root(working_dir='two')
382 self.assertEndsWith(dir2, 'two')
383 self.assertEqual(cwd, osutils.getcwd())
384
385
386class _DontSpawnProcess(Exception):
387 """A simple exception which just allows us to skip unnecessary steps"""
388
389
390class TestRunBzrSubprocessCommands(TestCaseWithTransport):
391
392 def _popen(self, *args, **kwargs):
393 """Record the command that is run, so that we can ensure it is correct"""
394 self._popen_args = args
395 self._popen_kwargs = kwargs
396 raise _DontSpawnProcess()
397
398 def test_run_bzr_subprocess_no_plugins(self):
399 self.assertRaises(_DontSpawnProcess, self.run_bzr_subprocess, '')
400 command = self._popen_args[0]
401 self.assertEqual(sys.executable, command[0])
402 self.assertEqual(self.get_bzr_path(), command[1])
403 self.assertEqual(['--no-plugins'], command[2:])
404
405 def test_allow_plugins(self):
406 self.assertRaises(_DontSpawnProcess,
407 self.run_bzr_subprocess, '', allow_plugins=True)
408 command = self._popen_args[0]
409 self.assertEqual([], command[2:])
410
411
412class TestBzrSubprocess(TestCaseWithTransport):
413
414 def test_start_and_stop_bzr_subprocess(self):
415 """We can start and perform other test actions while that process is
416 still alive.
417 """
418 process = self.start_bzr_subprocess(['--version'])
419 result = self.finish_bzr_subprocess(process)
420 self.assertContainsRe(result[0], 'is free software')
421 self.assertEqual('', result[1])
422
423 def test_start_and_stop_bzr_subprocess_with_error(self):
424 """finish_bzr_subprocess allows specification of the desired exit code.
425 """
426 process = self.start_bzr_subprocess(['--versionn'])
427 result = self.finish_bzr_subprocess(process, retcode=3)
428 self.assertEqual('', result[0])
429 self.assertContainsRe(result[1], 'unknown command')
430
431 def test_start_and_stop_bzr_subprocess_ignoring_retcode(self):
432 """finish_bzr_subprocess allows the exit code to be ignored."""
433 process = self.start_bzr_subprocess(['--versionn'])
434 result = self.finish_bzr_subprocess(process, retcode=None)
435 self.assertEqual('', result[0])
436 self.assertContainsRe(result[1], 'unknown command')
437
438 def test_start_and_stop_bzr_subprocess_with_unexpected_retcode(self):
439 """finish_bzr_subprocess raises self.failureException if the retcode is
440 not the expected one.
441 """
442 process = self.start_bzr_subprocess(['--versionn'])
443 self.assertRaises(self.failureException, self.finish_bzr_subprocess,
444 process)
445
446 def test_start_and_stop_bzr_subprocess_send_signal(self):
447 """finish_bzr_subprocess raises self.failureException if the retcode is
448 not the expected one.
449 """
450 process = self.start_bzr_subprocess(['wait-until-signalled'],
451 skip_if_plan_to_signal=True)
452 self.assertEqual('running\n', process.stdout.readline())
453 result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
454 retcode=3)
455 self.assertEqual('', result[0])
456 self.assertEqual('bzr: interrupted\n', result[1])
457
458 def test_start_and_stop_working_dir(self):
459 cwd = osutils.getcwd()
460
461 self.make_branch_and_tree('one')
462
463 process = self.start_bzr_subprocess(['root'], working_dir='one')
464 result = self.finish_bzr_subprocess(process, universal_newlines=True)
465 self.assertEndsWith(result[0], 'one\n')
466 self.assertEqual('', result[1])
467
468
469class TestRunBzrError(ExternalBase):
470
471 def test_run_bzr_error(self):
472 # retcode=0 is specially needed here because run_bzr_error expects
473 # an error (oddly enough) but we want to test the case of not
474 # actually getting one
475 out, err = self.run_bzr_error(['^$'], ['rocks'], retcode=0)
476 self.assertEqual(out, 'It sure does!\n')
477 # now test actually getting an error
478 out, err = self.run_bzr_error(
479 ["bzr: ERROR: foobarbaz is not versioned"],
480 ['file-id', 'foobarbaz'])
481
482
483class TestSelftestListOnly(TestCase):
484
485 @staticmethod
486 def _parse_test_list(lines, newlines_in_header=0):
487 "Parse a list of lines into a tuple of 3 lists (header,body,footer)."124 "Parse a list of lines into a tuple of 3 lists (header,body,footer)."
488 in_header = newlines_in_header != 0125 in_header = newlines_in_header != 0
489 in_footer = False126 in_footer = False
@@ -512,92 +149,26 @@
512 return (header,body,footer)149 return (header,body,footer)
513150
514 def test_list_only(self):151 def test_list_only(self):
515 # check that bzr selftest --list-only works correctly152 # check that bzr selftest --list-only outputs no ui noise
516 out,err = self.run_bzr('selftest selftest --list-only')153 def selftest(*args, **kwargs):
517 (header,body,footer) = self._parse_test_list(out.splitlines())154 """Capture the arguments selftest was run with."""
518 num_tests = len(body)155 return True
519 self.assertLength(0, header)156 def outputs_nothing(cmdline):
520 self.assertLength(0, footer)157 out,err = self.run_bzr(cmdline)
521 self.assertEqual('', err)158 (header,body,footer) = self._parse_test_list(out.splitlines())
522159 num_tests = len(body)
523 def test_list_only_filtered(self):160 self.assertLength(0, header)
524 # check that a filtered --list-only works, both include and exclude161 self.assertLength(0, footer)
525 out_all,err_all = self.run_bzr('selftest --list-only')162 self.assertEqual('', err)
526 tests_all = self._parse_test_list(out_all.splitlines())[1]163 # Yes this prevents using threads to run the test suite in parallel,
527 out_incl,err_incl = self.run_bzr('selftest --list-only selftest')164 # however we don't have a clean dependency injector for commands,
528 tests_incl = self._parse_test_list(out_incl.splitlines())[1]165 # and even if we did - we'd still be testing that the glue is wired
529 self.assertSubset(tests_incl, tests_all)166 # up correctly. XXX: TODO: Solve this testing problem.
530 out_excl,err_excl = self.run_bzr(['selftest', '--list-only',167 original_selftest = tests.selftest
531 '--exclude', 'selftest'])168 tests.selftest = selftest
532 tests_excl = self._parse_test_list(out_excl.splitlines())[1]169 try:
533 self.assertSubset(tests_excl, tests_all)170 outputs_nothing('selftest --list-only')
534 set_incl = set(tests_incl)171 outputs_nothing('selftest --list-only selftest')
535 set_excl = set(tests_excl)172 outputs_nothing(['selftest', '--list-only', '--exclude', 'selftest'])
536 intersection = set_incl.intersection(set_excl)173 finally:
537 self.assertEquals(0, len(intersection))174 tests.selftest = original_selftest
538 self.assertEquals(len(tests_all), len(tests_incl) + len(tests_excl))
539
540 def test_list_only_random(self):
541 # check that --randomize works correctly
542 out_all,err_all = self.run_bzr('selftest --list-only selftest')
543 tests_all = self._parse_test_list(out_all.splitlines())[1]
544 # XXX: It looks like there are some orders for generating tests that
545 # fail as of 20070504 - maybe because of import order dependencies.
546 # So unfortunately this will rarely intermittently fail at the moment.
547 # -- mbp 20070504
548 out_rand,err_rand = self.run_bzr(['selftest', '--list-only',
549 'selftest', '--randomize', 'now'])
550 (header_rand,tests_rand,dummy) = self._parse_test_list(
551 out_rand.splitlines(), 1)
552 # XXX: The following line asserts that the randomized order is not the
553 # same as the default order. It is just possible that they'll get
554 # randomized into the same order and this will falsely fail, but
555 # that's very unlikely in practice because there are thousands of
556 # tests.
557 self.assertNotEqual(tests_all, tests_rand)
558 self.assertEqual(sorted(tests_all), sorted(tests_rand))
559 # Check that the seed can be reused to get the exact same order
560 seed_re = re.compile('Randomizing test order using seed (\w+)')
561 match_obj = seed_re.search(header_rand[-1])
562 seed = match_obj.group(1)
563 out_rand2,err_rand2 = self.run_bzr(['selftest', '--list-only',
564 'selftest', '--randomize', seed])
565 (header_rand2,tests_rand2,dummy) = self._parse_test_list(
566 out_rand2.splitlines(), 1)
567 self.assertEqual(tests_rand, tests_rand2)
568
569
570class TestSelftestWithIdList(TestCaseInTempDir):
571
572 def test_load_list(self):
573 # We don't want to call selftest for the whole suite, so we start with
574 # a reduced list.
575 test_list_fname = 'test.list'
576 fl = open(test_list_fname, 'wt')
577 fl.write('%s\n' % self.id())
578 fl.close()
579 out, err = self.run_bzr(
580 ['selftest', '--load-list', test_list_fname, '--list'])
581 self.assertContainsRe(out, "TestSelftestWithIdList")
582 self.assertLength(1, out.splitlines())
583
584 def test_load_unknown(self):
585 out, err = self.run_bzr('selftest --load-list I_do_not_exist ',
586 retcode=3)
587
588
589class TestSelftestStartingWith(TestCase):
590
591 def test_starting_with_single_argument(self):
592 out, err = self.run_bzr(
593 ['selftest', '--starting-with', self.id(), '--list'])
594 self.assertContainsRe(out, self.id())
595
596 def test_starting_with_multiple_argument(self):
597 out, err = self.run_bzr(
598 ['selftest',
599 '--starting-with', self.id(),
600 '--starting-with', 'bzrlib.tests.test_sampler',
601 '--list'])
602 self.assertContainsRe(out, self.id())
603 self.assertContainsRe(out, 'bzrlib.tests.test_sampler')
604175
=== modified file 'bzrlib/tests/test_selftest.py'
--- bzrlib/tests/test_selftest.py 2009-08-17 03:47:03 +0000
+++ bzrlib/tests/test_selftest.py 2009-08-24 06:35:12 +0000
@@ -18,6 +18,7 @@
1818
19from cStringIO import StringIO19from cStringIO import StringIO
20import os20import os
21import signal
21import sys22import sys
22import time23import time
23import unittest24import unittest
@@ -50,6 +51,7 @@
50 deprecated_method,51 deprecated_method,
51 )52 )
52from bzrlib.tests import (53from bzrlib.tests import (
54 SubUnitFeature,
53 test_lsprof,55 test_lsprof,
54 test_sftp_transport,56 test_sftp_transport,
55 TestUtil,57 TestUtil,
@@ -1745,7 +1747,7 @@
17451747
1746 def test_make_tree_for_sftp_branch(self):1748 def test_make_tree_for_sftp_branch(self):
1747 """Transports backed by local directories create local trees."""1749 """Transports backed by local directories create local trees."""
17481750 # NB: This is arguably a bug in the definition of make_branch_and_tree.
1749 tree = self.make_branch_and_tree('t1')1751 tree = self.make_branch_and_tree('t1')
1750 base = tree.bzrdir.root_transport.base1752 base = tree.bzrdir.root_transport.base
1751 self.failIf(base.startswith('sftp'),1753 self.failIf(base.startswith('sftp'),
@@ -1756,7 +1758,24 @@
1756 tree.branch.repository.bzrdir.root_transport)1758 tree.branch.repository.bzrdir.root_transport)
17571759
17581760
1759class TestSelftest(tests.TestCase):1761class SelfTestHelper:
1762
1763 def run_selftest(self, **kwargs):
1764 """Run selftest returning its output."""
1765 output = StringIO()
1766 old_transport = bzrlib.tests.default_transport
1767 old_root = tests.TestCaseWithMemoryTransport.TEST_ROOT
1768 tests.TestCaseWithMemoryTransport.TEST_ROOT = None
1769 try:
1770 self.assertEqual(True, tests.selftest(stream=output, **kwargs))
1771 finally:
1772 bzrlib.tests.default_transport = old_transport
1773 tests.TestCaseWithMemoryTransport.TEST_ROOT = old_root
1774 output.seek(0)
1775 return output
1776
1777
1778class TestSelftest(tests.TestCase, SelfTestHelper):
1760 """Tests of bzrlib.tests.selftest."""1779 """Tests of bzrlib.tests.selftest."""
17611780
1762 def test_selftest_benchmark_parameter_invokes_test_suite__benchmark__(self):1781 def test_selftest_benchmark_parameter_invokes_test_suite__benchmark__(self):
@@ -1770,6 +1789,511 @@
1770 test_suite_factory=factory)1789 test_suite_factory=factory)
1771 self.assertEqual([True], factory_called)1790 self.assertEqual([True], factory_called)
17721791
1792 def factory(self):
1793 """A test suite factory."""
1794 class Test(tests.TestCase):
1795 def a(self):
1796 pass
1797 def b(self):
1798 pass
1799 def c(self):
1800 pass
1801 return TestUtil.TestSuite([Test("a"), Test("b"), Test("c")])
1802
1803 def test_list_only(self):
1804 output = self.run_selftest(test_suite_factory=self.factory,
1805 list_only=True)
1806 self.assertEqual(3, len(output.readlines()))
1807
1808 def test_list_only_filtered(self):
1809 output = self.run_selftest(test_suite_factory=self.factory,
1810 list_only=True, pattern="Test.b")
1811 self.assertEndsWith(output.getvalue(), "Test.b\n")
1812 self.assertLength(1, output.readlines())
1813
1814 def test_list_only_excludes(self):
1815 output = self.run_selftest(test_suite_factory=self.factory,
1816 list_only=True, exclude_pattern="Test.b")
1817 self.assertNotContainsRe("Test.b", output.getvalue())
1818 self.assertLength(2, output.readlines())
1819
1820 def test_random(self):
1821 # test randomising by listing a number of tests.
1822 output_123 = self.run_selftest(test_suite_factory=self.factory,
1823 list_only=True, random_seed="123")
1824 output_234 = self.run_selftest(test_suite_factory=self.factory,
1825 list_only=True, random_seed="234")
1826 self.assertNotEqual(output_123, output_234)
1827 # "Randominzing test order..\n\n
1828 self.assertLength(5, output_123.readlines())
1829 self.assertLength(5, output_234.readlines())
1830
1831 def test_random_reuse_is_same_order(self):
1832 # test randomising by listing a number of tests.
1833 expected = self.run_selftest(test_suite_factory=self.factory,
1834 list_only=True, random_seed="123")
1835 repeated = self.run_selftest(test_suite_factory=self.factory,
1836 list_only=True, random_seed="123")
1837 self.assertEqual(expected.getvalue(), repeated.getvalue())
1838
1839 def test_runner_class(self):
1840 self.requireFeature(SubUnitFeature)
1841 from subunit import ProtocolTestCase
1842 stream = self.run_selftest(runner_class=tests.SubUnitBzrRunner,
1843 test_suite_factory=self.factory)
1844 test = ProtocolTestCase(stream)
1845 result = unittest.TestResult()
1846 test.run(result)
1847 self.assertEqual(3, result.testsRun)
1848
1849 def test_starting_with_single_argument(self):
1850 output = self.run_selftest(test_suite_factory=self.factory,
1851 starting_with=['bzrlib.tests.test_selftest.Test.a'],
1852 list_only=True)
1853 self.assertEqual('bzrlib.tests.test_selftest.Test.a\n',
1854 output.getvalue())
1855
1856 def test_starting_with_multiple_argument(self):
1857 output = self.run_selftest(test_suite_factory=self.factory,
1858 starting_with=['bzrlib.tests.test_selftest.Test.a',
1859 'bzrlib.tests.test_selftest.Test.b'],
1860 list_only=True)
1861 self.assertEqual('bzrlib.tests.test_selftest.Test.a\n'
1862 'bzrlib.tests.test_selftest.Test.b\n',
1863 output.getvalue())
1864
1865 def check_transport_set(self, transport_server):
1866 captured_transport = []
1867 def seen_transport(a_transport):
1868 captured_transport.append(a_transport)
1869 class Capture(tests.TestCase):
1870 def a(self):
1871 seen_transport(bzrlib.tests.default_transport)
1872 def factory():
1873 return TestUtil.TestSuite([Capture("a")])
1874 self.run_selftest(transport=transport_server, test_suite_factory=factory)
1875 self.assertEqual(transport_server, captured_transport[0])
1876
1877 def test_transport_sftp(self):
1878 try:
1879 import bzrlib.transport.sftp
1880 except ParamikoNotPresent:
1881 raise TestSkipped("Paramiko not present")
1882 self.check_transport_set(bzrlib.transport.sftp.SFTPAbsoluteServer)
1883
1884 def test_transport_memory(self):
1885 self.check_transport_set(bzrlib.transport.memory.MemoryServer)
1886
1887
1888class TestSelftestWithIdList(tests.TestCaseInTempDir, SelfTestHelper):
1889 # Does IO: reads test.list
1890
1891 def test_load_list(self):
1892 # Provide a list with one test - this test.
1893 test_id_line = '%s\n' % self.id()
1894 self.build_tree_contents([('test.list', test_id_line)])
1895 # And generate a list of the tests in the suite.
1896 stream = self.run_selftest(load_list='test.list', list_only=True)
1897 self.assertEqual(test_id_line, stream.getvalue())
1898
1899 def test_load_unknown(self):
1900 # Provide a list with one test - this test.
1901 # And generate a list of the tests in the suite.
1902 err = self.assertRaises(errors.NoSuchFile, self.run_selftest,
1903 load_list='missing file name', list_only=True)
1904
1905
1906class TestRunBzr(tests.TestCase):
1907
1908 out = ''
1909 err = ''
1910
1911 def _run_bzr_core(self, argv, retcode=0, encoding=None, stdin=None,
1912 working_dir=None):
1913 """Override _run_bzr_core to test how it is invoked by run_bzr.
1914
1915 Attempts to run bzr from inside this class don't actually run it.
1916
1917 We test how run_bzr actually invokes bzr in another location.
1918 Here we only need to test that it is run_bzr passes the right
1919 parameters to run_bzr.
1920 """
1921 self.argv = list(argv)
1922 self.retcode = retcode
1923 self.encoding = encoding
1924 self.stdin = stdin
1925 self.working_dir = working_dir
1926 return self.out, self.err
1927
1928 def test_run_bzr_error(self):
1929 self.out = "It sure does!\n"
1930 out, err = self.run_bzr_error(['^$'], ['rocks'], retcode=34)
1931 self.assertEqual(['rocks'], self.argv)
1932 self.assertEqual(34, self.retcode)
1933 self.assertEqual(out, 'It sure does!\n')
1934
1935 def test_run_bzr_error_regexes(self):
1936 self.out = ''
1937 self.err = "bzr: ERROR: foobarbaz is not versioned"
1938 out, err = self.run_bzr_error(
1939 ["bzr: ERROR: foobarbaz is not versioned"],
1940 ['file-id', 'foobarbaz'])
1941
1942 def test_encoding(self):
1943 """Test that run_bzr passes encoding to _run_bzr_core"""
1944 self.run_bzr('foo bar')
1945 self.assertEqual(None, self.encoding)
1946 self.assertEqual(['foo', 'bar'], self.argv)
1947
1948 self.run_bzr('foo bar', encoding='baz')
1949 self.assertEqual('baz', self.encoding)
1950 self.assertEqual(['foo', 'bar'], self.argv)
1951
1952 def test_retcode(self):
1953 """Test that run_bzr passes retcode to _run_bzr_core"""
1954 # Default is retcode == 0
1955 self.run_bzr('foo bar')
1956 self.assertEqual(0, self.retcode)
1957 self.assertEqual(['foo', 'bar'], self.argv)
1958
1959 self.run_bzr('foo bar', retcode=1)
1960 self.assertEqual(1, self.retcode)
1961 self.assertEqual(['foo', 'bar'], self.argv)
1962
1963 self.run_bzr('foo bar', retcode=None)
1964 self.assertEqual(None, self.retcode)
1965 self.assertEqual(['foo', 'bar'], self.argv)
1966
1967 self.run_bzr(['foo', 'bar'], retcode=3)
1968 self.assertEqual(3, self.retcode)
1969 self.assertEqual(['foo', 'bar'], self.argv)
1970
1971 def test_stdin(self):
1972 # test that the stdin keyword to run_bzr is passed through to
1973 # _run_bzr_core as-is. We do this by overriding
1974 # _run_bzr_core in this class, and then calling run_bzr,
1975 # which is a convenience function for _run_bzr_core, so
1976 # should invoke it.
1977 self.run_bzr('foo bar', stdin='gam')
1978 self.assertEqual('gam', self.stdin)
1979 self.assertEqual(['foo', 'bar'], self.argv)
1980
1981 self.run_bzr('foo bar', stdin='zippy')
1982 self.assertEqual('zippy', self.stdin)
1983 self.assertEqual(['foo', 'bar'], self.argv)
1984
1985 def test_working_dir(self):
1986 """Test that run_bzr passes working_dir to _run_bzr_core"""
1987 self.run_bzr('foo bar')
1988 self.assertEqual(None, self.working_dir)
1989 self.assertEqual(['foo', 'bar'], self.argv)
1990
1991 self.run_bzr('foo bar', working_dir='baz')
1992 self.assertEqual('baz', self.working_dir)
1993 self.assertEqual(['foo', 'bar'], self.argv)
1994
1995 def test_reject_extra_keyword_arguments(self):
1996 self.assertRaises(TypeError, self.run_bzr, "foo bar",
1997 error_regex=['error message'])
1998
1999
2000class TestRunBzrCaptured(tests.TestCaseWithTransport):
2001 # Does IO when testing the working_dir parameter.
2002
2003 def apply_redirected(self, stdin=None, stdout=None, stderr=None,
2004 a_callable=None, *args, **kwargs):
2005 self.stdin = stdin
2006 self.factory_stdin = getattr(bzrlib.ui.ui_factory, "stdin", None)
2007 self.factory = bzrlib.ui.ui_factory
2008 self.working_dir = osutils.getcwd()
2009 stdout.write('foo\n')
2010 stderr.write('bar\n')
2011 return 0
2012
2013 def test_stdin(self):
2014 # test that the stdin keyword to _run_bzr_core is passed through to
2015 # apply_redirected as a StringIO. We do this by overriding
2016 # apply_redirected in this class, and then calling _run_bzr_core,
2017 # which calls apply_redirected.
2018 self.run_bzr(['foo', 'bar'], stdin='gam')
2019 self.assertEqual('gam', self.stdin.read())
2020 self.assertTrue(self.stdin is self.factory_stdin)
2021 self.run_bzr(['foo', 'bar'], stdin='zippy')
2022 self.assertEqual('zippy', self.stdin.read())
2023 self.assertTrue(self.stdin is self.factory_stdin)
2024
2025 def test_ui_factory(self):
2026 # each invocation of self.run_bzr should get its
2027 # own UI factory, which is an instance of TestUIFactory,
2028 # with stdin, stdout and stderr attached to the stdin,
2029 # stdout and stderr of the invoked run_bzr
2030 current_factory = bzrlib.ui.ui_factory
2031 self.run_bzr(['foo'])
2032 self.failIf(current_factory is self.factory)
2033 self.assertNotEqual(sys.stdout, self.factory.stdout)
2034 self.assertNotEqual(sys.stderr, self.factory.stderr)
2035 self.assertEqual('foo\n', self.factory.stdout.getvalue())
2036 self.assertEqual('bar\n', self.factory.stderr.getvalue())
2037 self.assertIsInstance(self.factory, tests.TestUIFactory)
2038
2039 def test_working_dir(self):
2040 self.build_tree(['one/', 'two/'])
2041 cwd = osutils.getcwd()
2042
2043 # Default is to work in the current directory
2044 self.run_bzr(['foo', 'bar'])
2045 self.assertEqual(cwd, self.working_dir)
2046
2047 self.run_bzr(['foo', 'bar'], working_dir=None)
2048 self.assertEqual(cwd, self.working_dir)
2049
2050 # The function should be run in the alternative directory
2051 # but afterwards the current working dir shouldn't be changed
2052 self.run_bzr(['foo', 'bar'], working_dir='one')
2053 self.assertNotEqual(cwd, self.working_dir)
2054 self.assertEndsWith(self.working_dir, 'one')
2055 self.assertEqual(cwd, osutils.getcwd())
2056
2057 self.run_bzr(['foo', 'bar'], working_dir='two')
2058 self.assertNotEqual(cwd, self.working_dir)
2059 self.assertEndsWith(self.working_dir, 'two')
2060 self.assertEqual(cwd, osutils.getcwd())
2061
2062
2063class StubProcess(object):
2064 """A stub process for testing run_bzr_subprocess."""
2065
2066 def __init__(self, out="", err="", retcode=0):
2067 self.out = out
2068 self.err = err
2069 self.returncode = retcode
2070
2071 def communicate(self):
2072 return self.out, self.err
2073
2074
2075class TestRunBzrSubprocess(tests.TestCaseWithTransport):
2076
2077 def setUp(self):
2078 tests.TestCaseWithTransport.setUp(self)
2079 self.subprocess_calls = []
2080
2081 def start_bzr_subprocess(self, process_args, env_changes=None,
2082 skip_if_plan_to_signal=False,
2083 working_dir=None,
2084 allow_plugins=False):
2085 """capture what run_bzr_subprocess tries to do."""
2086 self.subprocess_calls.append({'process_args':process_args,
2087 'env_changes':env_changes,
2088 'skip_if_plan_to_signal':skip_if_plan_to_signal,
2089 'working_dir':working_dir, 'allow_plugins':allow_plugins})
2090 return self.next_subprocess
2091
2092 def assertRunBzrSubprocess(self, expected_args, process, *args, **kwargs):
2093 """Run run_bzr_subprocess with args and kwargs using a stubbed process.
2094
2095 Inside TestRunBzrSubprocessCommands we use a stub start_bzr_subprocess
2096 that will return static results. This assertion method populates those
2097 results and also checks the arguments run_bzr_subprocess generates.
2098 """
2099 self.next_subprocess = process
2100 try:
2101 result = self.run_bzr_subprocess(*args, **kwargs)
2102 except:
2103 self.next_subprocess = None
2104 for key, expected in expected_args.iteritems():
2105 self.assertEqual(expected, self.subprocess_calls[-1][key])
2106 raise
2107 else:
2108 self.next_subprocess = None
2109 for key, expected in expected_args.iteritems():
2110 self.assertEqual(expected, self.subprocess_calls[-1][key])
2111 return result
2112
2113 def test_run_bzr_subprocess(self):
2114 """The run_bzr_helper_external command behaves nicely."""
2115 self.assertRunBzrSubprocess({'process_args':['--version']},
2116 StubProcess(), '--version')
2117 self.assertRunBzrSubprocess({'process_args':['--version']},
2118 StubProcess(), ['--version'])
2119 # retcode=None disables retcode checking
2120 result = self.assertRunBzrSubprocess({},
2121 StubProcess(retcode=3), '--version', retcode=None)
2122 result = self.assertRunBzrSubprocess({},
2123 StubProcess(out="is free software"), '--version')
2124 self.assertContainsRe(result[0], 'is free software')
2125 # Running a subcommand that is missing errors
2126 self.assertRaises(AssertionError, self.assertRunBzrSubprocess,
2127 {'process_args':['--versionn']}, StubProcess(retcode=3),
2128 '--versionn')
2129 # Unless it is told to expect the error from the subprocess
2130 result = self.assertRunBzrSubprocess({},
2131 StubProcess(retcode=3), '--versionn', retcode=3)
2132 # Or to ignore retcode checking
2133 result = self.assertRunBzrSubprocess({},
2134 StubProcess(err="unknown command", retcode=3), '--versionn',
2135 retcode=None)
2136 self.assertContainsRe(result[1], 'unknown command')
2137
2138 def test_env_change_passes_through(self):
2139 self.assertRunBzrSubprocess(
2140 {'env_changes':{'new':'value', 'changed':'newvalue', 'deleted':None}},
2141 StubProcess(), '',
2142 env_changes={'new':'value', 'changed':'newvalue', 'deleted':None})
2143
2144 def test_no_working_dir_passed_as_None(self):
2145 self.assertRunBzrSubprocess({'working_dir': None}, StubProcess(), '')
2146
2147 def test_no_working_dir_passed_through(self):
2148 self.assertRunBzrSubprocess({'working_dir': 'dir'}, StubProcess(), '',
2149 working_dir='dir')
2150
2151 def test_run_bzr_subprocess_no_plugins(self):
2152 self.assertRunBzrSubprocess({'allow_plugins': False},
2153 StubProcess(), '')
2154
2155 def test_allow_plugins(self):
2156 self.assertRunBzrSubprocess({'allow_plugins': True},
2157 StubProcess(), '', allow_plugins=True)
2158
2159
2160class _DontSpawnProcess(Exception):
2161 """A simple exception which just allows us to skip unnecessary steps"""
2162
2163
2164class TestStartBzrSubProcess(tests.TestCase):
2165
2166 def check_popen_state(self):
2167 """Replace to make assertions when popen is called."""
2168
2169 def _popen(self, *args, **kwargs):
2170 """Record the command that is run, so that we can ensure it is correct"""
2171 self.check_popen_state()
2172 self._popen_args = args
2173 self._popen_kwargs = kwargs
2174 raise _DontSpawnProcess()
2175
2176 def test_run_bzr_subprocess_no_plugins(self):
2177 self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [])
2178 command = self._popen_args[0]
2179 self.assertEqual(sys.executable, command[0])
2180 self.assertEqual(self.get_bzr_path(), command[1])
2181 self.assertEqual(['--no-plugins'], command[2:])
2182
2183 def test_allow_plugins(self):
2184 self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2185 allow_plugins=True)
2186 command = self._popen_args[0]
2187 self.assertEqual([], command[2:])
2188
2189 def test_set_env(self):
2190 self.failIf('EXISTANT_ENV_VAR' in os.environ)
2191 # set in the child
2192 def check_environment():
2193 self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2194 self.check_popen_state = check_environment
2195 self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2196 env_changes={'EXISTANT_ENV_VAR':'set variable'})
2197 # not set in theparent
2198 self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2199
2200 def test_run_bzr_subprocess_env_del(self):
2201 """run_bzr_subprocess can remove environment variables too."""
2202 self.failIf('EXISTANT_ENV_VAR' in os.environ)
2203 def check_environment():
2204 self.assertFalse('EXISTANT_ENV_VAR' in os.environ)
2205 os.environ['EXISTANT_ENV_VAR'] = 'set variable'
2206 self.check_popen_state = check_environment
2207 self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2208 env_changes={'EXISTANT_ENV_VAR':None})
2209 # Still set in parent
2210 self.assertEqual('set variable', os.environ['EXISTANT_ENV_VAR'])
2211 del os.environ['EXISTANT_ENV_VAR']
2212
2213 def test_env_del_missing(self):
2214 self.failIf('NON_EXISTANT_ENV_VAR' in os.environ)
2215 def check_environment():
2216 self.assertFalse('NON_EXISTANT_ENV_VAR' in os.environ)
2217 self.check_popen_state = check_environment
2218 self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2219 env_changes={'NON_EXISTANT_ENV_VAR':None})
2220
2221 def test_working_dir(self):
2222 """Test that we can specify the working dir for the child"""
2223 orig_getcwd = osutils.getcwd
2224 orig_chdir = os.chdir
2225 chdirs = []
2226 def chdir(path):
2227 chdirs.append(path)
2228 os.chdir = chdir
2229 try:
2230 def getcwd():
2231 return 'current'
2232 osutils.getcwd = getcwd
2233 try:
2234 self.assertRaises(_DontSpawnProcess, self.start_bzr_subprocess, [],
2235 working_dir='foo')
2236 finally:
2237 osutils.getcwd = orig_getcwd
2238 finally:
2239 os.chdir = orig_chdir
2240 self.assertEqual(['foo', 'current'], chdirs)
2241
2242
2243class TestBzrSubprocess(tests.TestCaseWithTransport):
2244
2245 def test_start_and_stop_bzr_subprocess(self):
2246 """We can start and perform other test actions while that process is
2247 still alive.
2248 """
2249 process = self.start_bzr_subprocess(['--version'])
2250 result = self.finish_bzr_subprocess(process)
2251 self.assertContainsRe(result[0], 'is free software')
2252 self.assertEqual('', result[1])
2253
2254 def test_start_and_stop_bzr_subprocess_with_error(self):
2255 """finish_bzr_subprocess allows specification of the desired exit code.
2256 """
2257 process = self.start_bzr_subprocess(['--versionn'])
2258 result = self.finish_bzr_subprocess(process, retcode=3)
2259 self.assertEqual('', result[0])
2260 self.assertContainsRe(result[1], 'unknown command')
2261
2262 def test_start_and_stop_bzr_subprocess_ignoring_retcode(self):
2263 """finish_bzr_subprocess allows the exit code to be ignored."""
2264 process = self.start_bzr_subprocess(['--versionn'])
2265 result = self.finish_bzr_subprocess(process, retcode=None)
2266 self.assertEqual('', result[0])
2267 self.assertContainsRe(result[1], 'unknown command')
2268
2269 def test_start_and_stop_bzr_subprocess_with_unexpected_retcode(self):
2270 """finish_bzr_subprocess raises self.failureException if the retcode is
2271 not the expected one.
2272 """
2273 process = self.start_bzr_subprocess(['--versionn'])
2274 self.assertRaises(self.failureException, self.finish_bzr_subprocess,
2275 process)
2276
2277 def test_start_and_stop_bzr_subprocess_send_signal(self):
2278 """finish_bzr_subprocess raises self.failureException if the retcode is
2279 not the expected one.
2280 """
2281 process = self.start_bzr_subprocess(['wait-until-signalled'],
2282 skip_if_plan_to_signal=True)
2283 self.assertEqual('running\n', process.stdout.readline())
2284 result = self.finish_bzr_subprocess(process, send_signal=signal.SIGINT,
2285 retcode=3)
2286 self.assertEqual('', result[0])
2287 self.assertEqual('bzr: interrupted\n', result[1])
2288
2289 def test_start_and_stop_working_dir(self):
2290 cwd = osutils.getcwd()
2291 self.make_branch_and_tree('one')
2292 process = self.start_bzr_subprocess(['root'], working_dir='one')
2293 result = self.finish_bzr_subprocess(process, universal_newlines=True)
2294 self.assertEndsWith(result[0], 'one\n')
2295 self.assertEqual('', result[1])
2296
17732297
1774class TestKnownFailure(tests.TestCase):2298class TestKnownFailure(tests.TestCase):
17752299
@@ -1840,8 +2364,8 @@
1840 tests.TestCase.setUp(self)2364 tests.TestCase.setUp(self)
1841 self.suite = TestUtil.TestSuite()2365 self.suite = TestUtil.TestSuite()
1842 self.loader = TestUtil.TestLoader()2366 self.loader = TestUtil.TestLoader()
1843 self.suite.addTest(self.loader.loadTestsFromModuleNames([2367 self.suite.addTest(self.loader.loadTestsFromModule(
1844 'bzrlib.tests.test_selftest']))2368 sys.modules['bzrlib.tests.test_selftest']))
1845 self.all_names = _test_ids(self.suite)2369 self.all_names = _test_ids(self.suite)
18462370
1847 def test_condition_id_re(self):2371 def test_condition_id_re(self):
@@ -2158,8 +2682,8 @@
2158class TestTestSuite(tests.TestCase):2682class TestTestSuite(tests.TestCase):
21592683
2160 def test_test_suite(self):2684 def test_test_suite(self):
2161 # This test is slow, so we do a single test with one test in each2685 # This test is slow - it loads the entire test suite to operate, so we
2162 # category2686 # do a single test with one test in each category
2163 test_list = [2687 test_list = [
2164 # testmod_names2688 # testmod_names
2165 'bzrlib.tests.blackbox.test_branch.TestBranch.test_branch',2689 'bzrlib.tests.blackbox.test_branch.TestBranch.test_branch',
@@ -2175,6 +2699,9 @@
2175 self.assertEquals(test_list, _test_ids(suite))2699 self.assertEquals(test_list, _test_ids(suite))
21762700
2177 def test_test_suite_list_and_start(self):2701 def test_test_suite_list_and_start(self):
2702 # We cannot test this at the same time as the main load, because we want
2703 # to know that starting_with == None works. So a second full load is
2704 # incurred.
2178 test_list = ['bzrlib.tests.test_selftest.TestTestSuite.test_test_suite']2705 test_list = ['bzrlib.tests.test_selftest.TestTestSuite.test_test_suite']
2179 suite = tests.test_suite(test_list,2706 suite = tests.test_suite(test_list,
2180 ['bzrlib.tests.test_selftest.TestTestSuite'])2707 ['bzrlib.tests.test_selftest.TestTestSuite'])