Merge lp:~mwhudson/launchpad/reduce-concurrent-job-count into lp:launchpad
- reduce-concurrent-job-count
- Merge into devel
Status: | Merged | ||||
---|---|---|---|---|---|
Approved by: | Michael Hudson-Doyle | ||||
Approved revision: | not available | ||||
Merged at revision: | not available | ||||
Proposed branch: | lp:~mwhudson/launchpad/reduce-concurrent-job-count | ||||
Merge into: | lp:launchpad | ||||
Diff against target: |
492 lines (+93/-79) 15 files modified
cronscripts/code-import-dispatcher.py (+8/-1) lib/canonical/config/schema-lazr.conf (+1/-1) lib/lp/code/doc/xmlrpc-codeimport-scheduler.txt (+3/-6) lib/lp/code/interfaces/codeimportjob.py (+1/-1) lib/lp/code/interfaces/codeimportmachine.py (+1/-1) lib/lp/code/interfaces/codeimportscheduler.py (+1/-1) lib/lp/code/model/codeimportjob.py (+2/-2) lib/lp/code/model/codeimportmachine.py (+2/-3) lib/lp/code/model/tests/test_codeimport.py (+1/-1) lib/lp/code/model/tests/test_codeimportjob.py (+3/-3) lib/lp/code/model/tests/test_codeimportmachine.py (+9/-11) lib/lp/code/xmlrpc/codeimportscheduler.py (+3/-2) lib/lp/codehosting/codeimport/dispatcher.py (+4/-2) lib/lp/codehosting/codeimport/tests/test_dispatcher.py (+53/-43) lib/lp/codehosting/codeimport/tests/test_workermonitor.py (+1/-1) |
||||
To merge this branch: | bzr merge lp:~mwhudson/launchpad/reduce-concurrent-job-count | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Tim Penhey (community) | Approve | ||
Michael Nelson (community) | code | Needs Information | |
Review via email: mp+19574@code.launchpad.net |
Commit message
Allow a code import machine to set its own limit for how many jobs to run, and lower the default to 3.
Description of the change
Michael Hudson-Doyle (mwhudson) wrote : | # |
Michael Nelson (michael.nelson) wrote : | # |
Hi Michael,
doesn't this need to be a change on the lp-production-
production/
55:max_
Michael Hudson-Doyle (mwhudson) wrote : | # |
Hm, I don't think that config setting takes effect at the moment. I'll investigate further!
Michael Hudson-Doyle (mwhudson) wrote : | # |
Yeah, it doesn't apply, so I'll remove it: https:/
Tim Penhey (thumper) : | # |
Michael Hudson-Doyle (mwhudson) wrote : | # |
OK, so as discussed in IRC, I've made the worker limit settable on the code-import-
Tim Penhey (thumper) wrote : | # |
lib/lp/
You have:
if worker_limit is None:
Which doesn't set self.worker_limit if worker_limit is passed in.
if worker_limit is None:
self.
perhaps.
How hard would it be to add an int Option? That way the OptionParser object does the integer validation and conversion for you.
--max-jobs
Michael Hudson-Doyle (mwhudson) wrote : | # |
Tim,
can you rereview now please?
Cheers,
mwh
Preview Diff
1 | === modified file 'cronscripts/code-import-dispatcher.py' | |||
2 | --- cronscripts/code-import-dispatcher.py 2009-10-13 14:38:07 +0000 | |||
3 | +++ cronscripts/code-import-dispatcher.py 2010-02-22 19:42:20 +0000 | |||
4 | @@ -18,6 +18,12 @@ | |||
5 | 18 | 18 | ||
6 | 19 | class CodeImportDispatcherScript(LaunchpadScript): | 19 | class CodeImportDispatcherScript(LaunchpadScript): |
7 | 20 | 20 | ||
8 | 21 | def add_my_options(self): | ||
9 | 22 | self.parser.add_option( | ||
10 | 23 | "--max-jobs", dest="max_jobs", type=int, | ||
11 | 24 | default=config.codeimportdispatcher.max_jobs_per_machine, | ||
12 | 25 | help="The maximum number of jobs to run on this machine.") | ||
13 | 26 | |||
14 | 21 | def run(self, use_web_security=False, implicit_begin=True, | 27 | def run(self, use_web_security=False, implicit_begin=True, |
15 | 22 | isolation=None): | 28 | isolation=None): |
16 | 23 | """See `LaunchpadScript.run`. | 29 | """See `LaunchpadScript.run`. |
17 | @@ -30,7 +36,8 @@ | |||
18 | 30 | def main(self): | 36 | def main(self): |
19 | 31 | globalErrorUtility.configure('codeimportdispatcher') | 37 | globalErrorUtility.configure('codeimportdispatcher') |
20 | 32 | 38 | ||
22 | 33 | CodeImportDispatcher(self.logger).findAndDispatchJob( | 39 | dispatcher = CodeImportDispatcher(self.logger, self.options.max_jobs) |
23 | 40 | dispatcher.findAndDispatchJob( | ||
24 | 34 | ServerProxy(config.codeimportdispatcher.codeimportscheduler_url)) | 41 | ServerProxy(config.codeimportdispatcher.codeimportscheduler_url)) |
25 | 35 | 42 | ||
26 | 36 | 43 | ||
27 | 37 | 44 | ||
28 | === modified file 'lib/canonical/config/schema-lazr.conf' | |||
29 | --- lib/canonical/config/schema-lazr.conf 2010-02-20 04:13:47 +0000 | |||
30 | +++ lib/canonical/config/schema-lazr.conf 2010-02-22 19:42:20 +0000 | |||
31 | @@ -449,7 +449,7 @@ | |||
32 | 449 | codeimportscheduler_url: http://xmlrpc-private.launchpad.dev:8087/codeimportscheduler | 449 | codeimportscheduler_url: http://xmlrpc-private.launchpad.dev:8087/codeimportscheduler |
33 | 450 | 450 | ||
34 | 451 | # The maximum number of jobs to run on a machine at one time. | 451 | # The maximum number of jobs to run on a machine at one time. |
36 | 452 | max_jobs_per_machine: 10 | 452 | max_jobs_per_machine: 3 |
37 | 453 | 453 | ||
38 | 454 | # See [error_reports]. | 454 | # See [error_reports]. |
39 | 455 | copy_to_zlog: False | 455 | copy_to_zlog: False |
40 | 456 | 456 | ||
41 | === modified file 'lib/lp/code/doc/xmlrpc-codeimport-scheduler.txt' | |||
42 | --- lib/lp/code/doc/xmlrpc-codeimport-scheduler.txt 2009-10-22 11:55:51 +0000 | |||
43 | +++ lib/lp/code/doc/xmlrpc-codeimport-scheduler.txt 2010-02-22 19:42:20 +0000 | |||
44 | @@ -32,12 +32,9 @@ | |||
45 | 32 | getJobForMachine(), that returns the id of the job that the code | 32 | getJobForMachine(), that returns the id of the job that the code |
46 | 33 | import slave should next run. | 33 | import slave should next run. |
47 | 34 | 34 | ||
49 | 35 | >>> codeimportscheduler_api.getJobForMachine('bazaar-importer') | 35 | >>> codeimportscheduler_api.getJobForMachine('bazaar-importer', 2) |
50 | 36 | 1 | 36 | 1 |
51 | 37 | 37 | ||
52 | 38 | >>> from canonical.database.sqlbase import flush_database_updates | ||
53 | 39 | >>> flush_database_updates() | ||
54 | 40 | |||
55 | 41 | The method just calls the 'getJobForMachine' method from the | 38 | The method just calls the 'getJobForMachine' method from the |
56 | 42 | ICodeImportJobSet interface, and tests all the details of what it does | 39 | ICodeImportJobSet interface, and tests all the details of what it does |
57 | 43 | can be found in the tests for ICodeImportJobSet. | 40 | can be found in the tests for ICodeImportJobSet. |
58 | @@ -49,7 +46,7 @@ | |||
59 | 49 | >>> codeimportscheduler = xmlrpclib.ServerProxy( | 46 | >>> codeimportscheduler = xmlrpclib.ServerProxy( |
60 | 50 | ... 'http://xmlrpc-private.launchpad.dev:8087/codeimportscheduler', | 47 | ... 'http://xmlrpc-private.launchpad.dev:8087/codeimportscheduler', |
61 | 51 | ... transport=XMLRPCTestTransport()) | 48 | ... transport=XMLRPCTestTransport()) |
63 | 52 | >>> codeimportscheduler.getJobForMachine('bazaar-importer') | 49 | >>> codeimportscheduler.getJobForMachine('bazaar-importer', 2) |
64 | 53 | 0 | 50 | 0 |
65 | 54 | 51 | ||
66 | 55 | This includes the behaviour of auto-creating machine rows for | 52 | This includes the behaviour of auto-creating machine rows for |
67 | @@ -60,7 +57,7 @@ | |||
68 | 60 | >>> print getUtility(ICodeImportMachineSet).getByHostname( | 57 | >>> print getUtility(ICodeImportMachineSet).getByHostname( |
69 | 61 | ... 'doesnt-exist-yet') | 58 | ... 'doesnt-exist-yet') |
70 | 62 | None | 59 | None |
72 | 63 | >>> codeimportscheduler.getJobForMachine('doesnt-exist-yet') | 60 | >>> codeimportscheduler.getJobForMachine('doesnt-exist-yet', 1) |
73 | 64 | 0 | 61 | 0 |
74 | 65 | >>> new_machine = getUtility(ICodeImportMachineSet).getByHostname( | 62 | >>> new_machine = getUtility(ICodeImportMachineSet).getByHostname( |
75 | 66 | ... 'doesnt-exist-yet') | 63 | ... 'doesnt-exist-yet') |
76 | 67 | 64 | ||
77 | === modified file 'lib/lp/code/interfaces/codeimportjob.py' | |||
78 | --- lib/lp/code/interfaces/codeimportjob.py 2009-06-25 04:06:00 +0000 | |||
79 | +++ lib/lp/code/interfaces/codeimportjob.py 2010-02-22 19:42:20 +0000 | |||
80 | @@ -122,7 +122,7 @@ | |||
81 | 122 | # we implement endpoint specific authentication for the private xml-rpc | 122 | # we implement endpoint specific authentication for the private xml-rpc |
82 | 123 | # server. | 123 | # server. |
83 | 124 | 124 | ||
85 | 125 | def getJobForMachine(hostname): | 125 | def getJobForMachine(hostname, worker_limit): |
86 | 126 | """Select a job for the given machine to run and mark it as started. | 126 | """Select a job for the given machine to run and mark it as started. |
87 | 127 | 127 | ||
88 | 128 | If there is not already a CodeImportMachine with the given hostname, | 128 | If there is not already a CodeImportMachine with the given hostname, |
89 | 129 | 129 | ||
90 | === modified file 'lib/lp/code/interfaces/codeimportmachine.py' | |||
91 | --- lib/lp/code/interfaces/codeimportmachine.py 2009-06-25 04:06:00 +0000 | |||
92 | +++ lib/lp/code/interfaces/codeimportmachine.py 2010-02-22 19:42:20 +0000 | |||
93 | @@ -47,7 +47,7 @@ | |||
94 | 47 | description=_("When the controller deamon last recorded it was" | 47 | description=_("When the controller deamon last recorded it was" |
95 | 48 | " running.")) | 48 | " running.")) |
96 | 49 | 49 | ||
98 | 50 | def shouldLookForJob(): | 50 | def shouldLookForJob(worker_limit): |
99 | 51 | """Should we look for a job to run on this machine? | 51 | """Should we look for a job to run on this machine? |
100 | 52 | 52 | ||
101 | 53 | There are three reasons we might not look for a job: | 53 | There are three reasons we might not look for a job: |
102 | 54 | 54 | ||
103 | === modified file 'lib/lp/code/interfaces/codeimportscheduler.py' | |||
104 | --- lib/lp/code/interfaces/codeimportscheduler.py 2009-06-25 04:06:00 +0000 | |||
105 | +++ lib/lp/code/interfaces/codeimportscheduler.py 2010-02-22 19:42:20 +0000 | |||
106 | @@ -28,7 +28,7 @@ | |||
107 | 28 | when they need more work to do. | 28 | when they need more work to do. |
108 | 29 | """ | 29 | """ |
109 | 30 | 30 | ||
111 | 31 | def getJobForMachine(hostname): | 31 | def getJobForMachine(hostname, worker_limit): |
112 | 32 | """Get a job to run on the slave 'hostname'. | 32 | """Get a job to run on the slave 'hostname'. |
113 | 33 | 33 | ||
114 | 34 | This method selects the most appropriate job for the machine, | 34 | This method selects the most appropriate job for the machine, |
115 | 35 | 35 | ||
116 | === modified file 'lib/lp/code/model/codeimportjob.py' | |||
117 | --- lib/lp/code/model/codeimportjob.py 2010-01-27 02:48:13 +0000 | |||
118 | +++ lib/lp/code/model/codeimportjob.py 2010-02-22 19:42:20 +0000 | |||
119 | @@ -102,7 +102,7 @@ | |||
120 | 102 | except SQLObjectNotFound: | 102 | except SQLObjectNotFound: |
121 | 103 | return None | 103 | return None |
122 | 104 | 104 | ||
124 | 105 | def getJobForMachine(self, hostname): | 105 | def getJobForMachine(self, hostname, worker_limit): |
125 | 106 | """See `ICodeImportJobSet`.""" | 106 | """See `ICodeImportJobSet`.""" |
126 | 107 | job_workflow = getUtility(ICodeImportJobWorkflow) | 107 | job_workflow = getUtility(ICodeImportJobWorkflow) |
127 | 108 | for job in self.getReclaimableJobs(): | 108 | for job in self.getReclaimableJobs(): |
128 | @@ -111,7 +111,7 @@ | |||
129 | 111 | if machine is None: | 111 | if machine is None: |
130 | 112 | machine = getUtility(ICodeImportMachineSet).new( | 112 | machine = getUtility(ICodeImportMachineSet).new( |
131 | 113 | hostname, CodeImportMachineState.ONLINE) | 113 | hostname, CodeImportMachineState.ONLINE) |
133 | 114 | elif not machine.shouldLookForJob(): | 114 | elif not machine.shouldLookForJob(worker_limit): |
134 | 115 | return None | 115 | return None |
135 | 116 | job = CodeImportJob.selectOne( | 116 | job = CodeImportJob.selectOne( |
136 | 117 | """id IN (SELECT id FROM CodeImportJob | 117 | """id IN (SELECT id FROM CodeImportJob |
137 | 118 | 118 | ||
138 | === modified file 'lib/lp/code/model/codeimportmachine.py' | |||
139 | --- lib/lp/code/model/codeimportmachine.py 2009-06-25 04:06:00 +0000 | |||
140 | +++ lib/lp/code/model/codeimportmachine.py 2010-02-22 19:42:20 +0000 | |||
141 | @@ -51,7 +51,7 @@ | |||
142 | 51 | 'CodeImportEvent', joinColumn='machine', | 51 | 'CodeImportEvent', joinColumn='machine', |
143 | 52 | orderBy=['-date_created', '-id']) | 52 | orderBy=['-date_created', '-id']) |
144 | 53 | 53 | ||
146 | 54 | def shouldLookForJob(self): | 54 | def shouldLookForJob(self, worker_limit): |
147 | 55 | """See `ICodeImportMachine`.""" | 55 | """See `ICodeImportMachine`.""" |
148 | 56 | job_count = self.current_jobs.count() | 56 | job_count = self.current_jobs.count() |
149 | 57 | 57 | ||
150 | @@ -64,8 +64,7 @@ | |||
151 | 64 | CodeImportMachineOfflineReason.QUIESCED) | 64 | CodeImportMachineOfflineReason.QUIESCED) |
152 | 65 | return False | 65 | return False |
153 | 66 | elif self.state == CodeImportMachineState.ONLINE: | 66 | elif self.state == CodeImportMachineState.ONLINE: |
156 | 67 | max_jobs = config.codeimportdispatcher.max_jobs_per_machine | 67 | return job_count < worker_limit |
155 | 68 | return job_count < max_jobs | ||
157 | 69 | else: | 68 | else: |
158 | 70 | raise AssertionError( | 69 | raise AssertionError( |
159 | 71 | "Unknown machine state %r??" % self.state) | 70 | "Unknown machine state %r??" % self.state) |
160 | 72 | 71 | ||
161 | === modified file 'lib/lp/code/model/tests/test_codeimport.py' | |||
162 | --- lib/lp/code/model/tests/test_codeimport.py 2010-02-01 03:55:59 +0000 | |||
163 | +++ lib/lp/code/model/tests/test_codeimport.py 2010-02-22 19:42:20 +0000 | |||
164 | @@ -195,7 +195,7 @@ | |||
165 | 195 | 195 | ||
166 | 196 | def makeApprovedImportWithRunningJob(self): | 196 | def makeApprovedImportWithRunningJob(self): |
167 | 197 | code_import = self.makeApprovedImportWithPendingJob() | 197 | code_import = self.makeApprovedImportWithPendingJob() |
169 | 198 | job = CodeImportJobSet().getJobForMachine('machine') | 198 | job = CodeImportJobSet().getJobForMachine('machine', 10) |
170 | 199 | self.assertEqual(code_import.import_job, job) | 199 | self.assertEqual(code_import.import_job, job) |
171 | 200 | return code_import | 200 | return code_import |
172 | 201 | 201 | ||
173 | 202 | 202 | ||
174 | === modified file 'lib/lp/code/model/tests/test_codeimportjob.py' | |||
175 | --- lib/lp/code/model/tests/test_codeimportjob.py 2010-02-01 03:55:59 +0000 | |||
176 | +++ lib/lp/code/model/tests/test_codeimportjob.py 2010-02-22 19:42:20 +0000 | |||
177 | @@ -114,7 +114,7 @@ | |||
178 | 114 | def assertJobIsSelected(self, desired_job): | 114 | def assertJobIsSelected(self, desired_job): |
179 | 115 | """Assert that the expected job is chosen by getJobForMachine.""" | 115 | """Assert that the expected job is chosen by getJobForMachine.""" |
180 | 116 | observed_job = getUtility(ICodeImportJobSet).getJobForMachine( | 116 | observed_job = getUtility(ICodeImportJobSet).getJobForMachine( |
182 | 117 | self.machine.hostname) | 117 | self.machine.hostname, 10) |
183 | 118 | self.assert_(observed_job is not None, "No job was selected.") | 118 | self.assert_(observed_job is not None, "No job was selected.") |
184 | 119 | self.assertEqual(desired_job, observed_job, | 119 | self.assertEqual(desired_job, observed_job, |
185 | 120 | "Expected job not selected.") | 120 | "Expected job not selected.") |
186 | @@ -122,7 +122,7 @@ | |||
187 | 122 | def assertNoJobSelected(self): | 122 | def assertNoJobSelected(self): |
188 | 123 | """Assert that no job is selected.""" | 123 | """Assert that no job is selected.""" |
189 | 124 | observed_job = getUtility(ICodeImportJobSet).getJobForMachine( | 124 | observed_job = getUtility(ICodeImportJobSet).getJobForMachine( |
191 | 125 | 'machine') | 125 | 'machine', 10) |
192 | 126 | self.assert_(observed_job is None, "Job unexpectedly selected.") | 126 | self.assert_(observed_job is None, "Job unexpectedly selected.") |
193 | 127 | 127 | ||
194 | 128 | def test_nothingSelectedIfNothingCreated(self): | 128 | def test_nothingSelectedIfNothingCreated(self): |
195 | @@ -274,7 +274,7 @@ | |||
196 | 274 | machine = self.factory.makeCodeImportMachine(set_online=True) | 274 | machine = self.factory.makeCodeImportMachine(set_online=True) |
197 | 275 | login(ANONYMOUS) | 275 | login(ANONYMOUS) |
198 | 276 | getUtility(ICodeImportJobSet).getJobForMachine( | 276 | getUtility(ICodeImportJobSet).getJobForMachine( |
200 | 277 | machine.hostname) | 277 | machine.hostname, 10) |
201 | 278 | login_for_code_imports() | 278 | login_for_code_imports() |
202 | 279 | # Now there are no reclaimable jobs. | 279 | # Now there are no reclaimable jobs. |
203 | 280 | self.assertReclaimableJobs([]) | 280 | self.assertReclaimableJobs([]) |
204 | 281 | 281 | ||
205 | === modified file 'lib/lp/code/model/tests/test_codeimportmachine.py' | |||
206 | --- lib/lp/code/model/tests/test_codeimportmachine.py 2009-06-25 04:06:00 +0000 | |||
207 | +++ lib/lp/code/model/tests/test_codeimportmachine.py 2010-02-22 19:42:20 +0000 | |||
208 | @@ -9,7 +9,6 @@ | |||
209 | 9 | 9 | ||
210 | 10 | from zope.component import getUtility | 10 | from zope.component import getUtility |
211 | 11 | 11 | ||
212 | 12 | from canonical.config import config | ||
213 | 13 | from canonical.database.constants import UTC_NOW | 12 | from canonical.database.constants import UTC_NOW |
214 | 14 | from lp.code.enums import ( | 13 | from lp.code.enums import ( |
215 | 15 | CodeImportMachineOfflineReason, CodeImportMachineState) | 14 | CodeImportMachineOfflineReason, CodeImportMachineState) |
216 | @@ -39,13 +38,13 @@ | |||
217 | 39 | def test_machineIsOffline(self): | 38 | def test_machineIsOffline(self): |
218 | 40 | # When the machine is offline, we shouldn't look for any jobs. | 39 | # When the machine is offline, we shouldn't look for any jobs. |
219 | 41 | self.machine.setOffline(CodeImportMachineOfflineReason.STOPPED) | 40 | self.machine.setOffline(CodeImportMachineOfflineReason.STOPPED) |
221 | 42 | self.assertFalse(self.machine.shouldLookForJob()) | 41 | self.assertFalse(self.machine.shouldLookForJob(10)) |
222 | 43 | 42 | ||
223 | 44 | def test_machineIsQuiescingNoJobsRunning(self): | 43 | def test_machineIsQuiescingNoJobsRunning(self): |
224 | 45 | # When the machine is quiescing and no jobs are running on this | 44 | # When the machine is quiescing and no jobs are running on this |
225 | 46 | # machine, we should set the machine to OFFLINE and not look for jobs. | 45 | # machine, we should set the machine to OFFLINE and not look for jobs. |
226 | 47 | self.machine.setQuiescing(self.factory.makePerson()) | 46 | self.machine.setQuiescing(self.factory.makePerson()) |
228 | 48 | self.assertFalse(self.machine.shouldLookForJob()) | 47 | self.assertFalse(self.machine.shouldLookForJob(10)) |
229 | 49 | self.assertEqual(self.machine.state, CodeImportMachineState.OFFLINE) | 48 | self.assertEqual(self.machine.state, CodeImportMachineState.OFFLINE) |
230 | 50 | 49 | ||
231 | 51 | def test_machineIsQuiescingWithJobsRunning(self): | 50 | def test_machineIsQuiescingWithJobsRunning(self): |
232 | @@ -53,20 +52,19 @@ | |||
233 | 53 | # machine, we shouldn't look for any more jobs. | 52 | # machine, we shouldn't look for any more jobs. |
234 | 54 | self.createJobRunningOnMachine(self.machine) | 53 | self.createJobRunningOnMachine(self.machine) |
235 | 55 | self.machine.setQuiescing(self.factory.makePerson()) | 54 | self.machine.setQuiescing(self.factory.makePerson()) |
237 | 56 | self.assertFalse(self.machine.shouldLookForJob()) | 55 | self.assertFalse(self.machine.shouldLookForJob(10)) |
238 | 57 | self.assertEqual(self.machine.state, CodeImportMachineState.QUIESCING) | 56 | self.assertEqual(self.machine.state, CodeImportMachineState.QUIESCING) |
239 | 58 | 57 | ||
240 | 59 | def test_enoughJobsRunningOnMachine(self): | 58 | def test_enoughJobsRunningOnMachine(self): |
241 | 60 | # When there are already enough jobs running on this machine, we | 59 | # When there are already enough jobs running on this machine, we |
242 | 61 | # shouldn't look for any more jobs. | 60 | # shouldn't look for any more jobs. |
246 | 62 | for i in range(config.codeimportdispatcher.max_jobs_per_machine): | 61 | self.createJobRunningOnMachine(self.machine) |
247 | 63 | self.createJobRunningOnMachine(self.machine) | 62 | self.assertFalse(self.machine.shouldLookForJob(worker_limit=1)) |
245 | 64 | self.assertFalse(self.machine.shouldLookForJob()) | ||
248 | 65 | 63 | ||
249 | 66 | def test_shouldLook(self): | 64 | def test_shouldLook(self): |
250 | 67 | # If the machine is online and there are not already | 65 | # If the machine is online and there are not already |
251 | 68 | # max_jobs_per_machine jobs running, then we should look for a job. | 66 | # max_jobs_per_machine jobs running, then we should look for a job. |
253 | 69 | self.assertTrue(self.machine.shouldLookForJob()) | 67 | self.assertTrue(self.machine.shouldLookForJob(worker_limit=1)) |
254 | 70 | 68 | ||
255 | 71 | def test_noHeartbeatWhenCreated(self): | 69 | def test_noHeartbeatWhenCreated(self): |
256 | 72 | # Machines are created with a NULL heartbeat. | 70 | # Machines are created with a NULL heartbeat. |
257 | @@ -75,18 +73,18 @@ | |||
258 | 75 | def test_noHeartbeatUpdateWhenOffline(self): | 73 | def test_noHeartbeatUpdateWhenOffline(self): |
259 | 76 | # When the machine is offline, the heartbeat is not updated. | 74 | # When the machine is offline, the heartbeat is not updated. |
260 | 77 | self.machine.setOffline(CodeImportMachineOfflineReason.STOPPED) | 75 | self.machine.setOffline(CodeImportMachineOfflineReason.STOPPED) |
262 | 78 | self.machine.shouldLookForJob() | 76 | self.machine.shouldLookForJob(10) |
263 | 79 | self.assertTrue(self.machine.heartbeat is None) | 77 | self.assertTrue(self.machine.heartbeat is None) |
264 | 80 | 78 | ||
265 | 81 | def test_heartbeatUpdateWhenQuiescing(self): | 79 | def test_heartbeatUpdateWhenQuiescing(self): |
266 | 82 | # When the machine is quiescing, the heartbeat is updated. | 80 | # When the machine is quiescing, the heartbeat is updated. |
267 | 83 | self.machine.setQuiescing(self.factory.makePerson()) | 81 | self.machine.setQuiescing(self.factory.makePerson()) |
269 | 84 | self.machine.shouldLookForJob() | 82 | self.machine.shouldLookForJob(10) |
270 | 85 | self.assertSqlAttributeEqualsDate(self.machine, 'heartbeat', UTC_NOW) | 83 | self.assertSqlAttributeEqualsDate(self.machine, 'heartbeat', UTC_NOW) |
271 | 86 | 84 | ||
272 | 87 | def test_heartbeatUpdateWhenOnline(self): | 85 | def test_heartbeatUpdateWhenOnline(self): |
273 | 88 | # When the machine is online, the heartbeat is updated. | 86 | # When the machine is online, the heartbeat is updated. |
275 | 89 | self.machine.shouldLookForJob() | 87 | self.machine.shouldLookForJob(10) |
276 | 90 | self.assertSqlAttributeEqualsDate(self.machine, 'heartbeat', UTC_NOW) | 88 | self.assertSqlAttributeEqualsDate(self.machine, 'heartbeat', UTC_NOW) |
277 | 91 | 89 | ||
278 | 92 | 90 | ||
279 | 93 | 91 | ||
280 | === modified file 'lib/lp/code/xmlrpc/codeimportscheduler.py' | |||
281 | --- lib/lp/code/xmlrpc/codeimportscheduler.py 2009-06-25 04:06:00 +0000 | |||
282 | +++ lib/lp/code/xmlrpc/codeimportscheduler.py 2010-02-22 19:42:20 +0000 | |||
283 | @@ -22,9 +22,10 @@ | |||
284 | 22 | 22 | ||
285 | 23 | implements(ICodeImportScheduler) | 23 | implements(ICodeImportScheduler) |
286 | 24 | 24 | ||
288 | 25 | def getJobForMachine(self, hostname): | 25 | def getJobForMachine(self, hostname, worker_limit): |
289 | 26 | """See `ICodeImportScheduler`.""" | 26 | """See `ICodeImportScheduler`.""" |
291 | 27 | job = getUtility(ICodeImportJobSet).getJobForMachine(hostname) | 27 | job = getUtility(ICodeImportJobSet).getJobForMachine( |
292 | 28 | hostname, worker_limit) | ||
293 | 28 | if job is not None: | 29 | if job is not None: |
294 | 29 | return job.id | 30 | return job.id |
295 | 30 | else: | 31 | else: |
296 | 31 | 32 | ||
297 | === modified file 'lib/lp/codehosting/codeimport/dispatcher.py' | |||
298 | --- lib/lp/codehosting/codeimport/dispatcher.py 2009-06-25 04:06:00 +0000 | |||
299 | +++ lib/lp/codehosting/codeimport/dispatcher.py 2010-02-22 19:42:20 +0000 | |||
300 | @@ -32,12 +32,13 @@ | |||
301 | 32 | worker_script = os.path.join( | 32 | worker_script = os.path.join( |
302 | 33 | config.root, 'scripts', 'code-import-worker-db.py') | 33 | config.root, 'scripts', 'code-import-worker-db.py') |
303 | 34 | 34 | ||
305 | 35 | def __init__(self, logger): | 35 | def __init__(self, logger, worker_limit): |
306 | 36 | """Initialize an instance. | 36 | """Initialize an instance. |
307 | 37 | 37 | ||
308 | 38 | :param logger: A `Logger` object. | 38 | :param logger: A `Logger` object. |
309 | 39 | """ | 39 | """ |
310 | 40 | self.logger = logger | 40 | self.logger = logger |
311 | 41 | self.worker_limit = worker_limit | ||
312 | 41 | 42 | ||
313 | 42 | def getHostname(self): | 43 | def getHostname(self): |
314 | 43 | """Return the hostname of this machine. | 44 | """Return the hostname of this machine. |
315 | @@ -66,7 +67,8 @@ | |||
316 | 66 | def findAndDispatchJob(self, scheduler_client): | 67 | def findAndDispatchJob(self, scheduler_client): |
317 | 67 | """Check for and dispatch a job if necessary.""" | 68 | """Check for and dispatch a job if necessary.""" |
318 | 68 | 69 | ||
320 | 69 | job_id = scheduler_client.getJobForMachine(self.getHostname()) | 70 | job_id = scheduler_client.getJobForMachine( |
321 | 71 | self.getHostname(), self.worker_limit) | ||
322 | 70 | 72 | ||
323 | 71 | if job_id == 0: | 73 | if job_id == 0: |
324 | 72 | self.logger.info("No jobs pending.") | 74 | self.logger.info("No jobs pending.") |
325 | 73 | 75 | ||
326 | === modified file 'lib/lp/codehosting/codeimport/tests/test_dispatcher.py' | |||
327 | --- lib/lp/codehosting/codeimport/tests/test_dispatcher.py 2009-12-14 18:11:07 +0000 | |||
328 | +++ lib/lp/codehosting/codeimport/tests/test_dispatcher.py 2010-02-22 19:42:20 +0000 | |||
329 | @@ -11,64 +11,60 @@ | |||
330 | 11 | import shutil | 11 | import shutil |
331 | 12 | import socket | 12 | import socket |
332 | 13 | import tempfile | 13 | import tempfile |
333 | 14 | from textwrap import dedent | ||
334 | 15 | from unittest import TestLoader | 14 | from unittest import TestLoader |
335 | 16 | 15 | ||
336 | 17 | from twisted.trial.unittest import TestCase | ||
337 | 18 | |||
338 | 19 | from canonical.config import config | ||
339 | 20 | from lp.codehosting.codeimport.dispatcher import CodeImportDispatcher | ||
340 | 21 | from canonical.launchpad import scripts | 16 | from canonical.launchpad import scripts |
341 | 22 | from canonical.launchpad.scripts.logger import QuietFakeLogger | 17 | from canonical.launchpad.scripts.logger import QuietFakeLogger |
343 | 23 | from canonical.testing.layers import TwistedLaunchpadZopelessLayer | 18 | from canonical.testing.layers import BaseLayer |
344 | 19 | |||
345 | 20 | from lp.codehosting.codeimport.dispatcher import CodeImportDispatcher | ||
346 | 21 | from lp.testing import TestCase | ||
347 | 24 | 22 | ||
348 | 25 | 23 | ||
349 | 26 | class StubSchedulerClient: | 24 | class StubSchedulerClient: |
351 | 27 | """A stub scheduler client that returns a pre-arranged answer.""" | 25 | """A scheduler client that returns a pre-arranged answer.""" |
352 | 28 | 26 | ||
353 | 29 | def __init__(self, id_to_return): | 27 | def __init__(self, id_to_return): |
354 | 30 | self.id_to_return = id_to_return | 28 | self.id_to_return = id_to_return |
355 | 31 | 29 | ||
357 | 32 | def getJobForMachine(self, machine): | 30 | def getJobForMachine(self, machine, limit): |
358 | 33 | return self.id_to_return | 31 | return self.id_to_return |
359 | 34 | 32 | ||
360 | 35 | 33 | ||
361 | 34 | class MockSchedulerClient: | ||
362 | 35 | """A scheduler client that records calls to `getJobForMachine`.""" | ||
363 | 36 | |||
364 | 37 | def __init__(self): | ||
365 | 38 | self.calls = [] | ||
366 | 39 | |||
367 | 40 | def getJobForMachine(self, machine, limit): | ||
368 | 41 | self.calls.append((machine, limit)) | ||
369 | 42 | return 0 | ||
370 | 43 | |||
371 | 44 | |||
372 | 36 | class TestCodeImportDispatcherUnit(TestCase): | 45 | class TestCodeImportDispatcherUnit(TestCase): |
373 | 37 | """Unit tests for `CodeImportDispatcher`.""" | 46 | """Unit tests for `CodeImportDispatcher`.""" |
374 | 38 | 47 | ||
376 | 39 | layer = TwistedLaunchpadZopelessLayer | 48 | layer = BaseLayer |
377 | 40 | 49 | ||
378 | 41 | def setUp(self): | 50 | def setUp(self): |
396 | 42 | self.config_count = 0 | 51 | TestCase.setUp(self) |
397 | 43 | self.pushConfig(forced_hostname='none') | 52 | self.pushConfig('codeimportdispatcher', forced_hostname='none') |
398 | 44 | self.dispatcher = CodeImportDispatcher(QuietFakeLogger()) | 53 | |
399 | 45 | 54 | def makeDispatcher(self, worker_limit=10): | |
400 | 46 | def pushConfig(self, **args): | 55 | """Make a `CodeImportDispatcher`.""" |
401 | 47 | """Push some key-value pairs into the codeimportdispatcher config. | 56 | return CodeImportDispatcher(QuietFakeLogger(), worker_limit) |
385 | 48 | |||
386 | 49 | The config values will be restored during test tearDown. | ||
387 | 50 | """ | ||
388 | 51 | self.config_count += 1 | ||
389 | 52 | name = 'test%d' % self.config_count | ||
390 | 53 | body = '\n'.join(["%s: %s"%(k, v) for k, v in args.iteritems()]) | ||
391 | 54 | config.push(name, dedent(""" | ||
392 | 55 | [codeimportdispatcher] | ||
393 | 56 | %s | ||
394 | 57 | """ % body)) | ||
395 | 58 | self.addCleanup(config.pop, name) | ||
402 | 59 | 57 | ||
403 | 60 | def test_getHostname(self): | 58 | def test_getHostname(self): |
404 | 61 | # By default, getHostname return the same as socket.gethostname() | 59 | # By default, getHostname return the same as socket.gethostname() |
408 | 62 | self.assertEqual( | 60 | dispatcher = self.makeDispatcher() |
409 | 63 | self.dispatcher.getHostname(), | 61 | self.assertEqual(socket.gethostname(), dispatcher.getHostname()) |
407 | 64 | socket.gethostname()) | ||
410 | 65 | 62 | ||
411 | 66 | def test_getHostnameOverride(self): | 63 | def test_getHostnameOverride(self): |
412 | 67 | # getHostname can be overriden by the config for testing, however. | 64 | # getHostname can be overriden by the config for testing, however. |
417 | 68 | self.pushConfig(forced_hostname='test-value') | 65 | dispatcher = self.makeDispatcher() |
418 | 69 | self.assertEqual( | 66 | self.pushConfig('codeimportdispatcher', forced_hostname='test-value') |
419 | 70 | self.dispatcher.getHostname(), | 67 | self.assertEqual('test-value', dispatcher.getHostname()) |
416 | 71 | 'test-value') | ||
420 | 72 | 68 | ||
421 | 73 | def writePythonScript(self, script_path, script_body): | 69 | def writePythonScript(self, script_path, script_body): |
422 | 74 | """Write out an executable Python script. | 70 | """Write out an executable Python script. |
423 | @@ -94,6 +90,7 @@ | |||
424 | 94 | 90 | ||
425 | 95 | # We create a script that writes its command line arguments to | 91 | # We create a script that writes its command line arguments to |
426 | 96 | # some a temporary file and examine that. | 92 | # some a temporary file and examine that. |
427 | 93 | dispatcher = self.makeDispatcher() | ||
428 | 97 | tmpdir = tempfile.mkdtemp() | 94 | tmpdir = tempfile.mkdtemp() |
429 | 98 | self.addCleanup(shutil.rmtree, tmpdir) | 95 | self.addCleanup(shutil.rmtree, tmpdir) |
430 | 99 | script_path = os.path.join(tmpdir, 'script.py') | 96 | script_path = os.path.join(tmpdir, 'script.py') |
431 | @@ -102,26 +99,39 @@ | |||
432 | 102 | script_path, | 99 | script_path, |
433 | 103 | ['import sys', | 100 | ['import sys', |
434 | 104 | 'open(%r, "w").write(str(sys.argv[1:]))' % output_path]) | 101 | 'open(%r, "w").write(str(sys.argv[1:]))' % output_path]) |
437 | 105 | self.dispatcher.worker_script = script_path | 102 | dispatcher.worker_script = script_path |
438 | 106 | proc = self.dispatcher.dispatchJob(10) | 103 | proc = dispatcher.dispatchJob(10) |
439 | 107 | proc.wait() | 104 | proc.wait() |
440 | 108 | arglist = self.filterOutLoggingOptions(eval(open(output_path).read())) | 105 | arglist = self.filterOutLoggingOptions(eval(open(output_path).read())) |
442 | 109 | self.assertEqual(arglist, ['10']) | 106 | self.assertEqual(['10'], arglist) |
443 | 110 | 107 | ||
444 | 111 | def test_findAndDispatchJob_jobWaiting(self): | 108 | def test_findAndDispatchJob_jobWaiting(self): |
446 | 112 | # If there is a job to dispatch, then we call dispatchJob with its id. | 109 | # If there is a job to dispatch, then we call dispatchJob with its id |
447 | 110 | # and the worker_limit supplied to the dispatcher. | ||
448 | 113 | calls = [] | 111 | calls = [] |
452 | 114 | self.dispatcher.dispatchJob = lambda job_id: calls.append(job_id) | 112 | dispatcher = self.makeDispatcher() |
453 | 115 | self.dispatcher.findAndDispatchJob(StubSchedulerClient(10)) | 113 | dispatcher.dispatchJob = lambda job_id: calls.append(job_id) |
454 | 116 | self.assertEqual(calls, [10]) | 114 | dispatcher.findAndDispatchJob(StubSchedulerClient(10)) |
455 | 115 | self.assertEqual([10], calls) | ||
456 | 117 | 116 | ||
457 | 118 | def test_findAndDispatchJob_noJobWaiting(self): | 117 | def test_findAndDispatchJob_noJobWaiting(self): |
458 | 119 | # If there is no job to dispatch, then we just exit quietly. | 118 | # If there is no job to dispatch, then we just exit quietly. |
459 | 120 | calls = [] | 119 | calls = [] |
463 | 121 | self.dispatcher.dispatchJob = lambda job_id: calls.append(job_id) | 120 | dispatcher = self.makeDispatcher() |
464 | 122 | self.dispatcher.findAndDispatchJob(StubSchedulerClient(0)) | 121 | dispatcher.dispatchJob = lambda job_id: calls.append(job_id) |
465 | 123 | self.assertEqual(calls, []) | 122 | dispatcher.findAndDispatchJob(StubSchedulerClient(0)) |
466 | 123 | self.assertEqual([], calls) | ||
467 | 124 | 124 | ||
468 | 125 | def test_findAndDispatchJob_calls_getJobForMachine_with_limit(self): | ||
469 | 126 | # findAndDispatchJob calls getJobForMachine on the scheduler client | ||
470 | 127 | # with the hostname and supplied worker limit. | ||
471 | 128 | worker_limit = self.factory.getUniqueInteger() | ||
472 | 129 | dispatcher = self.makeDispatcher(worker_limit) | ||
473 | 130 | scheduler_client = MockSchedulerClient() | ||
474 | 131 | dispatcher.findAndDispatchJob(scheduler_client) | ||
475 | 132 | self.assertEqual( | ||
476 | 133 | [(dispatcher.getHostname(), worker_limit)], | ||
477 | 134 | scheduler_client.calls) | ||
478 | 125 | 135 | ||
479 | 126 | def test_suite(): | 136 | def test_suite(): |
480 | 127 | return TestLoader().loadTestsFromName(__name__) | 137 | return TestLoader().loadTestsFromName(__name__) |
481 | 128 | 138 | ||
482 | === modified file 'lib/lp/codehosting/codeimport/tests/test_workermonitor.py' | |||
483 | --- lib/lp/codehosting/codeimport/tests/test_workermonitor.py 2010-02-03 19:29:27 +0000 | |||
484 | +++ lib/lp/codehosting/codeimport/tests/test_workermonitor.py 2010-02-22 19:42:20 +0000 | |||
485 | @@ -566,7 +566,7 @@ | |||
486 | 566 | code_import.updateFromData( | 566 | code_import.updateFromData( |
487 | 567 | {'review_status': CodeImportReviewStatus.REVIEWED}, | 567 | {'review_status': CodeImportReviewStatus.REVIEWED}, |
488 | 568 | self.factory.makePerson()) | 568 | self.factory.makePerson()) |
490 | 569 | job = getUtility(ICodeImportJobSet).getJobForMachine('machine') | 569 | job = getUtility(ICodeImportJobSet).getJobForMachine('machine', 10) |
491 | 570 | self.assertEqual(code_import, job.code_import) | 570 | self.assertEqual(code_import, job.code_import) |
492 | 571 | return job | 571 | return job |
493 | 572 | 572 |
As discussed in the linked bug, we should probably do something more sophisticated here to give a machine a reasonable number of jobs to run, but this is much easier for sure.