Merge lp:~jameinel/launchpad/lp-service into lp:launchpad/db-devel
- lp-service
- Merge into db-devel
Status: | Merged |
---|---|
Approved by: | Michael Hudson-Doyle |
Approved revision: | no longer in the source branch. |
Merged at revision: | 9914 |
Proposed branch: | lp:~jameinel/launchpad/lp-service |
Merge into: | lp:launchpad/db-devel |
Diff against target: |
2134 lines (+1776/-86) 10 files modified
Makefile (+2/-2) bzrplugins/lpserve/__init__.py (+737/-4) bzrplugins/lpserve/test_lpserve.py (+534/-0) configs/development/launchpad-lazr.conf (+1/-0) lib/canonical/config/schema-lazr.conf (+12/-0) lib/canonical/launchpad/scripts/runlaunchpad.py (+47/-0) lib/lp/codehosting/sshserver/session.py (+299/-12) lib/lp/codehosting/sshserver/tests/test_session.py (+73/-0) lib/lp/codehosting/tests/test_acceptance.py (+61/-0) lib/lp/codehosting/tests/test_lpserve.py (+10/-68) |
To merge this branch: | bzr merge lp:~jameinel/launchpad/lp-service |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Michael Hudson-Doyle | Approve | ||
Gavin Panella (community) | Abstain | ||
Jonathan Lange | Pending | ||
Review via email: mp+37531@code.launchpad.net |
Commit message
Implement LaunchpadForkin
Description of the change
Retargetted from https:/
because I accidentally started from db-devel in the beginning. This patch includes all suggestions from the original submission.
The goal of this submission is to improve the time for "bzr+ssh" to connect and be useful. The new method must be activated by setting:
[codehosting]
use_forking_
It is set to be enabled in "development" mode, but the default is still disabled. I can't give a recommendation for the production config, because the branch is private.
This implements a new service (LaunchpadForki
The benefit is seen with: time echo hello | ssh localhost bzr serve --inet ...
Without this patch, it is 2.5s to serve on the loopback. With this patch, it is 0.25s.
I'm very happy to work with someone to smooth out the finer points of this submission. I tried to be explicit about places in the code that I had a decision to make, and why I chose the method I did.
I didn't use the FeatureFlag system, because setting it up via the config file was a lot more straightforward. (globally enabling/disabling the functionality).
As near as I can tell, there should be no impact of rolling this code out on production, if use_forking_daemon: False.
(The make run_codehosting will not actually spawn the daemon, and the twisted Conch code just gets an 'if ' check to see that it should use the old code path.)
I haven't run the full test suite, but I have:
1) Run all of the locally relevant tests "bzr selftest -s bt.lp_serve' and 'bin/test
lp.
2) Manually started the sftp service and run commands through the local instance. Both with
and without the forking service enabled. (And I have confirmed that it isn't used when
disabled, and is used when enabled, etc.)
Michael Hudson-Doyle (mwhudson) wrote : | # |
Michael Hudson-Doyle (mwhudson) wrote : | # |
Wow, what a branch. I've been quite picky in this, that's not because
I hate you <wink> but because it's subtle code and has potential to
cause issues, so I wanted to make sure all the comments make sense and
there's no redundant code, etc.
I worry slightly about what would happen if the forking service fell
over -- but clearly, we rely on the sftp server itself staying up, so
this probably isn't such a big deal. I guess we also need to make
sure the forking service is running in production.
All that said, I have no real problems with the code and look forward
to getting my branches onto and off of Launchpad that bit faster!
I'm marking this needs information because I want to see a response to my
comments. But I don't think any of them are deep.
> === modified file 'Makefile'
> --- Makefile 2010-09-07 18:15:01 +0000
> +++ Makefile 2010-10-06 00:19:31 +0000
> @@ -253,7 +253,7 @@
>
> run_codehosting: check_schema inplace stop
> $(RM) thread*.request
> - bin/run -r librarian,
> + bin/run -r librarian,
>
>
> start_librarian: compile
You should add the forker service to the run_all target too.
> === added directory 'bzrplugins/
> === renamed file 'bzrplugins/
> --- bzrplugins/
> +++ bzrplugins/
> @@ -8,15 +8,33 @@
>
> __metaclass__ = type
>
> -__all__ = ['cmd_launchpad
> -
> -
> +__all__ = ['cmd_launchpad
> + 'cmd_launchpad_
> + ]
This shoudl be formatted like this:
__all__ = [
'cmd_
'cmd_
]
> +
> +
> +import errno
> +import os
> import resource
> +import shlex
> +import shutil
> +import signal
> +import socket
> import sys
> +import tempfile
> +import threading
> +import time
>
> from bzrlib.commands import Command, register_command
> from bzrlib.option import Option
> -from bzrlib import lockdir, ui
> +from bzrlib import (
> + commands,
> + errors,
> + lockdir,
> + osutils,
> + trace,
> + ui,
> + )
>
> from bzrlib.smart import medium, server
> from bzrlib.transport import get_transport
> @@ -110,3 +128,724 @@
>
>
> register_
> +
> +
> +class LPForkingServic
> + """A service that can be asked to start a new bzr subprocess via fork.
> +
> + The basic idea is that python startup is very expensive. For example, the
> + original 'lp-serve' command could take 2.5s just to start up, before any
> + actual actions could be performed.
Well, it's not really Python startup, its more the importing thats
slow. Maybe phrase this as "starting Python and importing Launchpad
is very...".
> + This class provides a service sitting on a socket, which can then be
> + requested to fork and run a given bzr command.
> +
> + Clients connect to the socket and make a simple request, which then
> + receives a response. The possible requests are:
> +
> + "hello\n": Trigger a heartbeat to report that the program is still
> + runnin...
Martin Pool (mbp) wrote : | # |
notes from talking this over with spm:
* at the moment you can see in the process listing which branch is
being accessed; it would be very nice to keep that; but at least we
should log the pid/user/branch
* can run this on staging, then edge, then lpnet
* propose changes into lp:lp-production-configs to gradually turn it on
* might want an rt describing how to monitor the new daemon process,
which just needs to mention the socket location and the hello command;
they can write the script from there
* francis should formally tell tom and charlie we'll be making this change
Robert Collins (lifeless) wrote : | # |
On Wed, Oct 6, 2010 at 9:43 PM, Martin Pool <email address hidden> wrote:
> Â * can run this on staging, then edge, then lpnet
There is no edge.
> Â * francis should formally tell tom and charlie we'll be making this change
The rt describing should mention this; it will get enough airplay ;)
We make architectural changes -all- the time, there's no need to get
particularly cautious about this one.
Jonathan Lange (jml) wrote : | # |
On Wed, Oct 6, 2010 at 9:52 AM, Robert Collins
<email address hidden> wrote:
> On Wed, Oct 6, 2010 at 9:43 PM, Martin Pool <email address hidden> wrote:
>
>> Â * can run this on staging, then edge, then lpnet
>
> There is no edge.
Yeah there is.
$ ssh bazaar.
No shells on this server.
Connection to bazaar.
jml
Robert Collins (lifeless) wrote : | # |
It will be gone extremely soon. I want people to stop thinking of
'edge' as a way to test things.
Michael Hudson-Doyle (mwhudson) wrote : | # |
On Wed, 06 Oct 2010 08:43:44 -0000, Martin Pool <email address hidden> wrote:
> notes from talking this over with spm:
>
> * at the moment you can see in the process listing which branch is
> being accessed; it would be very nice to keep that; but at least we
> should log the pid/user/branch
This isn't affected by this branch, although the process name ends up
being a bit unwieldy:
21266 pts/1 S+ 0:00 /usr/bin/python /home/mwh/
Maybe inserting a call to setproctitle in become_child would be useful,
especially as we have the dependency already -- one piece of information
we've lost is the user who has connected, although we can probably
figure that out from logs still.
> * can run this on staging, then edge, then lpnet
> * propose changes into lp:lp-production-configs to gradually turn it on
> * might want an rt describing how to monitor the new daemon process,
> which just needs to mention the socket location and the hello command;
> they can write the script from there
> * francis should formally tell tom and charlie we'll be making this change
I still vaguely wonder about controlling this with a feature flag. The
thing is, the codehosting systems don't connect to the database
currently, and that's actually kind of nice in some ways. We could add
an XML-RPC method to query a flag, and have the ssh daemon check that
every 30s or so -- I don't think we want an XML-RPC call delaying the
launch of the lp-servce process on every connection.
Cheers,
mwh
John A Meinel (jameinel) wrote : | # |
For starters, thanks for the thorough review. It is really nice to get feedback on it, and you have certainly thought a lot about the code.
On 10/6/2010 12:02 AM, Michael Hudson-Doyle wrote:
> Review: Needs Information
> Wow, what a branch. I've been quite picky in this, that's not because
> I hate you <wink> but because it's subtle code and has potential to
> cause issues, so I wanted to make sure all the comments make sense and
> there's no redundant code, etc.
>
> I worry slightly about what would happen if the forking service fell
> over -- but clearly, we rely on the sftp server itself staying up, so
> this probably isn't such a big deal. I guess we also need to make
> sure the forking service is running in production.
We could trap failures to connect to the forking server and fall back to regular spawnProcess. The code is already a simple if/else block. However, I wouldn't really want silent degradation of connection. If we want it, it could probably be done with:
transport = None
if config.
try:
transport = ...
except ???:
pass # We failed, we'll try again
if transport is None:
transport = reactor.
>
> All that said, I have no real problems with the code and look forward
> to getting my branches onto and off of Launchpad that bit faster!
>
> I'm marking this needs information because I want to see a response to my
> comments. But I don't think any of them are deep.
>
>> === modified file 'Makefile'
>> --- Makefile 2010-09-07 18:15:01 +0000
>> +++ Makefile 2010-10-06 00:19:31 +0000
>> @@ -253,7 +253,7 @@
>>
>> run_codehosting: check_schema inplace stop
>> $(RM) thread*.request
>> - bin/run -r librarian,
>> + bin/run -r librarian,
>>
>>
>> start_librarian: compile
>
> You should add the forker service to the run_all target too.
Done.
>
>> === added directory 'bzrplugins/
>> === renamed file 'bzrplugins/
>> --- bzrplugins/
>> +++ bzrplugins/
>> @@ -8,15 +8,33 @@
>>
>> __metaclass__ = type
>>
>> -__all__ = ['cmd_launchpad
>> -
>> -
>> +__all__ = ['cmd_launchpad
>> + 'cmd_launchpad_
>> + ]
>
> This shoudl be formatted like this:
>
> __all__ = [
> 'cmd_launchpad_
> 'cmd_launchpad_
> ]
check
...
>> +class LPForkingServic
>> + """A service that can be asked to start a new bzr subprocess via fork.
>> +
>> + The basic idea is that python startup is very expensive. For example, the
>> + original 'lp-serve' command could take 2.5s just to start up, before any
>> + actual actions could be performed.
>
> Well, it's not really Python startup, its more the importing thats
> slow. Maybe phrase this as "starting Python and importing Launchpad
> is very...".
>
>> + This class provides a service sitting on a socket, which can then be
>> + requested to fork and run a given bzr command.
>> +
>> + Clients connect to the socket and make a simp...
Martin Pool (mbp) wrote : | # |
On 7 October 2010 07:15, Michael Hudson-Doyle <email address hidden> wrote:
> On Wed, 06 Oct 2010 08:43:44 -0000, Martin Pool <email address hidden> wrote:
>> notes from talking this over with spm:
>>
>> Â * at the moment you can see in the process listing which branch is
>> being accessed; it would be very nice to keep that; but at least we
>> should log the pid/user/branch
>
> This isn't affected by this branch, although the process name ends up
> being a bit unwieldy:
>
> 21266 pts/1 Â Â S+ Â Â 0:00 /usr/bin/python /home/mwh/
>
> Maybe inserting a call to setproctitle in become_child would be useful,
> especially as we have the dependency already -- one piece of information
> we've lost is the user who has connected, although we can probably
> figure that out from logs still.
Without the call to setproctitle, we would be losing this thing spm
said he liked, which is to immediately show which user and branch it
corresponds to.
To my mind it's more important to log the username, branch and pid, so
that you can see who it was both while the process is running and
afterwards. It's also nice that jam logs the rusage. But putting
them into the argv is more immediately accessible.
>> Â * can run this on staging, then edge, then lpnet
>> Â * propose changes into lp:lp-production-configs to gradually turn it on
>> Â * might want an rt describing how to monitor the new daemon process,
>> which just needs to mention the socket location and the hello command;
>> they can write the script from there
>> Â * francis should formally tell tom and charlie we'll be making this change
>
> I still vaguely wonder about controlling this with a feature flag. Â The
> thing is, the codehosting systems don't connect to the database
> currently, and that's actually kind of nice in some ways. Â We could add
> an XML-RPC method to query a flag, and have the ssh daemon check that
> every 30s or so -- I don't think we want an XML-RPC call delaying the
> launch of the lp-servce process on every connection.
Actually I was thinking of just having an http URL, accessible only
within the DC, that just presents all the feature flag rules in a
machine readable form: maybe json by default, perhaps others. There's
no need for xmlrpc when we just want to read, and avoiding startup
dependencies is good. Then we can have a FeatureRuleSource that gets
and parses this rather than talking to storm, and it will be fast to
start up for this type of thing; probably fast enough to run on every
request if we want to. bug 656031.
--
Martin
Michael Hudson-Doyle (mwhudson) wrote : | # |
On Wed, 06 Oct 2010 22:34:40 -0000, Martin Pool <email address hidden> wrote:
> On 7 October 2010 07:15, Michael Hudson-Doyle <email address hidden> wrote:
> > On Wed, 06 Oct 2010 08:43:44 -0000, Martin Pool <email address hidden> wrote:
> >> notes from talking this over with spm:
> >>
> >>  * at the moment you can see in the process listing which branch is
> >> being accessed; it would be very nice to keep that; but at least we
> >> should log the pid/user/branch
> >
> > This isn't affected by this branch, although the process name ends up
> > being a bit unwieldy:
> >
> > 21266 pts/1   S+   0:00 /usr/bin/python /home/mwh/
> >
> > Maybe inserting a call to setproctitle in become_child would be useful,
> > especially as we have the dependency already -- one piece of information
> > we've lost is the user who has connected, although we can probably
> > figure that out from logs still.
>
> Without the call to setproctitle, we would be losing this thing spm
> said he liked, which is to immediately show which user and branch it
> corresponds to.
Uh? No, that's the bit we retain. The ps output I showed above was
from running with the branch being reviewed.
> To my mind it's more important to log the username, branch and pid, so
> that you can see who it was both while the process is running and
> afterwards. It's also nice that jam logs the rusage. But putting
> them into the argv is more immediately accessible.
I'm not sure if you're recommending this branch be changed before
landing here.
> >>  * can run this on staging, then edge, then lpnet
> >>  * propose changes into lp:lp-production-configs to gradually turn it on
> >>  * might want an rt describing how to monitor the new daemon process,
> >> which just needs to mention the socket location and the hello command;
> >> they can write the script from there
> >>  * francis should formally tell tom and charlie we'll be making this change
> >
> > I still vaguely wonder about controlling this with a feature flag.  The
> > thing is, the codehosting systems don't connect to the database
> > currently, and that's actually kind of nice in some ways.  We could add
> > an XML-RPC method to query a flag, and have the ssh daemon check that
> > every 30s or so -- I don't think we want an XML-RPC call delaying the
> > launch of the lp-servce process on every connection.
>
> Actually I was thinking of just having an http URL, accessible only
> within the DC, that just presents all the feature flag rules in a
> machine readable form: maybe json by default, perhaps others. There's
> no need for xmlrpc when we just want to read, and avoiding startup
> dependencies is good. Then we can have a FeatureRuleSource that gets
> and parses this rather than talking to storm, and it will be fast to
> start up for this type of thing; probably fast enough to run on every
> request if we want to. bug 656031.
That would be interesting. The sftp server being a twisted app, access
to this URL should really be nonblo...
John A Meinel (jameinel) wrote : | # |
I do log the username and pid, I don't (at that time) have the branch, because we haven't opened it yet. If we changed "bzrlib.
Martin Pool (mbp) wrote : | # |
On 7 October 2010 10:09, Michael Hudson-Doyle <email address hidden> wrote:
>> > 21266 pts/1   S+   0:00 /usr/bin/python /home/mwh/
>> >
>> > Maybe inserting a call to setproctitle in become_child would be useful,
>> > especially as we have the dependency already -- one piece of information
>> > we've lost is the user who has connected, although we can probably
>> > figure that out from logs still.
>>
>> Without the call to setproctitle, we would be losing this thing spm
>> said he liked, which is to immediately show which user and branch it
>> corresponds to.
>
> Uh? Â No, that's the bit we retain. Â The ps output I showed above was
> from running with the branch being reviewed.
The username is not shown. I don't know if it's a big deal; this
branch is old enough already.
>
>> To my mind it's more important to log the username, branch and pid, so
>> that you can see who it was both while the process is running and
>> afterwards. Â It's also nice that jam logs the rusage. Â But putting
>> them into the argv is more immediately accessible.
>
> I'm not sure if you're recommending this branch be changed before
> landing here.
what john said.
> That would be interesting. Â The sftp server being a twisted app, access
> to this URL should really be nonblocking which always adds a wrinkle.
> Maybe that would be fine though, or maybe we can just cheat.
That's actually kind of a feature...
--
Martin
Michael Hudson-Doyle (mwhudson) wrote : | # |
On Wed, 06 Oct 2010 20:39:01 -0000, John A Meinel <email address hidden> wrote:
> For starters, thanks for the thorough review. It is really nice to get feedback on it, and you have certainly thought a lot about the code.
>
> On 10/6/2010 12:02 AM, Michael Hudson-Doyle wrote:
> > Review: Needs Information
> > Wow, what a branch. I've been quite picky in this, that's not because
> > I hate you <wink> but because it's subtle code and has potential to
> > cause issues, so I wanted to make sure all the comments make sense and
> > there's no redundant code, etc.
> >
> > I worry slightly about what would happen if the forking service fell
> > over -- but clearly, we rely on the sftp server itself staying up, so
> > this probably isn't such a big deal. I guess we also need to make
> > sure the forking service is running in production.
>
>
> We could trap failures to connect to the forking server and fall back
> to regular spawnProcess. The code is already a simple if/else
> block. However, I wouldn't really want silent degradation of
> connection.
Yeah, I don't really like that idea either.
> >> + "fork <command>\n": Request a new subprocess to be started.
> >> + <command> is the bzr command to be run, such as "rocks" or
> >> + "lp-serve --inet 12".
> >> + The immediate response will be the path-on-disk to a directory full
> >> + of named pipes (fifos) that will be the stdout/stderr/stdin of the
> >> + new process.
> >
> > This doesn't quite make it clear what the names of the files will be.
> > The obvious guess is correct, but I'd rather be certain.
> >
> >> + If a client holds the socket open, when the child process exits,
> >> + the exit status (as given by 'wait()') will be written to the
> >> + socket.
> >
> > This appears to be out of date -- there's also a fork-env command now.
> > Is the fork command still useful?
>
> It has been useful in testing, it isn't used in the 'production' code.
If the fork command is still implemented, I think it should still be
documented...
> ...
>
>
> >
> >> + DEFAULT_PATH = '/var/run/
> >
> > I'm not sure of the value of providing a default here, as it seems
> > that its always in practice going to be overridden by the config. If
> > it makes testing easier, it's worth it I guess.
>
> Well, we override it for testing as well. I wanted to support "bzr
> lp-forking-service" as something you could run. Also, that seemed to
> be the 'best' place to put it, which gives hints as to where you would
> want to put it.
OK, let's leave it.
> >> + self._children_
> >> +
> >> + def _create_
> >> + self._server_socket = socket.
> >> + self._server_
> >> + if self._perms is not None:
> >> + os.chmod(
> >
> > The pedant in me thinks this is a bit racy. But I don't think it's
> > important (in fact, I wonder if the permission stuff is necessary
> > really).
>
> I'm sure it isn't necessary ...
Robert Collins (lifeless) wrote : | # |
On feature flags... I'd suggest an xmlrpc/API for evaluating flags.
evaluating flags requires DB access for group membership of a user and
potentially other things like time limited trials, access to opt-in
features and so forth.
The exact API would need to take a user, possibly feature names, and
return a list of the rules whose scopes need to be locally resolved.
Whether this is json or not is immaterial to me - at least for this
patch, the feature evaluating code would load once.
It would be very interesting to do this though, because we could phase
this in per-user.
-Rob
Martin Pool (mbp) wrote : | # |
rt 41791 for adding the monitoring service <https:/
Gavin Panella (allenap) wrote : | # |
Looks like this is taken care of, so abstaining on behalf of the Launchpad code reviewers.
Gavin Panella (allenap) : | # |
Michael Hudson-Doyle (mwhudson) wrote : | # |
So I would like to see the code that communicates with the service more twisted-ized, but until it turns out to be an actual problem it's not worth the bother. Let's land this.
Michael Hudson-Doyle (mwhudson) wrote : | # |
I think your latest changes are basically ok. Some of it feels a bit cargo culted though -- I don't see the reason for the lazy getForker method, just stash the forker on the instance normally? and the atexit calls aren't needed imho. If you inherited the forker helper class from fixture, you'd be able to use addCleanup, which would probably be a bit cleaner. I'm not sure the big TODO comment about paths is warranted -- it's all true, but much of Launchpad suffers this problem. If you can de-cargo-cult a bit, I'll land this asap.
Robert Collins (lifeless) wrote : | # |
lets just land and iterate.
-Rob
Michael Hudson-Doyle (mwhudson) wrote : | # |
On Mon, 18 Oct 2010 22:22:39 -0000, Robert Collins <email address hidden> wrote:
> lets just land and iterate.
Yeah fair enough. Throwing it at ec2 now.
Cheers,
mwh
Preview Diff
1 | === modified file 'Makefile' | |||
2 | --- Makefile 2010-10-18 21:53:28 +0000 | |||
3 | +++ Makefile 2010-10-19 22:56:30 +0000 | |||
4 | @@ -239,7 +239,7 @@ | |||
5 | 239 | 239 | ||
6 | 240 | run_all: check_schema inplace stop | 240 | run_all: check_schema inplace stop |
7 | 241 | $(RM) thread*.request | 241 | $(RM) thread*.request |
9 | 242 | bin/run -r librarian,sftp,mailman,codebrowse,google-webservice,memcached \ | 242 | bin/run -r librarian,sftp,forker,mailman,codebrowse,google-webservice,memcached \ |
10 | 243 | -i $(LPCONFIG) | 243 | -i $(LPCONFIG) |
11 | 244 | 244 | ||
12 | 245 | run_codebrowse: build | 245 | run_codebrowse: build |
13 | @@ -253,7 +253,7 @@ | |||
14 | 253 | 253 | ||
15 | 254 | run_codehosting: check_schema inplace stop | 254 | run_codehosting: check_schema inplace stop |
16 | 255 | $(RM) thread*.request | 255 | $(RM) thread*.request |
18 | 256 | bin/run -r librarian,sftp,codebrowse -i $(LPCONFIG) | 256 | bin/run -r librarian,sftp,forker,codebrowse -i $(LPCONFIG) |
19 | 257 | 257 | ||
20 | 258 | start_librarian: compile | 258 | start_librarian: compile |
21 | 259 | bin/start_librarian | 259 | bin/start_librarian |
22 | 260 | 260 | ||
23 | === added directory 'bzrplugins/lpserve' | |||
24 | === renamed file 'bzrplugins/lpserve.py' => 'bzrplugins/lpserve/__init__.py' | |||
25 | --- bzrplugins/lpserve.py 2010-04-19 06:35:23 +0000 | |||
26 | +++ bzrplugins/lpserve/__init__.py 2010-10-19 22:56:30 +0000 | |||
27 | @@ -8,15 +8,34 @@ | |||
28 | 8 | 8 | ||
29 | 9 | __metaclass__ = type | 9 | __metaclass__ = type |
30 | 10 | 10 | ||
34 | 11 | __all__ = ['cmd_launchpad_server'] | 11 | __all__ = [ |
35 | 12 | 12 | 'cmd_launchpad_server', | |
36 | 13 | 13 | 'cmd_launchpad_forking_service', | |
37 | 14 | ] | ||
38 | 15 | |||
39 | 16 | |||
40 | 17 | import errno | ||
41 | 18 | import logging | ||
42 | 19 | import os | ||
43 | 14 | import resource | 20 | import resource |
44 | 21 | import shlex | ||
45 | 22 | import shutil | ||
46 | 23 | import signal | ||
47 | 24 | import socket | ||
48 | 15 | import sys | 25 | import sys |
49 | 26 | import tempfile | ||
50 | 27 | import threading | ||
51 | 28 | import time | ||
52 | 16 | 29 | ||
53 | 17 | from bzrlib.commands import Command, register_command | 30 | from bzrlib.commands import Command, register_command |
54 | 18 | from bzrlib.option import Option | 31 | from bzrlib.option import Option |
56 | 19 | from bzrlib import lockdir, ui | 32 | from bzrlib import ( |
57 | 33 | commands, | ||
58 | 34 | lockdir, | ||
59 | 35 | osutils, | ||
60 | 36 | trace, | ||
61 | 37 | ui, | ||
62 | 38 | ) | ||
63 | 20 | 39 | ||
64 | 21 | from bzrlib.smart import medium, server | 40 | from bzrlib.smart import medium, server |
65 | 22 | from bzrlib.transport import get_transport | 41 | from bzrlib.transport import get_transport |
66 | @@ -110,3 +129,717 @@ | |||
67 | 110 | 129 | ||
68 | 111 | 130 | ||
69 | 112 | register_command(cmd_launchpad_server) | 131 | register_command(cmd_launchpad_server) |
70 | 132 | |||
71 | 133 | |||
72 | 134 | class LPForkingService(object): | ||
73 | 135 | """A service that can be asked to start a new bzr subprocess via fork. | ||
74 | 136 | |||
75 | 137 | The basic idea is that bootstrapping time is long. Most of this is time | ||
76 | 138 | spent during import of all needed libraries (lp.*). For example, the | ||
77 | 139 | original 'lp-serve' command could take 2.5s just to start up, before any | ||
78 | 140 | actual actions could be performed. | ||
79 | 141 | |||
80 | 142 | This class provides a service sitting on a socket, which can then be | ||
81 | 143 | requested to fork and run a given bzr command. | ||
82 | 144 | |||
83 | 145 | Clients connect to the socket and make a single request, which then | ||
84 | 146 | receives a response. The possible requests are: | ||
85 | 147 | |||
86 | 148 | "hello\n": Trigger a heartbeat to report that the program is still | ||
87 | 149 | running, and write status information to the log file. | ||
88 | 150 | "quit\n": Stop the service, but do so 'nicely', waiting for children | ||
89 | 151 | to exit, etc. Once this is received the service will stop | ||
90 | 152 | taking new requests on the port. | ||
91 | 153 | "fork-env <command>\n<env>\nend\n": Request a new subprocess to be | ||
92 | 154 | started. <command> is the bzr command to be run, such as "rocks" | ||
93 | 155 | or "lp-serve --inet 12". | ||
94 | 156 | The immediate response will be the path-on-disk to a directory full | ||
95 | 157 | of named pipes (fifos) that will be the stdout/stderr/stdin (named | ||
96 | 158 | accordingly) of the new process. | ||
97 | 159 | If a client holds the socket open, when the child process exits, | ||
98 | 160 | the exit status (as given by 'wait()') will be written to the | ||
99 | 161 | socket. | ||
100 | 162 | |||
101 | 163 | Note that one of the key bits is that the client will not be | ||
102 | 164 | started with exec*, we just call 'commands.run_bzr*()' directly. | ||
103 | 165 | This way, any modules that are already loaded will not need to be | ||
104 | 166 | loaded again. However, care must be taken with any global-state | ||
105 | 167 | that should be reset. | ||
106 | 168 | |||
107 | 169 | fork-env allows you to supply environment variables such as | ||
108 | 170 | "BZR_EMAIL: joe@foo.com" which will be set in os.environ before the | ||
109 | 171 | command is run. | ||
110 | 172 | """ | ||
111 | 173 | |||
112 | 174 | # Design decisions. These are bits where we could have chosen a different | ||
113 | 175 | # method/implementation and weren't sure what would be best. Documenting | ||
114 | 176 | # the current decision, and the alternatives. | ||
115 | 177 | # | ||
116 | 178 | # [Decision #1] | ||
117 | 179 | # Serve on a named AF_UNIX socket. | ||
118 | 180 | # 1) It doesn't make sense to serve to arbitrary hosts, we only want | ||
119 | 181 | # the local host to make requests. (Since the client needs to | ||
120 | 182 | # access the named fifos on the current filesystem.) | ||
121 | 183 | # 2) You can set security parameters on a filesystem path (g+rw, | ||
122 | 184 | # a-rw). | ||
123 | 185 | # [Decision #2] | ||
124 | 186 | # SIGCHLD | ||
125 | 187 | # We want to quickly detect that children have exited so that we can | ||
126 | 188 | # inform the client process quickly. At the moment, we register a | ||
127 | 189 | # SIGCHLD handler that doesn't do anything. However, it means that | ||
128 | 190 | # when we get the signal, if we are currently blocked in something | ||
129 | 191 | # like '.accept()', we will jump out temporarily. At that point the | ||
130 | 192 | # main loop will check if any children have exited. We could have | ||
131 | 193 | # done this work as part of the signal handler, but that felt 'racy' | ||
132 | 194 | # doing any serious work in a signal handler. | ||
133 | 195 | # If we just used socket.timeout as the indicator to go poll for | ||
134 | 196 | # children exiting, it slows the disconnect by as much as the full | ||
135 | 197 | # timeout. (So a timeout of 1.0s will cause the process to hang by | ||
136 | 198 | # that long until it determines that a child has exited, and can | ||
137 | 199 | # close the connection.) | ||
138 | 200 | # The current flow means that we'll notice exited children whenever | ||
139 | 201 | # we finish the current work. | ||
140 | 202 | # [Decision #3] | ||
141 | 203 | # Child vs Parent actions. | ||
142 | 204 | # There are several actions that are done when we get a new request. | ||
143 | 205 | # We have to create the fifos on disk, fork a new child, connect the | ||
144 | 206 | # child to those handles, and inform the client of the new path (not | ||
145 | 207 | # necessarily in that order.) It makes sense to wait to send the path | ||
146 | 208 | # message until after the fifos have been created. That way the | ||
147 | 209 | # client can just try to open them immediately, and the | ||
148 | 210 | # client-and-child will be synchronized by the open() calls. | ||
149 | 211 | # However, should the client be the one doing the mkfifo, should the | ||
150 | 212 | # server? Who should be sending the message? Should we fork after the | ||
151 | 213 | # mkfifo or before. | ||
152 | 214 | # The current thoughts: | ||
153 | 215 | # 1) Try to do work in the child when possible. This should allow | ||
154 | 216 | # for 'scaling' because the server is single-threaded. | ||
155 | 217 | # 2) We create the directory itself in the server, because that | ||
156 | 218 | # allows the server to monitor whether the client failed to | ||
157 | 219 | # clean up after itself or not. | ||
158 | 220 | # 3) Otherwise we create the fifos in the client, and then send | ||
159 | 221 | # the message back. | ||
160 | 222 | # [Decision #4] | ||
161 | 223 | # Exit information | ||
162 | 224 | # Inform the client that the child has exited on the socket they used | ||
163 | 225 | # to request the fork. | ||
164 | 226 | # 1) Arguably they could see that stdout and stderr have been closed, | ||
165 | 227 | # and thus stop reading. In testing, I wrote a client which uses | ||
166 | 228 | # select.poll() over stdin/stdout/stderr and used that to ferry | ||
167 | 229 | # the content to the appropriate local handle. However for the | ||
168 | 230 | # FIFOs, when the remote end closed, I wouldn't see any | ||
169 | 231 | # corresponding information on the local end. There obviously | ||
170 | 232 | # wasn't any data to be read, so they wouldn't show up as | ||
171 | 233 | # 'readable' (for me to try to read, and get 0 bytes, indicating | ||
172 | 234 | # it was closed). I also wasn't seeing POLLHUP, which seemed to be | ||
173 | 235 | # the correct indicator. As such, we decided to inform the client | ||
174 | 236 | # on the socket that they originally made the fork request, rather | ||
175 | 237 | # than just closing the socket immediately. | ||
176 | 238 | # 2) We could have had the forking server close the socket, and only | ||
177 | 239 | # the child hold the socket open. When the child exits, then the | ||
178 | 240 | # OS naturally closes the socket. | ||
179 | 241 | # If we want the returncode, then we should put that as bytes on | ||
180 | 242 | # the socket before we exit. Having the child do the work means | ||
181 | 243 | # that in error conditions, it could easily die before being able | ||
182 | 244 | # to write anything (think SEGFAULT, etc). The forking server is | ||
183 | 245 | # already 'wait'() ing on its children. So that we don't get | ||
184 | 246 | # zombies, and with wait3() we can get the rusage (user time, | ||
185 | 247 | # memory consumption, etc.) | ||
186 | 248 | # As such, it seems reasonable that the server can then also | ||
187 | 249 | # report back when a child is seen as exiting. | ||
188 | 250 | # [Decision #5] | ||
189 | 251 | # cleanup once connected | ||
190 | 252 | # The child process blocks during 'open()' waiting for the client to | ||
191 | 253 | # connect to its fifos. Once the client has connected, the child then | ||
192 | 254 | # deletes the temporary directory and the fifos from disk. This means | ||
193 | 255 | # that there isn't much left for diagnosis, but it also means that | ||
194 | 256 | # the client won't leave garbage around if it crashes, etc. | ||
195 | 257 | # Note that the forking service itself still monitors the paths | ||
196 | 258 | # created, and will delete garbage if it sees that a child failed to | ||
197 | 259 | # do so. | ||
198 | 260 | # [Decision #6] | ||
199 | 261 | # os._exit(retcode) in the child | ||
200 | 262 | # Calling sys.exit(retcode) raises an exception, which then bubbles | ||
201 | 263 | # up the stack and runs exit functions (and finally statements). When | ||
202 | 264 | # I tried using it originally, I would see the current child bubble | ||
203 | 265 | # all the way up the stack (through the server code that it fork() | ||
204 | 266 | # through), and then get to main() returning code 0. The process | ||
205 | 267 | # would still exit nonzero. My guess is that something in the atexit | ||
206 | 268 | # functions was failing, but that it was happening after logging, etc | ||
207 | 269 | # had been shut down. | ||
208 | 270 | # Any global state from the child process should be flushed before | ||
209 | 271 | # run_bzr_* has exited (which we *do* wait for), and any other global | ||
210 | 272 | # state is probably a remnant from the service process. Which will be | ||
211 | 273 | # cleaned up by the service itself, rather than the child. | ||
212 | 274 | # There is some concern that log files may not get flushed, so we | ||
213 | 275 | # currently call sys.exitfunc() first. The main problem is that I | ||
214 | 276 | # don't know any way to *remove* a function registered via 'atexit()' | ||
215 | 277 | # so if the forking service has some state, we my try to clean it up | ||
216 | 278 | # incorrectly. | ||
217 | 279 | # Note that the bzr script itself uses sys.exitfunc(); os._exit() in | ||
218 | 280 | # the 'bzr' main script, as the teardown time of all the python state | ||
219 | 281 | # was quite noticeable in real-world runtime. As such, bzrlib should | ||
220 | 282 | # be pretty safe, or it would have been failing for people already. | ||
221 | 283 | # [Decision #7] | ||
222 | 284 | # prefork vs max children vs ? | ||
223 | 285 | # For simplicity it seemed easiest to just fork when requested. Over | ||
224 | 286 | # time, I realized it would be easy to allow running an arbitrary | ||
225 | 287 | # command (no harder than just running one command), so it seemed | ||
226 | 288 | # reasonable to switch over. If we go the prefork route, then we'll | ||
227 | 289 | # need a way to tell the pre-forked children what command to run. | ||
228 | 290 | # This could be as easy as just adding one more fifo that they wait | ||
229 | 291 | # on in the same directory. | ||
230 | 292 | # For now, I've chosen not to limit the number of forked children. I | ||
231 | 293 | # don't know what a reasonable value is, and probably there are | ||
232 | 294 | # already limitations at play. (If Conch limits connections, then it | ||
233 | 295 | # will already be doing all the work, etc.) | ||
234 | 296 | # [Decision #8] | ||
235 | 297 | # nicer errors on the request socket | ||
236 | 298 | # This service is meant to be run only on the local system. As such, | ||
237 | 299 | # we don't try to be extra defensive about leaking information to | ||
238 | 300 | # the one connecting to the socket. (We should still watch out what | ||
239 | 301 | # we send across the per-child fifos, since those are connected to | ||
240 | 302 | # remote clients.) Instead we try to be helpful, and tell them as | ||
241 | 303 | # much as we know about what went wrong. | ||
242 | 304 | |||
243 | 305 | DEFAULT_PATH = '/var/run/launchpad_forking_service.sock' | ||
244 | 306 | DEFAULT_PERMISSIONS = 00660 # Permissions on the master socket (rw-rw----) | ||
245 | 307 | WAIT_FOR_CHILDREN_TIMEOUT = 5*60 # Wait no more than 5 min for children | ||
246 | 308 | SOCKET_TIMEOUT = 1.0 | ||
247 | 309 | SLEEP_FOR_CHILDREN_TIMEOUT = 1.0 | ||
248 | 310 | WAIT_FOR_REQUEST_TIMEOUT = 1.0 # No request should take longer than this to | ||
249 | 311 | # be read | ||
250 | 312 | |||
251 | 313 | _fork_function = os.fork | ||
252 | 314 | |||
253 | 315 | def __init__(self, path=DEFAULT_PATH, perms=DEFAULT_PERMISSIONS): | ||
254 | 316 | self.master_socket_path = path | ||
255 | 317 | self._perms = perms | ||
256 | 318 | self._start_time = None | ||
257 | 319 | self._should_terminate = threading.Event() | ||
258 | 320 | # We address these locally, in case of shutdown socket may be gc'd | ||
259 | 321 | # before we are | ||
260 | 322 | self._socket_timeout = socket.timeout | ||
261 | 323 | self._socket_error = socket.error | ||
262 | 324 | # Map from pid => (temp_path_for_handles, request_socket) | ||
263 | 325 | self._child_processes = {} | ||
264 | 326 | self._children_spawned = 0 | ||
265 | 327 | |||
266 | 328 | def _create_master_socket(self): | ||
267 | 329 | self._server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | ||
268 | 330 | self._server_socket.bind(self.master_socket_path) | ||
269 | 331 | if self._perms is not None: | ||
270 | 332 | os.chmod(self.master_socket_path, self._perms) | ||
271 | 333 | self._server_socket.listen(5) | ||
272 | 334 | self._server_socket.settimeout(self.SOCKET_TIMEOUT) | ||
273 | 335 | trace.mutter('set socket timeout to: %s' % (self.SOCKET_TIMEOUT,)) | ||
274 | 336 | |||
275 | 337 | def _cleanup_master_socket(self): | ||
276 | 338 | self._server_socket.close() | ||
277 | 339 | try: | ||
278 | 340 | os.remove(self.master_socket_path) | ||
279 | 341 | except (OSError, IOError), e: | ||
280 | 342 | # If we don't delete it, then we get 'address already in | ||
281 | 343 | # use' failures | ||
282 | 344 | trace.mutter('failed to cleanup: %s' | ||
283 | 345 | % (self.master_socket_path,)) | ||
284 | 346 | |||
285 | 347 | def _handle_sigchld(self, signum, frm): | ||
286 | 348 | # We don't actually do anything here, we just want an interrupt (EINTR) | ||
287 | 349 | # on socket.accept() when SIGCHLD occurs. | ||
288 | 350 | pass | ||
289 | 351 | |||
290 | 352 | def _handle_sigterm(self, signum, frm): | ||
291 | 353 | # Unregister this as the default handler, 2 SIGTERMs will exit us. | ||
292 | 354 | signal.signal(signal.SIGTERM, signal.SIG_DFL) | ||
293 | 355 | # SIGTERM should also generate EINTR on our wait loop, so this should | ||
294 | 356 | # be enough | ||
295 | 357 | self._should_terminate.set() | ||
296 | 358 | |||
297 | 359 | def _register_signals(self): | ||
298 | 360 | """Register a SIGCHILD and SIGTERM handler. | ||
299 | 361 | |||
300 | 362 | If we have a trigger for SIGCHILD then we can quickly respond to | ||
301 | 363 | clients when their process exits. The main risk is getting more EAGAIN | ||
302 | 364 | errors elsewhere. | ||
303 | 365 | |||
304 | 366 | SIGTERM allows us to cleanup nicely before we exit. | ||
305 | 367 | """ | ||
306 | 368 | signal.signal(signal.SIGCHLD, self._handle_sigchld) | ||
307 | 369 | signal.signal(signal.SIGTERM, self._handle_sigterm) | ||
308 | 370 | |||
309 | 371 | def _unregister_signals(self): | ||
310 | 372 | signal.signal(signal.SIGCHLD, signal.SIG_DFL) | ||
311 | 373 | signal.signal(signal.SIGTERM, signal.SIG_DFL) | ||
312 | 374 | |||
313 | 375 | def _create_child_file_descriptors(self, base_path): | ||
314 | 376 | stdin_path = os.path.join(base_path, 'stdin') | ||
315 | 377 | stdout_path = os.path.join(base_path, 'stdout') | ||
316 | 378 | stderr_path = os.path.join(base_path, 'stderr') | ||
317 | 379 | os.mkfifo(stdin_path) | ||
318 | 380 | os.mkfifo(stdout_path) | ||
319 | 381 | os.mkfifo(stderr_path) | ||
320 | 382 | |||
321 | 383 | def _bind_child_file_descriptors(self, base_path): | ||
322 | 384 | stdin_path = os.path.join(base_path, 'stdin') | ||
323 | 385 | stdout_path = os.path.join(base_path, 'stdout') | ||
324 | 386 | stderr_path = os.path.join(base_path, 'stderr') | ||
325 | 387 | # These open calls will block until another process connects (which | ||
326 | 388 | # must connect in the same order) | ||
327 | 389 | stdin_fid = os.open(stdin_path, os.O_RDONLY) | ||
328 | 390 | stdout_fid = os.open(stdout_path, os.O_WRONLY) | ||
329 | 391 | stderr_fid = os.open(stderr_path, os.O_WRONLY) | ||
330 | 392 | # Note: by this point bzrlib has opened stderr for logging | ||
331 | 393 | # (as part of starting the service process in the first place). | ||
332 | 394 | # As such, it has a stream handler that writes to stderr. logging | ||
333 | 395 | # tries to flush and close that, but the file is already closed. | ||
334 | 396 | # This just supresses that exception | ||
335 | 397 | logging.raiseExceptions = False | ||
336 | 398 | sys.stdin.close() | ||
337 | 399 | sys.stdout.close() | ||
338 | 400 | sys.stderr.close() | ||
339 | 401 | os.dup2(stdin_fid, 0) | ||
340 | 402 | os.dup2(stdout_fid, 1) | ||
341 | 403 | os.dup2(stderr_fid, 2) | ||
342 | 404 | sys.stdin = os.fdopen(stdin_fid, 'rb') | ||
343 | 405 | sys.stdout = os.fdopen(stdout_fid, 'wb') | ||
344 | 406 | sys.stderr = os.fdopen(stderr_fid, 'wb') | ||
345 | 407 | ui.ui_factory.stdin = sys.stdin | ||
346 | 408 | ui.ui_factory.stdout = sys.stdout | ||
347 | 409 | ui.ui_factory.stderr = sys.stderr | ||
348 | 410 | # Now that we've opened the handles, delete everything so that we don't | ||
349 | 411 | # leave garbage around. Because the open() is done in blocking mode, we | ||
350 | 412 | # know that someone has already connected to them, and we don't want | ||
351 | 413 | # anyone else getting confused and connecting. | ||
352 | 414 | # See [Decision #5] | ||
353 | 415 | os.remove(stderr_path) | ||
354 | 416 | os.remove(stdout_path) | ||
355 | 417 | os.remove(stdin_path) | ||
356 | 418 | os.rmdir(base_path) | ||
357 | 419 | |||
358 | 420 | def _close_child_file_descriptors(self): | ||
359 | 421 | sys.stdin.close() | ||
360 | 422 | sys.stderr.close() | ||
361 | 423 | sys.stdout.close() | ||
362 | 424 | |||
363 | 425 | def become_child(self, command_argv, path): | ||
364 | 426 | """We are in the spawned child code, do our magic voodoo.""" | ||
365 | 427 | # Stop tracking new signals | ||
366 | 428 | self._unregister_signals() | ||
367 | 429 | # Reset the start time | ||
368 | 430 | trace._bzr_log_start_time = time.time() | ||
369 | 431 | trace.mutter('%d starting %r' | ||
370 | 432 | % (os.getpid(), command_argv)) | ||
371 | 433 | self._bind_child_file_descriptors(path) | ||
372 | 434 | self._run_child_command(command_argv) | ||
373 | 435 | |||
374 | 436 | def _run_child_command(self, command_argv): | ||
375 | 437 | # This is the point where we would actually want to do something with | ||
376 | 438 | # our life | ||
377 | 439 | # TODO: We may want to consider special-casing the 'lp-serve' command. | ||
378 | 440 | # As that is the primary use-case for this service, it might be | ||
379 | 441 | # interesting to have an already-instantiated instance, where we | ||
380 | 442 | # can just pop on an extra argument and be ready to go. However, | ||
381 | 443 | # that would probably only really be measurable if we prefork. As | ||
382 | 444 | # it looks like ~200ms is 'fork()' time, but only 50ms is | ||
383 | 445 | # run-the-command time. | ||
384 | 446 | retcode = commands.run_bzr_catch_errors(command_argv) | ||
385 | 447 | self._close_child_file_descriptors() | ||
386 | 448 | trace.mutter('%d finished %r' | ||
387 | 449 | % (os.getpid(), command_argv)) | ||
388 | 450 | # We force os._exit() here, because we don't want to unwind the stack, | ||
389 | 451 | # which has complex results. (We can get it to unwind back to the | ||
390 | 452 | # cmd_launchpad_forking_service code, and even back to main() reporting | ||
391 | 453 | # thereturn code, but after that, suddenly the return code changes from | ||
392 | 454 | # a '0' to a '1', with no logging of info. | ||
393 | 455 | # TODO: Should we call sys.exitfunc() here? it allows atexit functions | ||
394 | 456 | # to fire, however, some of those may be still around from the | ||
395 | 457 | # parent process, which we don't really want. | ||
396 | 458 | sys.exitfunc() | ||
397 | 459 | # See [Decision #6] | ||
398 | 460 | os._exit(retcode) | ||
399 | 461 | |||
400 | 462 | @staticmethod | ||
401 | 463 | def command_to_argv(command_str): | ||
402 | 464 | """Convert a 'foo bar' style command to [u'foo', u'bar']""" | ||
403 | 465 | # command_str must be a utf-8 string | ||
404 | 466 | return [s.decode('utf-8') for s in shlex.split(command_str)] | ||
405 | 467 | |||
406 | 468 | @staticmethod | ||
407 | 469 | def parse_env(env_str): | ||
408 | 470 | """Convert the environment information into a dict. | ||
409 | 471 | |||
410 | 472 | :param env_str: A string full of environment variable declarations. | ||
411 | 473 | Each key is simple ascii "key: value\n" | ||
412 | 474 | The string must end with "end\n". | ||
413 | 475 | :return: A dict of environment variables | ||
414 | 476 | """ | ||
415 | 477 | env = {} | ||
416 | 478 | if not env_str.endswith('end\n'): | ||
417 | 479 | raise ValueError('Invalid env-str: %r' % (env_str,)) | ||
418 | 480 | env_str = env_str[:-5] | ||
419 | 481 | if not env_str: | ||
420 | 482 | return env | ||
421 | 483 | env_entries = env_str.split('\n') | ||
422 | 484 | for entry in env_entries: | ||
423 | 485 | key, value = entry.split(': ', 1) | ||
424 | 486 | env[key] = value | ||
425 | 487 | return env | ||
426 | 488 | |||
427 | 489 | def fork_one_request(self, conn, client_addr, command_argv, env): | ||
428 | 490 | """Fork myself and serve a request.""" | ||
429 | 491 | temp_name = tempfile.mkdtemp(prefix='lp-forking-service-child-') | ||
430 | 492 | # Now that we've set everything up, send the response to the client we | ||
431 | 493 | # create them first, so the client can start trying to connect to them, | ||
432 | 494 | # while we fork and have the child do the same. | ||
433 | 495 | self._children_spawned += 1 | ||
434 | 496 | pid = self._fork_function() | ||
435 | 497 | if pid == 0: | ||
436 | 498 | pid = os.getpid() | ||
437 | 499 | trace.mutter('%d spawned' % (pid,)) | ||
438 | 500 | self._server_socket.close() | ||
439 | 501 | for env_var, value in env.iteritems(): | ||
440 | 502 | osutils.set_or_unset_env(env_var, value) | ||
441 | 503 | # See [Decision #3] | ||
442 | 504 | self._create_child_file_descriptors(temp_name) | ||
443 | 505 | conn.sendall('ok\n%d\n%s\n' % (pid, temp_name)) | ||
444 | 506 | conn.close() | ||
445 | 507 | self.become_child(command_argv, temp_name) | ||
446 | 508 | trace.warning('become_child returned!!!') | ||
447 | 509 | sys.exit(1) | ||
448 | 510 | else: | ||
449 | 511 | self._child_processes[pid] = (temp_name, conn) | ||
450 | 512 | self.log(client_addr, 'Spawned process %s for %r: %s' | ||
451 | 513 | % (pid, command_argv, temp_name)) | ||
452 | 514 | |||
453 | 515 | def main_loop(self): | ||
454 | 516 | self._start_time = time.time() | ||
455 | 517 | self._should_terminate.clear() | ||
456 | 518 | self._register_signals() | ||
457 | 519 | self._create_master_socket() | ||
458 | 520 | trace.note('Listening on socket: %s' % (self.master_socket_path,)) | ||
459 | 521 | try: | ||
460 | 522 | try: | ||
461 | 523 | self._do_loop() | ||
462 | 524 | finally: | ||
463 | 525 | # Stop talking to others, we are shutting down | ||
464 | 526 | self._cleanup_master_socket() | ||
465 | 527 | except KeyboardInterrupt: | ||
466 | 528 | # SIGINT received, try to shutdown cleanly | ||
467 | 529 | pass | ||
468 | 530 | trace.note('Shutting down. Waiting up to %.0fs for %d child processes' | ||
469 | 531 | % (self.WAIT_FOR_CHILDREN_TIMEOUT, | ||
470 | 532 | len(self._child_processes))) | ||
471 | 533 | self._shutdown_children() | ||
472 | 534 | trace.note('Exiting') | ||
473 | 535 | |||
474 | 536 | def _do_loop(self): | ||
475 | 537 | while not self._should_terminate.isSet(): | ||
476 | 538 | try: | ||
477 | 539 | conn, client_addr = self._server_socket.accept() | ||
478 | 540 | except self._socket_timeout: | ||
479 | 541 | pass # run shutdown and children checks | ||
480 | 542 | except self._socket_error, e: | ||
481 | 543 | if e.args[0] == errno.EINTR: | ||
482 | 544 | pass # run shutdown and children checks | ||
483 | 545 | elif e.args[0] != errno.EBADF: | ||
484 | 546 | # We can get EBADF here while we are shutting down | ||
485 | 547 | # So we just ignore it for now | ||
486 | 548 | pass | ||
487 | 549 | else: | ||
488 | 550 | # Log any other failure mode | ||
489 | 551 | trace.warning("listening socket error: %s", e) | ||
490 | 552 | else: | ||
491 | 553 | self.log(client_addr, 'connected') | ||
492 | 554 | # TODO: We should probably trap exceptions coming out of this | ||
493 | 555 | # and log them, so that we don't kill the service because | ||
494 | 556 | # of an unhandled error | ||
495 | 557 | # Note: settimeout is used so that a malformed request doesn't | ||
496 | 558 | # cause us to hang forever. Note that the particular | ||
497 | 559 | # implementation means that a malicious client could | ||
498 | 560 | # probably send us one byte every Xms, and we would just | ||
499 | 561 | # keep trying to read it. However, as a local service, we | ||
500 | 562 | # aren't worrying about it. | ||
501 | 563 | conn.settimeout(self.WAIT_FOR_REQUEST_TIMEOUT) | ||
502 | 564 | try: | ||
503 | 565 | self.serve_one_connection(conn, client_addr) | ||
504 | 566 | except self._socket_timeout, e: | ||
505 | 567 | trace.log_exception_quietly() | ||
506 | 568 | self.log(client_addr, 'request timeout failure: %s' % (e,)) | ||
507 | 569 | conn.sendall('FAILURE\nrequest timed out\n') | ||
508 | 570 | conn.close() | ||
509 | 571 | self._poll_children() | ||
510 | 572 | |||
511 | 573 | def log(self, client_addr, message): | ||
512 | 574 | """Log a message to the trace log. | ||
513 | 575 | |||
514 | 576 | Include the information about what connection is being served. | ||
515 | 577 | """ | ||
516 | 578 | if client_addr is not None: | ||
517 | 579 | # Note, we don't use conn.getpeername() because if a client | ||
518 | 580 | # disconnects before we get here, that raises an exception | ||
519 | 581 | conn_info = '[%s] ' % (client_addr,) | ||
520 | 582 | else: | ||
521 | 583 | conn_info = '' | ||
522 | 584 | trace.mutter('%s%s' % (conn_info, message)) | ||
523 | 585 | |||
524 | 586 | def log_information(self): | ||
525 | 587 | """Log the status information. | ||
526 | 588 | |||
527 | 589 | This includes stuff like number of children, and ... ? | ||
528 | 590 | """ | ||
529 | 591 | self._poll_children() | ||
530 | 592 | self.log(None, 'Running for %.3fs' % (time.time() - self._start_time)) | ||
531 | 593 | self.log(None, '%d children currently running (spawned %d total)' | ||
532 | 594 | % (len(self._child_processes), self._children_spawned)) | ||
533 | 595 | # Read the current information about memory consumption, etc. | ||
534 | 596 | self.log(None, 'Self: %s' | ||
535 | 597 | % (resource.getrusage(resource.RUSAGE_SELF),)) | ||
536 | 598 | # This seems to be the sum of all rusage for all children that have | ||
537 | 599 | # been collected (not for currently running children, or ones we | ||
538 | 600 | # haven't "wait"ed on.) We may want to read /proc/PID/status, since | ||
539 | 601 | # 'live' information is probably more useful. | ||
540 | 602 | self.log(None, 'Finished children: %s' | ||
541 | 603 | % (resource.getrusage(resource.RUSAGE_CHILDREN),)) | ||
542 | 604 | |||
543 | 605 | def _poll_children(self): | ||
544 | 606 | """See if children are still running, etc. | ||
545 | 607 | |||
546 | 608 | One interesting hook here would be to track memory consumption, etc. | ||
547 | 609 | """ | ||
548 | 610 | while self._child_processes: | ||
549 | 611 | try: | ||
550 | 612 | c_id, exit_code, rusage = os.wait3(os.WNOHANG) | ||
551 | 613 | except OSError, e: | ||
552 | 614 | if e.errno == errno.ECHILD: | ||
553 | 615 | # TODO: We handle this right now because the test suite | ||
554 | 616 | # fakes a child, since we wanted to test some code | ||
555 | 617 | # without actually forking anything | ||
556 | 618 | trace.mutter('_poll_children() called, and' | ||
557 | 619 | ' self._child_processes indicates there are' | ||
558 | 620 | ' children, but os.wait3() says there are not.' | ||
559 | 621 | ' current_children: %s' % (self._child_processes,)) | ||
560 | 622 | return | ||
561 | 623 | if c_id == 0: | ||
562 | 624 | # No more children stopped right now | ||
563 | 625 | return | ||
564 | 626 | c_path, sock = self._child_processes.pop(c_id) | ||
565 | 627 | trace.mutter('%s exited %s and usage: %s' | ||
566 | 628 | % (c_id, exit_code, rusage)) | ||
567 | 629 | # See [Decision #4] | ||
568 | 630 | try: | ||
569 | 631 | sock.sendall('exited\n%s\n' % (exit_code,)) | ||
570 | 632 | except (self._socket_timeout, self._socket_error), e: | ||
571 | 633 | # The client disconnected before we wanted them to, | ||
572 | 634 | # no big deal | ||
573 | 635 | trace.mutter('%s\'s socket already closed: %s' % (c_id, e)) | ||
574 | 636 | else: | ||
575 | 637 | sock.close() | ||
576 | 638 | if os.path.exists(c_path): | ||
577 | 639 | # The child failed to cleanup after itself, do the work here | ||
578 | 640 | trace.warning('Had to clean up after child %d: %s\n' | ||
579 | 641 | % (c_id, c_path)) | ||
580 | 642 | shutil.rmtree(c_path, ignore_errors=True) | ||
581 | 643 | |||
582 | 644 | def _wait_for_children(self, secs): | ||
583 | 645 | start = time.time() | ||
584 | 646 | end = start + secs | ||
585 | 647 | while self._child_processes: | ||
586 | 648 | self._poll_children() | ||
587 | 649 | if secs > 0 and time.time() > end: | ||
588 | 650 | break | ||
589 | 651 | time.sleep(self.SLEEP_FOR_CHILDREN_TIMEOUT) | ||
590 | 652 | |||
591 | 653 | def _shutdown_children(self): | ||
592 | 654 | self._wait_for_children(self.WAIT_FOR_CHILDREN_TIMEOUT) | ||
593 | 655 | if self._child_processes: | ||
594 | 656 | trace.warning('Children still running: %s' | ||
595 | 657 | % ', '.join(map(str, self._child_processes))) | ||
596 | 658 | for c_id in self._child_processes: | ||
597 | 659 | trace.warning('sending SIGINT to %d' % (c_id,)) | ||
598 | 660 | os.kill(c_id, signal.SIGINT) | ||
599 | 661 | # We sent the SIGINT signal, see if they exited | ||
600 | 662 | self._wait_for_children(self.SLEEP_FOR_CHILDREN_TIMEOUT) | ||
601 | 663 | if self._child_processes: | ||
602 | 664 | # No? Then maybe something more powerful | ||
603 | 665 | for c_id in self._child_processes: | ||
604 | 666 | trace.warning('sending SIGKILL to %d' % (c_id,)) | ||
605 | 667 | os.kill(c_id, signal.SIGKILL) | ||
606 | 668 | # We sent the SIGKILL signal, see if they exited | ||
607 | 669 | self._wait_for_children(self.SLEEP_FOR_CHILDREN_TIMEOUT) | ||
608 | 670 | if self._child_processes: | ||
609 | 671 | for c_id, (c_path, sock) in self._child_processes.iteritems(): | ||
610 | 672 | # TODO: We should probably put something into this message? | ||
611 | 673 | # However, the likelyhood is very small that this isn't | ||
612 | 674 | # already closed because of SIGKILL + _wait_for_children | ||
613 | 675 | # And I don't really know what to say... | ||
614 | 676 | sock.close() | ||
615 | 677 | if os.path.exists(c_path): | ||
616 | 678 | trace.warning('Cleaning up after immortal child %d: %s\n' | ||
617 | 679 | % (c_id, c_path)) | ||
618 | 680 | shutil.rmtree(c_path) | ||
619 | 681 | |||
620 | 682 | def _parse_fork_request(self, conn, client_addr, request): | ||
621 | 683 | if request.startswith('fork-env '): | ||
622 | 684 | while not request.endswith('end\n'): | ||
623 | 685 | request += osutils.read_bytes_from_socket(conn) | ||
624 | 686 | command, env = request[9:].split('\n', 1) | ||
625 | 687 | else: | ||
626 | 688 | command = request[5:].strip() | ||
627 | 689 | env = 'end\n' # No env set | ||
628 | 690 | try: | ||
629 | 691 | command_argv = self.command_to_argv(command) | ||
630 | 692 | env = self.parse_env(env) | ||
631 | 693 | except Exception, e: | ||
632 | 694 | # TODO: Log the traceback? | ||
633 | 695 | self.log(client_addr, 'command or env parsing failed: %r' | ||
634 | 696 | % (str(e),)) | ||
635 | 697 | conn.sendall('FAILURE\ncommand or env parsing failed: %r' | ||
636 | 698 | % (str(e),)) | ||
637 | 699 | else: | ||
638 | 700 | return command_argv, env | ||
639 | 701 | return None, None | ||
640 | 702 | |||
641 | 703 | def serve_one_connection(self, conn, client_addr): | ||
642 | 704 | request = '' | ||
643 | 705 | while '\n' not in request: | ||
644 | 706 | request += osutils.read_bytes_from_socket(conn) | ||
645 | 707 | # telnet likes to use '\r\n' rather than '\n', and it is nice to have | ||
646 | 708 | # an easy way to debug. | ||
647 | 709 | request = request.replace('\r\n', '\n') | ||
648 | 710 | self.log(client_addr, 'request: %r' % (request,)) | ||
649 | 711 | if request == 'hello\n': | ||
650 | 712 | conn.sendall('ok\nyep, still alive\n') | ||
651 | 713 | self.log_information() | ||
652 | 714 | conn.close() | ||
653 | 715 | elif request == 'quit\n': | ||
654 | 716 | self._should_terminate.set() | ||
655 | 717 | conn.sendall('ok\nquit command requested... exiting\n') | ||
656 | 718 | conn.close() | ||
657 | 719 | elif request.startswith('fork ') or request.startswith('fork-env '): | ||
658 | 720 | command_argv, env = self._parse_fork_request(conn, client_addr, | ||
659 | 721 | request) | ||
660 | 722 | if command_argv is not None: | ||
661 | 723 | # See [Decision #7] | ||
662 | 724 | # TODO: Do we want to limit the number of children? And/or | ||
663 | 725 | # prefork additional instances? (the design will need to | ||
664 | 726 | # change if we prefork and run arbitrary commands.) | ||
665 | 727 | self.fork_one_request(conn, client_addr, command_argv, env) | ||
666 | 728 | # We don't close the conn like other code paths, since we use | ||
667 | 729 | # it again later. | ||
668 | 730 | else: | ||
669 | 731 | conn.close() | ||
670 | 732 | else: | ||
671 | 733 | self.log(client_addr, 'FAILURE: unknown request: %r' % (request,)) | ||
672 | 734 | # See [Decision #8] | ||
673 | 735 | conn.sendall('FAILURE\nunknown request: %r\n' % (request,)) | ||
674 | 736 | conn.close() | ||
675 | 737 | |||
676 | 738 | |||
677 | 739 | class cmd_launchpad_forking_service(Command): | ||
678 | 740 | """Launch a long-running process, where you can ask for new processes. | ||
679 | 741 | |||
680 | 742 | The process will block on a given AF_UNIX socket waiting for requests to be | ||
681 | 743 | made. When a request is made, it will fork itself and redirect | ||
682 | 744 | stdout/in/err to fifos on the filesystem, and start running the requested | ||
683 | 745 | command. The caller will be informed where those file handles can be found. | ||
684 | 746 | Thus it only makes sense that the process connecting to the port must be on | ||
685 | 747 | the same system. | ||
686 | 748 | """ | ||
687 | 749 | |||
688 | 750 | aliases = ['lp-service'] | ||
689 | 751 | |||
690 | 752 | takes_options = [Option('path', | ||
691 | 753 | help='Listen for connections at PATH', | ||
692 | 754 | type=str), | ||
693 | 755 | Option('perms', | ||
694 | 756 | help='Set the mode bits for the socket, interpreted' | ||
695 | 757 | ' as an octal integer (same as chmod)'), | ||
696 | 758 | Option('preload', | ||
697 | 759 | help="Do/don't preload libraries before startup."), | ||
698 | 760 | Option('children-timeout', type=int, argname='SEC', | ||
699 | 761 | help="Only wait SEC seconds for children to exit"), | ||
700 | 762 | ] | ||
701 | 763 | |||
702 | 764 | def _preload_libraries(self): | ||
703 | 765 | for pyname in libraries_to_preload: | ||
704 | 766 | try: | ||
705 | 767 | __import__(pyname) | ||
706 | 768 | except ImportError, e: | ||
707 | 769 | trace.mutter('failed to preload %s: %s' % (pyname, e)) | ||
708 | 770 | |||
709 | 771 | def run(self, path=None, perms=None, preload=True, | ||
710 | 772 | children_timeout=LPForkingService.WAIT_FOR_CHILDREN_TIMEOUT): | ||
711 | 773 | if path is None: | ||
712 | 774 | path = LPForkingService.DEFAULT_PATH | ||
713 | 775 | if perms is None: | ||
714 | 776 | perms = LPForkingService.DEFAULT_PERMISSIONS | ||
715 | 777 | if preload: | ||
716 | 778 | # We 'note' this because it often takes a fair amount of time. | ||
717 | 779 | trace.note('Preloading %d modules' % (len(libraries_to_preload),)) | ||
718 | 780 | self._preload_libraries() | ||
719 | 781 | service = LPForkingService(path, perms) | ||
720 | 782 | service.WAIT_FOR_CHILDREN_TIMEOUT = children_timeout | ||
721 | 783 | service.main_loop() | ||
722 | 784 | |||
723 | 785 | register_command(cmd_launchpad_forking_service) | ||
724 | 786 | |||
725 | 787 | |||
726 | 788 | class cmd_launchpad_replay(Command): | ||
727 | 789 | """Write input from stdin back to stdout or stderr. | ||
728 | 790 | |||
729 | 791 | This is a hidden command, primarily available for testing | ||
730 | 792 | cmd_launchpad_forking_service. | ||
731 | 793 | """ | ||
732 | 794 | |||
733 | 795 | hidden = True | ||
734 | 796 | |||
735 | 797 | def run(self): | ||
736 | 798 | # Just read line-by-line from stdin, and write out to stdout or stderr | ||
737 | 799 | # depending on the prefix | ||
738 | 800 | for line in sys.stdin: | ||
739 | 801 | channel, contents = line.split(' ', 1) | ||
740 | 802 | channel = int(channel) | ||
741 | 803 | if channel == 1: | ||
742 | 804 | sys.stdout.write(contents) | ||
743 | 805 | sys.stdout.flush() | ||
744 | 806 | elif channel == 2: | ||
745 | 807 | sys.stderr.write(contents) | ||
746 | 808 | sys.stderr.flush() | ||
747 | 809 | else: | ||
748 | 810 | raise RuntimeError('Invalid channel request.') | ||
749 | 811 | return 0 | ||
750 | 812 | |||
751 | 813 | register_command(cmd_launchpad_replay) | ||
752 | 814 | |||
753 | 815 | # This list was generated by run lsprofing a spawned child, and looking for | ||
754 | 816 | # <module ...> times, which indicate an import occured. Other possibilities are | ||
755 | 817 | # to just run "bzr lp-serve --profile-imports" manually, and observe what was | ||
756 | 818 | # expensive to import. It doesn't seem very easy to get this right | ||
757 | 819 | # automatically. | ||
758 | 820 | libraries_to_preload = [ | ||
759 | 821 | 'bzrlib.errors', | ||
760 | 822 | 'bzrlib.repofmt.groupcompress_repo', | ||
761 | 823 | 'bzrlib.repository', | ||
762 | 824 | 'bzrlib.smart', | ||
763 | 825 | 'bzrlib.smart.protocol', | ||
764 | 826 | 'bzrlib.smart.request', | ||
765 | 827 | 'bzrlib.smart.server', | ||
766 | 828 | 'bzrlib.smart.vfs', | ||
767 | 829 | 'bzrlib.transport.local', | ||
768 | 830 | 'bzrlib.transport.readonly', | ||
769 | 831 | 'lp.codehosting.bzrutils', | ||
770 | 832 | 'lp.codehosting.vfs', | ||
771 | 833 | 'lp.codehosting.vfs.branchfs', | ||
772 | 834 | 'lp.codehosting.vfs.branchfsclient', | ||
773 | 835 | 'lp.codehosting.vfs.hooks', | ||
774 | 836 | 'lp.codehosting.vfs.transport', | ||
775 | 837 | ] | ||
776 | 838 | |||
777 | 839 | |||
778 | 840 | def load_tests(standard_tests, module, loader): | ||
779 | 841 | standard_tests.addTests(loader.loadTestsFromModuleNames( | ||
780 | 842 | [__name__ + '.' + x for x in [ | ||
781 | 843 | 'test_lpserve', | ||
782 | 844 | ]])) | ||
783 | 845 | return standard_tests | ||
784 | 113 | 846 | ||
785 | === added file 'bzrplugins/lpserve/test_lpserve.py' | |||
786 | --- bzrplugins/lpserve/test_lpserve.py 1970-01-01 00:00:00 +0000 | |||
787 | +++ bzrplugins/lpserve/test_lpserve.py 2010-10-19 22:56:30 +0000 | |||
788 | @@ -0,0 +1,534 @@ | |||
789 | 1 | # Copyright 2010 Canonical Ltd. This software is licensed under the | ||
790 | 2 | # GNU Affero General Public License version 3 (see the file LICENSE). | ||
791 | 3 | |||
792 | 4 | import os | ||
793 | 5 | import signal | ||
794 | 6 | import socket | ||
795 | 7 | import subprocess | ||
796 | 8 | import tempfile | ||
797 | 9 | import threading | ||
798 | 10 | import time | ||
799 | 11 | |||
800 | 12 | from testtools import content | ||
801 | 13 | |||
802 | 14 | from bzrlib import ( | ||
803 | 15 | osutils, | ||
804 | 16 | tests, | ||
805 | 17 | trace, | ||
806 | 18 | ) | ||
807 | 19 | from bzrlib.plugins import lpserve | ||
808 | 20 | |||
809 | 21 | from canonical.config import config | ||
810 | 22 | from lp.codehosting import get_bzr_path, get_BZR_PLUGIN_PATH_for_subprocess | ||
811 | 23 | |||
812 | 24 | |||
813 | 25 | class TestingLPForkingServiceInAThread(lpserve.LPForkingService): | ||
814 | 26 | """A test-double to run a "forking service" in a thread. | ||
815 | 27 | |||
816 | 28 | Note that we don't allow actually forking, but it does allow us to interact | ||
817 | 29 | with the service for other operations. | ||
818 | 30 | """ | ||
819 | 31 | |||
820 | 32 | # For testing, we set the timeouts much lower, because we want the tests to | ||
821 | 33 | # run quickly | ||
822 | 34 | WAIT_FOR_CHILDREN_TIMEOUT = 0.5 | ||
823 | 35 | SOCKET_TIMEOUT = 0.01 | ||
824 | 36 | SLEEP_FOR_CHILDREN_TIMEOUT = 0.01 | ||
825 | 37 | WAIT_FOR_REQUEST_TIMEOUT = 0.1 | ||
826 | 38 | |||
827 | 39 | # We're running in a thread as part of the test suite, blow up if we try to | ||
828 | 40 | # fork | ||
829 | 41 | _fork_function = None | ||
830 | 42 | |||
831 | 43 | def __init__(self, path, perms=None): | ||
832 | 44 | self.service_started = threading.Event() | ||
833 | 45 | self.service_stopped = threading.Event() | ||
834 | 46 | self.this_thread = None | ||
835 | 47 | self.fork_log = [] | ||
836 | 48 | super(TestingLPForkingServiceInAThread, self).__init__( | ||
837 | 49 | path=path, perms=None) | ||
838 | 50 | |||
839 | 51 | def _register_signals(self): | ||
840 | 52 | pass # Don't register it for the test suite | ||
841 | 53 | |||
842 | 54 | def _unregister_signals(self): | ||
843 | 55 | pass # We don't fork, and didn't register, so don't unregister | ||
844 | 56 | |||
845 | 57 | def _create_master_socket(self): | ||
846 | 58 | super(TestingLPForkingServiceInAThread, self)._create_master_socket() | ||
847 | 59 | self.service_started.set() | ||
848 | 60 | |||
849 | 61 | def main_loop(self): | ||
850 | 62 | self.service_stopped.clear() | ||
851 | 63 | super(TestingLPForkingServiceInAThread, self).main_loop() | ||
852 | 64 | self.service_stopped.set() | ||
853 | 65 | |||
854 | 66 | def fork_one_request(self, conn, client_addr, command, env): | ||
855 | 67 | # We intentionally don't allow the test suite to request a fork, as | ||
856 | 68 | # threads + forks and everything else don't exactly play well together | ||
857 | 69 | self.fork_log.append((command, env)) | ||
858 | 70 | conn.sendall('ok\nfake forking\n') | ||
859 | 71 | conn.close() | ||
860 | 72 | |||
861 | 73 | @staticmethod | ||
862 | 74 | def start_service(test): | ||
863 | 75 | """Start a new LPForkingService in a thread at a random path. | ||
864 | 76 | |||
865 | 77 | This will block until the service has created its socket, and is ready | ||
866 | 78 | to communicate. | ||
867 | 79 | |||
868 | 80 | :return: A new TestingLPForkingServiceInAThread instance | ||
869 | 81 | """ | ||
870 | 82 | fd, path = tempfile.mkstemp(prefix='tmp-lp-forking-service-', | ||
871 | 83 | suffix='.sock') | ||
872 | 84 | # We don't want a temp file, we want a temp socket | ||
873 | 85 | os.close(fd) | ||
874 | 86 | os.remove(path) | ||
875 | 87 | new_service = TestingLPForkingServiceInAThread(path=path) | ||
876 | 88 | thread = threading.Thread(target=new_service.main_loop, | ||
877 | 89 | name='TestingLPForkingServiceInAThread') | ||
878 | 90 | new_service.this_thread = thread | ||
879 | 91 | # should we be doing thread.setDaemon(True) ? | ||
880 | 92 | thread.start() | ||
881 | 93 | new_service.service_started.wait(10.0) | ||
882 | 94 | if not new_service.service_started.isSet(): | ||
883 | 95 | raise RuntimeError( | ||
884 | 96 | 'Failed to start the TestingLPForkingServiceInAThread') | ||
885 | 97 | test.addCleanup(new_service.stop_service) | ||
886 | 98 | # what about returning new_service._sockname ? | ||
887 | 99 | return new_service | ||
888 | 100 | |||
889 | 101 | def stop_service(self): | ||
890 | 102 | """Stop the test-server thread. This can be called multiple times.""" | ||
891 | 103 | if self.this_thread is None: | ||
892 | 104 | # We already stopped the process | ||
893 | 105 | return | ||
894 | 106 | self._should_terminate.set() | ||
895 | 107 | self.service_stopped.wait(10.0) | ||
896 | 108 | if not self.service_stopped.isSet(): | ||
897 | 109 | raise RuntimeError( | ||
898 | 110 | 'Failed to stop the TestingLPForkingServiceInAThread') | ||
899 | 111 | self.this_thread.join() | ||
900 | 112 | # Break any refcycles | ||
901 | 113 | self.this_thread = None | ||
902 | 114 | |||
903 | 115 | |||
904 | 116 | class TestTestingLPForkingServiceInAThread(tests.TestCaseWithTransport): | ||
905 | 117 | |||
906 | 118 | def test_start_and_stop_service(self): | ||
907 | 119 | service = TestingLPForkingServiceInAThread.start_service(self) | ||
908 | 120 | service.stop_service() | ||
909 | 121 | |||
910 | 122 | def test_multiple_stops(self): | ||
911 | 123 | service = TestingLPForkingServiceInAThread.start_service(self) | ||
912 | 124 | service.stop_service() | ||
913 | 125 | # calling stop_service repeatedly is a no-op (and not an error) | ||
914 | 126 | service.stop_service() | ||
915 | 127 | |||
916 | 128 | def test_autostop(self): | ||
917 | 129 | # We shouldn't leak a thread here, as it should be part of the test | ||
918 | 130 | # case teardown. | ||
919 | 131 | service = TestingLPForkingServiceInAThread.start_service(self) | ||
920 | 132 | |||
921 | 133 | |||
922 | 134 | class TestCaseWithLPForkingService(tests.TestCaseWithTransport): | ||
923 | 135 | |||
924 | 136 | def setUp(self): | ||
925 | 137 | super(TestCaseWithLPForkingService, self).setUp() | ||
926 | 138 | self.service = TestingLPForkingServiceInAThread.start_service(self) | ||
927 | 139 | |||
928 | 140 | def send_message_to_service(self, message, one_byte_at_a_time=False): | ||
929 | 141 | client_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | ||
930 | 142 | client_sock.connect(self.service.master_socket_path) | ||
931 | 143 | if one_byte_at_a_time: | ||
932 | 144 | for byte in message: | ||
933 | 145 | client_sock.send(byte) | ||
934 | 146 | else: | ||
935 | 147 | client_sock.sendall(message) | ||
936 | 148 | response = client_sock.recv(1024) | ||
937 | 149 | return response | ||
938 | 150 | |||
939 | 151 | |||
940 | 152 | class TestLPForkingServiceCommandToArgv(tests.TestCase): | ||
941 | 153 | |||
942 | 154 | def assertAsArgv(self, argv, command_str): | ||
943 | 155 | self.assertEqual(argv, | ||
944 | 156 | lpserve.LPForkingService.command_to_argv(command_str)) | ||
945 | 157 | |||
946 | 158 | def test_simple(self): | ||
947 | 159 | self.assertAsArgv([u'foo'], 'foo') | ||
948 | 160 | self.assertAsArgv([u'foo', u'bar'], 'foo bar') | ||
949 | 161 | |||
950 | 162 | def test_quoted(self): | ||
951 | 163 | self.assertAsArgv([u'foo'], 'foo') | ||
952 | 164 | self.assertAsArgv([u'foo bar'], '"foo bar"') | ||
953 | 165 | |||
954 | 166 | def test_unicode(self): | ||
955 | 167 | self.assertAsArgv([u'command', u'\xe5'], 'command \xc3\xa5') | ||
956 | 168 | |||
957 | 169 | |||
958 | 170 | class TestLPForkingServiceParseEnv(tests.TestCase): | ||
959 | 171 | |||
960 | 172 | def assertEnv(self, env, env_str): | ||
961 | 173 | self.assertEqual(env, lpserve.LPForkingService.parse_env(env_str)) | ||
962 | 174 | |||
963 | 175 | def assertInvalid(self, env_str): | ||
964 | 176 | self.assertRaises(ValueError, lpserve.LPForkingService.parse_env, | ||
965 | 177 | env_str) | ||
966 | 178 | |||
967 | 179 | def test_no_entries(self): | ||
968 | 180 | self.assertEnv({}, 'end\n') | ||
969 | 181 | |||
970 | 182 | def test_one_entries(self): | ||
971 | 183 | self.assertEnv({'BZR_EMAIL': 'joe@foo.com'}, | ||
972 | 184 | 'BZR_EMAIL: joe@foo.com\n' | ||
973 | 185 | 'end\n') | ||
974 | 186 | |||
975 | 187 | def test_two_entries(self): | ||
976 | 188 | self.assertEnv({'BZR_EMAIL': 'joe@foo.com', 'BAR': 'foo'}, | ||
977 | 189 | 'BZR_EMAIL: joe@foo.com\n' | ||
978 | 190 | 'BAR: foo\n' | ||
979 | 191 | 'end\n') | ||
980 | 192 | |||
981 | 193 | def test_invalid_empty(self): | ||
982 | 194 | self.assertInvalid('') | ||
983 | 195 | |||
984 | 196 | def test_invalid_end(self): | ||
985 | 197 | self.assertInvalid("BZR_EMAIL: joe@foo.com\n") | ||
986 | 198 | |||
987 | 199 | def test_invalid_entry(self): | ||
988 | 200 | self.assertInvalid("BZR_EMAIL joe@foo.com\nend\n") | ||
989 | 201 | |||
990 | 202 | |||
991 | 203 | class TestLPForkingService(TestCaseWithLPForkingService): | ||
992 | 204 | |||
993 | 205 | def test_send_quit_message(self): | ||
994 | 206 | response = self.send_message_to_service('quit\n') | ||
995 | 207 | self.assertEqual('ok\nquit command requested... exiting\n', response) | ||
996 | 208 | self.service.service_stopped.wait(10.0) | ||
997 | 209 | self.assertTrue(self.service.service_stopped.isSet()) | ||
998 | 210 | |||
999 | 211 | def test_send_invalid_message_fails(self): | ||
1000 | 212 | response = self.send_message_to_service('unknown\n') | ||
1001 | 213 | self.assertStartsWith(response, 'FAILURE') | ||
1002 | 214 | |||
1003 | 215 | def test_send_hello_heartbeat(self): | ||
1004 | 216 | response = self.send_message_to_service('hello\n') | ||
1005 | 217 | self.assertEqual('ok\nyep, still alive\n', response) | ||
1006 | 218 | |||
1007 | 219 | def test_send_simple_fork(self): | ||
1008 | 220 | response = self.send_message_to_service('fork rocks\n') | ||
1009 | 221 | self.assertEqual('ok\nfake forking\n', response) | ||
1010 | 222 | self.assertEqual([(['rocks'], {})], self.service.fork_log) | ||
1011 | 223 | |||
1012 | 224 | def test_send_fork_env_with_empty_env(self): | ||
1013 | 225 | response = self.send_message_to_service( | ||
1014 | 226 | 'fork-env rocks\n' | ||
1015 | 227 | 'end\n') | ||
1016 | 228 | self.assertEqual('ok\nfake forking\n', response) | ||
1017 | 229 | self.assertEqual([(['rocks'], {})], self.service.fork_log) | ||
1018 | 230 | |||
1019 | 231 | def test_send_fork_env_with_env(self): | ||
1020 | 232 | response = self.send_message_to_service( | ||
1021 | 233 | 'fork-env rocks\n' | ||
1022 | 234 | 'BZR_EMAIL: joe@example.com\n' | ||
1023 | 235 | 'end\n') | ||
1024 | 236 | self.assertEqual('ok\nfake forking\n', response) | ||
1025 | 237 | self.assertEqual([(['rocks'], {'BZR_EMAIL': 'joe@example.com'})], | ||
1026 | 238 | self.service.fork_log) | ||
1027 | 239 | |||
1028 | 240 | def test_send_fork_env_slowly(self): | ||
1029 | 241 | response = self.send_message_to_service( | ||
1030 | 242 | 'fork-env rocks\n' | ||
1031 | 243 | 'BZR_EMAIL: joe@example.com\n' | ||
1032 | 244 | 'end\n', one_byte_at_a_time=True) | ||
1033 | 245 | self.assertEqual('ok\nfake forking\n', response) | ||
1034 | 246 | self.assertEqual([(['rocks'], {'BZR_EMAIL': 'joe@example.com'})], | ||
1035 | 247 | self.service.fork_log) | ||
1036 | 248 | |||
1037 | 249 | def test_send_incomplete_fork_env_timeout(self): | ||
1038 | 250 | # We should get a failure message if we can't quickly read the whole | ||
1039 | 251 | # content | ||
1040 | 252 | response = self.send_message_to_service( | ||
1041 | 253 | 'fork-env rocks\n' | ||
1042 | 254 | 'BZR_EMAIL: joe@example.com\n', | ||
1043 | 255 | one_byte_at_a_time=True) | ||
1044 | 256 | # Note that we *don't* send a final 'end\n' | ||
1045 | 257 | self.assertStartsWith(response, 'FAILURE\n') | ||
1046 | 258 | |||
1047 | 259 | def test_send_incomplete_request_timeout(self): | ||
1048 | 260 | # Requests end with '\n', send one without it | ||
1049 | 261 | response = self.send_message_to_service('hello', | ||
1050 | 262 | one_byte_at_a_time=True) | ||
1051 | 263 | self.assertStartsWith(response, 'FAILURE\n') | ||
1052 | 264 | |||
1053 | 265 | |||
1054 | 266 | class TestCaseWithSubprocess(tests.TestCaseWithTransport): | ||
1055 | 267 | """Override the bzr start_bzr_subprocess command. | ||
1056 | 268 | |||
1057 | 269 | The launchpad infrastructure requires a fair amount of configuration to get | ||
1058 | 270 | paths, etc correct. This provides a "start_bzr_subprocess" command that | ||
1059 | 271 | has all of those paths appropriately set, but otherwise functions the same | ||
1060 | 272 | as the bzrlib.tests.TestCase version. | ||
1061 | 273 | """ | ||
1062 | 274 | |||
1063 | 275 | def get_python_path(self): | ||
1064 | 276 | """Return the path to the Python interpreter.""" | ||
1065 | 277 | return '%s/bin/py' % config.root | ||
1066 | 278 | |||
1067 | 279 | def start_bzr_subprocess(self, process_args, env_changes=None, | ||
1068 | 280 | working_dir=None): | ||
1069 | 281 | """Start bzr in a subprocess for testing. | ||
1070 | 282 | |||
1071 | 283 | Copied and modified from `bzrlib.tests.TestCase.start_bzr_subprocess`. | ||
1072 | 284 | This version removes some of the skipping stuff, some of the | ||
1073 | 285 | irrelevant comments (e.g. about win32) and uses Launchpad's own | ||
1074 | 286 | mechanisms for getting the path to 'bzr'. | ||
1075 | 287 | |||
1076 | 288 | Comments starting with 'LAUNCHPAD' are comments about our | ||
1077 | 289 | modifications. | ||
1078 | 290 | """ | ||
1079 | 291 | if env_changes is None: | ||
1080 | 292 | env_changes = {} | ||
1081 | 293 | env_changes['BZR_PLUGIN_PATH'] = get_BZR_PLUGIN_PATH_for_subprocess() | ||
1082 | 294 | old_env = {} | ||
1083 | 295 | |||
1084 | 296 | def cleanup_environment(): | ||
1085 | 297 | for env_var, value in env_changes.iteritems(): | ||
1086 | 298 | old_env[env_var] = osutils.set_or_unset_env(env_var, value) | ||
1087 | 299 | |||
1088 | 300 | def restore_environment(): | ||
1089 | 301 | for env_var, value in old_env.iteritems(): | ||
1090 | 302 | osutils.set_or_unset_env(env_var, value) | ||
1091 | 303 | |||
1092 | 304 | cwd = None | ||
1093 | 305 | if working_dir is not None: | ||
1094 | 306 | cwd = osutils.getcwd() | ||
1095 | 307 | os.chdir(working_dir) | ||
1096 | 308 | |||
1097 | 309 | # LAUNCHPAD: Because of buildout, we need to get a custom Python | ||
1098 | 310 | # binary, not sys.executable. | ||
1099 | 311 | python_path = self.get_python_path() | ||
1100 | 312 | # LAUNCHPAD: We can't use self.get_bzr_path(), since it'll find | ||
1101 | 313 | # lib/bzrlib, rather than the path to sourcecode/bzr/bzr. | ||
1102 | 314 | bzr_path = get_bzr_path() | ||
1103 | 315 | try: | ||
1104 | 316 | cleanup_environment() | ||
1105 | 317 | command = [python_path, bzr_path] | ||
1106 | 318 | command.extend(process_args) | ||
1107 | 319 | process = self._popen( | ||
1108 | 320 | command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, | ||
1109 | 321 | stderr=subprocess.PIPE) | ||
1110 | 322 | finally: | ||
1111 | 323 | restore_environment() | ||
1112 | 324 | if cwd is not None: | ||
1113 | 325 | os.chdir(cwd) | ||
1114 | 326 | |||
1115 | 327 | return process | ||
1116 | 328 | |||
1117 | 329 | |||
1118 | 330 | class TestCaseWithLPForkingServiceSubprocess(TestCaseWithSubprocess): | ||
1119 | 331 | """Tests will get a separate process to communicate to. | ||
1120 | 332 | |||
1121 | 333 | The number of these tests should be small, because it is expensive to start | ||
1122 | 334 | and stop the daemon. | ||
1123 | 335 | |||
1124 | 336 | TODO: This should probably use testresources, or layers somehow... | ||
1125 | 337 | """ | ||
1126 | 338 | |||
1127 | 339 | def setUp(self): | ||
1128 | 340 | super(TestCaseWithLPForkingServiceSubprocess, self).setUp() | ||
1129 | 341 | (self.service_process, | ||
1130 | 342 | self.service_path) = self.start_service_subprocess() | ||
1131 | 343 | self.addCleanup(self.stop_service) | ||
1132 | 344 | |||
1133 | 345 | def start_conversation(self, message, one_byte_at_a_time=False): | ||
1134 | 346 | """Start talking to the service, and get the initial response.""" | ||
1135 | 347 | client_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | ||
1136 | 348 | trace.mutter('sending %r to socket %s' % (message, self.service_path)) | ||
1137 | 349 | client_sock.connect(self.service_path) | ||
1138 | 350 | if one_byte_at_a_time: | ||
1139 | 351 | for byte in message: | ||
1140 | 352 | client_sock.send(byte) | ||
1141 | 353 | else: | ||
1142 | 354 | client_sock.sendall(message) | ||
1143 | 355 | response = client_sock.recv(1024) | ||
1144 | 356 | trace.mutter('response: %r' % (response,)) | ||
1145 | 357 | if response.startswith("FAILURE"): | ||
1146 | 358 | raise RuntimeError('Failed to send message: %r' % (response,)) | ||
1147 | 359 | return response, client_sock | ||
1148 | 360 | |||
1149 | 361 | def send_message_to_service(self, message, one_byte_at_a_time=False): | ||
1150 | 362 | response, client_sock = self.start_conversation(message, | ||
1151 | 363 | one_byte_at_a_time=one_byte_at_a_time) | ||
1152 | 364 | client_sock.close() | ||
1153 | 365 | return response | ||
1154 | 366 | |||
1155 | 367 | def send_fork_request(self, command, env=None): | ||
1156 | 368 | if env is not None: | ||
1157 | 369 | request_lines = ['fork-env %s\n' % (command,)] | ||
1158 | 370 | for key, value in env.iteritems(): | ||
1159 | 371 | request_lines.append('%s: %s\n' % (key, value)) | ||
1160 | 372 | request_lines.append('end\n') | ||
1161 | 373 | request = ''.join(request_lines) | ||
1162 | 374 | else: | ||
1163 | 375 | request = 'fork %s\n' % (command,) | ||
1164 | 376 | response, sock = self.start_conversation(request) | ||
1165 | 377 | ok, pid, path, tail = response.split('\n') | ||
1166 | 378 | self.assertEqual('ok', ok) | ||
1167 | 379 | self.assertEqual('', tail) | ||
1168 | 380 | # Don't really care what it is, but should be an integer | ||
1169 | 381 | pid = int(pid) | ||
1170 | 382 | path = path.strip() | ||
1171 | 383 | self.assertContainsRe(path, '/lp-forking-service-child-') | ||
1172 | 384 | return path, pid, sock | ||
1173 | 385 | |||
1174 | 386 | def start_service_subprocess(self): | ||
1175 | 387 | # Make sure this plugin is exposed to the subprocess | ||
1176 | 388 | # SLOOWWW (~2 seconds, which is why we are doing the work anyway) | ||
1177 | 389 | fd, tempname = tempfile.mkstemp(prefix='tmp-log-bzr-lp-forking-') | ||
1178 | 390 | # I'm not 100% sure about when cleanup runs versus addDetail, but I | ||
1179 | 391 | # think this will work. | ||
1180 | 392 | self.addCleanup(os.remove, tempname) | ||
1181 | 393 | |||
1182 | 394 | def read_log(): | ||
1183 | 395 | f = os.fdopen(fd) | ||
1184 | 396 | f.seek(0) | ||
1185 | 397 | content = f.read() | ||
1186 | 398 | f.close() | ||
1187 | 399 | return [content] | ||
1188 | 400 | self.addDetail('server-log', content.Content( | ||
1189 | 401 | content.ContentType('text', 'plain', {"charset": "utf8"}), | ||
1190 | 402 | read_log)) | ||
1191 | 403 | service_fd, path = tempfile.mkstemp(prefix='tmp-lp-service-', | ||
1192 | 404 | suffix='.sock') | ||
1193 | 405 | os.close(service_fd) | ||
1194 | 406 | os.remove(path) # service wants create it as a socket | ||
1195 | 407 | env_changes = {'BZR_PLUGIN_PATH': lpserve.__path__[0], | ||
1196 | 408 | 'BZR_LOG': tempname} | ||
1197 | 409 | proc = self.start_bzr_subprocess( | ||
1198 | 410 | ['lp-service', '--path', path, '--no-preload', | ||
1199 | 411 | '--children-timeout=1'], | ||
1200 | 412 | env_changes=env_changes) | ||
1201 | 413 | trace.mutter('started lp-service subprocess') | ||
1202 | 414 | expected = 'Listening on socket: %s\n' % (path,) | ||
1203 | 415 | path_line = proc.stderr.readline() | ||
1204 | 416 | trace.mutter(path_line) | ||
1205 | 417 | self.assertEqual(expected, path_line) | ||
1206 | 418 | # The process won't delete it, so we do | ||
1207 | 419 | return proc, path | ||
1208 | 420 | |||
1209 | 421 | def stop_service(self): | ||
1210 | 422 | if self.service_process is None: | ||
1211 | 423 | # Already stopped | ||
1212 | 424 | return | ||
1213 | 425 | # First, try to stop the service gracefully, by sending a 'quit' | ||
1214 | 426 | # message | ||
1215 | 427 | try: | ||
1216 | 428 | response = self.send_message_to_service('quit\n') | ||
1217 | 429 | except socket.error, e: | ||
1218 | 430 | # Ignore a failure to connect, the service must be stopping/stopped | ||
1219 | 431 | # already | ||
1220 | 432 | response = None | ||
1221 | 433 | tend = time.time() + 10.0 | ||
1222 | 434 | while self.service_process.poll() is None: | ||
1223 | 435 | if time.time() > tend: | ||
1224 | 436 | self.finish_bzr_subprocess(process=self.service_process, | ||
1225 | 437 | send_signal=signal.SIGINT, retcode=3) | ||
1226 | 438 | self.fail('Failed to quit gracefully after 10.0 seconds') | ||
1227 | 439 | time.sleep(0.1) | ||
1228 | 440 | if response is not None: | ||
1229 | 441 | self.assertEqual('ok\nquit command requested... exiting\n', | ||
1230 | 442 | response) | ||
1231 | 443 | |||
1232 | 444 | def _get_fork_handles(self, path): | ||
1233 | 445 | trace.mutter('getting handles for: %s' % (path,)) | ||
1234 | 446 | stdin_path = os.path.join(path, 'stdin') | ||
1235 | 447 | stdout_path = os.path.join(path, 'stdout') | ||
1236 | 448 | stderr_path = os.path.join(path, 'stderr') | ||
1237 | 449 | # The ordering must match the ordering of the service or we get a | ||
1238 | 450 | # deadlock. | ||
1239 | 451 | child_stdin = open(stdin_path, 'wb') | ||
1240 | 452 | child_stdout = open(stdout_path, 'rb') | ||
1241 | 453 | child_stderr = open(stderr_path, 'rb') | ||
1242 | 454 | return child_stdin, child_stdout, child_stderr | ||
1243 | 455 | |||
1244 | 456 | def communicate_with_fork(self, path, stdin=None): | ||
1245 | 457 | child_stdin, child_stdout, child_stderr = self._get_fork_handles(path) | ||
1246 | 458 | if stdin is not None: | ||
1247 | 459 | child_stdin.write(stdin) | ||
1248 | 460 | child_stdin.close() | ||
1249 | 461 | stdout_content = child_stdout.read() | ||
1250 | 462 | stderr_content = child_stderr.read() | ||
1251 | 463 | return stdout_content, stderr_content | ||
1252 | 464 | |||
1253 | 465 | def assertReturnCode(self, expected_code, sock): | ||
1254 | 466 | """Assert that we get the expected return code as a message.""" | ||
1255 | 467 | response = sock.recv(1024) | ||
1256 | 468 | self.assertStartsWith(response, 'exited\n') | ||
1257 | 469 | code = int(response.split('\n', 1)[1]) | ||
1258 | 470 | self.assertEqual(expected_code, code) | ||
1259 | 471 | |||
1260 | 472 | def test_fork_lp_serve_hello(self): | ||
1261 | 473 | path, _, sock = self.send_fork_request('lp-serve --inet 2') | ||
1262 | 474 | stdout_content, stderr_content = self.communicate_with_fork(path, | ||
1263 | 475 | 'hello\n') | ||
1264 | 476 | self.assertEqual('ok\x012\n', stdout_content) | ||
1265 | 477 | self.assertEqual('', stderr_content) | ||
1266 | 478 | self.assertReturnCode(0, sock) | ||
1267 | 479 | |||
1268 | 480 | def test_fork_replay(self): | ||
1269 | 481 | path, _, sock = self.send_fork_request('launchpad-replay') | ||
1270 | 482 | stdout_content, stderr_content = self.communicate_with_fork(path, | ||
1271 | 483 | '1 hello\n2 goodbye\n1 maybe\n') | ||
1272 | 484 | self.assertEqualDiff('hello\nmaybe\n', stdout_content) | ||
1273 | 485 | self.assertEqualDiff('goodbye\n', stderr_content) | ||
1274 | 486 | self.assertReturnCode(0, sock) | ||
1275 | 487 | |||
1276 | 488 | def test_just_run_service(self): | ||
1277 | 489 | # Start and stop are defined in setUp() | ||
1278 | 490 | pass | ||
1279 | 491 | |||
1280 | 492 | def test_fork_multiple_children(self): | ||
1281 | 493 | paths = [] | ||
1282 | 494 | for idx in range(4): | ||
1283 | 495 | paths.append(self.send_fork_request('launchpad-replay')) | ||
1284 | 496 | # Do them out of order, as order shouldn't matter. | ||
1285 | 497 | for idx in [3, 2, 0, 1]: | ||
1286 | 498 | p, pid, sock = paths[idx] | ||
1287 | 499 | stdout_msg = 'hello %d\n' % (idx,) | ||
1288 | 500 | stderr_msg = 'goodbye %d\n' % (idx+1,) | ||
1289 | 501 | stdout, stderr = self.communicate_with_fork(p, | ||
1290 | 502 | '1 %s2 %s' % (stdout_msg, stderr_msg)) | ||
1291 | 503 | self.assertEqualDiff(stdout_msg, stdout) | ||
1292 | 504 | self.assertEqualDiff(stderr_msg, stderr) | ||
1293 | 505 | self.assertReturnCode(0, sock) | ||
1294 | 506 | |||
1295 | 507 | def test_fork_respects_env_vars(self): | ||
1296 | 508 | path, pid, sock = self.send_fork_request('whoami', | ||
1297 | 509 | env={'BZR_EMAIL': 'this_test@example.com'}) | ||
1298 | 510 | stdout_content, stderr_content = self.communicate_with_fork(path) | ||
1299 | 511 | self.assertEqual('', stderr_content) | ||
1300 | 512 | self.assertEqual('this_test@example.com\n', stdout_content) | ||
1301 | 513 | |||
1302 | 514 | def _check_exits_nicely(self, sig_id): | ||
1303 | 515 | path, _, sock = self.send_fork_request('rocks') | ||
1304 | 516 | self.assertEqual(None, self.service_process.poll()) | ||
1305 | 517 | # Now when we send SIGTERM, it should wait for the child to exit, | ||
1306 | 518 | # before it tries to exit itself. | ||
1307 | 519 | # In python2.6+ we could use self.service_process.terminate() | ||
1308 | 520 | os.kill(self.service_process.pid, sig_id) | ||
1309 | 521 | self.assertEqual(None, self.service_process.poll()) | ||
1310 | 522 | # Now talk to the child, so the service can close | ||
1311 | 523 | stdout_content, stderr_content = self.communicate_with_fork(path) | ||
1312 | 524 | self.assertEqual('It sure does!\n', stdout_content) | ||
1313 | 525 | self.assertEqual('', stderr_content) | ||
1314 | 526 | self.assertReturnCode(0, sock) | ||
1315 | 527 | # And the process should exit cleanly | ||
1316 | 528 | self.assertEqual(0, self.service_process.wait()) | ||
1317 | 529 | |||
1318 | 530 | def test_sigterm_exits_nicely(self): | ||
1319 | 531 | self._check_exits_nicely(signal.SIGTERM) | ||
1320 | 532 | |||
1321 | 533 | def test_sigint_exits_nicely(self): | ||
1322 | 534 | self._check_exits_nicely(signal.SIGINT) | ||
1323 | 0 | 535 | ||
1324 | === modified file 'configs/development/launchpad-lazr.conf' | |||
1325 | --- configs/development/launchpad-lazr.conf 2010-10-08 06:22:10 +0000 | |||
1326 | +++ configs/development/launchpad-lazr.conf 2010-10-19 22:56:30 +0000 | |||
1327 | @@ -76,6 +76,7 @@ | |||
1328 | 76 | lp_url_hosts: dev | 76 | lp_url_hosts: dev |
1329 | 77 | access_log: /var/tmp/bazaar.launchpad.dev/codehosting-access.log | 77 | access_log: /var/tmp/bazaar.launchpad.dev/codehosting-access.log |
1330 | 78 | blacklisted_hostnames: | 78 | blacklisted_hostnames: |
1331 | 79 | use_forking_daemon: True | ||
1332 | 79 | 80 | ||
1333 | 80 | [codeimport] | 81 | [codeimport] |
1334 | 81 | bazaar_branch_store: file:///tmp/bazaar-branches | 82 | bazaar_branch_store: file:///tmp/bazaar-branches |
1335 | 82 | 83 | ||
1336 | === modified file 'lib/canonical/config/schema-lazr.conf' | |||
1337 | --- lib/canonical/config/schema-lazr.conf 2010-10-15 17:04:51 +0000 | |||
1338 | +++ lib/canonical/config/schema-lazr.conf 2010-10-19 22:56:30 +0000 | |||
1339 | @@ -301,6 +301,18 @@ | |||
1340 | 301 | # datatype: string | 301 | # datatype: string |
1341 | 302 | logfile: - | 302 | logfile: - |
1342 | 303 | 303 | ||
1343 | 304 | # The location of the log file used by the LaunchpadForkingService | ||
1344 | 305 | # datatype: string | ||
1345 | 306 | forker_logfile: - | ||
1346 | 307 | |||
1347 | 308 | # Should we be using the forking daemon? Or should we be calling spawnProcess | ||
1348 | 309 | # instead? | ||
1349 | 310 | # datatype: boolean | ||
1350 | 311 | use_forking_daemon: False | ||
1351 | 312 | # What disk path will the daemon listen on | ||
1352 | 313 | # datatype: string | ||
1353 | 314 | forking_daemon_socket: /var/tmp/launchpad_forking_service.sock | ||
1354 | 315 | |||
1355 | 304 | # The prefix of the web URL for all public branches. This should end with a | 316 | # The prefix of the web URL for all public branches. This should end with a |
1356 | 305 | # slash. | 317 | # slash. |
1357 | 306 | # | 318 | # |
1358 | 307 | 319 | ||
1359 | === modified file 'lib/canonical/launchpad/scripts/runlaunchpad.py' | |||
1360 | --- lib/canonical/launchpad/scripts/runlaunchpad.py 2010-10-11 04:07:36 +0000 | |||
1361 | +++ lib/canonical/launchpad/scripts/runlaunchpad.py 2010-10-19 22:56:30 +0000 | |||
1362 | @@ -174,6 +174,52 @@ | |||
1363 | 174 | process.stdin.close() | 174 | process.stdin.close() |
1364 | 175 | 175 | ||
1365 | 176 | 176 | ||
1366 | 177 | class ForkingSessionService(Service): | ||
1367 | 178 | """A lp-forking-service for handling codehosting access.""" | ||
1368 | 179 | |||
1369 | 180 | # TODO: The "sftp" (aka codehosting) server depends fairly heavily on this | ||
1370 | 181 | # service. It would seem reasonable to make one always start if the | ||
1371 | 182 | # other one is started. Though this might be a way to "FeatureFlag" | ||
1372 | 183 | # whether this is active or not. | ||
1373 | 184 | @property | ||
1374 | 185 | def should_launch(self): | ||
1375 | 186 | return (config.codehosting.launch and | ||
1376 | 187 | config.codehosting.use_forking_daemon) | ||
1377 | 188 | |||
1378 | 189 | @property | ||
1379 | 190 | def logfile(self): | ||
1380 | 191 | """Return the log file to use. | ||
1381 | 192 | |||
1382 | 193 | Default to the value of the configuration key logfile. | ||
1383 | 194 | """ | ||
1384 | 195 | return config.codehosting.forker_logfile | ||
1385 | 196 | |||
1386 | 197 | def launch(self): | ||
1387 | 198 | # Following the logic in TacFile. Specifically, if you configure sftp | ||
1388 | 199 | # to not run (and thus bzr+ssh) then we don't want to run the forking | ||
1389 | 200 | # service. | ||
1390 | 201 | if not self.should_launch: | ||
1391 | 202 | return | ||
1392 | 203 | from lp.codehosting import get_bzr_path | ||
1393 | 204 | command = [config.root + '/bin/py', get_bzr_path(), | ||
1394 | 205 | 'launchpad-forking-service', | ||
1395 | 206 | '--path', config.codehosting.forking_daemon_socket, | ||
1396 | 207 | ] | ||
1397 | 208 | env = dict(os.environ) | ||
1398 | 209 | env['BZR_PLUGIN_PATH'] = config.root + '/bzrplugins' | ||
1399 | 210 | logfile = self.logfile | ||
1400 | 211 | if logfile == '-': | ||
1401 | 212 | # This process uses a different logging infrastructure from the | ||
1402 | 213 | # rest of the Launchpad code. As such, it cannot trivially use '-' | ||
1403 | 214 | # as the logfile. So we just ignore this setting. | ||
1404 | 215 | pass | ||
1405 | 216 | else: | ||
1406 | 217 | env['BZR_LOG'] = logfile | ||
1407 | 218 | process = subprocess.Popen(command, env=env, stdin=subprocess.PIPE) | ||
1408 | 219 | self.addCleanup(stop_process, process) | ||
1409 | 220 | process.stdin.close() | ||
1410 | 221 | |||
1411 | 222 | |||
1412 | 177 | def stop_process(process): | 223 | def stop_process(process): |
1413 | 178 | """kill process and BLOCK until process dies. | 224 | """kill process and BLOCK until process dies. |
1414 | 179 | 225 | ||
1415 | @@ -193,6 +239,7 @@ | |||
1416 | 193 | 'librarian': TacFile('librarian', 'daemons/librarian.tac', | 239 | 'librarian': TacFile('librarian', 'daemons/librarian.tac', |
1417 | 194 | 'librarian_server', prepare_for_librarian), | 240 | 'librarian_server', prepare_for_librarian), |
1418 | 195 | 'sftp': TacFile('sftp', 'daemons/sftp.tac', 'codehosting'), | 241 | 'sftp': TacFile('sftp', 'daemons/sftp.tac', 'codehosting'), |
1419 | 242 | 'forker': ForkingSessionService(), | ||
1420 | 196 | 'mailman': MailmanService(), | 243 | 'mailman': MailmanService(), |
1421 | 197 | 'codebrowse': CodebrowseService(), | 244 | 'codebrowse': CodebrowseService(), |
1422 | 198 | 'google-webservice': GoogleWebService(), | 245 | 'google-webservice': GoogleWebService(), |
1423 | 199 | 246 | ||
1424 | === modified file 'lib/lp/codehosting/sshserver/session.py' | |||
1425 | --- lib/lp/codehosting/sshserver/session.py 2010-08-20 20:31:18 +0000 | |||
1426 | +++ lib/lp/codehosting/sshserver/session.py 2010-10-19 22:56:30 +0000 | |||
1427 | @@ -9,11 +9,19 @@ | |||
1428 | 9 | ] | 9 | ] |
1429 | 10 | 10 | ||
1430 | 11 | import os | 11 | import os |
1431 | 12 | import signal | ||
1432 | 13 | import socket | ||
1433 | 12 | import urlparse | 14 | import urlparse |
1434 | 13 | 15 | ||
1436 | 14 | from twisted.internet.process import ProcessExitedAlready | 16 | from zope.event import notify |
1437 | 17 | from zope.interface import implements | ||
1438 | 18 | |||
1439 | 19 | from twisted.internet import ( | ||
1440 | 20 | error, | ||
1441 | 21 | interfaces, | ||
1442 | 22 | process, | ||
1443 | 23 | ) | ||
1444 | 15 | from twisted.python import log | 24 | from twisted.python import log |
1445 | 16 | from zope.event import notify | ||
1446 | 17 | 25 | ||
1447 | 18 | from canonical.config import config | 26 | from canonical.config import config |
1448 | 19 | from lp.codehosting import get_bzr_path | 27 | from lp.codehosting import get_bzr_path |
1449 | @@ -35,6 +43,238 @@ | |||
1450 | 35 | """Raised when a session is asked to execute a forbidden command.""" | 43 | """Raised when a session is asked to execute a forbidden command.""" |
1451 | 36 | 44 | ||
1452 | 37 | 45 | ||
1453 | 46 | class _WaitForExit(process.ProcessReader): | ||
1454 | 47 | """Wait on a socket for the exit status.""" | ||
1455 | 48 | |||
1456 | 49 | def __init__(self, reactor, proc, sock): | ||
1457 | 50 | super(_WaitForExit, self).__init__(reactor, proc, 'exit', | ||
1458 | 51 | sock.fileno()) | ||
1459 | 52 | self._sock = sock | ||
1460 | 53 | self.connected = 1 | ||
1461 | 54 | |||
1462 | 55 | def close(self): | ||
1463 | 56 | self._sock.close() | ||
1464 | 57 | |||
1465 | 58 | def dataReceived(self, data): | ||
1466 | 59 | # TODO: how do we handle getting only *some* of the content?, Maybe we | ||
1467 | 60 | # need to read more bytes first... | ||
1468 | 61 | |||
1469 | 62 | # This is the only thing we do differently from the standard | ||
1470 | 63 | # ProcessReader. When we get data on this socket, we need to treat it | ||
1471 | 64 | # as a return code, or a failure. | ||
1472 | 65 | if not data.startswith('exited'): | ||
1473 | 66 | # Bad data, we want to signal that we are closing the connection | ||
1474 | 67 | # TODO: How? | ||
1475 | 68 | self.proc.childConnectionLost(self.name, "invalid data") | ||
1476 | 69 | self.close() | ||
1477 | 70 | # I don't know what to put here if we get bogus data, but I *do* | ||
1478 | 71 | # want to say that the process is now considered dead to me | ||
1479 | 72 | log.err('Got invalid exit information: %r' % (data,)) | ||
1480 | 73 | exit_status = (255 << 8) | ||
1481 | 74 | else: | ||
1482 | 75 | exit_status = int(data.split('\n')[1]) | ||
1483 | 76 | self.proc.processEnded(exit_status) | ||
1484 | 77 | |||
1485 | 78 | |||
1486 | 79 | class ForkedProcessTransport(process.BaseProcess): | ||
1487 | 80 | """Wrap the forked process in a ProcessTransport so we can talk to it. | ||
1488 | 81 | |||
1489 | 82 | Note that instantiating the class creates the fork and sets it up in the | ||
1490 | 83 | reactor. | ||
1491 | 84 | """ | ||
1492 | 85 | |||
1493 | 86 | implements(interfaces.IProcessTransport) | ||
1494 | 87 | |||
1495 | 88 | # Design decisions | ||
1496 | 89 | # [Decision #a] | ||
1497 | 90 | # Inherit from process.BaseProcess | ||
1498 | 91 | # This seems slightly risky, as process.BaseProcess is actually | ||
1499 | 92 | # imported from twisted.internet._baseprocess.BaseProcess. The | ||
1500 | 93 | # real-world Process then actually inherits from process._BaseProcess | ||
1501 | 94 | # I've also had to copy a fair amount from the actual Process | ||
1502 | 95 | # command. | ||
1503 | 96 | # One option would be to inherit from process.Process, and just | ||
1504 | 97 | # override stuff like __init__ and reapProcess which I don't want to | ||
1505 | 98 | # do in the same way. (Is it ok not to call your Base classes | ||
1506 | 99 | # __init__ if you don't want to do that exact work?) | ||
1507 | 100 | def __init__(self, reactor, executable, args, environment, proto): | ||
1508 | 101 | process.BaseProcess.__init__(self, proto) | ||
1509 | 102 | # Map from standard file descriptor to the associated pipe | ||
1510 | 103 | self.pipes = {} | ||
1511 | 104 | pid, path, sock = self._spawn(executable, args, environment) | ||
1512 | 105 | self._fifo_path = path | ||
1513 | 106 | self.pid = pid | ||
1514 | 107 | self.process_sock = sock | ||
1515 | 108 | self._fifo_path = path | ||
1516 | 109 | self._connectSpawnToReactor(reactor) | ||
1517 | 110 | if self.proto is not None: | ||
1518 | 111 | self.proto.makeConnection(self) | ||
1519 | 112 | |||
1520 | 113 | def _sendMessageToService(self, message): | ||
1521 | 114 | """Send a message to the Forking service and get the response""" | ||
1522 | 115 | path = config.codehosting.forking_daemon_socket | ||
1523 | 116 | client_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | ||
1524 | 117 | log.msg('Connecting to Forking Service @ socket: %s for %r' | ||
1525 | 118 | % (path, message)) | ||
1526 | 119 | try: | ||
1527 | 120 | client_sock.connect(path) | ||
1528 | 121 | client_sock.sendall(message) | ||
1529 | 122 | # We define the requests to be no bigger than 1kB. (For now) | ||
1530 | 123 | response = client_sock.recv(1024) | ||
1531 | 124 | except socket.error, e: | ||
1532 | 125 | # TODO: What exceptions should be raised? | ||
1533 | 126 | # Raising the raw exception seems to kill the twisted reactor | ||
1534 | 127 | # Note that if the connection is refused, we *could* just | ||
1535 | 128 | # fall back on a regular 'spawnProcess' call. | ||
1536 | 129 | log.err('Connection failed: %s' % (e,)) | ||
1537 | 130 | raise | ||
1538 | 131 | if response.startswith("FAILURE"): | ||
1539 | 132 | raise RuntimeError('Failed to send message: %r' % (response,)) | ||
1540 | 133 | return response, client_sock | ||
1541 | 134 | |||
1542 | 135 | def _spawn(self, executable, args, environment): | ||
1543 | 136 | """Start the new process. | ||
1544 | 137 | |||
1545 | 138 | This talks to the ForkingSessionService and requests a new process be | ||
1546 | 139 | started. Similar to what Process.__init__/_fork would do. | ||
1547 | 140 | |||
1548 | 141 | :return: The pid, communication directory, and request socket. | ||
1549 | 142 | """ | ||
1550 | 143 | assert executable == 'bzr', executable # Maybe .endswith() | ||
1551 | 144 | assert args[0] == 'bzr', args[0] | ||
1552 | 145 | command_str = ' '.join(args[1:]) | ||
1553 | 146 | message = ['fork-env %s\n' % (' '.join(args[1:]),)] | ||
1554 | 147 | for key, value in environment.iteritems(): | ||
1555 | 148 | # XXX: Currently we only pass BZR_EMAIL, should we be passing | ||
1556 | 149 | # everything else? Note that many won't be handled properly, | ||
1557 | 150 | # since the process is already running. | ||
1558 | 151 | if key != 'BZR_EMAIL': | ||
1559 | 152 | continue | ||
1560 | 153 | message.append('%s: %s\n' % (key, value)) | ||
1561 | 154 | message.append('end\n') | ||
1562 | 155 | message = ''.join(message) | ||
1563 | 156 | response, sock = self._sendMessageToService(message) | ||
1564 | 157 | if response.startswith('FAILURE'): | ||
1565 | 158 | # TODO: Is there a better error to raise? | ||
1566 | 159 | raise RuntimeError("Failed while sending message to forking " | ||
1567 | 160 | "service. message: %r, failure: %r" | ||
1568 | 161 | % (message, response)) | ||
1569 | 162 | ok, pid, path, tail = response.split('\n') | ||
1570 | 163 | assert ok == 'ok' | ||
1571 | 164 | assert tail == '' | ||
1572 | 165 | pid = int(pid) | ||
1573 | 166 | log.msg('Forking returned pid: %d, path: %s' % (pid, path)) | ||
1574 | 167 | return pid, path, sock | ||
1575 | 168 | |||
1576 | 169 | def _connectSpawnToReactor(self, reactor): | ||
1577 | 170 | stdin_path = os.path.join(self._fifo_path, 'stdin') | ||
1578 | 171 | stdout_path = os.path.join(self._fifo_path, 'stdout') | ||
1579 | 172 | stderr_path = os.path.join(self._fifo_path, 'stderr') | ||
1580 | 173 | child_stdin_fd = os.open(stdin_path, os.O_WRONLY) | ||
1581 | 174 | self.pipes[0] = process.ProcessWriter(reactor, self, 0, | ||
1582 | 175 | child_stdin_fd) | ||
1583 | 176 | child_stdout_fd = os.open(stdout_path, os.O_RDONLY) | ||
1584 | 177 | # forceReadHack=True ? Used in process.py doesn't seem to be needed | ||
1585 | 178 | # here | ||
1586 | 179 | self.pipes[1] = process.ProcessReader(reactor, self, 1, | ||
1587 | 180 | child_stdout_fd) | ||
1588 | 181 | child_stderr_fd = os.open(stderr_path, os.O_RDONLY) | ||
1589 | 182 | self.pipes[2] = process.ProcessReader(reactor, self, 2, | ||
1590 | 183 | child_stderr_fd) | ||
1591 | 184 | # Note: _exiter forms a GC cycle, since it points to us, and we hold a | ||
1592 | 185 | # reference to it | ||
1593 | 186 | self._exiter = _WaitForExit(reactor, self, self.process_sock) | ||
1594 | 187 | self.pipes['exit'] = self._exiter | ||
1595 | 188 | |||
1596 | 189 | def _getReason(self, status): | ||
1597 | 190 | # Copied from twisted.internet.process._BaseProcess | ||
1598 | 191 | exitCode = sig = None | ||
1599 | 192 | if os.WIFEXITED(status): | ||
1600 | 193 | exitCode = os.WEXITSTATUS(status) | ||
1601 | 194 | else: | ||
1602 | 195 | sig = os.WTERMSIG(status) | ||
1603 | 196 | if exitCode or sig: | ||
1604 | 197 | return error.ProcessTerminated(exitCode, sig, status) | ||
1605 | 198 | return error.ProcessDone(status) | ||
1606 | 199 | |||
1607 | 200 | def signalProcess(self, signalID): | ||
1608 | 201 | """ | ||
1609 | 202 | Send the given signal C{signalID} to the process. It'll translate a | ||
1610 | 203 | few signals ('HUP', 'STOP', 'INT', 'KILL', 'TERM') from a string | ||
1611 | 204 | representation to its int value, otherwise it'll pass directly the | ||
1612 | 205 | value provided | ||
1613 | 206 | |||
1614 | 207 | @type signalID: C{str} or C{int} | ||
1615 | 208 | """ | ||
1616 | 209 | # Copied from twisted.internet.process._BaseProcess | ||
1617 | 210 | if signalID in ('HUP', 'STOP', 'INT', 'KILL', 'TERM'): | ||
1618 | 211 | signalID = getattr(signal, 'SIG%s' % (signalID,)) | ||
1619 | 212 | if self.pid is None: | ||
1620 | 213 | raise process.ProcessExitedAlready() | ||
1621 | 214 | os.kill(self.pid, signalID) | ||
1622 | 215 | |||
1623 | 216 | # Implemented because conch.ssh.session uses it, the Process implementation | ||
1624 | 217 | # ignores writes if channel '0' is not available | ||
1625 | 218 | def write(self, data): | ||
1626 | 219 | self.pipes[0].write(data) | ||
1627 | 220 | |||
1628 | 221 | def writeToChild(self, childFD, data): | ||
1629 | 222 | # Copied from twisted.internet.process.Process | ||
1630 | 223 | self.pipes[childFD].write(data) | ||
1631 | 224 | |||
1632 | 225 | def closeChildFD(self, childFD): | ||
1633 | 226 | if childFD in self.pipes: | ||
1634 | 227 | self.pipes[childFD].loseConnection() | ||
1635 | 228 | |||
1636 | 229 | def closeStdin(self): | ||
1637 | 230 | self.closeChildFD(0) | ||
1638 | 231 | |||
1639 | 232 | def closeStdout(self): | ||
1640 | 233 | self.closeChildFD(1) | ||
1641 | 234 | |||
1642 | 235 | def closeStderr(self): | ||
1643 | 236 | self.closeChildFD(2) | ||
1644 | 237 | |||
1645 | 238 | def loseConnection(self): | ||
1646 | 239 | self.closeStdin() | ||
1647 | 240 | self.closeStdout() | ||
1648 | 241 | self.closeStderr() | ||
1649 | 242 | |||
1650 | 243 | # Implemented because ProcessWriter/ProcessReader want to call it | ||
1651 | 244 | # Copied from twisted.internet.Process | ||
1652 | 245 | def childDataReceived(self, name, data): | ||
1653 | 246 | self.proto.childDataReceived(name, data) | ||
1654 | 247 | |||
1655 | 248 | # Implemented because ProcessWriter/ProcessReader want to call it | ||
1656 | 249 | # Copied from twisted.internet.Process | ||
1657 | 250 | def childConnectionLost(self, childFD, reason): | ||
1658 | 251 | close = getattr(self.pipes[childFD], 'close', None) | ||
1659 | 252 | if close is not None: | ||
1660 | 253 | close() | ||
1661 | 254 | else: | ||
1662 | 255 | os.close(self.pipes[childFD].fileno()) | ||
1663 | 256 | del self.pipes[childFD] | ||
1664 | 257 | try: | ||
1665 | 258 | self.proto.childConnectionLost(childFD) | ||
1666 | 259 | except: | ||
1667 | 260 | log.err() | ||
1668 | 261 | self.maybeCallProcessEnded() | ||
1669 | 262 | |||
1670 | 263 | # Implemented because of childConnectionLost | ||
1671 | 264 | # Adapted from twisted.internet.Process | ||
1672 | 265 | # Note: Process.maybeCallProcessEnded() tries to reapProcess() at this | ||
1673 | 266 | # point, but the daemon will be doing the reaping for us (we can't | ||
1674 | 267 | # because the process isn't a direct child.) | ||
1675 | 268 | def maybeCallProcessEnded(self): | ||
1676 | 269 | if self.pipes: | ||
1677 | 270 | # Not done if we still have open pipes | ||
1678 | 271 | return | ||
1679 | 272 | if not self.lostProcess: | ||
1680 | 273 | return | ||
1681 | 274 | process.BaseProcess.maybeCallProcessEnded(self) | ||
1682 | 275 | # pauseProducing, present in process.py, not a IProcessTransport interface | ||
1683 | 276 | |||
1684 | 277 | |||
1685 | 38 | class ExecOnlySession(DoNothingSession): | 278 | class ExecOnlySession(DoNothingSession): |
1686 | 39 | """Conch session that only allows executing commands.""" | 279 | """Conch session that only allows executing commands.""" |
1687 | 40 | 280 | ||
1688 | @@ -58,7 +298,7 @@ | |||
1689 | 58 | notify(BazaarSSHClosed(self.avatar)) | 298 | notify(BazaarSSHClosed(self.avatar)) |
1690 | 59 | try: | 299 | try: |
1691 | 60 | self._transport.signalProcess('HUP') | 300 | self._transport.signalProcess('HUP') |
1693 | 61 | except (OSError, ProcessExitedAlready): | 301 | except (OSError, process.ProcessExitedAlready): |
1694 | 62 | pass | 302 | pass |
1695 | 63 | self._transport.loseConnection() | 303 | self._transport.loseConnection() |
1696 | 64 | 304 | ||
1697 | @@ -81,8 +321,7 @@ | |||
1698 | 81 | except ForbiddenCommand, e: | 321 | except ForbiddenCommand, e: |
1699 | 82 | self.errorWithMessage(protocol, str(e) + '\r\n') | 322 | self.errorWithMessage(protocol, str(e) + '\r\n') |
1700 | 83 | return | 323 | return |
1703 | 84 | log.msg('Running: %r, %r, %r' | 324 | log.msg('Running: %r, %r' % (executable, arguments)) |
1702 | 85 | % (executable, arguments, self.environment)) | ||
1704 | 86 | if self._transport is not None: | 325 | if self._transport is not None: |
1705 | 87 | log.err( | 326 | log.err( |
1706 | 88 | "ERROR: %r already running a command on transport %r" | 327 | "ERROR: %r already running a command on transport %r" |
1707 | @@ -91,8 +330,12 @@ | |||
1708 | 91 | # violation. Apart from this line and its twin, this class knows | 330 | # violation. Apart from this line and its twin, this class knows |
1709 | 92 | # nothing about Bazaar. | 331 | # nothing about Bazaar. |
1710 | 93 | notify(BazaarSSHStarted(self.avatar)) | 332 | notify(BazaarSSHStarted(self.avatar)) |
1713 | 94 | self._transport = self.reactor.spawnProcess( | 333 | self._transport = self._spawn(protocol, executable, arguments, |
1714 | 95 | protocol, executable, arguments, env=self.environment) | 334 | env=self.environment) |
1715 | 335 | |||
1716 | 336 | def _spawn(self, protocol, executable, arguments, env): | ||
1717 | 337 | return self.reactor.spawnProcess(protocol, executable, arguments, | ||
1718 | 338 | env=env) | ||
1719 | 96 | 339 | ||
1720 | 97 | def getCommandToRun(self, command): | 340 | def getCommandToRun(self, command): |
1721 | 98 | """Return the command that will actually be run given `command`. | 341 | """Return the command that will actually be run given `command`. |
1722 | @@ -144,21 +387,65 @@ | |||
1723 | 144 | % {'user_id': self.avatar.user_id}) | 387 | % {'user_id': self.avatar.user_id}) |
1724 | 145 | 388 | ||
1725 | 146 | 389 | ||
1726 | 390 | class ForkingRestrictedExecOnlySession(RestrictedExecOnlySession): | ||
1727 | 391 | """Use the Forking Service instead of spawnProcess.""" | ||
1728 | 392 | |||
1729 | 393 | def _simplifyEnvironment(self, env): | ||
1730 | 394 | """Pull out the bits of the environment we want to pass along.""" | ||
1731 | 395 | env = {} | ||
1732 | 396 | for env_var in ['BZR_EMAIL']: | ||
1733 | 397 | if env_var in self.environment: | ||
1734 | 398 | env[env_var] = self.environment[env_var] | ||
1735 | 399 | return env | ||
1736 | 400 | |||
1737 | 401 | def getCommandToFork(self, executable, arguments, env): | ||
1738 | 402 | assert executable.endswith('/bin/py') | ||
1739 | 403 | assert arguments[0] == executable | ||
1740 | 404 | assert arguments[1].endswith('/bzr') | ||
1741 | 405 | executable = 'bzr' | ||
1742 | 406 | arguments = arguments[1:] | ||
1743 | 407 | arguments[0] = 'bzr' | ||
1744 | 408 | env = self._simplifyEnvironment(env) | ||
1745 | 409 | return executable, arguments, env | ||
1746 | 410 | |||
1747 | 411 | def _spawn(self, protocol, executable, arguments, env): | ||
1748 | 412 | # When spawning, adapt the idea of "bin/py .../bzr" to just using "bzr" | ||
1749 | 413 | # and the executable | ||
1750 | 414 | executable, arguments, env = self.getCommandToFork(executable, | ||
1751 | 415 | arguments, env) | ||
1752 | 416 | return ForkedProcessTransport(self.reactor, executable, | ||
1753 | 417 | arguments, env, protocol) | ||
1754 | 418 | |||
1755 | 419 | |||
1756 | 147 | def launch_smart_server(avatar): | 420 | def launch_smart_server(avatar): |
1757 | 148 | from twisted.internet import reactor | 421 | from twisted.internet import reactor |
1758 | 149 | 422 | ||
1762 | 150 | command = ( | 423 | python_command = "%(root)s/bin/py %(bzr)s" % { |
1763 | 151 | "%(root)s/bin/py %(bzr)s lp-serve --inet " | 424 | 'root': config.root, |
1764 | 152 | % {'root': config.root, 'bzr': get_bzr_path()}) | 425 | 'bzr': get_bzr_path(), |
1765 | 426 | } | ||
1766 | 427 | args = " lp-serve --inet %(user_id)s" | ||
1767 | 428 | command = python_command + args | ||
1768 | 429 | forking_command = "bzr" + args | ||
1769 | 153 | 430 | ||
1770 | 154 | environment = dict(os.environ) | 431 | environment = dict(os.environ) |
1771 | 155 | 432 | ||
1772 | 156 | # Extract the hostname from the supermirror root config. | 433 | # Extract the hostname from the supermirror root config. |
1773 | 157 | hostname = urlparse.urlparse(config.codehosting.supermirror_root)[1] | 434 | hostname = urlparse.urlparse(config.codehosting.supermirror_root)[1] |
1774 | 158 | environment['BZR_EMAIL'] = '%s@%s' % (avatar.username, hostname) | 435 | environment['BZR_EMAIL'] = '%s@%s' % (avatar.username, hostname) |
1776 | 159 | return RestrictedExecOnlySession( | 436 | klass = RestrictedExecOnlySession |
1777 | 437 | # TODO: Use a FeatureFlag to enable this in a more fine-grained approach. | ||
1778 | 438 | # If the forking daemon has been spawned, then we can use it if the | ||
1779 | 439 | # feature is set to true for the given user, etc. | ||
1780 | 440 | # A global config is a good first step, but does require restarting | ||
1781 | 441 | # the service to change the flag. 'config' doesn't support SIGHUP. | ||
1782 | 442 | # For now, restarting the service is necessary to enabled/disable the | ||
1783 | 443 | # forking daemon. | ||
1784 | 444 | if config.codehosting.use_forking_daemon: | ||
1785 | 445 | klass = ForkingRestrictedExecOnlySession | ||
1786 | 446 | return klass( | ||
1787 | 160 | avatar, | 447 | avatar, |
1788 | 161 | reactor, | 448 | reactor, |
1789 | 162 | 'bzr serve --inet --directory=/ --allow-writes', | 449 | 'bzr serve --inet --directory=/ --allow-writes', |
1791 | 163 | command + ' %(user_id)s', | 450 | command, |
1792 | 164 | environment=environment) | 451 | environment=environment) |
1793 | 165 | 452 | ||
1794 | === modified file 'lib/lp/codehosting/sshserver/tests/test_session.py' | |||
1795 | --- lib/lp/codehosting/sshserver/tests/test_session.py 2010-08-20 20:31:18 +0000 | |||
1796 | +++ lib/lp/codehosting/sshserver/tests/test_session.py 2010-10-19 22:56:30 +0000 | |||
1797 | @@ -5,6 +5,7 @@ | |||
1798 | 5 | 5 | ||
1799 | 6 | __metaclass__ = type | 6 | __metaclass__ = type |
1800 | 7 | 7 | ||
1801 | 8 | import socket | ||
1802 | 8 | import unittest | 9 | import unittest |
1803 | 9 | 10 | ||
1804 | 10 | from twisted.conch.interfaces import ISession | 11 | from twisted.conch.interfaces import ISession |
1805 | @@ -21,9 +22,12 @@ | |||
1806 | 21 | from lp.codehosting.sshserver.session import ( | 22 | from lp.codehosting.sshserver.session import ( |
1807 | 22 | ExecOnlySession, | 23 | ExecOnlySession, |
1808 | 23 | ForbiddenCommand, | 24 | ForbiddenCommand, |
1809 | 25 | ForkingRestrictedExecOnlySession, | ||
1810 | 24 | RestrictedExecOnlySession, | 26 | RestrictedExecOnlySession, |
1811 | 27 | _WaitForExit, | ||
1812 | 25 | ) | 28 | ) |
1813 | 26 | from lp.codehosting.tests.helpers import AvatarTestCase | 29 | from lp.codehosting.tests.helpers import AvatarTestCase |
1814 | 30 | from lp.testing import TestCase | ||
1815 | 27 | 31 | ||
1816 | 28 | 32 | ||
1817 | 29 | class MockReactor: | 33 | class MockReactor: |
1818 | @@ -40,6 +44,9 @@ | |||
1819 | 40 | usePTY, childFDs)) | 44 | usePTY, childFDs)) |
1820 | 41 | return MockProcessTransport(executable) | 45 | return MockProcessTransport(executable) |
1821 | 42 | 46 | ||
1822 | 47 | def addReader(self, reader): | ||
1823 | 48 | self.log.append(('addReader', reader)) | ||
1824 | 49 | |||
1825 | 43 | 50 | ||
1826 | 44 | class MockSSHSession: | 51 | class MockSSHSession: |
1827 | 45 | """Just enough of SSHSession to allow checking of reporting to stderr.""" | 52 | """Just enough of SSHSession to allow checking of reporting to stderr.""" |
1828 | @@ -60,6 +67,7 @@ | |||
1829 | 60 | self._executable = executable | 67 | self._executable = executable |
1830 | 61 | self.log = [] | 68 | self.log = [] |
1831 | 62 | self.session = MockSSHSession(self.log) | 69 | self.session = MockSSHSession(self.log) |
1832 | 70 | self.status = None | ||
1833 | 63 | 71 | ||
1834 | 64 | def closeStdin(self): | 72 | def closeStdin(self): |
1835 | 65 | self.log.append(('closeStdin',)) | 73 | self.log.append(('closeStdin',)) |
1836 | @@ -67,6 +75,9 @@ | |||
1837 | 67 | def loseConnection(self): | 75 | def loseConnection(self): |
1838 | 68 | self.log.append(('loseConnection',)) | 76 | self.log.append(('loseConnection',)) |
1839 | 69 | 77 | ||
1840 | 78 | def childConnectionLost(self, childFD, reason=None): | ||
1841 | 79 | self.log.append(('childConnectionLost', childFD, reason)) | ||
1842 | 80 | |||
1843 | 70 | def signalProcess(self, signal): | 81 | def signalProcess(self, signal): |
1844 | 71 | if self._executable == 'raise-os-error': | 82 | if self._executable == 'raise-os-error': |
1845 | 72 | raise OSError() | 83 | raise OSError() |
1846 | @@ -77,6 +88,39 @@ | |||
1847 | 77 | def write(self, data): | 88 | def write(self, data): |
1848 | 78 | self.log.append(('write', data)) | 89 | self.log.append(('write', data)) |
1849 | 79 | 90 | ||
1850 | 91 | def processEnded(self, status): | ||
1851 | 92 | self.log.append(('processEnded', status)) | ||
1852 | 93 | |||
1853 | 94 | |||
1854 | 95 | class Test_WaitForExit(TestCase): | ||
1855 | 96 | |||
1856 | 97 | def setUp(self): | ||
1857 | 98 | TestCase.setUp(self) | ||
1858 | 99 | self.reactor = MockReactor() | ||
1859 | 100 | self.proc = MockProcessTransport('executable') | ||
1860 | 101 | sock = socket.socket() | ||
1861 | 102 | self.exiter = _WaitForExit(self.reactor, self.proc, sock) | ||
1862 | 103 | |||
1863 | 104 | def test__init__starts_reading(self): | ||
1864 | 105 | self.assertEqual([('addReader', self.exiter)], self.reactor.log) | ||
1865 | 106 | |||
1866 | 107 | def test_dataReceived_ends_cleanly(self): | ||
1867 | 108 | self.exiter.dataReceived('exited\n0\n') | ||
1868 | 109 | self.assertEqual([('processEnded', 0)], self.proc.log) | ||
1869 | 110 | |||
1870 | 111 | def test_dataReceived_ends_with_errno(self): | ||
1871 | 112 | self.exiter.dataReceived('exited\n256\n') | ||
1872 | 113 | self.assertEqual([('processEnded', 256)], self.proc.log) | ||
1873 | 114 | |||
1874 | 115 | def test_dataReceived_bad_data(self): | ||
1875 | 116 | # Note: The dataReceived code calls 'log.err' which ends up getting | ||
1876 | 117 | # printed during the test run. How do I suppress that or even | ||
1877 | 118 | # better, check that it does so? | ||
1878 | 119 | # self.flushLoggedErrors() doesn't seem to do anything. | ||
1879 | 120 | self.exiter.dataReceived('bogus\n') | ||
1880 | 121 | self.assertEqual([('childConnectionLost', 'exit', 'invalid data'), | ||
1881 | 122 | ('processEnded', (255 << 8))], self.proc.log) | ||
1882 | 123 | |||
1883 | 80 | 124 | ||
1884 | 81 | class TestExecOnlySession(AvatarTestCase): | 125 | class TestExecOnlySession(AvatarTestCase): |
1885 | 82 | """Tests for ExecOnlySession. | 126 | """Tests for ExecOnlySession. |
1886 | @@ -340,6 +384,35 @@ | |||
1887 | 340 | self.assertRaises( | 384 | self.assertRaises( |
1888 | 341 | ForbiddenCommand, session.getCommandToRun, 'rm -rf /') | 385 | ForbiddenCommand, session.getCommandToRun, 'rm -rf /') |
1889 | 342 | 386 | ||
1890 | 387 | def test_avatarAdaptsToOnlyRestrictedSession(self): | ||
1891 | 388 | config.push('codehosting-no-forking', | ||
1892 | 389 | "[codehosting]\nuse_forking_daemon: False\n") | ||
1893 | 390 | self.addCleanup(config.pop, 'codehosting-no-forking') | ||
1894 | 391 | session = ISession(self.avatar) | ||
1895 | 392 | self.failIf(isinstance(session, ForkingRestrictedExecOnlySession), | ||
1896 | 393 | "ISession(avatar) shouldn't adapt to " | ||
1897 | 394 | " ForkingRestrictedExecOnlySession when forking is disabled. ") | ||
1898 | 395 | |||
1899 | 396 | def test_avatarAdaptsToForkingRestrictedExecOnlySession(self): | ||
1900 | 397 | config.push('codehosting-forking', | ||
1901 | 398 | "[codehosting]\nuse_forking_daemon: True\n") | ||
1902 | 399 | self.addCleanup(config.pop, 'codehosting-forking') | ||
1903 | 400 | session = ISession(self.avatar) | ||
1904 | 401 | self.failUnless( | ||
1905 | 402 | isinstance(session, ForkingRestrictedExecOnlySession), | ||
1906 | 403 | "ISession(avatar) doesn't adapt to " | ||
1907 | 404 | " ForkingRestrictedExecOnlySession. " | ||
1908 | 405 | "Got %r instead." % (session,)) | ||
1909 | 406 | executable, arguments = session.getCommandToRun( | ||
1910 | 407 | 'bzr serve --inet --directory=/ --allow-writes') | ||
1911 | 408 | executable, arguments, env = session.getCommandToFork( | ||
1912 | 409 | executable, arguments, session.environment) | ||
1913 | 410 | self.assertEqual('bzr', executable) | ||
1914 | 411 | self.assertEqual( | ||
1915 | 412 | ['bzr', 'lp-serve', | ||
1916 | 413 | '--inet', str(self.avatar.user_id)], | ||
1917 | 414 | list(arguments)) | ||
1918 | 415 | |||
1919 | 343 | 416 | ||
1920 | 344 | def test_suite(): | 417 | def test_suite(): |
1921 | 345 | return unittest.TestLoader().loadTestsFromName(__name__) | 418 | return unittest.TestLoader().loadTestsFromName(__name__) |
1922 | 346 | 419 | ||
1923 | === modified file 'lib/lp/codehosting/tests/test_acceptance.py' | |||
1924 | --- lib/lp/codehosting/tests/test_acceptance.py 2010-10-17 01:26:56 +0000 | |||
1925 | +++ lib/lp/codehosting/tests/test_acceptance.py 2010-10-19 22:56:30 +0000 | |||
1926 | @@ -8,6 +8,9 @@ | |||
1927 | 8 | import atexit | 8 | import atexit |
1928 | 9 | import os | 9 | import os |
1929 | 10 | import re | 10 | import re |
1930 | 11 | import signal | ||
1931 | 12 | import subprocess | ||
1932 | 13 | import sys | ||
1933 | 11 | import unittest | 14 | import unittest |
1934 | 12 | import xmlrpclib | 15 | import xmlrpclib |
1935 | 13 | 16 | ||
1936 | @@ -51,9 +54,58 @@ | |||
1937 | 51 | from lp.testing import TestCaseWithFactory | 54 | from lp.testing import TestCaseWithFactory |
1938 | 52 | 55 | ||
1939 | 53 | 56 | ||
1940 | 57 | class ForkingServerForTests(object): | ||
1941 | 58 | """Map starting/stopping a LPForkingService with setUp() and tearDown().""" | ||
1942 | 59 | |||
1943 | 60 | def __init__(self): | ||
1944 | 61 | self.process = None | ||
1945 | 62 | self.socket_path = None | ||
1946 | 63 | |||
1947 | 64 | def setUp(self): | ||
1948 | 65 | bzr_path = get_bzr_path() | ||
1949 | 66 | BZR_PLUGIN_PATH = get_BZR_PLUGIN_PATH_for_subprocess() | ||
1950 | 67 | env = os.environ.copy() | ||
1951 | 68 | env['BZR_PLUGIN_PATH'] = BZR_PLUGIN_PATH | ||
1952 | 69 | # TODO: We probably want to use a random disk path for | ||
1953 | 70 | # forking_daemon_socket, but we need to update config so that the | ||
1954 | 71 | # CodeHosting service can find it. | ||
1955 | 72 | # The main problem is that CodeHostingTac seems to start a tac | ||
1956 | 73 | # server directly from the disk configs, and doesn't use the | ||
1957 | 74 | # in-memory config. So we can't just override the memory | ||
1958 | 75 | # settings, we have to somehow pass it a new config-on-disk to | ||
1959 | 76 | # use. | ||
1960 | 77 | self.socket_path = config.codehosting.forking_daemon_socket | ||
1961 | 78 | process = subprocess.Popen( | ||
1962 | 79 | [sys.executable, bzr_path, 'launchpad-forking-service', | ||
1963 | 80 | '--path', self.socket_path, | ||
1964 | 81 | ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env) | ||
1965 | 82 | self.process = process | ||
1966 | 83 | # Wait for it to indicate it is running | ||
1967 | 84 | # The first line should be "Preloading" indicating it is ready | ||
1968 | 85 | preloading_line = process.stderr.readline() | ||
1969 | 86 | # The next line is the "Listening on socket" line | ||
1970 | 87 | socket_line = process.stderr.readline() | ||
1971 | 88 | # Now it is ready | ||
1972 | 89 | |||
1973 | 90 | def tearDown(self): | ||
1974 | 91 | # SIGTERM is the graceful exit request, potentially we could wait a bit | ||
1975 | 92 | # and send something stronger? | ||
1976 | 93 | if self.process is not None and self.process.poll() is None: | ||
1977 | 94 | os.kill(self.process.pid, signal.SIGTERM) | ||
1978 | 95 | self.process.wait() | ||
1979 | 96 | self.process = None | ||
1980 | 97 | # We want to make sure the socket path has been cleaned up, so that | ||
1981 | 98 | # future runs can work correctly | ||
1982 | 99 | if os.path.exists(self.socket_path): | ||
1983 | 100 | # Should there be a warning/error here? | ||
1984 | 101 | os.remove(self.socket_path) | ||
1985 | 102 | |||
1986 | 103 | |||
1987 | 104 | |||
1988 | 54 | class SSHServerLayer(ZopelessAppServerLayer): | 105 | class SSHServerLayer(ZopelessAppServerLayer): |
1989 | 55 | 106 | ||
1990 | 56 | _tac_handler = None | 107 | _tac_handler = None |
1991 | 108 | _forker_service = None | ||
1992 | 57 | 109 | ||
1993 | 58 | @classmethod | 110 | @classmethod |
1994 | 59 | def getTacHandler(cls): | 111 | def getTacHandler(cls): |
1995 | @@ -64,18 +116,27 @@ | |||
1996 | 64 | return cls._tac_handler | 116 | return cls._tac_handler |
1997 | 65 | 117 | ||
1998 | 66 | @classmethod | 118 | @classmethod |
1999 | 119 | def getForker(cls): | ||
2000 | 120 | if cls._forker_service is None: | ||
2001 | 121 | cls._forker_service = ForkingServerForTests() | ||
2002 | 122 | return cls._forker_service | ||
2003 | 123 | |||
2004 | 124 | @classmethod | ||
2005 | 67 | @profiled | 125 | @profiled |
2006 | 68 | def setUp(cls): | 126 | def setUp(cls): |
2007 | 69 | tac_handler = SSHServerLayer.getTacHandler() | 127 | tac_handler = SSHServerLayer.getTacHandler() |
2008 | 70 | tac_handler.setUp() | 128 | tac_handler.setUp() |
2009 | 71 | SSHServerLayer._reset() | 129 | SSHServerLayer._reset() |
2010 | 72 | atexit.register(tac_handler.tearDown) | 130 | atexit.register(tac_handler.tearDown) |
2011 | 131 | forker = SSHServerLayer.getForker() | ||
2012 | 132 | forker.setUp() | ||
2013 | 73 | 133 | ||
2014 | 74 | @classmethod | 134 | @classmethod |
2015 | 75 | @profiled | 135 | @profiled |
2016 | 76 | def tearDown(cls): | 136 | def tearDown(cls): |
2017 | 77 | SSHServerLayer._reset() | 137 | SSHServerLayer._reset() |
2018 | 78 | SSHServerLayer.getTacHandler().tearDown() | 138 | SSHServerLayer.getTacHandler().tearDown() |
2019 | 139 | SSHServerLayer.getForker().tearDown() | ||
2020 | 79 | 140 | ||
2021 | 80 | @classmethod | 141 | @classmethod |
2022 | 81 | @profiled | 142 | @profiled |
2023 | 82 | 143 | ||
2024 | === modified file 'lib/lp/codehosting/tests/test_lpserve.py' | |||
2025 | --- lib/lp/codehosting/tests/test_lpserve.py 2010-08-20 20:31:18 +0000 | |||
2026 | +++ lib/lp/codehosting/tests/test_lpserve.py 2010-10-19 22:56:30 +0000 | |||
2027 | @@ -1,32 +1,21 @@ | |||
2029 | 1 | # Copyright 2009 Canonical Ltd. This software is licensed under the | 1 | # Copyright 2009-2010 Canonical Ltd. This software is licensed under the |
2030 | 2 | # GNU Affero General Public License version 3 (see the file LICENSE). | 2 | # GNU Affero General Public License version 3 (see the file LICENSE). |
2031 | 3 | 3 | ||
2032 | 4 | """Tests for the lp-serve plugin.""" | 4 | """Tests for the lp-serve plugin.""" |
2033 | 5 | 5 | ||
2034 | 6 | __metaclass__ = type | 6 | __metaclass__ = type |
2035 | 7 | 7 | ||
2036 | 8 | import os | ||
2037 | 9 | import re | ||
2038 | 10 | from subprocess import PIPE | ||
2039 | 11 | import unittest | ||
2040 | 12 | |||
2041 | 13 | from bzrlib import ( | 8 | from bzrlib import ( |
2042 | 14 | errors, | 9 | errors, |
2043 | 15 | osutils, | ||
2044 | 16 | ) | 10 | ) |
2045 | 17 | from bzrlib.smart import medium | 11 | from bzrlib.smart import medium |
2046 | 18 | from bzrlib.tests import TestCaseWithTransport | ||
2047 | 19 | from bzrlib.transport import remote | 12 | from bzrlib.transport import remote |
2048 | 13 | from bzrlib.plugins.lpserve.test_lpserve import TestCaseWithSubprocess | ||
2049 | 20 | 14 | ||
2050 | 21 | from canonical.config import config | ||
2051 | 22 | from lp.codehosting import ( | ||
2052 | 23 | get_bzr_path, | ||
2053 | 24 | get_BZR_PLUGIN_PATH_for_subprocess, | ||
2054 | 25 | ) | ||
2055 | 26 | from lp.codehosting.bzrutils import make_error_utility | 15 | from lp.codehosting.bzrutils import make_error_utility |
2056 | 27 | 16 | ||
2057 | 28 | 17 | ||
2059 | 29 | class TestLaunchpadServe(TestCaseWithTransport): | 18 | class TestLaunchpadServe(TestCaseWithSubprocess): |
2060 | 30 | """Tests for the lp-serve plugin. | 19 | """Tests for the lp-serve plugin. |
2061 | 31 | 20 | ||
2062 | 32 | Most of the helper methods here are copied from bzrlib.tests and | 21 | Most of the helper methods here are copied from bzrlib.tests and |
2063 | @@ -38,59 +27,6 @@ | |||
2064 | 38 | """Assert that a server process finished cleanly.""" | 27 | """Assert that a server process finished cleanly.""" |
2065 | 39 | self.assertEqual((0, '', ''), tuple(result)) | 28 | self.assertEqual((0, '', ''), tuple(result)) |
2066 | 40 | 29 | ||
2067 | 41 | def get_python_path(self): | ||
2068 | 42 | """Return the path to the Python interpreter.""" | ||
2069 | 43 | return '%s/bin/py' % config.root | ||
2070 | 44 | |||
2071 | 45 | def start_bzr_subprocess(self, process_args, env_changes=None, | ||
2072 | 46 | working_dir=None): | ||
2073 | 47 | """Start bzr in a subprocess for testing. | ||
2074 | 48 | |||
2075 | 49 | Copied and modified from `bzrlib.tests.TestCase.start_bzr_subprocess`. | ||
2076 | 50 | This version removes some of the skipping stuff, some of the | ||
2077 | 51 | irrelevant comments (e.g. about win32) and uses Launchpad's own | ||
2078 | 52 | mechanisms for getting the path to 'bzr'. | ||
2079 | 53 | |||
2080 | 54 | Comments starting with 'LAUNCHPAD' are comments about our | ||
2081 | 55 | modifications. | ||
2082 | 56 | """ | ||
2083 | 57 | if env_changes is None: | ||
2084 | 58 | env_changes = {} | ||
2085 | 59 | env_changes['BZR_PLUGIN_PATH'] = get_BZR_PLUGIN_PATH_for_subprocess() | ||
2086 | 60 | old_env = {} | ||
2087 | 61 | |||
2088 | 62 | def cleanup_environment(): | ||
2089 | 63 | for env_var, value in env_changes.iteritems(): | ||
2090 | 64 | old_env[env_var] = osutils.set_or_unset_env(env_var, value) | ||
2091 | 65 | |||
2092 | 66 | def restore_environment(): | ||
2093 | 67 | for env_var, value in old_env.iteritems(): | ||
2094 | 68 | osutils.set_or_unset_env(env_var, value) | ||
2095 | 69 | |||
2096 | 70 | cwd = None | ||
2097 | 71 | if working_dir is not None: | ||
2098 | 72 | cwd = osutils.getcwd() | ||
2099 | 73 | os.chdir(working_dir) | ||
2100 | 74 | |||
2101 | 75 | # LAUNCHPAD: Because of buildout, we need to get a custom Python | ||
2102 | 76 | # binary, not sys.executable. | ||
2103 | 77 | python_path = self.get_python_path() | ||
2104 | 78 | # LAUNCHPAD: We can't use self.get_bzr_path(), since it'll find | ||
2105 | 79 | # lib/bzrlib, rather than the path to sourcecode/bzr/bzr. | ||
2106 | 80 | bzr_path = get_bzr_path() | ||
2107 | 81 | try: | ||
2108 | 82 | cleanup_environment() | ||
2109 | 83 | command = [python_path, bzr_path] | ||
2110 | 84 | command.extend(process_args) | ||
2111 | 85 | process = self._popen( | ||
2112 | 86 | command, stdin=PIPE, stdout=PIPE, stderr=PIPE) | ||
2113 | 87 | finally: | ||
2114 | 88 | restore_environment() | ||
2115 | 89 | if cwd is not None: | ||
2116 | 90 | os.chdir(cwd) | ||
2117 | 91 | |||
2118 | 92 | return process | ||
2119 | 93 | |||
2120 | 94 | def finish_lpserve_subprocess(self, process): | 30 | def finish_lpserve_subprocess(self, process): |
2121 | 95 | """Shut down the server process. | 31 | """Shut down the server process. |
2122 | 96 | 32 | ||
2123 | @@ -169,4 +105,10 @@ | |||
2124 | 169 | 105 | ||
2125 | 170 | 106 | ||
2126 | 171 | def test_suite(): | 107 | def test_suite(): |
2128 | 172 | return unittest.TestLoader().loadTestsFromName(__name__) | 108 | from bzrlib import tests |
2129 | 109 | from bzrlib.plugins import lpserve | ||
2130 | 110 | |||
2131 | 111 | loader = tests.TestLoader() | ||
2132 | 112 | suite = loader.loadTestsFromName(__name__) | ||
2133 | 113 | suite = lpserve.load_tests(suite, lpserve, loader) | ||
2134 | 114 | return suite |
Hi,
This basically looks great and works in local testing. I don't have time for a line-by-line review right now though :(
Cheers,
mwh