Merge lp:~mwhudson/launchpad/smarter-code-import-scheduling-bug-236973 into lp:launchpad

Proposed by Michael Hudson-Doyle
Status: Merged
Approved by: Tim Penhey
Approved revision: not available
Merged at revision: not available
Proposed branch: lp:~mwhudson/launchpad/smarter-code-import-scheduling-bug-236973
Merge into: lp:launchpad
Prerequisite: lp:~mwhudson/launchpad/reduce-concurrent-job-count
Diff against target: 164 lines (+63/-13)
3 files modified
cronscripts/code-import-dispatcher.py (+1/-1)
lib/lp/codehosting/codeimport/dispatcher.py (+28/-3)
lib/lp/codehosting/codeimport/tests/test_dispatcher.py (+34/-9)
To merge this branch: bzr merge lp:~mwhudson/launchpad/smarter-code-import-scheduling-bug-236973
Reviewer Review Type Date Requested Status
Tim Penhey (community) Approve
Review via email: mp+19841@code.launchpad.net

Commit message

Ask for work until none is provided in the code import dispatcher.

To post a comment you must log in.
Revision history for this message
Michael Hudson-Doyle (mwhudson) wrote :

Hi Tim,

We've talked about this I think -- I modify the code import dispatcher to look for work until it finds none and sleeps for a period dependent on the load of the machine between the asks.

Revision history for this message
Tim Penhey (thumper) wrote :

Looks good. I'm eager to see how this works on staging :)

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'cronscripts/code-import-dispatcher.py'
2--- cronscripts/code-import-dispatcher.py 2010-02-22 01:36:30 +0000
3+++ cronscripts/code-import-dispatcher.py 2010-02-23 00:34:17 +0000
4@@ -37,7 +37,7 @@
5 globalErrorUtility.configure('codeimportdispatcher')
6
7 dispatcher = CodeImportDispatcher(self.logger, self.options.max_jobs)
8- dispatcher.findAndDispatchJob(
9+ dispatcher.findAndDispatchJobs(
10 ServerProxy(config.codeimportdispatcher.codeimportscheduler_url))
11
12
13
14=== modified file 'lib/lp/codehosting/codeimport/dispatcher.py'
15--- lib/lp/codehosting/codeimport/dispatcher.py 2010-02-22 01:36:30 +0000
16+++ lib/lp/codehosting/codeimport/dispatcher.py 2010-02-23 00:34:17 +0000
17@@ -16,6 +16,7 @@
18 import os
19 import socket
20 import subprocess
21+import time
22
23 from canonical.config import config
24
25@@ -32,13 +33,14 @@
26 worker_script = os.path.join(
27 config.root, 'scripts', 'code-import-worker-db.py')
28
29- def __init__(self, logger, worker_limit):
30+ def __init__(self, logger, worker_limit, _sleep=time.sleep):
31 """Initialize an instance.
32
33 :param logger: A `Logger` object.
34 """
35 self.logger = logger
36 self.worker_limit = worker_limit
37+ self._sleep = _sleep
38
39 def getHostname(self):
40 """Return the hostname of this machine.
41@@ -65,15 +67,38 @@
42
43
44 def findAndDispatchJob(self, scheduler_client):
45- """Check for and dispatch a job if necessary."""
46+ """Check for and dispatch a job if necessary.
47+
48+ :return: A boolean, true if a job was found and dispatched.
49+ """
50
51 job_id = scheduler_client.getJobForMachine(
52 self.getHostname(), self.worker_limit)
53
54 if job_id == 0:
55 self.logger.info("No jobs pending.")
56- return
57+ return False
58
59 self.logger.info("Dispatching job %d." % job_id)
60
61 self.dispatchJob(job_id)
62+ return True
63+
64+ def _getSleepInterval(self):
65+ """How long to sleep for until asking for a new job.
66+
67+ The basic idea is to wait longer if the machine is more heavily
68+ loaded, so that less loaded slaves get a chance to grab some jobs.
69+
70+ We assume worker_limit will be roughly the number of CPUs in the
71+ machine, so load/worker_limit is roughly how loaded the machine is.
72+ """
73+ return 5*os.getloadavg()[0]/self.worker_limit
74+
75+ def findAndDispatchJobs(self, scheduler_client):
76+ """Call findAndDispatchJob until no job is found."""
77+ while True:
78+ found = self.findAndDispatchJob(scheduler_client)
79+ if not found:
80+ break
81+ self._sleep(self._getSleepInterval())
82
83=== modified file 'lib/lp/codehosting/codeimport/tests/test_dispatcher.py'
84--- lib/lp/codehosting/codeimport/tests/test_dispatcher.py 2010-02-22 02:06:57 +0000
85+++ lib/lp/codehosting/codeimport/tests/test_dispatcher.py 2010-02-23 00:34:17 +0000
86@@ -24,11 +24,11 @@
87 class StubSchedulerClient:
88 """A scheduler client that returns a pre-arranged answer."""
89
90- def __init__(self, id_to_return):
91- self.id_to_return = id_to_return
92+ def __init__(self, ids_to_return):
93+ self.ids_to_return = ids_to_return
94
95 def getJobForMachine(self, machine, limit):
96- return self.id_to_return
97+ return self.ids_to_return.pop(0)
98
99
100 class MockSchedulerClient:
101@@ -51,9 +51,10 @@
102 TestCase.setUp(self)
103 self.pushConfig('codeimportdispatcher', forced_hostname='none')
104
105- def makeDispatcher(self, worker_limit=10):
106+ def makeDispatcher(self, worker_limit=10, _sleep=lambda delay: None):
107 """Make a `CodeImportDispatcher`."""
108- return CodeImportDispatcher(QuietFakeLogger(), worker_limit)
109+ return CodeImportDispatcher(
110+ QuietFakeLogger(), worker_limit, _sleep=_sleep)
111
112 def test_getHostname(self):
113 # By default, getHostname return the same as socket.gethostname()
114@@ -111,16 +112,16 @@
115 calls = []
116 dispatcher = self.makeDispatcher()
117 dispatcher.dispatchJob = lambda job_id: calls.append(job_id)
118- dispatcher.findAndDispatchJob(StubSchedulerClient(10))
119- self.assertEqual([10], calls)
120+ found = dispatcher.findAndDispatchJob(StubSchedulerClient([10]))
121+ self.assertEqual(([10], True), (calls, found))
122
123 def test_findAndDispatchJob_noJobWaiting(self):
124 # If there is no job to dispatch, then we just exit quietly.
125 calls = []
126 dispatcher = self.makeDispatcher()
127 dispatcher.dispatchJob = lambda job_id: calls.append(job_id)
128- dispatcher.findAndDispatchJob(StubSchedulerClient(0))
129- self.assertEqual([], calls)
130+ found = dispatcher.findAndDispatchJob(StubSchedulerClient([0]))
131+ self.assertEqual(([], False), (calls, found))
132
133 def test_findAndDispatchJob_calls_getJobForMachine_with_limit(self):
134 # findAndDispatchJob calls getJobForMachine on the scheduler client
135@@ -133,5 +134,29 @@
136 [(dispatcher.getHostname(), worker_limit)],
137 scheduler_client.calls)
138
139+ def test_findAndDispatchJobs(self):
140+ # findAndDispatchJobs calls getJobForMachine on the scheduler_client,
141+ # dispatching jobs, until it indicates that there are no more jobs to
142+ # dispatch.
143+ calls = []
144+ dispatcher = self.makeDispatcher()
145+ dispatcher.dispatchJob = lambda job_id: calls.append(job_id)
146+ dispatcher.findAndDispatchJobs(StubSchedulerClient([10, 9, 0]))
147+ self.assertEqual([10, 9], calls)
148+
149+ def test_findAndDispatchJobs_sleeps(self):
150+ # After finding a job, findAndDispatchJobs sleeps for an interval as
151+ # returned by _getSleepInterval.
152+ sleep_calls = []
153+ interval = self.factory.getUniqueInteger()
154+ def _sleep(delay):
155+ sleep_calls.append(delay)
156+ dispatcher = self.makeDispatcher(_sleep=_sleep)
157+ dispatcher.dispatchJob = lambda job_id: None
158+ dispatcher._getSleepInterval = lambda : interval
159+ dispatcher.findAndDispatchJobs(StubSchedulerClient([10, 0]))
160+ self.assertEqual([interval], sleep_calls)
161+
162+
163 def test_suite():
164 return TestLoader().loadTestsFromName(__name__)