Merge lp:~abentley/launchpad/ampoulejob into lp:launchpad
- ampoulejob
- Merge into devel
Proposed by
Aaron Bentley
Status: | Merged |
---|---|
Approved by: | Aaron Bentley |
Approved revision: | not available |
Merged at revision: | not available |
Proposed branch: | lp:~abentley/launchpad/ampoulejob |
Merge into: | lp:launchpad |
Prerequisite: | lp:~abentley/launchpad/twistedjob |
Diff against target: |
515 lines (+195/-48) 11 files modified
cronscripts/update_preview_diffs.py (+1/-7) cronscripts/upgrade_branches.py (+0/-6) lib/lp/code/configure.zcml (+1/-0) lib/lp/code/interfaces/branchjob.py (+3/-0) lib/lp/code/interfaces/branchmergeproposal.py (+3/-0) lib/lp/code/model/branchjob.py (+13/-2) lib/lp/code/model/branchmergeproposaljob.py (+22/-2) lib/lp/code/scripts/tests/test_update_preview_diffs.py (+1/-1) lib/lp/services/job/runner.py (+144/-30) setup.py (+1/-0) versions.cfg (+6/-0) |
To merge this branch: | bzr merge lp:~abentley/launchpad/ampoulejob |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Paul Hummer (community) | Approve | ||
Review via email: mp+15982@code.launchpad.net |
Commit message
Description of the change
To post a comment you must log in.
Revision history for this message
Aaron Bentley (abentley) wrote : | # |
Revision history for this message
Paul Hummer (rockstar) : | # |
review:
Approve
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'cronscripts/update_preview_diffs.py' | |||
2 | --- cronscripts/update_preview_diffs.py 2009-12-18 16:03:15 +0000 | |||
3 | +++ cronscripts/update_preview_diffs.py 2009-12-18 16:03:17 +0000 | |||
4 | @@ -11,7 +11,6 @@ | |||
5 | 11 | 11 | ||
6 | 12 | import _pythonpath | 12 | import _pythonpath |
7 | 13 | 13 | ||
8 | 14 | from lp.codehosting.vfs import get_scanner_server | ||
9 | 15 | from lp.services.job.runner import JobCronScript, JobRunner, TwistedJobRunner | 14 | from lp.services.job.runner import JobCronScript, JobRunner, TwistedJobRunner |
10 | 16 | from lp.code.interfaces.branchmergeproposal import ( | 15 | from lp.code.interfaces.branchmergeproposal import ( |
11 | 17 | IUpdatePreviewDiffJobSource,) | 16 | IUpdatePreviewDiffJobSource,) |
12 | @@ -24,7 +23,7 @@ | |||
13 | 24 | source_interface = IUpdatePreviewDiffJobSource | 23 | source_interface = IUpdatePreviewDiffJobSource |
14 | 25 | 24 | ||
15 | 26 | def __init__(self): | 25 | def __init__(self): |
17 | 27 | super(JobCronScript, self).__init__() | 26 | super(RunUpdatePreviewDiffJobs, self).__init__() |
18 | 28 | if self.options.twisted: | 27 | if self.options.twisted: |
19 | 29 | self.runner_class = TwistedJobRunner | 28 | self.runner_class = TwistedJobRunner |
20 | 30 | else: | 29 | else: |
21 | @@ -33,11 +32,6 @@ | |||
22 | 33 | def add_my_options(self): | 32 | def add_my_options(self): |
23 | 34 | self.parser.add_option('--twisted', action='store_true') | 33 | self.parser.add_option('--twisted', action='store_true') |
24 | 35 | 34 | ||
25 | 36 | def setUp(self): | ||
26 | 37 | server = get_scanner_server() | ||
27 | 38 | server.setUp() | ||
28 | 39 | return [server.tearDown] | ||
29 | 40 | |||
30 | 41 | 35 | ||
31 | 42 | if __name__ == '__main__': | 36 | if __name__ == '__main__': |
32 | 43 | script = RunUpdatePreviewDiffJobs() | 37 | script = RunUpdatePreviewDiffJobs() |
33 | 44 | 38 | ||
34 | === modified file 'cronscripts/upgrade_branches.py' | |||
35 | --- cronscripts/upgrade_branches.py 2009-10-31 12:07:16 +0000 | |||
36 | +++ cronscripts/upgrade_branches.py 2009-12-18 16:03:17 +0000 | |||
37 | @@ -11,7 +11,6 @@ | |||
38 | 11 | 11 | ||
39 | 12 | from lp.services.job.runner import JobCronScript | 12 | from lp.services.job.runner import JobCronScript |
40 | 13 | from lp.code.interfaces.branchjob import IBranchUpgradeJobSource | 13 | from lp.code.interfaces.branchjob import IBranchUpgradeJobSource |
41 | 14 | from lp.codehosting.vfs import get_multi_server | ||
42 | 15 | 14 | ||
43 | 16 | 15 | ||
44 | 17 | class RunUpgradeBranches(JobCronScript): | 16 | class RunUpgradeBranches(JobCronScript): |
45 | @@ -20,11 +19,6 @@ | |||
46 | 20 | config_name = 'upgrade_branches' | 19 | config_name = 'upgrade_branches' |
47 | 21 | source_interface = IBranchUpgradeJobSource | 20 | source_interface = IBranchUpgradeJobSource |
48 | 22 | 21 | ||
49 | 23 | def setUp(self): | ||
50 | 24 | server = get_multi_server(write_hosted=True) | ||
51 | 25 | server.setUp() | ||
52 | 26 | return [server.tearDown] | ||
53 | 27 | |||
54 | 28 | 22 | ||
55 | 29 | if __name__ == '__main__': | 23 | if __name__ == '__main__': |
56 | 30 | script = RunUpgradeBranches() | 24 | script = RunUpgradeBranches() |
57 | 31 | 25 | ||
58 | === modified file 'lib/lp/code/configure.zcml' | |||
59 | --- lib/lp/code/configure.zcml 2009-12-11 00:56:16 +0000 | |||
60 | +++ lib/lp/code/configure.zcml 2009-12-18 16:03:17 +0000 | |||
61 | @@ -889,6 +889,7 @@ | |||
62 | 889 | </securedutility> | 889 | </securedutility> |
63 | 890 | <class class="lp.code.model.branchmergeproposaljob.UpdatePreviewDiffJob"> | 890 | <class class="lp.code.model.branchmergeproposaljob.UpdatePreviewDiffJob"> |
64 | 891 | <allow interface="lp.services.job.interfaces.job.IRunnableJob" /> | 891 | <allow interface="lp.services.job.interfaces.job.IRunnableJob" /> |
65 | 892 | <allow interface="lp.code.interfaces.branchmergeproposal.IBranchMergeProposalJob" /> | ||
66 | 892 | </class> | 893 | </class> |
67 | 893 | 894 | ||
68 | 894 | <securedutility | 895 | <securedutility |
69 | 895 | 896 | ||
70 | === modified file 'lib/lp/code/interfaces/branchjob.py' | |||
71 | --- lib/lp/code/interfaces/branchjob.py 2009-10-29 05:50:08 +0000 | |||
72 | +++ lib/lp/code/interfaces/branchjob.py 2009-12-18 16:03:17 +0000 | |||
73 | @@ -90,6 +90,9 @@ | |||
74 | 90 | def iterReady(): | 90 | def iterReady(): |
75 | 91 | """Iterate through all IBranchUpgradeJobs.""" | 91 | """Iterate through all IBranchUpgradeJobs.""" |
76 | 92 | 92 | ||
77 | 93 | def contextManager(): | ||
78 | 94 | """Get a context for running this kind of job in.""" | ||
79 | 95 | |||
80 | 93 | 96 | ||
81 | 94 | class IRevisionMailJob(IRunnableJob): | 97 | class IRevisionMailJob(IRunnableJob): |
82 | 95 | """A Job to send email a revision change in a branch.""" | 98 | """A Job to send email a revision change in a branch.""" |
83 | 96 | 99 | ||
84 | === modified file 'lib/lp/code/interfaces/branchmergeproposal.py' | |||
85 | --- lib/lp/code/interfaces/branchmergeproposal.py 2009-12-18 16:03:15 +0000 | |||
86 | +++ lib/lp/code/interfaces/branchmergeproposal.py 2009-12-18 16:03:17 +0000 | |||
87 | @@ -597,6 +597,9 @@ | |||
88 | 597 | def iterReady(): | 597 | def iterReady(): |
89 | 598 | """Iterate through jobs ready to update preview diffs.""" | 598 | """Iterate through jobs ready to update preview diffs.""" |
90 | 599 | 599 | ||
91 | 600 | def contextManager(): | ||
92 | 601 | """Get a context for running this kind of job in.""" | ||
93 | 602 | |||
94 | 600 | 603 | ||
95 | 601 | def notify_modified(proposal, func, *args, **kwargs): | 604 | def notify_modified(proposal, func, *args, **kwargs): |
96 | 602 | """Call func, then notify about the changes it made. | 605 | """Call func, then notify about the changes it made. |
97 | 603 | 606 | ||
98 | === modified file 'lib/lp/code/model/branchjob.py' | |||
99 | --- lib/lp/code/model/branchjob.py 2009-10-29 17:07:18 +0000 | |||
100 | +++ lib/lp/code/model/branchjob.py 2009-12-18 16:03:17 +0000 | |||
101 | @@ -9,6 +9,7 @@ | |||
102 | 9 | 'RosettaUploadJob', | 9 | 'RosettaUploadJob', |
103 | 10 | ] | 10 | ] |
104 | 11 | 11 | ||
105 | 12 | import contextlib | ||
106 | 12 | import os | 13 | import os |
107 | 13 | import shutil | 14 | import shutil |
108 | 14 | from StringIO import StringIO | 15 | from StringIO import StringIO |
109 | @@ -40,14 +41,14 @@ | |||
110 | 40 | from canonical.config import config | 41 | from canonical.config import config |
111 | 41 | from canonical.database.enumcol import EnumCol | 42 | from canonical.database.enumcol import EnumCol |
112 | 42 | from canonical.database.sqlbase import SQLBase | 43 | from canonical.database.sqlbase import SQLBase |
114 | 43 | from canonical.launchpad.webapp import canonical_url | 44 | from canonical.launchpad.webapp import canonical_url, errorlog |
115 | 44 | from lp.code.bzr import ( | 45 | from lp.code.bzr import ( |
116 | 45 | BRANCH_FORMAT_UPGRADE_PATH, REPOSITORY_FORMAT_UPGRADE_PATH) | 46 | BRANCH_FORMAT_UPGRADE_PATH, REPOSITORY_FORMAT_UPGRADE_PATH) |
117 | 46 | from lp.code.model.branch import Branch | 47 | from lp.code.model.branch import Branch |
118 | 47 | from lp.code.model.branchmergeproposal import BranchMergeProposal | 48 | from lp.code.model.branchmergeproposal import BranchMergeProposal |
119 | 48 | from lp.code.model.diff import StaticDiff | 49 | from lp.code.model.diff import StaticDiff |
120 | 49 | from lp.code.model.revision import RevisionSet | 50 | from lp.code.model.revision import RevisionSet |
122 | 50 | from lp.codehosting.vfs import branch_id_to_path | 51 | from lp.codehosting.vfs import branch_id_to_path, get_multi_server |
123 | 51 | from lp.services.job.model.job import Job | 52 | from lp.services.job.model.job import Job |
124 | 52 | from lp.services.job.interfaces.job import JobStatus | 53 | from lp.services.job.interfaces.job import JobStatus |
125 | 53 | from lp.services.job.runner import BaseRunnableJob | 54 | from lp.services.job.runner import BaseRunnableJob |
126 | @@ -258,6 +259,16 @@ | |||
127 | 258 | branch_job = BranchJob(branch, BranchJobType.UPGRADE_BRANCH, {}) | 259 | branch_job = BranchJob(branch, BranchJobType.UPGRADE_BRANCH, {}) |
128 | 259 | return cls(branch_job) | 260 | return cls(branch_job) |
129 | 260 | 261 | ||
130 | 262 | @staticmethod | ||
131 | 263 | @contextlib.contextmanager | ||
132 | 264 | def contextManager(): | ||
133 | 265 | """See `IBranchUpgradeJobSource`.""" | ||
134 | 266 | errorlog.globalErrorUtility.configure('upgrade_branches') | ||
135 | 267 | server = get_multi_server(write_hosted=True) | ||
136 | 268 | server.setUp() | ||
137 | 269 | yield | ||
138 | 270 | server.tearDown() | ||
139 | 271 | |||
140 | 261 | def run(self): | 272 | def run(self): |
141 | 262 | """See `IBranchUpgradeJob`.""" | 273 | """See `IBranchUpgradeJob`.""" |
142 | 263 | # Set up the new branch structure | 274 | # Set up the new branch structure |
143 | 264 | 275 | ||
144 | === modified file 'lib/lp/code/model/branchmergeproposaljob.py' | |||
145 | --- lib/lp/code/model/branchmergeproposaljob.py 2009-12-18 16:03:15 +0000 | |||
146 | +++ lib/lp/code/model/branchmergeproposaljob.py 2009-12-18 16:03:17 +0000 | |||
147 | @@ -14,6 +14,7 @@ | |||
148 | 14 | 'MergeProposalCreatedJob', | 14 | 'MergeProposalCreatedJob', |
149 | 15 | ] | 15 | ] |
150 | 16 | 16 | ||
151 | 17 | import contextlib | ||
152 | 17 | from email.Utils import parseaddr | 18 | from email.Utils import parseaddr |
153 | 18 | import transaction | 19 | import transaction |
154 | 19 | 20 | ||
155 | @@ -31,6 +32,7 @@ | |||
156 | 31 | from canonical.database.enumcol import EnumCol | 32 | from canonical.database.enumcol import EnumCol |
157 | 32 | from canonical.launchpad.database.message import MessageJob, MessageJobAction | 33 | from canonical.launchpad.database.message import MessageJob, MessageJobAction |
158 | 33 | from canonical.launchpad.interfaces.message import IMessageJob | 34 | from canonical.launchpad.interfaces.message import IMessageJob |
159 | 35 | from canonical.launchpad.webapp import errorlog | ||
160 | 34 | from canonical.launchpad.webapp.interaction import setupInteraction | 36 | from canonical.launchpad.webapp.interaction import setupInteraction |
161 | 35 | from canonical.launchpad.webapp.interfaces import ( | 37 | from canonical.launchpad.webapp.interfaces import ( |
162 | 36 | DEFAULT_FLAVOR, IPlacelessAuthUtility, IStoreSelector, MAIN_STORE, | 38 | DEFAULT_FLAVOR, IPlacelessAuthUtility, IStoreSelector, MAIN_STORE, |
163 | @@ -44,10 +46,10 @@ | |||
164 | 44 | from lp.code.mail.branchmergeproposal import BMPMailer | 46 | from lp.code.mail.branchmergeproposal import BMPMailer |
165 | 45 | from lp.code.model.branchmergeproposal import BranchMergeProposal | 47 | from lp.code.model.branchmergeproposal import BranchMergeProposal |
166 | 46 | from lp.code.model.diff import PreviewDiff, StaticDiff | 48 | from lp.code.model.diff import PreviewDiff, StaticDiff |
168 | 47 | from lp.codehosting.vfs import get_multi_server | 49 | from lp.codehosting.vfs import get_multi_server, get_scanner_server |
169 | 48 | from lp.services.job.model.job import Job | 50 | from lp.services.job.model.job import Job |
170 | 49 | from lp.services.job.interfaces.job import IRunnableJob | 51 | from lp.services.job.interfaces.job import IRunnableJob |
172 | 50 | from lp.services.job.runner import BaseRunnableJob | 52 | from lp.services.job.runner import BaseRunnableJob, JobRunnerProcess |
173 | 51 | 53 | ||
174 | 52 | 54 | ||
175 | 53 | class BranchMergeProposalJobType(DBEnumeratedType): | 55 | class BranchMergeProposalJobType(DBEnumeratedType): |
176 | @@ -287,6 +289,16 @@ | |||
177 | 287 | 289 | ||
178 | 288 | class_job_type = BranchMergeProposalJobType.UPDATE_PREVIEW_DIFF | 290 | class_job_type = BranchMergeProposalJobType.UPDATE_PREVIEW_DIFF |
179 | 289 | 291 | ||
180 | 292 | @staticmethod | ||
181 | 293 | @contextlib.contextmanager | ||
182 | 294 | def contextManager(): | ||
183 | 295 | """See `IUpdatePreviewDiffJobSource`.""" | ||
184 | 296 | errorlog.globalErrorUtility.configure('update_preview_diffs') | ||
185 | 297 | server = get_scanner_server() | ||
186 | 298 | server.setUp() | ||
187 | 299 | yield | ||
188 | 300 | server.tearDown() | ||
189 | 301 | |||
190 | 290 | def run(self): | 302 | def run(self): |
191 | 291 | """See `IRunnableJob`""" | 303 | """See `IRunnableJob`""" |
192 | 292 | preview = PreviewDiff.fromBranchMergeProposal( | 304 | preview = PreviewDiff.fromBranchMergeProposal( |
193 | @@ -294,6 +306,14 @@ | |||
194 | 294 | self.branch_merge_proposal.preview_diff = preview | 306 | self.branch_merge_proposal.preview_diff = preview |
195 | 295 | 307 | ||
196 | 296 | 308 | ||
197 | 309 | class UpdatePreviewDiffProcess(JobRunnerProcess): | ||
198 | 310 | """A process that runs UpdatePreviewDiffJobs""" | ||
199 | 311 | job_class = UpdatePreviewDiffJob | ||
200 | 312 | |||
201 | 313 | |||
202 | 314 | UpdatePreviewDiffJob.amp = UpdatePreviewDiffProcess | ||
203 | 315 | |||
204 | 316 | |||
205 | 297 | class CreateMergeProposalJob(BaseRunnableJob): | 317 | class CreateMergeProposalJob(BaseRunnableJob): |
206 | 298 | """See `ICreateMergeProposalJob` and `ICreateMergeProposalJobSource`.""" | 318 | """See `ICreateMergeProposalJob` and `ICreateMergeProposalJobSource`.""" |
207 | 299 | 319 | ||
208 | 300 | 320 | ||
209 | === modified file 'lib/lp/code/scripts/tests/test_update_preview_diffs.py' | |||
210 | --- lib/lp/code/scripts/tests/test_update_preview_diffs.py 2009-12-18 16:03:15 +0000 | |||
211 | +++ lib/lp/code/scripts/tests/test_update_preview_diffs.py 2009-12-18 16:03:17 +0000 | |||
212 | @@ -78,7 +78,7 @@ | |||
213 | 78 | error_utility.configure('update_preview_diffs') | 78 | error_utility.configure('update_preview_diffs') |
214 | 79 | old_id = error_utility.getLastOopsReport().id | 79 | old_id = error_utility.getLastOopsReport().id |
215 | 80 | retcode, stdout, stderr = run_script( | 80 | retcode, stdout, stderr = run_script( |
217 | 81 | 'cronscripts/update_preview_diffs.py', []) | 81 | 'cronscripts/update_preview_diffs.py', ['--twisted']) |
218 | 82 | self.assertEqual(0, retcode) | 82 | self.assertEqual(0, retcode) |
219 | 83 | self.assertIn( | 83 | self.assertIn( |
220 | 84 | 'INFO 1 IUpdatePreviewDiffJobSource jobs did not complete.\n', | 84 | 'INFO 1 IUpdatePreviewDiffJobSource jobs did not complete.\n', |
221 | 85 | 85 | ||
222 | === modified file 'lib/lp/services/job/runner.py' | |||
223 | --- lib/lp/services/job/runner.py 2009-12-18 16:03:15 +0000 | |||
224 | +++ lib/lp/services/job/runner.py 2009-12-18 16:03:17 +0000 | |||
225 | @@ -11,10 +11,16 @@ | |||
226 | 11 | __all__ = ['JobRunner'] | 11 | __all__ = ['JobRunner'] |
227 | 12 | 12 | ||
228 | 13 | 13 | ||
229 | 14 | import os | ||
230 | 15 | from signal import getsignal, SIGCHLD, signal | ||
231 | 14 | import sys | 16 | import sys |
232 | 15 | 17 | ||
233 | 18 | from ampoule import child, pool, main | ||
234 | 19 | |||
235 | 16 | from twisted.internet import reactor, defer | 20 | from twisted.internet import reactor, defer |
236 | 21 | from twisted.protocols import amp | ||
237 | 17 | from zope.component import getUtility | 22 | from zope.component import getUtility |
238 | 23 | from zope.security.proxy import removeSecurityProxy | ||
239 | 18 | 24 | ||
240 | 19 | from canonical.config import config | 25 | from canonical.config import config |
241 | 20 | from canonical.twistedsupport.task import ( | 26 | from canonical.twistedsupport.task import ( |
242 | @@ -27,6 +33,27 @@ | |||
243 | 27 | from lp.services.mail.sendmail import MailController | 33 | from lp.services.mail.sendmail import MailController |
244 | 28 | from canonical.launchpad.webapp import errorlog | 34 | from canonical.launchpad.webapp import errorlog |
245 | 29 | 35 | ||
246 | 36 | BOOTSTRAP = """\ | ||
247 | 37 | import sys | ||
248 | 38 | |||
249 | 39 | def main(reactor, ampChildPath): | ||
250 | 40 | from twisted.application import reactors | ||
251 | 41 | reactors.installReactor(reactor) | ||
252 | 42 | |||
253 | 43 | from twisted.python import log | ||
254 | 44 | log.startLogging(sys.stderr) | ||
255 | 45 | |||
256 | 46 | from twisted.internet import reactor, stdio | ||
257 | 47 | from twisted.python import reflect | ||
258 | 48 | |||
259 | 49 | ampChild = reflect.namedAny(ampChildPath) | ||
260 | 50 | stdio.StandardIO(ampChild(), 3, 4) | ||
261 | 51 | from canonical.launchpad import scripts | ||
262 | 52 | scripts.execute_zcml_for_scripts(use_web_security=False) | ||
263 | 53 | reactor.run() | ||
264 | 54 | main(sys.argv[-2], sys.argv[-1]) | ||
265 | 55 | """ | ||
266 | 56 | |||
267 | 30 | 57 | ||
268 | 31 | class BaseRunnableJob: | 58 | class BaseRunnableJob: |
269 | 32 | """Base class for jobs to be run via JobRunner. | 59 | """Base class for jobs to be run via JobRunner. |
270 | @@ -102,10 +129,13 @@ | |||
271 | 102 | class BaseJobRunner(object): | 129 | class BaseJobRunner(object): |
272 | 103 | """Runner of Jobs.""" | 130 | """Runner of Jobs.""" |
273 | 104 | 131 | ||
275 | 105 | def __init__(self, logger=None): | 132 | def __init__(self, logger=None, error_utility=None): |
276 | 106 | self.completed_jobs = [] | 133 | self.completed_jobs = [] |
277 | 107 | self.incomplete_jobs = [] | 134 | self.incomplete_jobs = [] |
278 | 108 | self.logger = logger | 135 | self.logger = logger |
279 | 136 | self.error_utility = error_utility | ||
280 | 137 | if self.error_utility is None: | ||
281 | 138 | self.error_utility = errorlog.globalErrorUtility | ||
282 | 109 | 139 | ||
283 | 110 | def runJob(self, job): | 140 | def runJob(self, job): |
284 | 111 | """Attempt to run a job, updating its status as appropriate.""" | 141 | """Attempt to run a job, updating its status as appropriate.""" |
285 | @@ -139,7 +169,7 @@ | |||
286 | 139 | The list of complete or incomplete jobs will be updated. | 169 | The list of complete or incomplete jobs will be updated. |
287 | 140 | """ | 170 | """ |
288 | 141 | job = IRunnableJob(job) | 171 | job = IRunnableJob(job) |
290 | 142 | with errorlog.globalErrorUtility.oopsMessage( | 172 | with self.error_utility.oopsMessage( |
291 | 143 | dict(job.getOopsVars())): | 173 | dict(job.getOopsVars())): |
292 | 144 | try: | 174 | try: |
293 | 145 | self.runJob(job) | 175 | self.runJob(job) |
294 | @@ -149,11 +179,24 @@ | |||
295 | 149 | job.notifyUserError(e) | 179 | job.notifyUserError(e) |
296 | 150 | except Exception: | 180 | except Exception: |
297 | 151 | info = sys.exc_info() | 181 | info = sys.exc_info() |
303 | 152 | errorlog.globalErrorUtility.raising(info) | 182 | return self._doOops(job, info) |
304 | 153 | oops = errorlog.globalErrorUtility.getLastOopsReport() | 183 | |
305 | 154 | job.notifyOops(oops) | 184 | def _doOops(self, job, info): |
306 | 155 | if self.logger is not None: | 185 | """Report an OOPS for the provided job and info. |
307 | 156 | self.logger.info('Job resulted in OOPS: %s' % oops.id) | 186 | |
308 | 187 | :param job: The IRunnableJob whose run failed. | ||
309 | 188 | :param info: The standard sys.exc_info() value. | ||
310 | 189 | :return: the Oops that was reported. | ||
311 | 190 | """ | ||
312 | 191 | self.error_utility.raising(info) | ||
313 | 192 | oops = self.error_utility.getLastOopsReport() | ||
314 | 193 | job.notifyOops(oops) | ||
315 | 194 | return oops | ||
316 | 195 | |||
317 | 196 | def _logOopsId(self, oops_id): | ||
318 | 197 | """Report oopses by id to the log.""" | ||
319 | 198 | if self.logger is not None: | ||
320 | 199 | self.logger.info('Job resulted in OOPS: %s' % oops_id) | ||
321 | 157 | 200 | ||
322 | 158 | 201 | ||
323 | 159 | class JobRunner(BaseJobRunner): | 202 | class JobRunner(BaseJobRunner): |
324 | @@ -170,30 +213,97 @@ | |||
325 | 170 | @classmethod | 213 | @classmethod |
326 | 171 | def runFromSource(cls, job_source, logger): | 214 | def runFromSource(cls, job_source, logger): |
327 | 172 | """Run all ready jobs provided by the specified source.""" | 215 | """Run all ready jobs provided by the specified source.""" |
331 | 173 | logger.info("Running synchronously.") | 216 | with removeSecurityProxy(job_source.contextManager()): |
332 | 174 | runner = cls.fromReady(job_source, logger) | 217 | logger.info("Running synchronously.") |
333 | 175 | runner.runAll() | 218 | runner = cls.fromReady(job_source, logger) |
334 | 219 | runner.runAll() | ||
335 | 176 | return runner | 220 | return runner |
336 | 177 | 221 | ||
337 | 178 | def runAll(self): | 222 | def runAll(self): |
338 | 179 | """Run all the Jobs for this JobRunner.""" | 223 | """Run all the Jobs for this JobRunner.""" |
339 | 180 | for job in self.jobs: | 224 | for job in self.jobs: |
341 | 181 | self.runJobHandleError(job) | 225 | oops = self.runJobHandleError(job) |
342 | 226 | if oops is not None: | ||
343 | 227 | self._logOopsId(oops.id) | ||
344 | 228 | |||
345 | 229 | |||
346 | 230 | class RunJobCommand(amp.Command): | ||
347 | 231 | |||
348 | 232 | arguments = [('job_id', amp.Integer())] | ||
349 | 233 | response = [('success', amp.Integer()), ('oops_id', amp.String())] | ||
350 | 234 | |||
351 | 235 | |||
352 | 236 | class JobRunnerProcess(child.AMPChild): | ||
353 | 237 | """Base class for processes that run jobs.""" | ||
354 | 238 | |||
355 | 239 | def __init__(self): | ||
356 | 240 | child.AMPChild.__init__(self) | ||
357 | 241 | self.context_manager = self.job_class.contextManager() | ||
358 | 242 | |||
359 | 243 | def makeConnection(self, transport): | ||
360 | 244 | """The Job context is entered on connect.""" | ||
361 | 245 | child.AMPChild.makeConnection(self, transport) | ||
362 | 246 | self.context_manager.__enter__() | ||
363 | 247 | |||
364 | 248 | def connectionLost(self, reason): | ||
365 | 249 | """The Job context is left on disconnect.""" | ||
366 | 250 | self.context_manager.__exit__(None, None, None) | ||
367 | 251 | child.AMPChild.connectionLost(self, reason) | ||
368 | 252 | |||
369 | 253 | @RunJobCommand.responder | ||
370 | 254 | def runJobCommand(self, job_id): | ||
371 | 255 | """Run a job of this job_class according to its job id.""" | ||
372 | 256 | runner = BaseJobRunner() | ||
373 | 257 | job = self.job_class.get(job_id) | ||
374 | 258 | oops = runner.runJobHandleError(job) | ||
375 | 259 | if oops is None: | ||
376 | 260 | oops_id = '' | ||
377 | 261 | else: | ||
378 | 262 | oops_id = oops.id | ||
379 | 263 | return {'success': len(runner.completed_jobs), 'oops_id': oops_id} | ||
380 | 182 | 264 | ||
381 | 183 | 265 | ||
382 | 184 | class TwistedJobRunner(BaseJobRunner): | 266 | class TwistedJobRunner(BaseJobRunner): |
383 | 185 | """Run Jobs via twisted.""" | 267 | """Run Jobs via twisted.""" |
384 | 186 | 268 | ||
387 | 187 | def __init__(self, job_source, logger=None): | 269 | def __init__(self, job_source, job_amp, logger=None, error_utility=None): |
388 | 188 | super(TwistedJobRunner, self).__init__(logger=logger) | 270 | starter = main.ProcessStarter( |
389 | 271 | bootstrap=BOOTSTRAP, packages=('twisted', 'ampoule'), | ||
390 | 272 | env={'PYTHONPATH': os.environ['PYTHONPATH'], | ||
391 | 273 | 'PATH': os.environ['PATH'], | ||
392 | 274 | 'LPCONFIG': os.environ['LPCONFIG']}) | ||
393 | 275 | super(TwistedJobRunner, self).__init__(logger, error_utility) | ||
394 | 189 | self.job_source = job_source | 276 | self.job_source = job_source |
395 | 277 | self.pool = pool.ProcessPool(job_amp, starter=starter, min=0) | ||
396 | 278 | |||
397 | 279 | def runJobInSubprocess(self, job): | ||
398 | 280 | """Run the job_class with the specified id in the process pool. | ||
399 | 281 | |||
400 | 282 | :return: a Deferred that fires when the job has completed. | ||
401 | 283 | """ | ||
402 | 284 | job_id = job.id | ||
403 | 285 | deferred = self.pool.doWork(RunJobCommand, job_id=job_id) | ||
404 | 286 | def update(response): | ||
405 | 287 | if response['success']: | ||
406 | 288 | self.completed_jobs.append(job) | ||
407 | 289 | else: | ||
408 | 290 | self.incomplete_jobs.append(job) | ||
409 | 291 | if response['oops_id'] != '': | ||
410 | 292 | self._logOopsId(response['oops_id']) | ||
411 | 293 | def job_raised(failure): | ||
412 | 294 | self.incomplete_jobs.append(job) | ||
413 | 295 | info = (failure.type, failure.value, failure.tb) | ||
414 | 296 | oops = self._doOops(job, info) | ||
415 | 297 | self._logOopsId(oops.id) | ||
416 | 298 | deferred.addCallbacks(update, job_raised) | ||
417 | 299 | return deferred | ||
418 | 190 | 300 | ||
419 | 191 | def getTaskSource(self): | 301 | def getTaskSource(self): |
420 | 192 | """Return a task source for all jobs in job_source.""" | 302 | """Return a task source for all jobs in job_source.""" |
421 | 193 | def producer(): | 303 | def producer(): |
422 | 194 | while True: | 304 | while True: |
423 | 195 | for job in self.job_source.iterReady(): | 305 | for job in self.job_source.iterReady(): |
425 | 196 | yield lambda: self.runJobHandleError(job) | 306 | yield lambda: self.runJobInSubprocess(job) |
426 | 197 | yield None | 307 | yield None |
427 | 198 | return PollingTaskSource(5, producer().next) | 308 | return PollingTaskSource(5, producer().next) |
428 | 199 | 309 | ||
429 | @@ -204,22 +314,32 @@ | |||
430 | 204 | 314 | ||
431 | 205 | def runAll(self): | 315 | def runAll(self): |
432 | 206 | """Run all ready jobs, and any that become ready while running.""" | 316 | """Run all ready jobs, and any that become ready while running.""" |
433 | 317 | self.pool.start() | ||
434 | 207 | d = defer.maybeDeferred(self.doConsumer) | 318 | d = defer.maybeDeferred(self.doConsumer) |
439 | 208 | d.addCallbacks(lambda ignored: reactor.stop(), self.failed) | 319 | d.addCallbacks(self.terminated, self.failed) |
440 | 209 | 320 | ||
441 | 210 | @staticmethod | 321 | def terminated(self, ignored=None): |
442 | 211 | def failed(failure): | 322 | """Callback to stop the processpool and reactor.""" |
443 | 323 | deferred = self.pool.stop() | ||
444 | 324 | deferred.addBoth(lambda ignored: reactor.stop()) | ||
445 | 325 | |||
446 | 326 | def failed(self, failure): | ||
447 | 212 | """Callback for when the job fails.""" | 327 | """Callback for when the job fails.""" |
448 | 213 | failure.printTraceback() | 328 | failure.printTraceback() |
450 | 214 | reactor.stop() | 329 | self.terminated() |
451 | 215 | 330 | ||
452 | 216 | @classmethod | 331 | @classmethod |
454 | 217 | def runFromSource(cls, job_source, logger): | 332 | def runFromSource(cls, job_source, logger, error_utility=None): |
455 | 218 | """Run all ready jobs provided by the specified source.""" | 333 | """Run all ready jobs provided by the specified source.""" |
456 | 219 | logger.info("Running through Twisted.") | 334 | logger.info("Running through Twisted.") |
458 | 220 | runner = cls(job_source, logger) | 335 | runner = cls(job_source, removeSecurityProxy(job_source).amp, logger, |
459 | 336 | error_utility) | ||
460 | 221 | reactor.callWhenRunning(runner.runAll) | 337 | reactor.callWhenRunning(runner.runAll) |
462 | 222 | reactor.run() | 338 | handler = getsignal(SIGCHLD) |
463 | 339 | try: | ||
464 | 340 | reactor.run() | ||
465 | 341 | finally: | ||
466 | 342 | signal(SIGCHLD, handler) | ||
467 | 223 | return runner | 343 | return runner |
468 | 224 | 344 | ||
469 | 225 | 345 | ||
470 | @@ -232,14 +352,8 @@ | |||
471 | 232 | self.runner_class = runner_class | 352 | self.runner_class = runner_class |
472 | 233 | 353 | ||
473 | 234 | def main(self): | 354 | def main(self): |
482 | 235 | errorlog.globalErrorUtility.configure(self.config_name) | 355 | job_source = getUtility(self.source_interface) |
483 | 236 | cleanups = self.setUp() | 356 | runner = self.runner_class.runFromSource(job_source, self.logger) |
476 | 237 | try: | ||
477 | 238 | job_source = getUtility(self.source_interface) | ||
478 | 239 | runner = self.runner_class.runFromSource(job_source, self.logger) | ||
479 | 240 | finally: | ||
480 | 241 | for cleanup in reversed(cleanups): | ||
481 | 242 | cleanup() | ||
484 | 243 | self.logger.info( | 357 | self.logger.info( |
485 | 244 | 'Ran %d %s jobs.', | 358 | 'Ran %d %s jobs.', |
486 | 245 | len(runner.completed_jobs), self.source_interface.__name__) | 359 | len(runner.completed_jobs), self.source_interface.__name__) |
487 | 246 | 360 | ||
488 | === modified file 'setup.py' | |||
489 | --- setup.py 2009-11-25 10:59:02 +0000 | |||
490 | +++ setup.py 2009-12-18 16:03:17 +0000 | |||
491 | @@ -24,6 +24,7 @@ | |||
492 | 24 | # this list should only contain direct dependencies--things imported or | 24 | # this list should only contain direct dependencies--things imported or |
493 | 25 | # used in zcml. | 25 | # used in zcml. |
494 | 26 | install_requires=[ | 26 | install_requires=[ |
495 | 27 | 'ampoule', | ||
496 | 27 | 'bzr', | 28 | 'bzr', |
497 | 28 | 'chameleon.core', | 29 | 'chameleon.core', |
498 | 29 | 'chameleon.zpt', | 30 | 'chameleon.zpt', |
499 | 30 | 31 | ||
500 | === modified file 'versions.cfg' | |||
501 | --- versions.cfg 2009-12-16 14:14:41 +0000 | |||
502 | +++ versions.cfg 2009-12-18 16:03:17 +0000 | |||
503 | @@ -3,6 +3,12 @@ | |||
504 | 3 | 3 | ||
505 | 4 | [versions] | 4 | [versions] |
506 | 5 | # Alphabetical, case-insensitive, please! :-) | 5 | # Alphabetical, case-insensitive, please! :-) |
507 | 6 | |||
508 | 7 | # from -r 3:lp:~launchpad/ampoule/launchpad-tweaked | ||
509 | 8 | # To reproduce: | ||
510 | 9 | # bzr export ampoule-0.1.0-lp-1.tar.gz lp:~launchpad/ampoule/launchpad-tweaked\ | ||
511 | 10 | # -r 3 | ||
512 | 11 | ampoule = 0.1.0-lp-1 | ||
513 | 6 | bzr = 2.1b3 | 12 | bzr = 2.1b3 |
514 | 7 | chameleon.core = 1.0b35 | 13 | chameleon.core = 1.0b35 |
515 | 8 | chameleon.zpt = 1.0b17 | 14 | chameleon.zpt = 1.0b17 |
= Summary =
Modify the twisted job runner to use a subprocess.
== Proposed fix == TaskConsumer is paramaterized to run
Use the Ampoule process pool. Note that this implementation does not provide
parallelism, because the ParallelLimited
1 parallel process.
== Pre-implementation notes ==
Preimplementation discussion with thumper, mwhudson.
== Implementation details ==
Make ampoule a dependency.
Because process set-up should happen in a subprocess, with Ampoule, remove the
setUp concept from JobCronScript, and instead provide a context manager.
Configure the globalErrorUtility there. Use the context manager in JobRunner.
Add an Amp command called RunJobCommand. Add a JobRunnerProcess class that can
provide RunJobCommand. Have it enter the context when a connection is made,
and exit it when the connection is lost. (These should happen only once in the
JobRunnerProcess's lifecycle.)
Subclass RunJobCommand as UpdatePreviewDi ffProcess. Provide ffJob.amp so that TwistedJobRunne r.runFromSource can find the
UpdatePreviewDi
right process class to use.
Update runAll to start the process pool, using the specified job_amp kind. execute_ zcml_for_ scripts.
Provide a customized BOOTSTRAP to invoke scripts.
Change getTaskSource to return runJobInSubprocess, which uses the process pool
to run the job.
Update TwistedJobRunne r.runFromSource to clean up SIGCHLD signal handlers after
reactor.run.
== Tests == preview_ diffs
bin/test test_update_
== Demo and Q/A ==
Have a LOSA update the staging config to use --twisted, and ensure the log
says it ran under Twisted.