Merge lp:~jameinel/launchpad/lp-service into lp:launchpad/db-devel

Proposed by John A Meinel
Status: Merged
Approved by: Michael Hudson-Doyle
Approved revision: no longer in the source branch.
Merged at revision: 9914
Proposed branch: lp:~jameinel/launchpad/lp-service
Merge into: lp:launchpad/db-devel
Diff against target: 2134 lines (+1776/-86)
10 files modified
Makefile (+2/-2)
bzrplugins/lpserve/__init__.py (+737/-4)
bzrplugins/lpserve/test_lpserve.py (+534/-0)
configs/development/launchpad-lazr.conf (+1/-0)
lib/canonical/config/schema-lazr.conf (+12/-0)
lib/canonical/launchpad/scripts/runlaunchpad.py (+47/-0)
lib/lp/codehosting/sshserver/session.py (+299/-12)
lib/lp/codehosting/sshserver/tests/test_session.py (+73/-0)
lib/lp/codehosting/tests/test_acceptance.py (+61/-0)
lib/lp/codehosting/tests/test_lpserve.py (+10/-68)
To merge this branch: bzr merge lp:~jameinel/launchpad/lp-service
Reviewer Review Type Date Requested Status
Michael Hudson-Doyle Approve
Gavin Panella (community) Abstain
Jonathan Lange Pending
Review via email: mp+37531@code.launchpad.net

Commit message

Implement LaunchpadForkingService to speed up bzr+ssh connection times.

Description of the change

Retargetted from https://code.edge.launchpad.net/~jameinel/launchpad/lp-service/+merge/35877
because I accidentally started from db-devel in the beginning. This patch includes all suggestions from the original submission.

The goal of this submission is to improve the time for "bzr+ssh" to connect and be useful. The new method must be activated by setting:
  [codehosting]
  use_forking_server = True

It is set to be enabled in "development" mode, but the default is still disabled. I can't give a recommendation for the production config, because the branch is private.

This implements a new service (LaunchpadForkingService). It sits on a socket and waits for a request to 'fork <command>'. When received, it creates a stdin/stdout/stderr fifo on disk, and forks itself, running 'run_bzr_*(command)', rather than using 'exec()' which requires bootstrapping the python process.

The benefit is seen with: time echo hello | ssh localhost bzr serve --inet ...

Without this patch, it is 2.5s to serve on the loopback. With this patch, it is 0.25s.

I'm very happy to work with someone to smooth out the finer points of this submission. I tried to be explicit about places in the code that I had a decision to make, and why I chose the method I did.

I didn't use the FeatureFlag system, because setting it up via the config file was a lot more straightforward. (globally enabling/disabling the functionality).
As near as I can tell, there should be no impact of rolling this code out on production, if use_forking_daemon: False.

(The make run_codehosting will not actually spawn the daemon, and the twisted Conch code just gets an 'if ' check to see that it should use the old code path.)

I haven't run the full test suite, but I have:
 1) Run all of the locally relevant tests "bzr selftest -s bt.lp_serve' and 'bin/test
    lp.codehosting.sshserver
 2) Manually started the sftp service and run commands through the local instance. Both with
    and without the forking service enabled. (And I have confirmed that it isn't used when
    disabled, and is used when enabled, etc.)

To post a comment you must log in.
Revision history for this message
Michael Hudson-Doyle (mwhudson) wrote :

Hi,

This basically looks great and works in local testing. I don't have time for a line-by-line review right now though :(

Cheers,
mwh

Revision history for this message
Michael Hudson-Doyle (mwhudson) wrote :
Download full text (93.3 KiB)

Wow, what a branch. I've been quite picky in this, that's not because
I hate you <wink> but because it's subtle code and has potential to
cause issues, so I wanted to make sure all the comments make sense and
there's no redundant code, etc.

I worry slightly about what would happen if the forking service fell
over -- but clearly, we rely on the sftp server itself staying up, so
this probably isn't such a big deal. I guess we also need to make
sure the forking service is running in production.

All that said, I have no real problems with the code and look forward
to getting my branches onto and off of Launchpad that bit faster!

I'm marking this needs information because I want to see a response to my
comments. But I don't think any of them are deep.

> === modified file 'Makefile'
> --- Makefile 2010-09-07 18:15:01 +0000
> +++ Makefile 2010-10-06 00:19:31 +0000
> @@ -253,7 +253,7 @@
>
> run_codehosting: check_schema inplace stop
> $(RM) thread*.request
> - bin/run -r librarian,sftp,codebrowse -i $(LPCONFIG)
> + bin/run -r librarian,sftp,forker,codebrowse -i $(LPCONFIG)
>
>
> start_librarian: compile

You should add the forker service to the run_all target too.

> === added directory 'bzrplugins/lpserve'
> === renamed file 'bzrplugins/lpserve.py' => 'bzrplugins/lpserve/__init__.py'
> --- bzrplugins/lpserve.py 2010-04-19 06:35:23 +0000
> +++ bzrplugins/lpserve/__init__.py 2010-10-06 00:19:31 +0000
> @@ -8,15 +8,33 @@
>
> __metaclass__ = type
>
> -__all__ = ['cmd_launchpad_server']
> -
> -
> +__all__ = ['cmd_launchpad_server',
> + 'cmd_launchpad_forking_service',
> + ]

This shoudl be formatted like this:

__all__ = [
    'cmd_launchpad_server',
    'cmd_launchpad_forking_service',
    ]

> +
> +
> +import errno
> +import os
> import resource
> +import shlex
> +import shutil
> +import signal
> +import socket
> import sys
> +import tempfile
> +import threading
> +import time
>
> from bzrlib.commands import Command, register_command
> from bzrlib.option import Option
> -from bzrlib import lockdir, ui
> +from bzrlib import (
> + commands,
> + errors,
> + lockdir,
> + osutils,
> + trace,
> + ui,
> + )
>
> from bzrlib.smart import medium, server
> from bzrlib.transport import get_transport
> @@ -110,3 +128,724 @@
>
>
> register_command(cmd_launchpad_server)
> +
> +
> +class LPForkingService(object):
> + """A service that can be asked to start a new bzr subprocess via fork.
> +
> + The basic idea is that python startup is very expensive. For example, the
> + original 'lp-serve' command could take 2.5s just to start up, before any
> + actual actions could be performed.

Well, it's not really Python startup, its more the importing thats
slow. Maybe phrase this as "starting Python and importing Launchpad
is very...".

> + This class provides a service sitting on a socket, which can then be
> + requested to fork and run a given bzr command.
> +
> + Clients connect to the socket and make a simple request, which then
> + receives a response. The possible requests are:
> +
> + "hello\n": Trigger a heartbeat to report that the program is still
> + runnin...

review: Needs Information
Revision history for this message
Martin Pool (mbp) wrote :

notes from talking this over with spm:

 * at the moment you can see in the process listing which branch is
being accessed; it would be very nice to keep that; but at least we
should log the pid/user/branch
 * can run this on staging, then edge, then lpnet
 * propose changes into lp:lp-production-configs to gradually turn it on
 * might want an rt describing how to monitor the new daemon process,
which just needs to mention the socket location and the hello command;
they can write the script from there
 * francis should formally tell tom and charlie we'll be making this change

Revision history for this message
Robert Collins (lifeless) wrote :

On Wed, Oct 6, 2010 at 9:43 PM, Martin Pool <email address hidden> wrote:

>  * can run this on staging, then edge, then lpnet

There is no edge.

>  * francis should formally tell tom and charlie we'll be making this change

The rt describing should mention this; it will get enough airplay ;)

We make architectural changes -all- the time, there's no need to get
particularly cautious about this one.

Revision history for this message
Jonathan Lange (jml) wrote :

On Wed, Oct 6, 2010 at 9:52 AM, Robert Collins
<email address hidden> wrote:
> On Wed, Oct 6, 2010 at 9:43 PM, Martin Pool <email address hidden> wrote:
>
>>  * can run this on staging, then edge, then lpnet
>
> There is no edge.

Yeah there is.

$ ssh bazaar.edge.launchpad.net
No shells on this server.
Connection to bazaar.edge.launchpad.net closed.

jml

Revision history for this message
Robert Collins (lifeless) wrote :

It will be gone extremely soon. I want people to stop thinking of
'edge' as a way to test things.

Revision history for this message
Michael Hudson-Doyle (mwhudson) wrote :

On Wed, 06 Oct 2010 08:43:44 -0000, Martin Pool <email address hidden> wrote:
> notes from talking this over with spm:
>
> * at the moment you can see in the process listing which branch is
> being accessed; it would be very nice to keep that; but at least we
> should log the pid/user/branch

This isn't affected by this branch, although the process name ends up
being a bit unwieldy:

21266 pts/1 S+ 0:00 /usr/bin/python /home/mwh/canonical/checkouts/trunk/eggs/bzr-2.2.0-py2.6-linux-x86_64.egg/EGG-INFO/scripts/bzr launchpad-forking-service --path /var/tmp/launchpad_forking_service.sock BRANCH:~mark/upstart/branch

Maybe inserting a call to setproctitle in become_child would be useful,
especially as we have the dependency already -- one piece of information
we've lost is the user who has connected, although we can probably
figure that out from logs still.

> * can run this on staging, then edge, then lpnet
> * propose changes into lp:lp-production-configs to gradually turn it on
> * might want an rt describing how to monitor the new daemon process,
> which just needs to mention the socket location and the hello command;
> they can write the script from there
> * francis should formally tell tom and charlie we'll be making this change

I still vaguely wonder about controlling this with a feature flag. The
thing is, the codehosting systems don't connect to the database
currently, and that's actually kind of nice in some ways. We could add
an XML-RPC method to query a flag, and have the ssh daemon check that
every 30s or so -- I don't think we want an XML-RPC call delaying the
launch of the lp-servce process on every connection.

Cheers,
mwh

Revision history for this message
John A Meinel (jameinel) wrote :
Download full text (35.4 KiB)

For starters, thanks for the thorough review. It is really nice to get feedback on it, and you have certainly thought a lot about the code.

On 10/6/2010 12:02 AM, Michael Hudson-Doyle wrote:
> Review: Needs Information
> Wow, what a branch. I've been quite picky in this, that's not because
> I hate you <wink> but because it's subtle code and has potential to
> cause issues, so I wanted to make sure all the comments make sense and
> there's no redundant code, etc.
>
> I worry slightly about what would happen if the forking service fell
> over -- but clearly, we rely on the sftp server itself staying up, so
> this probably isn't such a big deal. I guess we also need to make
> sure the forking service is running in production.

We could trap failures to connect to the forking server and fall back to regular spawnProcess. The code is already a simple if/else block. However, I wouldn't really want silent degradation of connection. If we want it, it could probably be done with:

transport = None

if config.codehosting.use_forking_server:
  try:
    transport = ...
  except ???:
     pass # We failed, we'll try again

if transport is None:
  transport = reactor.spawnProcess(...)

>
> All that said, I have no real problems with the code and look forward
> to getting my branches onto and off of Launchpad that bit faster!
>
> I'm marking this needs information because I want to see a response to my
> comments. But I don't think any of them are deep.
>
>> === modified file 'Makefile'
>> --- Makefile 2010-09-07 18:15:01 +0000
>> +++ Makefile 2010-10-06 00:19:31 +0000
>> @@ -253,7 +253,7 @@
>>
>> run_codehosting: check_schema inplace stop
>> $(RM) thread*.request
>> - bin/run -r librarian,sftp,codebrowse -i $(LPCONFIG)
>> + bin/run -r librarian,sftp,forker,codebrowse -i $(LPCONFIG)
>>
>>
>> start_librarian: compile
>
> You should add the forker service to the run_all target too.

Done.

>
>> === added directory 'bzrplugins/lpserve'
>> === renamed file 'bzrplugins/lpserve.py' => 'bzrplugins/lpserve/__init__.py'
>> --- bzrplugins/lpserve.py 2010-04-19 06:35:23 +0000
>> +++ bzrplugins/lpserve/__init__.py 2010-10-06 00:19:31 +0000
>> @@ -8,15 +8,33 @@
>>
>> __metaclass__ = type
>>
>> -__all__ = ['cmd_launchpad_server']
>> -
>> -
>> +__all__ = ['cmd_launchpad_server',
>> + 'cmd_launchpad_forking_service',
>> + ]
>
> This shoudl be formatted like this:
>
> __all__ = [
> 'cmd_launchpad_server',
> 'cmd_launchpad_forking_service',
> ]

check

...

>> +class LPForkingService(object):
>> + """A service that can be asked to start a new bzr subprocess via fork.
>> +
>> + The basic idea is that python startup is very expensive. For example, the
>> + original 'lp-serve' command could take 2.5s just to start up, before any
>> + actual actions could be performed.
>
> Well, it's not really Python startup, its more the importing thats
> slow. Maybe phrase this as "starting Python and importing Launchpad
> is very...".
>
>> + This class provides a service sitting on a socket, which can then be
>> + requested to fork and run a given bzr command.
>> +
>> + Clients connect to the socket and make a simp...

Revision history for this message
Martin Pool (mbp) wrote :

On 7 October 2010 07:15, Michael Hudson-Doyle <email address hidden> wrote:
> On Wed, 06 Oct 2010 08:43:44 -0000, Martin Pool <email address hidden> wrote:
>> notes from talking this over with spm:
>>
>>  * at the moment you can see in the process listing which branch is
>> being accessed; it would be very nice to keep that; but at least we
>> should log the pid/user/branch
>
> This isn't affected by this branch, although the process name ends up
> being a bit unwieldy:
>
> 21266 pts/1    S+     0:00 /usr/bin/python /home/mwh/canonical/checkouts/trunk/eggs/bzr-2.2.0-py2.6-linux-x86_64.egg/EGG-INFO/scripts/bzr launchpad-forking-service --path /var/tmp/launchpad_forking_service.sock BRANCH:~mark/upstart/branch
>
> Maybe inserting a call to setproctitle in become_child would be useful,
> especially as we have the dependency already -- one piece of information
> we've lost is the user who has connected, although we can probably
> figure that out from logs still.

Without the call to setproctitle, we would be losing this thing spm
said he liked, which is to immediately show which user and branch it
corresponds to.

To my mind it's more important to log the username, branch and pid, so
that you can see who it was both while the process is running and
afterwards. It's also nice that jam logs the rusage. But putting
them into the argv is more immediately accessible.

>>  * can run this on staging, then edge, then lpnet
>>  * propose changes into lp:lp-production-configs to gradually turn it on
>>  * might want an rt describing how to monitor the new daemon process,
>> which just needs to mention the socket location and the hello command;
>> they can write the script from there
>>  * francis should formally tell tom and charlie we'll be making this change
>
> I still vaguely wonder about controlling this with a feature flag.  The
> thing is, the codehosting systems don't connect to the database
> currently, and that's actually kind of nice in some ways.  We could add
> an XML-RPC method to query a flag, and have the ssh daemon check that
> every 30s or so -- I don't think we want an XML-RPC call delaying the
> launch of the lp-servce process on every connection.

Actually I was thinking of just having an http URL, accessible only
within the DC, that just presents all the feature flag rules in a
machine readable form: maybe json by default, perhaps others. There's
no need for xmlrpc when we just want to read, and avoiding startup
dependencies is good. Then we can have a FeatureRuleSource that gets
and parses this rather than talking to storm, and it will be fast to
start up for this type of thing; probably fast enough to run on every
request if we want to. bug 656031.

--
Martin

Revision history for this message
Michael Hudson-Doyle (mwhudson) wrote :
Download full text (3.2 KiB)

On Wed, 06 Oct 2010 22:34:40 -0000, Martin Pool <email address hidden> wrote:
> On 7 October 2010 07:15, Michael Hudson-Doyle <email address hidden> wrote:
> > On Wed, 06 Oct 2010 08:43:44 -0000, Martin Pool <email address hidden> wrote:
> >> notes from talking this over with spm:
> >>
> >>  * at the moment you can see in the process listing which branch is
> >> being accessed; it would be very nice to keep that; but at least we
> >> should log the pid/user/branch
> >
> > This isn't affected by this branch, although the process name ends up
> > being a bit unwieldy:
> >
> > 21266 pts/1    S+     0:00 /usr/bin/python /home/mwh/canonical/checkouts/trunk/eggs/bzr-2.2.0-py2.6-linux-x86_64.egg/EGG-INFO/scripts/bzr launchpad-forking-service --path /var/tmp/launchpad_forking_service.sock BRANCH:~mark/upstart/branch
> >
> > Maybe inserting a call to setproctitle in become_child would be useful,
> > especially as we have the dependency already -- one piece of information
> > we've lost is the user who has connected, although we can probably
> > figure that out from logs still.
>
> Without the call to setproctitle, we would be losing this thing spm
> said he liked, which is to immediately show which user and branch it
> corresponds to.

Uh? No, that's the bit we retain. The ps output I showed above was
from running with the branch being reviewed.

> To my mind it's more important to log the username, branch and pid, so
> that you can see who it was both while the process is running and
> afterwards. It's also nice that jam logs the rusage. But putting
> them into the argv is more immediately accessible.

I'm not sure if you're recommending this branch be changed before
landing here.

> >>  * can run this on staging, then edge, then lpnet
> >>  * propose changes into lp:lp-production-configs to gradually turn it on
> >>  * might want an rt describing how to monitor the new daemon process,
> >> which just needs to mention the socket location and the hello command;
> >> they can write the script from there
> >>  * francis should formally tell tom and charlie we'll be making this change
> >
> > I still vaguely wonder about controlling this with a feature flag.  The
> > thing is, the codehosting systems don't connect to the database
> > currently, and that's actually kind of nice in some ways.  We could add
> > an XML-RPC method to query a flag, and have the ssh daemon check that
> > every 30s or so -- I don't think we want an XML-RPC call delaying the
> > launch of the lp-servce process on every connection.
>
> Actually I was thinking of just having an http URL, accessible only
> within the DC, that just presents all the feature flag rules in a
> machine readable form: maybe json by default, perhaps others. There's
> no need for xmlrpc when we just want to read, and avoiding startup
> dependencies is good. Then we can have a FeatureRuleSource that gets
> and parses this rather than talking to storm, and it will be fast to
> start up for this type of thing; probably fast enough to run on every
> request if we want to. bug 656031.

That would be interesting. The sftp server being a twisted app, access
to this URL should really be nonblo...

Read more...

Revision history for this message
John A Meinel (jameinel) wrote :

I do log the username and pid, I don't (at that time) have the branch, because we haven't opened it yet. If we changed "bzrlib.trace.mutter()" to always include the pid, then I think we'd have everything you were asking for.

Revision history for this message
Martin Pool (mbp) wrote :

On 7 October 2010 10:09, Michael Hudson-Doyle <email address hidden> wrote:
>> > 21266 pts/1    S+     0:00 /usr/bin/python /home/mwh/canonical/checkouts/trunk/eggs/bzr-2.2.0-py2.6-linux-x86_64.egg/EGG-INFO/scripts/bzr launchpad-forking-service --path /var/tmp/launchpad_forking_service.sock BRANCH:~mark/upstart/branch
>> >
>> > Maybe inserting a call to setproctitle in become_child would be useful,
>> > especially as we have the dependency already -- one piece of information
>> > we've lost is the user who has connected, although we can probably
>> > figure that out from logs still.
>>
>> Without the call to setproctitle, we would be losing this thing spm
>> said he liked, which is to immediately show which user and branch it
>> corresponds to.
>
> Uh?  No, that's the bit we retain.  The ps output I showed above was
> from running with the branch being reviewed.

The username is not shown. I don't know if it's a big deal; this
branch is old enough already.

>
>> To my mind it's more important to log the username, branch and pid, so
>> that you can see who it was both while the process is running and
>> afterwards.  It's also nice that jam logs the rusage.  But putting
>> them into the argv is more immediately accessible.
>
> I'm not sure if you're recommending this branch be changed before
> landing here.

what john said.

> That would be interesting.  The sftp server being a twisted app, access
> to this URL should really be nonblocking which always adds a wrinkle.
> Maybe that would be fine though, or maybe we can just cheat.

That's actually kind of a feature...

--
Martin

Revision history for this message
Michael Hudson-Doyle (mwhudson) wrote :
Download full text (20.4 KiB)

On Wed, 06 Oct 2010 20:39:01 -0000, John A Meinel <email address hidden> wrote:
> For starters, thanks for the thorough review. It is really nice to get feedback on it, and you have certainly thought a lot about the code.
>
> On 10/6/2010 12:02 AM, Michael Hudson-Doyle wrote:
> > Review: Needs Information
> > Wow, what a branch. I've been quite picky in this, that's not because
> > I hate you <wink> but because it's subtle code and has potential to
> > cause issues, so I wanted to make sure all the comments make sense and
> > there's no redundant code, etc.
> >
> > I worry slightly about what would happen if the forking service fell
> > over -- but clearly, we rely on the sftp server itself staying up, so
> > this probably isn't such a big deal. I guess we also need to make
> > sure the forking service is running in production.
>
>
> We could trap failures to connect to the forking server and fall back
> to regular spawnProcess. The code is already a simple if/else
> block. However, I wouldn't really want silent degradation of
> connection.

Yeah, I don't really like that idea either.

> >> + "fork <command>\n": Request a new subprocess to be started.
> >> + <command> is the bzr command to be run, such as "rocks" or
> >> + "lp-serve --inet 12".
> >> + The immediate response will be the path-on-disk to a directory full
> >> + of named pipes (fifos) that will be the stdout/stderr/stdin of the
> >> + new process.
> >
> > This doesn't quite make it clear what the names of the files will be.
> > The obvious guess is correct, but I'd rather be certain.
> >
> >> + If a client holds the socket open, when the child process exits,
> >> + the exit status (as given by 'wait()') will be written to the
> >> + socket.
> >
> > This appears to be out of date -- there's also a fork-env command now.
> > Is the fork command still useful?
>
> It has been useful in testing, it isn't used in the 'production' code.

If the fork command is still implemented, I think it should still be
documented...

> ...
>
>
> >
> >> + DEFAULT_PATH = '/var/run/launchpad_forking_service.sock'
> >
> > I'm not sure of the value of providing a default here, as it seems
> > that its always in practice going to be overridden by the config. If
> > it makes testing easier, it's worth it I guess.
>
> Well, we override it for testing as well. I wanted to support "bzr
> lp-forking-service" as something you could run. Also, that seemed to
> be the 'best' place to put it, which gives hints as to where you would
> want to put it.

OK, let's leave it.

> >> + self._children_spawned = 0
> >> +
> >> + def _create_master_socket(self):
> >> + self._server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
> >> + self._server_socket.bind(self.master_socket_path)
> >> + if self._perms is not None:
> >> + os.chmod(self.master_socket_path, self._perms)
> >
> > The pedant in me thinks this is a bit racy. But I don't think it's
> > important (in fact, I wonder if the permission stuff is necessary
> > really).
>
> I'm sure it isn't necessary ...

Revision history for this message
Robert Collins (lifeless) wrote :

On feature flags... I'd suggest an xmlrpc/API for evaluating flags.

evaluating flags requires DB access for group membership of a user and
potentially other things like time limited trials, access to opt-in
features and so forth.

The exact API would need to take a user, possibly feature names, and
return a list of the rules whose scopes need to be locally resolved.

Whether this is json or not is immaterial to me - at least for this
patch, the feature evaluating code would load once.

It would be very interesting to do this though, because we could phase
this in per-user.

-Rob

Revision history for this message
Martin Pool (mbp) wrote :

rt 41791 for adding the monitoring service <https://rt.admin.canonical.com/Ticket/Display.html?id=41791>

Revision history for this message
Gavin Panella (allenap) wrote :

Looks like this is taken care of, so abstaining on behalf of the Launchpad code reviewers.

review: Approve
Revision history for this message
Gavin Panella (allenap) :
review: Abstain
Revision history for this message
Michael Hudson-Doyle (mwhudson) wrote :

So I would like to see the code that communicates with the service more twisted-ized, but until it turns out to be an actual problem it's not worth the bother. Let's land this.

review: Approve
Revision history for this message
Michael Hudson-Doyle (mwhudson) wrote :

I think your latest changes are basically ok. Some of it feels a bit cargo culted though -- I don't see the reason for the lazy getForker method, just stash the forker on the instance normally? and the atexit calls aren't needed imho. If you inherited the forker helper class from fixture, you'd be able to use addCleanup, which would probably be a bit cleaner. I'm not sure the big TODO comment about paths is warranted -- it's all true, but much of Launchpad suffers this problem. If you can de-cargo-cult a bit, I'll land this asap.

review: Approve
Revision history for this message
Robert Collins (lifeless) wrote :

lets just land and iterate.

-Rob

Revision history for this message
Michael Hudson-Doyle (mwhudson) wrote :

On Mon, 18 Oct 2010 22:22:39 -0000, Robert Collins <email address hidden> wrote:
> lets just land and iterate.

Yeah fair enough. Throwing it at ec2 now.

Cheers,
mwh

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'Makefile'
2--- Makefile 2010-10-18 21:53:28 +0000
3+++ Makefile 2010-10-19 22:56:30 +0000
4@@ -239,7 +239,7 @@
5
6 run_all: check_schema inplace stop
7 $(RM) thread*.request
8- bin/run -r librarian,sftp,mailman,codebrowse,google-webservice,memcached \
9+ bin/run -r librarian,sftp,forker,mailman,codebrowse,google-webservice,memcached \
10 -i $(LPCONFIG)
11
12 run_codebrowse: build
13@@ -253,7 +253,7 @@
14
15 run_codehosting: check_schema inplace stop
16 $(RM) thread*.request
17- bin/run -r librarian,sftp,codebrowse -i $(LPCONFIG)
18+ bin/run -r librarian,sftp,forker,codebrowse -i $(LPCONFIG)
19
20 start_librarian: compile
21 bin/start_librarian
22
23=== added directory 'bzrplugins/lpserve'
24=== renamed file 'bzrplugins/lpserve.py' => 'bzrplugins/lpserve/__init__.py'
25--- bzrplugins/lpserve.py 2010-04-19 06:35:23 +0000
26+++ bzrplugins/lpserve/__init__.py 2010-10-19 22:56:30 +0000
27@@ -8,15 +8,34 @@
28
29 __metaclass__ = type
30
31-__all__ = ['cmd_launchpad_server']
32-
33-
34+__all__ = [
35+ 'cmd_launchpad_server',
36+ 'cmd_launchpad_forking_service',
37+ ]
38+
39+
40+import errno
41+import logging
42+import os
43 import resource
44+import shlex
45+import shutil
46+import signal
47+import socket
48 import sys
49+import tempfile
50+import threading
51+import time
52
53 from bzrlib.commands import Command, register_command
54 from bzrlib.option import Option
55-from bzrlib import lockdir, ui
56+from bzrlib import (
57+ commands,
58+ lockdir,
59+ osutils,
60+ trace,
61+ ui,
62+ )
63
64 from bzrlib.smart import medium, server
65 from bzrlib.transport import get_transport
66@@ -110,3 +129,717 @@
67
68
69 register_command(cmd_launchpad_server)
70+
71+
72+class LPForkingService(object):
73+ """A service that can be asked to start a new bzr subprocess via fork.
74+
75+ The basic idea is that bootstrapping time is long. Most of this is time
76+ spent during import of all needed libraries (lp.*). For example, the
77+ original 'lp-serve' command could take 2.5s just to start up, before any
78+ actual actions could be performed.
79+
80+ This class provides a service sitting on a socket, which can then be
81+ requested to fork and run a given bzr command.
82+
83+ Clients connect to the socket and make a single request, which then
84+ receives a response. The possible requests are:
85+
86+ "hello\n": Trigger a heartbeat to report that the program is still
87+ running, and write status information to the log file.
88+ "quit\n": Stop the service, but do so 'nicely', waiting for children
89+ to exit, etc. Once this is received the service will stop
90+ taking new requests on the port.
91+ "fork-env <command>\n<env>\nend\n": Request a new subprocess to be
92+ started. <command> is the bzr command to be run, such as "rocks"
93+ or "lp-serve --inet 12".
94+ The immediate response will be the path-on-disk to a directory full
95+ of named pipes (fifos) that will be the stdout/stderr/stdin (named
96+ accordingly) of the new process.
97+ If a client holds the socket open, when the child process exits,
98+ the exit status (as given by 'wait()') will be written to the
99+ socket.
100+
101+ Note that one of the key bits is that the client will not be
102+ started with exec*, we just call 'commands.run_bzr*()' directly.
103+ This way, any modules that are already loaded will not need to be
104+ loaded again. However, care must be taken with any global-state
105+ that should be reset.
106+
107+ fork-env allows you to supply environment variables such as
108+ "BZR_EMAIL: joe@foo.com" which will be set in os.environ before the
109+ command is run.
110+ """
111+
112+ # Design decisions. These are bits where we could have chosen a different
113+ # method/implementation and weren't sure what would be best. Documenting
114+ # the current decision, and the alternatives.
115+ #
116+ # [Decision #1]
117+ # Serve on a named AF_UNIX socket.
118+ # 1) It doesn't make sense to serve to arbitrary hosts, we only want
119+ # the local host to make requests. (Since the client needs to
120+ # access the named fifos on the current filesystem.)
121+ # 2) You can set security parameters on a filesystem path (g+rw,
122+ # a-rw).
123+ # [Decision #2]
124+ # SIGCHLD
125+ # We want to quickly detect that children have exited so that we can
126+ # inform the client process quickly. At the moment, we register a
127+ # SIGCHLD handler that doesn't do anything. However, it means that
128+ # when we get the signal, if we are currently blocked in something
129+ # like '.accept()', we will jump out temporarily. At that point the
130+ # main loop will check if any children have exited. We could have
131+ # done this work as part of the signal handler, but that felt 'racy'
132+ # doing any serious work in a signal handler.
133+ # If we just used socket.timeout as the indicator to go poll for
134+ # children exiting, it slows the disconnect by as much as the full
135+ # timeout. (So a timeout of 1.0s will cause the process to hang by
136+ # that long until it determines that a child has exited, and can
137+ # close the connection.)
138+ # The current flow means that we'll notice exited children whenever
139+ # we finish the current work.
140+ # [Decision #3]
141+ # Child vs Parent actions.
142+ # There are several actions that are done when we get a new request.
143+ # We have to create the fifos on disk, fork a new child, connect the
144+ # child to those handles, and inform the client of the new path (not
145+ # necessarily in that order.) It makes sense to wait to send the path
146+ # message until after the fifos have been created. That way the
147+ # client can just try to open them immediately, and the
148+ # client-and-child will be synchronized by the open() calls.
149+ # However, should the client be the one doing the mkfifo, should the
150+ # server? Who should be sending the message? Should we fork after the
151+ # mkfifo or before.
152+ # The current thoughts:
153+ # 1) Try to do work in the child when possible. This should allow
154+ # for 'scaling' because the server is single-threaded.
155+ # 2) We create the directory itself in the server, because that
156+ # allows the server to monitor whether the client failed to
157+ # clean up after itself or not.
158+ # 3) Otherwise we create the fifos in the client, and then send
159+ # the message back.
160+ # [Decision #4]
161+ # Exit information
162+ # Inform the client that the child has exited on the socket they used
163+ # to request the fork.
164+ # 1) Arguably they could see that stdout and stderr have been closed,
165+ # and thus stop reading. In testing, I wrote a client which uses
166+ # select.poll() over stdin/stdout/stderr and used that to ferry
167+ # the content to the appropriate local handle. However for the
168+ # FIFOs, when the remote end closed, I wouldn't see any
169+ # corresponding information on the local end. There obviously
170+ # wasn't any data to be read, so they wouldn't show up as
171+ # 'readable' (for me to try to read, and get 0 bytes, indicating
172+ # it was closed). I also wasn't seeing POLLHUP, which seemed to be
173+ # the correct indicator. As such, we decided to inform the client
174+ # on the socket that they originally made the fork request, rather
175+ # than just closing the socket immediately.
176+ # 2) We could have had the forking server close the socket, and only
177+ # the child hold the socket open. When the child exits, then the
178+ # OS naturally closes the socket.
179+ # If we want the returncode, then we should put that as bytes on
180+ # the socket before we exit. Having the child do the work means
181+ # that in error conditions, it could easily die before being able
182+ # to write anything (think SEGFAULT, etc). The forking server is
183+ # already 'wait'() ing on its children. So that we don't get
184+ # zombies, and with wait3() we can get the rusage (user time,
185+ # memory consumption, etc.)
186+ # As such, it seems reasonable that the server can then also
187+ # report back when a child is seen as exiting.
188+ # [Decision #5]
189+ # cleanup once connected
190+ # The child process blocks during 'open()' waiting for the client to
191+ # connect to its fifos. Once the client has connected, the child then
192+ # deletes the temporary directory and the fifos from disk. This means
193+ # that there isn't much left for diagnosis, but it also means that
194+ # the client won't leave garbage around if it crashes, etc.
195+ # Note that the forking service itself still monitors the paths
196+ # created, and will delete garbage if it sees that a child failed to
197+ # do so.
198+ # [Decision #6]
199+ # os._exit(retcode) in the child
200+ # Calling sys.exit(retcode) raises an exception, which then bubbles
201+ # up the stack and runs exit functions (and finally statements). When
202+ # I tried using it originally, I would see the current child bubble
203+ # all the way up the stack (through the server code that it fork()
204+ # through), and then get to main() returning code 0. The process
205+ # would still exit nonzero. My guess is that something in the atexit
206+ # functions was failing, but that it was happening after logging, etc
207+ # had been shut down.
208+ # Any global state from the child process should be flushed before
209+ # run_bzr_* has exited (which we *do* wait for), and any other global
210+ # state is probably a remnant from the service process. Which will be
211+ # cleaned up by the service itself, rather than the child.
212+ # There is some concern that log files may not get flushed, so we
213+ # currently call sys.exitfunc() first. The main problem is that I
214+ # don't know any way to *remove* a function registered via 'atexit()'
215+ # so if the forking service has some state, we my try to clean it up
216+ # incorrectly.
217+ # Note that the bzr script itself uses sys.exitfunc(); os._exit() in
218+ # the 'bzr' main script, as the teardown time of all the python state
219+ # was quite noticeable in real-world runtime. As such, bzrlib should
220+ # be pretty safe, or it would have been failing for people already.
221+ # [Decision #7]
222+ # prefork vs max children vs ?
223+ # For simplicity it seemed easiest to just fork when requested. Over
224+ # time, I realized it would be easy to allow running an arbitrary
225+ # command (no harder than just running one command), so it seemed
226+ # reasonable to switch over. If we go the prefork route, then we'll
227+ # need a way to tell the pre-forked children what command to run.
228+ # This could be as easy as just adding one more fifo that they wait
229+ # on in the same directory.
230+ # For now, I've chosen not to limit the number of forked children. I
231+ # don't know what a reasonable value is, and probably there are
232+ # already limitations at play. (If Conch limits connections, then it
233+ # will already be doing all the work, etc.)
234+ # [Decision #8]
235+ # nicer errors on the request socket
236+ # This service is meant to be run only on the local system. As such,
237+ # we don't try to be extra defensive about leaking information to
238+ # the one connecting to the socket. (We should still watch out what
239+ # we send across the per-child fifos, since those are connected to
240+ # remote clients.) Instead we try to be helpful, and tell them as
241+ # much as we know about what went wrong.
242+
243+ DEFAULT_PATH = '/var/run/launchpad_forking_service.sock'
244+ DEFAULT_PERMISSIONS = 00660 # Permissions on the master socket (rw-rw----)
245+ WAIT_FOR_CHILDREN_TIMEOUT = 5*60 # Wait no more than 5 min for children
246+ SOCKET_TIMEOUT = 1.0
247+ SLEEP_FOR_CHILDREN_TIMEOUT = 1.0
248+ WAIT_FOR_REQUEST_TIMEOUT = 1.0 # No request should take longer than this to
249+ # be read
250+
251+ _fork_function = os.fork
252+
253+ def __init__(self, path=DEFAULT_PATH, perms=DEFAULT_PERMISSIONS):
254+ self.master_socket_path = path
255+ self._perms = perms
256+ self._start_time = None
257+ self._should_terminate = threading.Event()
258+ # We address these locally, in case of shutdown socket may be gc'd
259+ # before we are
260+ self._socket_timeout = socket.timeout
261+ self._socket_error = socket.error
262+ # Map from pid => (temp_path_for_handles, request_socket)
263+ self._child_processes = {}
264+ self._children_spawned = 0
265+
266+ def _create_master_socket(self):
267+ self._server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
268+ self._server_socket.bind(self.master_socket_path)
269+ if self._perms is not None:
270+ os.chmod(self.master_socket_path, self._perms)
271+ self._server_socket.listen(5)
272+ self._server_socket.settimeout(self.SOCKET_TIMEOUT)
273+ trace.mutter('set socket timeout to: %s' % (self.SOCKET_TIMEOUT,))
274+
275+ def _cleanup_master_socket(self):
276+ self._server_socket.close()
277+ try:
278+ os.remove(self.master_socket_path)
279+ except (OSError, IOError), e:
280+ # If we don't delete it, then we get 'address already in
281+ # use' failures
282+ trace.mutter('failed to cleanup: %s'
283+ % (self.master_socket_path,))
284+
285+ def _handle_sigchld(self, signum, frm):
286+ # We don't actually do anything here, we just want an interrupt (EINTR)
287+ # on socket.accept() when SIGCHLD occurs.
288+ pass
289+
290+ def _handle_sigterm(self, signum, frm):
291+ # Unregister this as the default handler, 2 SIGTERMs will exit us.
292+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
293+ # SIGTERM should also generate EINTR on our wait loop, so this should
294+ # be enough
295+ self._should_terminate.set()
296+
297+ def _register_signals(self):
298+ """Register a SIGCHILD and SIGTERM handler.
299+
300+ If we have a trigger for SIGCHILD then we can quickly respond to
301+ clients when their process exits. The main risk is getting more EAGAIN
302+ errors elsewhere.
303+
304+ SIGTERM allows us to cleanup nicely before we exit.
305+ """
306+ signal.signal(signal.SIGCHLD, self._handle_sigchld)
307+ signal.signal(signal.SIGTERM, self._handle_sigterm)
308+
309+ def _unregister_signals(self):
310+ signal.signal(signal.SIGCHLD, signal.SIG_DFL)
311+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
312+
313+ def _create_child_file_descriptors(self, base_path):
314+ stdin_path = os.path.join(base_path, 'stdin')
315+ stdout_path = os.path.join(base_path, 'stdout')
316+ stderr_path = os.path.join(base_path, 'stderr')
317+ os.mkfifo(stdin_path)
318+ os.mkfifo(stdout_path)
319+ os.mkfifo(stderr_path)
320+
321+ def _bind_child_file_descriptors(self, base_path):
322+ stdin_path = os.path.join(base_path, 'stdin')
323+ stdout_path = os.path.join(base_path, 'stdout')
324+ stderr_path = os.path.join(base_path, 'stderr')
325+ # These open calls will block until another process connects (which
326+ # must connect in the same order)
327+ stdin_fid = os.open(stdin_path, os.O_RDONLY)
328+ stdout_fid = os.open(stdout_path, os.O_WRONLY)
329+ stderr_fid = os.open(stderr_path, os.O_WRONLY)
330+ # Note: by this point bzrlib has opened stderr for logging
331+ # (as part of starting the service process in the first place).
332+ # As such, it has a stream handler that writes to stderr. logging
333+ # tries to flush and close that, but the file is already closed.
334+ # This just supresses that exception
335+ logging.raiseExceptions = False
336+ sys.stdin.close()
337+ sys.stdout.close()
338+ sys.stderr.close()
339+ os.dup2(stdin_fid, 0)
340+ os.dup2(stdout_fid, 1)
341+ os.dup2(stderr_fid, 2)
342+ sys.stdin = os.fdopen(stdin_fid, 'rb')
343+ sys.stdout = os.fdopen(stdout_fid, 'wb')
344+ sys.stderr = os.fdopen(stderr_fid, 'wb')
345+ ui.ui_factory.stdin = sys.stdin
346+ ui.ui_factory.stdout = sys.stdout
347+ ui.ui_factory.stderr = sys.stderr
348+ # Now that we've opened the handles, delete everything so that we don't
349+ # leave garbage around. Because the open() is done in blocking mode, we
350+ # know that someone has already connected to them, and we don't want
351+ # anyone else getting confused and connecting.
352+ # See [Decision #5]
353+ os.remove(stderr_path)
354+ os.remove(stdout_path)
355+ os.remove(stdin_path)
356+ os.rmdir(base_path)
357+
358+ def _close_child_file_descriptors(self):
359+ sys.stdin.close()
360+ sys.stderr.close()
361+ sys.stdout.close()
362+
363+ def become_child(self, command_argv, path):
364+ """We are in the spawned child code, do our magic voodoo."""
365+ # Stop tracking new signals
366+ self._unregister_signals()
367+ # Reset the start time
368+ trace._bzr_log_start_time = time.time()
369+ trace.mutter('%d starting %r'
370+ % (os.getpid(), command_argv))
371+ self._bind_child_file_descriptors(path)
372+ self._run_child_command(command_argv)
373+
374+ def _run_child_command(self, command_argv):
375+ # This is the point where we would actually want to do something with
376+ # our life
377+ # TODO: We may want to consider special-casing the 'lp-serve' command.
378+ # As that is the primary use-case for this service, it might be
379+ # interesting to have an already-instantiated instance, where we
380+ # can just pop on an extra argument and be ready to go. However,
381+ # that would probably only really be measurable if we prefork. As
382+ # it looks like ~200ms is 'fork()' time, but only 50ms is
383+ # run-the-command time.
384+ retcode = commands.run_bzr_catch_errors(command_argv)
385+ self._close_child_file_descriptors()
386+ trace.mutter('%d finished %r'
387+ % (os.getpid(), command_argv))
388+ # We force os._exit() here, because we don't want to unwind the stack,
389+ # which has complex results. (We can get it to unwind back to the
390+ # cmd_launchpad_forking_service code, and even back to main() reporting
391+ # thereturn code, but after that, suddenly the return code changes from
392+ # a '0' to a '1', with no logging of info.
393+ # TODO: Should we call sys.exitfunc() here? it allows atexit functions
394+ # to fire, however, some of those may be still around from the
395+ # parent process, which we don't really want.
396+ sys.exitfunc()
397+ # See [Decision #6]
398+ os._exit(retcode)
399+
400+ @staticmethod
401+ def command_to_argv(command_str):
402+ """Convert a 'foo bar' style command to [u'foo', u'bar']"""
403+ # command_str must be a utf-8 string
404+ return [s.decode('utf-8') for s in shlex.split(command_str)]
405+
406+ @staticmethod
407+ def parse_env(env_str):
408+ """Convert the environment information into a dict.
409+
410+ :param env_str: A string full of environment variable declarations.
411+ Each key is simple ascii "key: value\n"
412+ The string must end with "end\n".
413+ :return: A dict of environment variables
414+ """
415+ env = {}
416+ if not env_str.endswith('end\n'):
417+ raise ValueError('Invalid env-str: %r' % (env_str,))
418+ env_str = env_str[:-5]
419+ if not env_str:
420+ return env
421+ env_entries = env_str.split('\n')
422+ for entry in env_entries:
423+ key, value = entry.split(': ', 1)
424+ env[key] = value
425+ return env
426+
427+ def fork_one_request(self, conn, client_addr, command_argv, env):
428+ """Fork myself and serve a request."""
429+ temp_name = tempfile.mkdtemp(prefix='lp-forking-service-child-')
430+ # Now that we've set everything up, send the response to the client we
431+ # create them first, so the client can start trying to connect to them,
432+ # while we fork and have the child do the same.
433+ self._children_spawned += 1
434+ pid = self._fork_function()
435+ if pid == 0:
436+ pid = os.getpid()
437+ trace.mutter('%d spawned' % (pid,))
438+ self._server_socket.close()
439+ for env_var, value in env.iteritems():
440+ osutils.set_or_unset_env(env_var, value)
441+ # See [Decision #3]
442+ self._create_child_file_descriptors(temp_name)
443+ conn.sendall('ok\n%d\n%s\n' % (pid, temp_name))
444+ conn.close()
445+ self.become_child(command_argv, temp_name)
446+ trace.warning('become_child returned!!!')
447+ sys.exit(1)
448+ else:
449+ self._child_processes[pid] = (temp_name, conn)
450+ self.log(client_addr, 'Spawned process %s for %r: %s'
451+ % (pid, command_argv, temp_name))
452+
453+ def main_loop(self):
454+ self._start_time = time.time()
455+ self._should_terminate.clear()
456+ self._register_signals()
457+ self._create_master_socket()
458+ trace.note('Listening on socket: %s' % (self.master_socket_path,))
459+ try:
460+ try:
461+ self._do_loop()
462+ finally:
463+ # Stop talking to others, we are shutting down
464+ self._cleanup_master_socket()
465+ except KeyboardInterrupt:
466+ # SIGINT received, try to shutdown cleanly
467+ pass
468+ trace.note('Shutting down. Waiting up to %.0fs for %d child processes'
469+ % (self.WAIT_FOR_CHILDREN_TIMEOUT,
470+ len(self._child_processes)))
471+ self._shutdown_children()
472+ trace.note('Exiting')
473+
474+ def _do_loop(self):
475+ while not self._should_terminate.isSet():
476+ try:
477+ conn, client_addr = self._server_socket.accept()
478+ except self._socket_timeout:
479+ pass # run shutdown and children checks
480+ except self._socket_error, e:
481+ if e.args[0] == errno.EINTR:
482+ pass # run shutdown and children checks
483+ elif e.args[0] != errno.EBADF:
484+ # We can get EBADF here while we are shutting down
485+ # So we just ignore it for now
486+ pass
487+ else:
488+ # Log any other failure mode
489+ trace.warning("listening socket error: %s", e)
490+ else:
491+ self.log(client_addr, 'connected')
492+ # TODO: We should probably trap exceptions coming out of this
493+ # and log them, so that we don't kill the service because
494+ # of an unhandled error
495+ # Note: settimeout is used so that a malformed request doesn't
496+ # cause us to hang forever. Note that the particular
497+ # implementation means that a malicious client could
498+ # probably send us one byte every Xms, and we would just
499+ # keep trying to read it. However, as a local service, we
500+ # aren't worrying about it.
501+ conn.settimeout(self.WAIT_FOR_REQUEST_TIMEOUT)
502+ try:
503+ self.serve_one_connection(conn, client_addr)
504+ except self._socket_timeout, e:
505+ trace.log_exception_quietly()
506+ self.log(client_addr, 'request timeout failure: %s' % (e,))
507+ conn.sendall('FAILURE\nrequest timed out\n')
508+ conn.close()
509+ self._poll_children()
510+
511+ def log(self, client_addr, message):
512+ """Log a message to the trace log.
513+
514+ Include the information about what connection is being served.
515+ """
516+ if client_addr is not None:
517+ # Note, we don't use conn.getpeername() because if a client
518+ # disconnects before we get here, that raises an exception
519+ conn_info = '[%s] ' % (client_addr,)
520+ else:
521+ conn_info = ''
522+ trace.mutter('%s%s' % (conn_info, message))
523+
524+ def log_information(self):
525+ """Log the status information.
526+
527+ This includes stuff like number of children, and ... ?
528+ """
529+ self._poll_children()
530+ self.log(None, 'Running for %.3fs' % (time.time() - self._start_time))
531+ self.log(None, '%d children currently running (spawned %d total)'
532+ % (len(self._child_processes), self._children_spawned))
533+ # Read the current information about memory consumption, etc.
534+ self.log(None, 'Self: %s'
535+ % (resource.getrusage(resource.RUSAGE_SELF),))
536+ # This seems to be the sum of all rusage for all children that have
537+ # been collected (not for currently running children, or ones we
538+ # haven't "wait"ed on.) We may want to read /proc/PID/status, since
539+ # 'live' information is probably more useful.
540+ self.log(None, 'Finished children: %s'
541+ % (resource.getrusage(resource.RUSAGE_CHILDREN),))
542+
543+ def _poll_children(self):
544+ """See if children are still running, etc.
545+
546+ One interesting hook here would be to track memory consumption, etc.
547+ """
548+ while self._child_processes:
549+ try:
550+ c_id, exit_code, rusage = os.wait3(os.WNOHANG)
551+ except OSError, e:
552+ if e.errno == errno.ECHILD:
553+ # TODO: We handle this right now because the test suite
554+ # fakes a child, since we wanted to test some code
555+ # without actually forking anything
556+ trace.mutter('_poll_children() called, and'
557+ ' self._child_processes indicates there are'
558+ ' children, but os.wait3() says there are not.'
559+ ' current_children: %s' % (self._child_processes,))
560+ return
561+ if c_id == 0:
562+ # No more children stopped right now
563+ return
564+ c_path, sock = self._child_processes.pop(c_id)
565+ trace.mutter('%s exited %s and usage: %s'
566+ % (c_id, exit_code, rusage))
567+ # See [Decision #4]
568+ try:
569+ sock.sendall('exited\n%s\n' % (exit_code,))
570+ except (self._socket_timeout, self._socket_error), e:
571+ # The client disconnected before we wanted them to,
572+ # no big deal
573+ trace.mutter('%s\'s socket already closed: %s' % (c_id, e))
574+ else:
575+ sock.close()
576+ if os.path.exists(c_path):
577+ # The child failed to cleanup after itself, do the work here
578+ trace.warning('Had to clean up after child %d: %s\n'
579+ % (c_id, c_path))
580+ shutil.rmtree(c_path, ignore_errors=True)
581+
582+ def _wait_for_children(self, secs):
583+ start = time.time()
584+ end = start + secs
585+ while self._child_processes:
586+ self._poll_children()
587+ if secs > 0 and time.time() > end:
588+ break
589+ time.sleep(self.SLEEP_FOR_CHILDREN_TIMEOUT)
590+
591+ def _shutdown_children(self):
592+ self._wait_for_children(self.WAIT_FOR_CHILDREN_TIMEOUT)
593+ if self._child_processes:
594+ trace.warning('Children still running: %s'
595+ % ', '.join(map(str, self._child_processes)))
596+ for c_id in self._child_processes:
597+ trace.warning('sending SIGINT to %d' % (c_id,))
598+ os.kill(c_id, signal.SIGINT)
599+ # We sent the SIGINT signal, see if they exited
600+ self._wait_for_children(self.SLEEP_FOR_CHILDREN_TIMEOUT)
601+ if self._child_processes:
602+ # No? Then maybe something more powerful
603+ for c_id in self._child_processes:
604+ trace.warning('sending SIGKILL to %d' % (c_id,))
605+ os.kill(c_id, signal.SIGKILL)
606+ # We sent the SIGKILL signal, see if they exited
607+ self._wait_for_children(self.SLEEP_FOR_CHILDREN_TIMEOUT)
608+ if self._child_processes:
609+ for c_id, (c_path, sock) in self._child_processes.iteritems():
610+ # TODO: We should probably put something into this message?
611+ # However, the likelyhood is very small that this isn't
612+ # already closed because of SIGKILL + _wait_for_children
613+ # And I don't really know what to say...
614+ sock.close()
615+ if os.path.exists(c_path):
616+ trace.warning('Cleaning up after immortal child %d: %s\n'
617+ % (c_id, c_path))
618+ shutil.rmtree(c_path)
619+
620+ def _parse_fork_request(self, conn, client_addr, request):
621+ if request.startswith('fork-env '):
622+ while not request.endswith('end\n'):
623+ request += osutils.read_bytes_from_socket(conn)
624+ command, env = request[9:].split('\n', 1)
625+ else:
626+ command = request[5:].strip()
627+ env = 'end\n' # No env set
628+ try:
629+ command_argv = self.command_to_argv(command)
630+ env = self.parse_env(env)
631+ except Exception, e:
632+ # TODO: Log the traceback?
633+ self.log(client_addr, 'command or env parsing failed: %r'
634+ % (str(e),))
635+ conn.sendall('FAILURE\ncommand or env parsing failed: %r'
636+ % (str(e),))
637+ else:
638+ return command_argv, env
639+ return None, None
640+
641+ def serve_one_connection(self, conn, client_addr):
642+ request = ''
643+ while '\n' not in request:
644+ request += osutils.read_bytes_from_socket(conn)
645+ # telnet likes to use '\r\n' rather than '\n', and it is nice to have
646+ # an easy way to debug.
647+ request = request.replace('\r\n', '\n')
648+ self.log(client_addr, 'request: %r' % (request,))
649+ if request == 'hello\n':
650+ conn.sendall('ok\nyep, still alive\n')
651+ self.log_information()
652+ conn.close()
653+ elif request == 'quit\n':
654+ self._should_terminate.set()
655+ conn.sendall('ok\nquit command requested... exiting\n')
656+ conn.close()
657+ elif request.startswith('fork ') or request.startswith('fork-env '):
658+ command_argv, env = self._parse_fork_request(conn, client_addr,
659+ request)
660+ if command_argv is not None:
661+ # See [Decision #7]
662+ # TODO: Do we want to limit the number of children? And/or
663+ # prefork additional instances? (the design will need to
664+ # change if we prefork and run arbitrary commands.)
665+ self.fork_one_request(conn, client_addr, command_argv, env)
666+ # We don't close the conn like other code paths, since we use
667+ # it again later.
668+ else:
669+ conn.close()
670+ else:
671+ self.log(client_addr, 'FAILURE: unknown request: %r' % (request,))
672+ # See [Decision #8]
673+ conn.sendall('FAILURE\nunknown request: %r\n' % (request,))
674+ conn.close()
675+
676+
677+class cmd_launchpad_forking_service(Command):
678+ """Launch a long-running process, where you can ask for new processes.
679+
680+ The process will block on a given AF_UNIX socket waiting for requests to be
681+ made. When a request is made, it will fork itself and redirect
682+ stdout/in/err to fifos on the filesystem, and start running the requested
683+ command. The caller will be informed where those file handles can be found.
684+ Thus it only makes sense that the process connecting to the port must be on
685+ the same system.
686+ """
687+
688+ aliases = ['lp-service']
689+
690+ takes_options = [Option('path',
691+ help='Listen for connections at PATH',
692+ type=str),
693+ Option('perms',
694+ help='Set the mode bits for the socket, interpreted'
695+ ' as an octal integer (same as chmod)'),
696+ Option('preload',
697+ help="Do/don't preload libraries before startup."),
698+ Option('children-timeout', type=int, argname='SEC',
699+ help="Only wait SEC seconds for children to exit"),
700+ ]
701+
702+ def _preload_libraries(self):
703+ for pyname in libraries_to_preload:
704+ try:
705+ __import__(pyname)
706+ except ImportError, e:
707+ trace.mutter('failed to preload %s: %s' % (pyname, e))
708+
709+ def run(self, path=None, perms=None, preload=True,
710+ children_timeout=LPForkingService.WAIT_FOR_CHILDREN_TIMEOUT):
711+ if path is None:
712+ path = LPForkingService.DEFAULT_PATH
713+ if perms is None:
714+ perms = LPForkingService.DEFAULT_PERMISSIONS
715+ if preload:
716+ # We 'note' this because it often takes a fair amount of time.
717+ trace.note('Preloading %d modules' % (len(libraries_to_preload),))
718+ self._preload_libraries()
719+ service = LPForkingService(path, perms)
720+ service.WAIT_FOR_CHILDREN_TIMEOUT = children_timeout
721+ service.main_loop()
722+
723+register_command(cmd_launchpad_forking_service)
724+
725+
726+class cmd_launchpad_replay(Command):
727+ """Write input from stdin back to stdout or stderr.
728+
729+ This is a hidden command, primarily available for testing
730+ cmd_launchpad_forking_service.
731+ """
732+
733+ hidden = True
734+
735+ def run(self):
736+ # Just read line-by-line from stdin, and write out to stdout or stderr
737+ # depending on the prefix
738+ for line in sys.stdin:
739+ channel, contents = line.split(' ', 1)
740+ channel = int(channel)
741+ if channel == 1:
742+ sys.stdout.write(contents)
743+ sys.stdout.flush()
744+ elif channel == 2:
745+ sys.stderr.write(contents)
746+ sys.stderr.flush()
747+ else:
748+ raise RuntimeError('Invalid channel request.')
749+ return 0
750+
751+register_command(cmd_launchpad_replay)
752+
753+# This list was generated by run lsprofing a spawned child, and looking for
754+# <module ...> times, which indicate an import occured. Other possibilities are
755+# to just run "bzr lp-serve --profile-imports" manually, and observe what was
756+# expensive to import. It doesn't seem very easy to get this right
757+# automatically.
758+libraries_to_preload = [
759+ 'bzrlib.errors',
760+ 'bzrlib.repofmt.groupcompress_repo',
761+ 'bzrlib.repository',
762+ 'bzrlib.smart',
763+ 'bzrlib.smart.protocol',
764+ 'bzrlib.smart.request',
765+ 'bzrlib.smart.server',
766+ 'bzrlib.smart.vfs',
767+ 'bzrlib.transport.local',
768+ 'bzrlib.transport.readonly',
769+ 'lp.codehosting.bzrutils',
770+ 'lp.codehosting.vfs',
771+ 'lp.codehosting.vfs.branchfs',
772+ 'lp.codehosting.vfs.branchfsclient',
773+ 'lp.codehosting.vfs.hooks',
774+ 'lp.codehosting.vfs.transport',
775+ ]
776+
777+
778+def load_tests(standard_tests, module, loader):
779+ standard_tests.addTests(loader.loadTestsFromModuleNames(
780+ [__name__ + '.' + x for x in [
781+ 'test_lpserve',
782+ ]]))
783+ return standard_tests
784
785=== added file 'bzrplugins/lpserve/test_lpserve.py'
786--- bzrplugins/lpserve/test_lpserve.py 1970-01-01 00:00:00 +0000
787+++ bzrplugins/lpserve/test_lpserve.py 2010-10-19 22:56:30 +0000
788@@ -0,0 +1,534 @@
789+# Copyright 2010 Canonical Ltd. This software is licensed under the
790+# GNU Affero General Public License version 3 (see the file LICENSE).
791+
792+import os
793+import signal
794+import socket
795+import subprocess
796+import tempfile
797+import threading
798+import time
799+
800+from testtools import content
801+
802+from bzrlib import (
803+ osutils,
804+ tests,
805+ trace,
806+ )
807+from bzrlib.plugins import lpserve
808+
809+from canonical.config import config
810+from lp.codehosting import get_bzr_path, get_BZR_PLUGIN_PATH_for_subprocess
811+
812+
813+class TestingLPForkingServiceInAThread(lpserve.LPForkingService):
814+ """A test-double to run a "forking service" in a thread.
815+
816+ Note that we don't allow actually forking, but it does allow us to interact
817+ with the service for other operations.
818+ """
819+
820+ # For testing, we set the timeouts much lower, because we want the tests to
821+ # run quickly
822+ WAIT_FOR_CHILDREN_TIMEOUT = 0.5
823+ SOCKET_TIMEOUT = 0.01
824+ SLEEP_FOR_CHILDREN_TIMEOUT = 0.01
825+ WAIT_FOR_REQUEST_TIMEOUT = 0.1
826+
827+ # We're running in a thread as part of the test suite, blow up if we try to
828+ # fork
829+ _fork_function = None
830+
831+ def __init__(self, path, perms=None):
832+ self.service_started = threading.Event()
833+ self.service_stopped = threading.Event()
834+ self.this_thread = None
835+ self.fork_log = []
836+ super(TestingLPForkingServiceInAThread, self).__init__(
837+ path=path, perms=None)
838+
839+ def _register_signals(self):
840+ pass # Don't register it for the test suite
841+
842+ def _unregister_signals(self):
843+ pass # We don't fork, and didn't register, so don't unregister
844+
845+ def _create_master_socket(self):
846+ super(TestingLPForkingServiceInAThread, self)._create_master_socket()
847+ self.service_started.set()
848+
849+ def main_loop(self):
850+ self.service_stopped.clear()
851+ super(TestingLPForkingServiceInAThread, self).main_loop()
852+ self.service_stopped.set()
853+
854+ def fork_one_request(self, conn, client_addr, command, env):
855+ # We intentionally don't allow the test suite to request a fork, as
856+ # threads + forks and everything else don't exactly play well together
857+ self.fork_log.append((command, env))
858+ conn.sendall('ok\nfake forking\n')
859+ conn.close()
860+
861+ @staticmethod
862+ def start_service(test):
863+ """Start a new LPForkingService in a thread at a random path.
864+
865+ This will block until the service has created its socket, and is ready
866+ to communicate.
867+
868+ :return: A new TestingLPForkingServiceInAThread instance
869+ """
870+ fd, path = tempfile.mkstemp(prefix='tmp-lp-forking-service-',
871+ suffix='.sock')
872+ # We don't want a temp file, we want a temp socket
873+ os.close(fd)
874+ os.remove(path)
875+ new_service = TestingLPForkingServiceInAThread(path=path)
876+ thread = threading.Thread(target=new_service.main_loop,
877+ name='TestingLPForkingServiceInAThread')
878+ new_service.this_thread = thread
879+ # should we be doing thread.setDaemon(True) ?
880+ thread.start()
881+ new_service.service_started.wait(10.0)
882+ if not new_service.service_started.isSet():
883+ raise RuntimeError(
884+ 'Failed to start the TestingLPForkingServiceInAThread')
885+ test.addCleanup(new_service.stop_service)
886+ # what about returning new_service._sockname ?
887+ return new_service
888+
889+ def stop_service(self):
890+ """Stop the test-server thread. This can be called multiple times."""
891+ if self.this_thread is None:
892+ # We already stopped the process
893+ return
894+ self._should_terminate.set()
895+ self.service_stopped.wait(10.0)
896+ if not self.service_stopped.isSet():
897+ raise RuntimeError(
898+ 'Failed to stop the TestingLPForkingServiceInAThread')
899+ self.this_thread.join()
900+ # Break any refcycles
901+ self.this_thread = None
902+
903+
904+class TestTestingLPForkingServiceInAThread(tests.TestCaseWithTransport):
905+
906+ def test_start_and_stop_service(self):
907+ service = TestingLPForkingServiceInAThread.start_service(self)
908+ service.stop_service()
909+
910+ def test_multiple_stops(self):
911+ service = TestingLPForkingServiceInAThread.start_service(self)
912+ service.stop_service()
913+ # calling stop_service repeatedly is a no-op (and not an error)
914+ service.stop_service()
915+
916+ def test_autostop(self):
917+ # We shouldn't leak a thread here, as it should be part of the test
918+ # case teardown.
919+ service = TestingLPForkingServiceInAThread.start_service(self)
920+
921+
922+class TestCaseWithLPForkingService(tests.TestCaseWithTransport):
923+
924+ def setUp(self):
925+ super(TestCaseWithLPForkingService, self).setUp()
926+ self.service = TestingLPForkingServiceInAThread.start_service(self)
927+
928+ def send_message_to_service(self, message, one_byte_at_a_time=False):
929+ client_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
930+ client_sock.connect(self.service.master_socket_path)
931+ if one_byte_at_a_time:
932+ for byte in message:
933+ client_sock.send(byte)
934+ else:
935+ client_sock.sendall(message)
936+ response = client_sock.recv(1024)
937+ return response
938+
939+
940+class TestLPForkingServiceCommandToArgv(tests.TestCase):
941+
942+ def assertAsArgv(self, argv, command_str):
943+ self.assertEqual(argv,
944+ lpserve.LPForkingService.command_to_argv(command_str))
945+
946+ def test_simple(self):
947+ self.assertAsArgv([u'foo'], 'foo')
948+ self.assertAsArgv([u'foo', u'bar'], 'foo bar')
949+
950+ def test_quoted(self):
951+ self.assertAsArgv([u'foo'], 'foo')
952+ self.assertAsArgv([u'foo bar'], '"foo bar"')
953+
954+ def test_unicode(self):
955+ self.assertAsArgv([u'command', u'\xe5'], 'command \xc3\xa5')
956+
957+
958+class TestLPForkingServiceParseEnv(tests.TestCase):
959+
960+ def assertEnv(self, env, env_str):
961+ self.assertEqual(env, lpserve.LPForkingService.parse_env(env_str))
962+
963+ def assertInvalid(self, env_str):
964+ self.assertRaises(ValueError, lpserve.LPForkingService.parse_env,
965+ env_str)
966+
967+ def test_no_entries(self):
968+ self.assertEnv({}, 'end\n')
969+
970+ def test_one_entries(self):
971+ self.assertEnv({'BZR_EMAIL': 'joe@foo.com'},
972+ 'BZR_EMAIL: joe@foo.com\n'
973+ 'end\n')
974+
975+ def test_two_entries(self):
976+ self.assertEnv({'BZR_EMAIL': 'joe@foo.com', 'BAR': 'foo'},
977+ 'BZR_EMAIL: joe@foo.com\n'
978+ 'BAR: foo\n'
979+ 'end\n')
980+
981+ def test_invalid_empty(self):
982+ self.assertInvalid('')
983+
984+ def test_invalid_end(self):
985+ self.assertInvalid("BZR_EMAIL: joe@foo.com\n")
986+
987+ def test_invalid_entry(self):
988+ self.assertInvalid("BZR_EMAIL joe@foo.com\nend\n")
989+
990+
991+class TestLPForkingService(TestCaseWithLPForkingService):
992+
993+ def test_send_quit_message(self):
994+ response = self.send_message_to_service('quit\n')
995+ self.assertEqual('ok\nquit command requested... exiting\n', response)
996+ self.service.service_stopped.wait(10.0)
997+ self.assertTrue(self.service.service_stopped.isSet())
998+
999+ def test_send_invalid_message_fails(self):
1000+ response = self.send_message_to_service('unknown\n')
1001+ self.assertStartsWith(response, 'FAILURE')
1002+
1003+ def test_send_hello_heartbeat(self):
1004+ response = self.send_message_to_service('hello\n')
1005+ self.assertEqual('ok\nyep, still alive\n', response)
1006+
1007+ def test_send_simple_fork(self):
1008+ response = self.send_message_to_service('fork rocks\n')
1009+ self.assertEqual('ok\nfake forking\n', response)
1010+ self.assertEqual([(['rocks'], {})], self.service.fork_log)
1011+
1012+ def test_send_fork_env_with_empty_env(self):
1013+ response = self.send_message_to_service(
1014+ 'fork-env rocks\n'
1015+ 'end\n')
1016+ self.assertEqual('ok\nfake forking\n', response)
1017+ self.assertEqual([(['rocks'], {})], self.service.fork_log)
1018+
1019+ def test_send_fork_env_with_env(self):
1020+ response = self.send_message_to_service(
1021+ 'fork-env rocks\n'
1022+ 'BZR_EMAIL: joe@example.com\n'
1023+ 'end\n')
1024+ self.assertEqual('ok\nfake forking\n', response)
1025+ self.assertEqual([(['rocks'], {'BZR_EMAIL': 'joe@example.com'})],
1026+ self.service.fork_log)
1027+
1028+ def test_send_fork_env_slowly(self):
1029+ response = self.send_message_to_service(
1030+ 'fork-env rocks\n'
1031+ 'BZR_EMAIL: joe@example.com\n'
1032+ 'end\n', one_byte_at_a_time=True)
1033+ self.assertEqual('ok\nfake forking\n', response)
1034+ self.assertEqual([(['rocks'], {'BZR_EMAIL': 'joe@example.com'})],
1035+ self.service.fork_log)
1036+
1037+ def test_send_incomplete_fork_env_timeout(self):
1038+ # We should get a failure message if we can't quickly read the whole
1039+ # content
1040+ response = self.send_message_to_service(
1041+ 'fork-env rocks\n'
1042+ 'BZR_EMAIL: joe@example.com\n',
1043+ one_byte_at_a_time=True)
1044+ # Note that we *don't* send a final 'end\n'
1045+ self.assertStartsWith(response, 'FAILURE\n')
1046+
1047+ def test_send_incomplete_request_timeout(self):
1048+ # Requests end with '\n', send one without it
1049+ response = self.send_message_to_service('hello',
1050+ one_byte_at_a_time=True)
1051+ self.assertStartsWith(response, 'FAILURE\n')
1052+
1053+
1054+class TestCaseWithSubprocess(tests.TestCaseWithTransport):
1055+ """Override the bzr start_bzr_subprocess command.
1056+
1057+ The launchpad infrastructure requires a fair amount of configuration to get
1058+ paths, etc correct. This provides a "start_bzr_subprocess" command that
1059+ has all of those paths appropriately set, but otherwise functions the same
1060+ as the bzrlib.tests.TestCase version.
1061+ """
1062+
1063+ def get_python_path(self):
1064+ """Return the path to the Python interpreter."""
1065+ return '%s/bin/py' % config.root
1066+
1067+ def start_bzr_subprocess(self, process_args, env_changes=None,
1068+ working_dir=None):
1069+ """Start bzr in a subprocess for testing.
1070+
1071+ Copied and modified from `bzrlib.tests.TestCase.start_bzr_subprocess`.
1072+ This version removes some of the skipping stuff, some of the
1073+ irrelevant comments (e.g. about win32) and uses Launchpad's own
1074+ mechanisms for getting the path to 'bzr'.
1075+
1076+ Comments starting with 'LAUNCHPAD' are comments about our
1077+ modifications.
1078+ """
1079+ if env_changes is None:
1080+ env_changes = {}
1081+ env_changes['BZR_PLUGIN_PATH'] = get_BZR_PLUGIN_PATH_for_subprocess()
1082+ old_env = {}
1083+
1084+ def cleanup_environment():
1085+ for env_var, value in env_changes.iteritems():
1086+ old_env[env_var] = osutils.set_or_unset_env(env_var, value)
1087+
1088+ def restore_environment():
1089+ for env_var, value in old_env.iteritems():
1090+ osutils.set_or_unset_env(env_var, value)
1091+
1092+ cwd = None
1093+ if working_dir is not None:
1094+ cwd = osutils.getcwd()
1095+ os.chdir(working_dir)
1096+
1097+ # LAUNCHPAD: Because of buildout, we need to get a custom Python
1098+ # binary, not sys.executable.
1099+ python_path = self.get_python_path()
1100+ # LAUNCHPAD: We can't use self.get_bzr_path(), since it'll find
1101+ # lib/bzrlib, rather than the path to sourcecode/bzr/bzr.
1102+ bzr_path = get_bzr_path()
1103+ try:
1104+ cleanup_environment()
1105+ command = [python_path, bzr_path]
1106+ command.extend(process_args)
1107+ process = self._popen(
1108+ command, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1109+ stderr=subprocess.PIPE)
1110+ finally:
1111+ restore_environment()
1112+ if cwd is not None:
1113+ os.chdir(cwd)
1114+
1115+ return process
1116+
1117+
1118+class TestCaseWithLPForkingServiceSubprocess(TestCaseWithSubprocess):
1119+ """Tests will get a separate process to communicate to.
1120+
1121+ The number of these tests should be small, because it is expensive to start
1122+ and stop the daemon.
1123+
1124+ TODO: This should probably use testresources, or layers somehow...
1125+ """
1126+
1127+ def setUp(self):
1128+ super(TestCaseWithLPForkingServiceSubprocess, self).setUp()
1129+ (self.service_process,
1130+ self.service_path) = self.start_service_subprocess()
1131+ self.addCleanup(self.stop_service)
1132+
1133+ def start_conversation(self, message, one_byte_at_a_time=False):
1134+ """Start talking to the service, and get the initial response."""
1135+ client_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1136+ trace.mutter('sending %r to socket %s' % (message, self.service_path))
1137+ client_sock.connect(self.service_path)
1138+ if one_byte_at_a_time:
1139+ for byte in message:
1140+ client_sock.send(byte)
1141+ else:
1142+ client_sock.sendall(message)
1143+ response = client_sock.recv(1024)
1144+ trace.mutter('response: %r' % (response,))
1145+ if response.startswith("FAILURE"):
1146+ raise RuntimeError('Failed to send message: %r' % (response,))
1147+ return response, client_sock
1148+
1149+ def send_message_to_service(self, message, one_byte_at_a_time=False):
1150+ response, client_sock = self.start_conversation(message,
1151+ one_byte_at_a_time=one_byte_at_a_time)
1152+ client_sock.close()
1153+ return response
1154+
1155+ def send_fork_request(self, command, env=None):
1156+ if env is not None:
1157+ request_lines = ['fork-env %s\n' % (command,)]
1158+ for key, value in env.iteritems():
1159+ request_lines.append('%s: %s\n' % (key, value))
1160+ request_lines.append('end\n')
1161+ request = ''.join(request_lines)
1162+ else:
1163+ request = 'fork %s\n' % (command,)
1164+ response, sock = self.start_conversation(request)
1165+ ok, pid, path, tail = response.split('\n')
1166+ self.assertEqual('ok', ok)
1167+ self.assertEqual('', tail)
1168+ # Don't really care what it is, but should be an integer
1169+ pid = int(pid)
1170+ path = path.strip()
1171+ self.assertContainsRe(path, '/lp-forking-service-child-')
1172+ return path, pid, sock
1173+
1174+ def start_service_subprocess(self):
1175+ # Make sure this plugin is exposed to the subprocess
1176+ # SLOOWWW (~2 seconds, which is why we are doing the work anyway)
1177+ fd, tempname = tempfile.mkstemp(prefix='tmp-log-bzr-lp-forking-')
1178+ # I'm not 100% sure about when cleanup runs versus addDetail, but I
1179+ # think this will work.
1180+ self.addCleanup(os.remove, tempname)
1181+
1182+ def read_log():
1183+ f = os.fdopen(fd)
1184+ f.seek(0)
1185+ content = f.read()
1186+ f.close()
1187+ return [content]
1188+ self.addDetail('server-log', content.Content(
1189+ content.ContentType('text', 'plain', {"charset": "utf8"}),
1190+ read_log))
1191+ service_fd, path = tempfile.mkstemp(prefix='tmp-lp-service-',
1192+ suffix='.sock')
1193+ os.close(service_fd)
1194+ os.remove(path) # service wants create it as a socket
1195+ env_changes = {'BZR_PLUGIN_PATH': lpserve.__path__[0],
1196+ 'BZR_LOG': tempname}
1197+ proc = self.start_bzr_subprocess(
1198+ ['lp-service', '--path', path, '--no-preload',
1199+ '--children-timeout=1'],
1200+ env_changes=env_changes)
1201+ trace.mutter('started lp-service subprocess')
1202+ expected = 'Listening on socket: %s\n' % (path,)
1203+ path_line = proc.stderr.readline()
1204+ trace.mutter(path_line)
1205+ self.assertEqual(expected, path_line)
1206+ # The process won't delete it, so we do
1207+ return proc, path
1208+
1209+ def stop_service(self):
1210+ if self.service_process is None:
1211+ # Already stopped
1212+ return
1213+ # First, try to stop the service gracefully, by sending a 'quit'
1214+ # message
1215+ try:
1216+ response = self.send_message_to_service('quit\n')
1217+ except socket.error, e:
1218+ # Ignore a failure to connect, the service must be stopping/stopped
1219+ # already
1220+ response = None
1221+ tend = time.time() + 10.0
1222+ while self.service_process.poll() is None:
1223+ if time.time() > tend:
1224+ self.finish_bzr_subprocess(process=self.service_process,
1225+ send_signal=signal.SIGINT, retcode=3)
1226+ self.fail('Failed to quit gracefully after 10.0 seconds')
1227+ time.sleep(0.1)
1228+ if response is not None:
1229+ self.assertEqual('ok\nquit command requested... exiting\n',
1230+ response)
1231+
1232+ def _get_fork_handles(self, path):
1233+ trace.mutter('getting handles for: %s' % (path,))
1234+ stdin_path = os.path.join(path, 'stdin')
1235+ stdout_path = os.path.join(path, 'stdout')
1236+ stderr_path = os.path.join(path, 'stderr')
1237+ # The ordering must match the ordering of the service or we get a
1238+ # deadlock.
1239+ child_stdin = open(stdin_path, 'wb')
1240+ child_stdout = open(stdout_path, 'rb')
1241+ child_stderr = open(stderr_path, 'rb')
1242+ return child_stdin, child_stdout, child_stderr
1243+
1244+ def communicate_with_fork(self, path, stdin=None):
1245+ child_stdin, child_stdout, child_stderr = self._get_fork_handles(path)
1246+ if stdin is not None:
1247+ child_stdin.write(stdin)
1248+ child_stdin.close()
1249+ stdout_content = child_stdout.read()
1250+ stderr_content = child_stderr.read()
1251+ return stdout_content, stderr_content
1252+
1253+ def assertReturnCode(self, expected_code, sock):
1254+ """Assert that we get the expected return code as a message."""
1255+ response = sock.recv(1024)
1256+ self.assertStartsWith(response, 'exited\n')
1257+ code = int(response.split('\n', 1)[1])
1258+ self.assertEqual(expected_code, code)
1259+
1260+ def test_fork_lp_serve_hello(self):
1261+ path, _, sock = self.send_fork_request('lp-serve --inet 2')
1262+ stdout_content, stderr_content = self.communicate_with_fork(path,
1263+ 'hello\n')
1264+ self.assertEqual('ok\x012\n', stdout_content)
1265+ self.assertEqual('', stderr_content)
1266+ self.assertReturnCode(0, sock)
1267+
1268+ def test_fork_replay(self):
1269+ path, _, sock = self.send_fork_request('launchpad-replay')
1270+ stdout_content, stderr_content = self.communicate_with_fork(path,
1271+ '1 hello\n2 goodbye\n1 maybe\n')
1272+ self.assertEqualDiff('hello\nmaybe\n', stdout_content)
1273+ self.assertEqualDiff('goodbye\n', stderr_content)
1274+ self.assertReturnCode(0, sock)
1275+
1276+ def test_just_run_service(self):
1277+ # Start and stop are defined in setUp()
1278+ pass
1279+
1280+ def test_fork_multiple_children(self):
1281+ paths = []
1282+ for idx in range(4):
1283+ paths.append(self.send_fork_request('launchpad-replay'))
1284+ # Do them out of order, as order shouldn't matter.
1285+ for idx in [3, 2, 0, 1]:
1286+ p, pid, sock = paths[idx]
1287+ stdout_msg = 'hello %d\n' % (idx,)
1288+ stderr_msg = 'goodbye %d\n' % (idx+1,)
1289+ stdout, stderr = self.communicate_with_fork(p,
1290+ '1 %s2 %s' % (stdout_msg, stderr_msg))
1291+ self.assertEqualDiff(stdout_msg, stdout)
1292+ self.assertEqualDiff(stderr_msg, stderr)
1293+ self.assertReturnCode(0, sock)
1294+
1295+ def test_fork_respects_env_vars(self):
1296+ path, pid, sock = self.send_fork_request('whoami',
1297+ env={'BZR_EMAIL': 'this_test@example.com'})
1298+ stdout_content, stderr_content = self.communicate_with_fork(path)
1299+ self.assertEqual('', stderr_content)
1300+ self.assertEqual('this_test@example.com\n', stdout_content)
1301+
1302+ def _check_exits_nicely(self, sig_id):
1303+ path, _, sock = self.send_fork_request('rocks')
1304+ self.assertEqual(None, self.service_process.poll())
1305+ # Now when we send SIGTERM, it should wait for the child to exit,
1306+ # before it tries to exit itself.
1307+ # In python2.6+ we could use self.service_process.terminate()
1308+ os.kill(self.service_process.pid, sig_id)
1309+ self.assertEqual(None, self.service_process.poll())
1310+ # Now talk to the child, so the service can close
1311+ stdout_content, stderr_content = self.communicate_with_fork(path)
1312+ self.assertEqual('It sure does!\n', stdout_content)
1313+ self.assertEqual('', stderr_content)
1314+ self.assertReturnCode(0, sock)
1315+ # And the process should exit cleanly
1316+ self.assertEqual(0, self.service_process.wait())
1317+
1318+ def test_sigterm_exits_nicely(self):
1319+ self._check_exits_nicely(signal.SIGTERM)
1320+
1321+ def test_sigint_exits_nicely(self):
1322+ self._check_exits_nicely(signal.SIGINT)
1323
1324=== modified file 'configs/development/launchpad-lazr.conf'
1325--- configs/development/launchpad-lazr.conf 2010-10-08 06:22:10 +0000
1326+++ configs/development/launchpad-lazr.conf 2010-10-19 22:56:30 +0000
1327@@ -76,6 +76,7 @@
1328 lp_url_hosts: dev
1329 access_log: /var/tmp/bazaar.launchpad.dev/codehosting-access.log
1330 blacklisted_hostnames:
1331+use_forking_daemon: True
1332
1333 [codeimport]
1334 bazaar_branch_store: file:///tmp/bazaar-branches
1335
1336=== modified file 'lib/canonical/config/schema-lazr.conf'
1337--- lib/canonical/config/schema-lazr.conf 2010-10-15 17:04:51 +0000
1338+++ lib/canonical/config/schema-lazr.conf 2010-10-19 22:56:30 +0000
1339@@ -301,6 +301,18 @@
1340 # datatype: string
1341 logfile: -
1342
1343+# The location of the log file used by the LaunchpadForkingService
1344+# datatype: string
1345+forker_logfile: -
1346+
1347+# Should we be using the forking daemon? Or should we be calling spawnProcess
1348+# instead?
1349+# datatype: boolean
1350+use_forking_daemon: False
1351+# What disk path will the daemon listen on
1352+# datatype: string
1353+forking_daemon_socket: /var/tmp/launchpad_forking_service.sock
1354+
1355 # The prefix of the web URL for all public branches. This should end with a
1356 # slash.
1357 #
1358
1359=== modified file 'lib/canonical/launchpad/scripts/runlaunchpad.py'
1360--- lib/canonical/launchpad/scripts/runlaunchpad.py 2010-10-11 04:07:36 +0000
1361+++ lib/canonical/launchpad/scripts/runlaunchpad.py 2010-10-19 22:56:30 +0000
1362@@ -174,6 +174,52 @@
1363 process.stdin.close()
1364
1365
1366+class ForkingSessionService(Service):
1367+ """A lp-forking-service for handling codehosting access."""
1368+
1369+ # TODO: The "sftp" (aka codehosting) server depends fairly heavily on this
1370+ # service. It would seem reasonable to make one always start if the
1371+ # other one is started. Though this might be a way to "FeatureFlag"
1372+ # whether this is active or not.
1373+ @property
1374+ def should_launch(self):
1375+ return (config.codehosting.launch and
1376+ config.codehosting.use_forking_daemon)
1377+
1378+ @property
1379+ def logfile(self):
1380+ """Return the log file to use.
1381+
1382+ Default to the value of the configuration key logfile.
1383+ """
1384+ return config.codehosting.forker_logfile
1385+
1386+ def launch(self):
1387+ # Following the logic in TacFile. Specifically, if you configure sftp
1388+ # to not run (and thus bzr+ssh) then we don't want to run the forking
1389+ # service.
1390+ if not self.should_launch:
1391+ return
1392+ from lp.codehosting import get_bzr_path
1393+ command = [config.root + '/bin/py', get_bzr_path(),
1394+ 'launchpad-forking-service',
1395+ '--path', config.codehosting.forking_daemon_socket,
1396+ ]
1397+ env = dict(os.environ)
1398+ env['BZR_PLUGIN_PATH'] = config.root + '/bzrplugins'
1399+ logfile = self.logfile
1400+ if logfile == '-':
1401+ # This process uses a different logging infrastructure from the
1402+ # rest of the Launchpad code. As such, it cannot trivially use '-'
1403+ # as the logfile. So we just ignore this setting.
1404+ pass
1405+ else:
1406+ env['BZR_LOG'] = logfile
1407+ process = subprocess.Popen(command, env=env, stdin=subprocess.PIPE)
1408+ self.addCleanup(stop_process, process)
1409+ process.stdin.close()
1410+
1411+
1412 def stop_process(process):
1413 """kill process and BLOCK until process dies.
1414
1415@@ -193,6 +239,7 @@
1416 'librarian': TacFile('librarian', 'daemons/librarian.tac',
1417 'librarian_server', prepare_for_librarian),
1418 'sftp': TacFile('sftp', 'daemons/sftp.tac', 'codehosting'),
1419+ 'forker': ForkingSessionService(),
1420 'mailman': MailmanService(),
1421 'codebrowse': CodebrowseService(),
1422 'google-webservice': GoogleWebService(),
1423
1424=== modified file 'lib/lp/codehosting/sshserver/session.py'
1425--- lib/lp/codehosting/sshserver/session.py 2010-08-20 20:31:18 +0000
1426+++ lib/lp/codehosting/sshserver/session.py 2010-10-19 22:56:30 +0000
1427@@ -9,11 +9,19 @@
1428 ]
1429
1430 import os
1431+import signal
1432+import socket
1433 import urlparse
1434
1435-from twisted.internet.process import ProcessExitedAlready
1436+from zope.event import notify
1437+from zope.interface import implements
1438+
1439+from twisted.internet import (
1440+ error,
1441+ interfaces,
1442+ process,
1443+ )
1444 from twisted.python import log
1445-from zope.event import notify
1446
1447 from canonical.config import config
1448 from lp.codehosting import get_bzr_path
1449@@ -35,6 +43,238 @@
1450 """Raised when a session is asked to execute a forbidden command."""
1451
1452
1453+class _WaitForExit(process.ProcessReader):
1454+ """Wait on a socket for the exit status."""
1455+
1456+ def __init__(self, reactor, proc, sock):
1457+ super(_WaitForExit, self).__init__(reactor, proc, 'exit',
1458+ sock.fileno())
1459+ self._sock = sock
1460+ self.connected = 1
1461+
1462+ def close(self):
1463+ self._sock.close()
1464+
1465+ def dataReceived(self, data):
1466+ # TODO: how do we handle getting only *some* of the content?, Maybe we
1467+ # need to read more bytes first...
1468+
1469+ # This is the only thing we do differently from the standard
1470+ # ProcessReader. When we get data on this socket, we need to treat it
1471+ # as a return code, or a failure.
1472+ if not data.startswith('exited'):
1473+ # Bad data, we want to signal that we are closing the connection
1474+ # TODO: How?
1475+ self.proc.childConnectionLost(self.name, "invalid data")
1476+ self.close()
1477+ # I don't know what to put here if we get bogus data, but I *do*
1478+ # want to say that the process is now considered dead to me
1479+ log.err('Got invalid exit information: %r' % (data,))
1480+ exit_status = (255 << 8)
1481+ else:
1482+ exit_status = int(data.split('\n')[1])
1483+ self.proc.processEnded(exit_status)
1484+
1485+
1486+class ForkedProcessTransport(process.BaseProcess):
1487+ """Wrap the forked process in a ProcessTransport so we can talk to it.
1488+
1489+ Note that instantiating the class creates the fork and sets it up in the
1490+ reactor.
1491+ """
1492+
1493+ implements(interfaces.IProcessTransport)
1494+
1495+ # Design decisions
1496+ # [Decision #a]
1497+ # Inherit from process.BaseProcess
1498+ # This seems slightly risky, as process.BaseProcess is actually
1499+ # imported from twisted.internet._baseprocess.BaseProcess. The
1500+ # real-world Process then actually inherits from process._BaseProcess
1501+ # I've also had to copy a fair amount from the actual Process
1502+ # command.
1503+ # One option would be to inherit from process.Process, and just
1504+ # override stuff like __init__ and reapProcess which I don't want to
1505+ # do in the same way. (Is it ok not to call your Base classes
1506+ # __init__ if you don't want to do that exact work?)
1507+ def __init__(self, reactor, executable, args, environment, proto):
1508+ process.BaseProcess.__init__(self, proto)
1509+ # Map from standard file descriptor to the associated pipe
1510+ self.pipes = {}
1511+ pid, path, sock = self._spawn(executable, args, environment)
1512+ self._fifo_path = path
1513+ self.pid = pid
1514+ self.process_sock = sock
1515+ self._fifo_path = path
1516+ self._connectSpawnToReactor(reactor)
1517+ if self.proto is not None:
1518+ self.proto.makeConnection(self)
1519+
1520+ def _sendMessageToService(self, message):
1521+ """Send a message to the Forking service and get the response"""
1522+ path = config.codehosting.forking_daemon_socket
1523+ client_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1524+ log.msg('Connecting to Forking Service @ socket: %s for %r'
1525+ % (path, message))
1526+ try:
1527+ client_sock.connect(path)
1528+ client_sock.sendall(message)
1529+ # We define the requests to be no bigger than 1kB. (For now)
1530+ response = client_sock.recv(1024)
1531+ except socket.error, e:
1532+ # TODO: What exceptions should be raised?
1533+ # Raising the raw exception seems to kill the twisted reactor
1534+ # Note that if the connection is refused, we *could* just
1535+ # fall back on a regular 'spawnProcess' call.
1536+ log.err('Connection failed: %s' % (e,))
1537+ raise
1538+ if response.startswith("FAILURE"):
1539+ raise RuntimeError('Failed to send message: %r' % (response,))
1540+ return response, client_sock
1541+
1542+ def _spawn(self, executable, args, environment):
1543+ """Start the new process.
1544+
1545+ This talks to the ForkingSessionService and requests a new process be
1546+ started. Similar to what Process.__init__/_fork would do.
1547+
1548+ :return: The pid, communication directory, and request socket.
1549+ """
1550+ assert executable == 'bzr', executable # Maybe .endswith()
1551+ assert args[0] == 'bzr', args[0]
1552+ command_str = ' '.join(args[1:])
1553+ message = ['fork-env %s\n' % (' '.join(args[1:]),)]
1554+ for key, value in environment.iteritems():
1555+ # XXX: Currently we only pass BZR_EMAIL, should we be passing
1556+ # everything else? Note that many won't be handled properly,
1557+ # since the process is already running.
1558+ if key != 'BZR_EMAIL':
1559+ continue
1560+ message.append('%s: %s\n' % (key, value))
1561+ message.append('end\n')
1562+ message = ''.join(message)
1563+ response, sock = self._sendMessageToService(message)
1564+ if response.startswith('FAILURE'):
1565+ # TODO: Is there a better error to raise?
1566+ raise RuntimeError("Failed while sending message to forking "
1567+ "service. message: %r, failure: %r"
1568+ % (message, response))
1569+ ok, pid, path, tail = response.split('\n')
1570+ assert ok == 'ok'
1571+ assert tail == ''
1572+ pid = int(pid)
1573+ log.msg('Forking returned pid: %d, path: %s' % (pid, path))
1574+ return pid, path, sock
1575+
1576+ def _connectSpawnToReactor(self, reactor):
1577+ stdin_path = os.path.join(self._fifo_path, 'stdin')
1578+ stdout_path = os.path.join(self._fifo_path, 'stdout')
1579+ stderr_path = os.path.join(self._fifo_path, 'stderr')
1580+ child_stdin_fd = os.open(stdin_path, os.O_WRONLY)
1581+ self.pipes[0] = process.ProcessWriter(reactor, self, 0,
1582+ child_stdin_fd)
1583+ child_stdout_fd = os.open(stdout_path, os.O_RDONLY)
1584+ # forceReadHack=True ? Used in process.py doesn't seem to be needed
1585+ # here
1586+ self.pipes[1] = process.ProcessReader(reactor, self, 1,
1587+ child_stdout_fd)
1588+ child_stderr_fd = os.open(stderr_path, os.O_RDONLY)
1589+ self.pipes[2] = process.ProcessReader(reactor, self, 2,
1590+ child_stderr_fd)
1591+ # Note: _exiter forms a GC cycle, since it points to us, and we hold a
1592+ # reference to it
1593+ self._exiter = _WaitForExit(reactor, self, self.process_sock)
1594+ self.pipes['exit'] = self._exiter
1595+
1596+ def _getReason(self, status):
1597+ # Copied from twisted.internet.process._BaseProcess
1598+ exitCode = sig = None
1599+ if os.WIFEXITED(status):
1600+ exitCode = os.WEXITSTATUS(status)
1601+ else:
1602+ sig = os.WTERMSIG(status)
1603+ if exitCode or sig:
1604+ return error.ProcessTerminated(exitCode, sig, status)
1605+ return error.ProcessDone(status)
1606+
1607+ def signalProcess(self, signalID):
1608+ """
1609+ Send the given signal C{signalID} to the process. It'll translate a
1610+ few signals ('HUP', 'STOP', 'INT', 'KILL', 'TERM') from a string
1611+ representation to its int value, otherwise it'll pass directly the
1612+ value provided
1613+
1614+ @type signalID: C{str} or C{int}
1615+ """
1616+ # Copied from twisted.internet.process._BaseProcess
1617+ if signalID in ('HUP', 'STOP', 'INT', 'KILL', 'TERM'):
1618+ signalID = getattr(signal, 'SIG%s' % (signalID,))
1619+ if self.pid is None:
1620+ raise process.ProcessExitedAlready()
1621+ os.kill(self.pid, signalID)
1622+
1623+ # Implemented because conch.ssh.session uses it, the Process implementation
1624+ # ignores writes if channel '0' is not available
1625+ def write(self, data):
1626+ self.pipes[0].write(data)
1627+
1628+ def writeToChild(self, childFD, data):
1629+ # Copied from twisted.internet.process.Process
1630+ self.pipes[childFD].write(data)
1631+
1632+ def closeChildFD(self, childFD):
1633+ if childFD in self.pipes:
1634+ self.pipes[childFD].loseConnection()
1635+
1636+ def closeStdin(self):
1637+ self.closeChildFD(0)
1638+
1639+ def closeStdout(self):
1640+ self.closeChildFD(1)
1641+
1642+ def closeStderr(self):
1643+ self.closeChildFD(2)
1644+
1645+ def loseConnection(self):
1646+ self.closeStdin()
1647+ self.closeStdout()
1648+ self.closeStderr()
1649+
1650+ # Implemented because ProcessWriter/ProcessReader want to call it
1651+ # Copied from twisted.internet.Process
1652+ def childDataReceived(self, name, data):
1653+ self.proto.childDataReceived(name, data)
1654+
1655+ # Implemented because ProcessWriter/ProcessReader want to call it
1656+ # Copied from twisted.internet.Process
1657+ def childConnectionLost(self, childFD, reason):
1658+ close = getattr(self.pipes[childFD], 'close', None)
1659+ if close is not None:
1660+ close()
1661+ else:
1662+ os.close(self.pipes[childFD].fileno())
1663+ del self.pipes[childFD]
1664+ try:
1665+ self.proto.childConnectionLost(childFD)
1666+ except:
1667+ log.err()
1668+ self.maybeCallProcessEnded()
1669+
1670+ # Implemented because of childConnectionLost
1671+ # Adapted from twisted.internet.Process
1672+ # Note: Process.maybeCallProcessEnded() tries to reapProcess() at this
1673+ # point, but the daemon will be doing the reaping for us (we can't
1674+ # because the process isn't a direct child.)
1675+ def maybeCallProcessEnded(self):
1676+ if self.pipes:
1677+ # Not done if we still have open pipes
1678+ return
1679+ if not self.lostProcess:
1680+ return
1681+ process.BaseProcess.maybeCallProcessEnded(self)
1682+ # pauseProducing, present in process.py, not a IProcessTransport interface
1683+
1684+
1685 class ExecOnlySession(DoNothingSession):
1686 """Conch session that only allows executing commands."""
1687
1688@@ -58,7 +298,7 @@
1689 notify(BazaarSSHClosed(self.avatar))
1690 try:
1691 self._transport.signalProcess('HUP')
1692- except (OSError, ProcessExitedAlready):
1693+ except (OSError, process.ProcessExitedAlready):
1694 pass
1695 self._transport.loseConnection()
1696
1697@@ -81,8 +321,7 @@
1698 except ForbiddenCommand, e:
1699 self.errorWithMessage(protocol, str(e) + '\r\n')
1700 return
1701- log.msg('Running: %r, %r, %r'
1702- % (executable, arguments, self.environment))
1703+ log.msg('Running: %r, %r' % (executable, arguments))
1704 if self._transport is not None:
1705 log.err(
1706 "ERROR: %r already running a command on transport %r"
1707@@ -91,8 +330,12 @@
1708 # violation. Apart from this line and its twin, this class knows
1709 # nothing about Bazaar.
1710 notify(BazaarSSHStarted(self.avatar))
1711- self._transport = self.reactor.spawnProcess(
1712- protocol, executable, arguments, env=self.environment)
1713+ self._transport = self._spawn(protocol, executable, arguments,
1714+ env=self.environment)
1715+
1716+ def _spawn(self, protocol, executable, arguments, env):
1717+ return self.reactor.spawnProcess(protocol, executable, arguments,
1718+ env=env)
1719
1720 def getCommandToRun(self, command):
1721 """Return the command that will actually be run given `command`.
1722@@ -144,21 +387,65 @@
1723 % {'user_id': self.avatar.user_id})
1724
1725
1726+class ForkingRestrictedExecOnlySession(RestrictedExecOnlySession):
1727+ """Use the Forking Service instead of spawnProcess."""
1728+
1729+ def _simplifyEnvironment(self, env):
1730+ """Pull out the bits of the environment we want to pass along."""
1731+ env = {}
1732+ for env_var in ['BZR_EMAIL']:
1733+ if env_var in self.environment:
1734+ env[env_var] = self.environment[env_var]
1735+ return env
1736+
1737+ def getCommandToFork(self, executable, arguments, env):
1738+ assert executable.endswith('/bin/py')
1739+ assert arguments[0] == executable
1740+ assert arguments[1].endswith('/bzr')
1741+ executable = 'bzr'
1742+ arguments = arguments[1:]
1743+ arguments[0] = 'bzr'
1744+ env = self._simplifyEnvironment(env)
1745+ return executable, arguments, env
1746+
1747+ def _spawn(self, protocol, executable, arguments, env):
1748+ # When spawning, adapt the idea of "bin/py .../bzr" to just using "bzr"
1749+ # and the executable
1750+ executable, arguments, env = self.getCommandToFork(executable,
1751+ arguments, env)
1752+ return ForkedProcessTransport(self.reactor, executable,
1753+ arguments, env, protocol)
1754+
1755+
1756 def launch_smart_server(avatar):
1757 from twisted.internet import reactor
1758
1759- command = (
1760- "%(root)s/bin/py %(bzr)s lp-serve --inet "
1761- % {'root': config.root, 'bzr': get_bzr_path()})
1762+ python_command = "%(root)s/bin/py %(bzr)s" % {
1763+ 'root': config.root,
1764+ 'bzr': get_bzr_path(),
1765+ }
1766+ args = " lp-serve --inet %(user_id)s"
1767+ command = python_command + args
1768+ forking_command = "bzr" + args
1769
1770 environment = dict(os.environ)
1771
1772 # Extract the hostname from the supermirror root config.
1773 hostname = urlparse.urlparse(config.codehosting.supermirror_root)[1]
1774 environment['BZR_EMAIL'] = '%s@%s' % (avatar.username, hostname)
1775- return RestrictedExecOnlySession(
1776+ klass = RestrictedExecOnlySession
1777+ # TODO: Use a FeatureFlag to enable this in a more fine-grained approach.
1778+ # If the forking daemon has been spawned, then we can use it if the
1779+ # feature is set to true for the given user, etc.
1780+ # A global config is a good first step, but does require restarting
1781+ # the service to change the flag. 'config' doesn't support SIGHUP.
1782+ # For now, restarting the service is necessary to enabled/disable the
1783+ # forking daemon.
1784+ if config.codehosting.use_forking_daemon:
1785+ klass = ForkingRestrictedExecOnlySession
1786+ return klass(
1787 avatar,
1788 reactor,
1789 'bzr serve --inet --directory=/ --allow-writes',
1790- command + ' %(user_id)s',
1791+ command,
1792 environment=environment)
1793
1794=== modified file 'lib/lp/codehosting/sshserver/tests/test_session.py'
1795--- lib/lp/codehosting/sshserver/tests/test_session.py 2010-08-20 20:31:18 +0000
1796+++ lib/lp/codehosting/sshserver/tests/test_session.py 2010-10-19 22:56:30 +0000
1797@@ -5,6 +5,7 @@
1798
1799 __metaclass__ = type
1800
1801+import socket
1802 import unittest
1803
1804 from twisted.conch.interfaces import ISession
1805@@ -21,9 +22,12 @@
1806 from lp.codehosting.sshserver.session import (
1807 ExecOnlySession,
1808 ForbiddenCommand,
1809+ ForkingRestrictedExecOnlySession,
1810 RestrictedExecOnlySession,
1811+ _WaitForExit,
1812 )
1813 from lp.codehosting.tests.helpers import AvatarTestCase
1814+from lp.testing import TestCase
1815
1816
1817 class MockReactor:
1818@@ -40,6 +44,9 @@
1819 usePTY, childFDs))
1820 return MockProcessTransport(executable)
1821
1822+ def addReader(self, reader):
1823+ self.log.append(('addReader', reader))
1824+
1825
1826 class MockSSHSession:
1827 """Just enough of SSHSession to allow checking of reporting to stderr."""
1828@@ -60,6 +67,7 @@
1829 self._executable = executable
1830 self.log = []
1831 self.session = MockSSHSession(self.log)
1832+ self.status = None
1833
1834 def closeStdin(self):
1835 self.log.append(('closeStdin',))
1836@@ -67,6 +75,9 @@
1837 def loseConnection(self):
1838 self.log.append(('loseConnection',))
1839
1840+ def childConnectionLost(self, childFD, reason=None):
1841+ self.log.append(('childConnectionLost', childFD, reason))
1842+
1843 def signalProcess(self, signal):
1844 if self._executable == 'raise-os-error':
1845 raise OSError()
1846@@ -77,6 +88,39 @@
1847 def write(self, data):
1848 self.log.append(('write', data))
1849
1850+ def processEnded(self, status):
1851+ self.log.append(('processEnded', status))
1852+
1853+
1854+class Test_WaitForExit(TestCase):
1855+
1856+ def setUp(self):
1857+ TestCase.setUp(self)
1858+ self.reactor = MockReactor()
1859+ self.proc = MockProcessTransport('executable')
1860+ sock = socket.socket()
1861+ self.exiter = _WaitForExit(self.reactor, self.proc, sock)
1862+
1863+ def test__init__starts_reading(self):
1864+ self.assertEqual([('addReader', self.exiter)], self.reactor.log)
1865+
1866+ def test_dataReceived_ends_cleanly(self):
1867+ self.exiter.dataReceived('exited\n0\n')
1868+ self.assertEqual([('processEnded', 0)], self.proc.log)
1869+
1870+ def test_dataReceived_ends_with_errno(self):
1871+ self.exiter.dataReceived('exited\n256\n')
1872+ self.assertEqual([('processEnded', 256)], self.proc.log)
1873+
1874+ def test_dataReceived_bad_data(self):
1875+ # Note: The dataReceived code calls 'log.err' which ends up getting
1876+ # printed during the test run. How do I suppress that or even
1877+ # better, check that it does so?
1878+ # self.flushLoggedErrors() doesn't seem to do anything.
1879+ self.exiter.dataReceived('bogus\n')
1880+ self.assertEqual([('childConnectionLost', 'exit', 'invalid data'),
1881+ ('processEnded', (255 << 8))], self.proc.log)
1882+
1883
1884 class TestExecOnlySession(AvatarTestCase):
1885 """Tests for ExecOnlySession.
1886@@ -340,6 +384,35 @@
1887 self.assertRaises(
1888 ForbiddenCommand, session.getCommandToRun, 'rm -rf /')
1889
1890+ def test_avatarAdaptsToOnlyRestrictedSession(self):
1891+ config.push('codehosting-no-forking',
1892+ "[codehosting]\nuse_forking_daemon: False\n")
1893+ self.addCleanup(config.pop, 'codehosting-no-forking')
1894+ session = ISession(self.avatar)
1895+ self.failIf(isinstance(session, ForkingRestrictedExecOnlySession),
1896+ "ISession(avatar) shouldn't adapt to "
1897+ " ForkingRestrictedExecOnlySession when forking is disabled. ")
1898+
1899+ def test_avatarAdaptsToForkingRestrictedExecOnlySession(self):
1900+ config.push('codehosting-forking',
1901+ "[codehosting]\nuse_forking_daemon: True\n")
1902+ self.addCleanup(config.pop, 'codehosting-forking')
1903+ session = ISession(self.avatar)
1904+ self.failUnless(
1905+ isinstance(session, ForkingRestrictedExecOnlySession),
1906+ "ISession(avatar) doesn't adapt to "
1907+ " ForkingRestrictedExecOnlySession. "
1908+ "Got %r instead." % (session,))
1909+ executable, arguments = session.getCommandToRun(
1910+ 'bzr serve --inet --directory=/ --allow-writes')
1911+ executable, arguments, env = session.getCommandToFork(
1912+ executable, arguments, session.environment)
1913+ self.assertEqual('bzr', executable)
1914+ self.assertEqual(
1915+ ['bzr', 'lp-serve',
1916+ '--inet', str(self.avatar.user_id)],
1917+ list(arguments))
1918+
1919
1920 def test_suite():
1921 return unittest.TestLoader().loadTestsFromName(__name__)
1922
1923=== modified file 'lib/lp/codehosting/tests/test_acceptance.py'
1924--- lib/lp/codehosting/tests/test_acceptance.py 2010-10-17 01:26:56 +0000
1925+++ lib/lp/codehosting/tests/test_acceptance.py 2010-10-19 22:56:30 +0000
1926@@ -8,6 +8,9 @@
1927 import atexit
1928 import os
1929 import re
1930+import signal
1931+import subprocess
1932+import sys
1933 import unittest
1934 import xmlrpclib
1935
1936@@ -51,9 +54,58 @@
1937 from lp.testing import TestCaseWithFactory
1938
1939
1940+class ForkingServerForTests(object):
1941+ """Map starting/stopping a LPForkingService with setUp() and tearDown()."""
1942+
1943+ def __init__(self):
1944+ self.process = None
1945+ self.socket_path = None
1946+
1947+ def setUp(self):
1948+ bzr_path = get_bzr_path()
1949+ BZR_PLUGIN_PATH = get_BZR_PLUGIN_PATH_for_subprocess()
1950+ env = os.environ.copy()
1951+ env['BZR_PLUGIN_PATH'] = BZR_PLUGIN_PATH
1952+ # TODO: We probably want to use a random disk path for
1953+ # forking_daemon_socket, but we need to update config so that the
1954+ # CodeHosting service can find it.
1955+ # The main problem is that CodeHostingTac seems to start a tac
1956+ # server directly from the disk configs, and doesn't use the
1957+ # in-memory config. So we can't just override the memory
1958+ # settings, we have to somehow pass it a new config-on-disk to
1959+ # use.
1960+ self.socket_path = config.codehosting.forking_daemon_socket
1961+ process = subprocess.Popen(
1962+ [sys.executable, bzr_path, 'launchpad-forking-service',
1963+ '--path', self.socket_path,
1964+ ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
1965+ self.process = process
1966+ # Wait for it to indicate it is running
1967+ # The first line should be "Preloading" indicating it is ready
1968+ preloading_line = process.stderr.readline()
1969+ # The next line is the "Listening on socket" line
1970+ socket_line = process.stderr.readline()
1971+ # Now it is ready
1972+
1973+ def tearDown(self):
1974+ # SIGTERM is the graceful exit request, potentially we could wait a bit
1975+ # and send something stronger?
1976+ if self.process is not None and self.process.poll() is None:
1977+ os.kill(self.process.pid, signal.SIGTERM)
1978+ self.process.wait()
1979+ self.process = None
1980+ # We want to make sure the socket path has been cleaned up, so that
1981+ # future runs can work correctly
1982+ if os.path.exists(self.socket_path):
1983+ # Should there be a warning/error here?
1984+ os.remove(self.socket_path)
1985+
1986+
1987+
1988 class SSHServerLayer(ZopelessAppServerLayer):
1989
1990 _tac_handler = None
1991+ _forker_service = None
1992
1993 @classmethod
1994 def getTacHandler(cls):
1995@@ -64,18 +116,27 @@
1996 return cls._tac_handler
1997
1998 @classmethod
1999+ def getForker(cls):
2000+ if cls._forker_service is None:
2001+ cls._forker_service = ForkingServerForTests()
2002+ return cls._forker_service
2003+
2004+ @classmethod
2005 @profiled
2006 def setUp(cls):
2007 tac_handler = SSHServerLayer.getTacHandler()
2008 tac_handler.setUp()
2009 SSHServerLayer._reset()
2010 atexit.register(tac_handler.tearDown)
2011+ forker = SSHServerLayer.getForker()
2012+ forker.setUp()
2013
2014 @classmethod
2015 @profiled
2016 def tearDown(cls):
2017 SSHServerLayer._reset()
2018 SSHServerLayer.getTacHandler().tearDown()
2019+ SSHServerLayer.getForker().tearDown()
2020
2021 @classmethod
2022 @profiled
2023
2024=== modified file 'lib/lp/codehosting/tests/test_lpserve.py'
2025--- lib/lp/codehosting/tests/test_lpserve.py 2010-08-20 20:31:18 +0000
2026+++ lib/lp/codehosting/tests/test_lpserve.py 2010-10-19 22:56:30 +0000
2027@@ -1,32 +1,21 @@
2028-# Copyright 2009 Canonical Ltd. This software is licensed under the
2029+# Copyright 2009-2010 Canonical Ltd. This software is licensed under the
2030 # GNU Affero General Public License version 3 (see the file LICENSE).
2031
2032 """Tests for the lp-serve plugin."""
2033
2034 __metaclass__ = type
2035
2036-import os
2037-import re
2038-from subprocess import PIPE
2039-import unittest
2040-
2041 from bzrlib import (
2042 errors,
2043- osutils,
2044 )
2045 from bzrlib.smart import medium
2046-from bzrlib.tests import TestCaseWithTransport
2047 from bzrlib.transport import remote
2048+from bzrlib.plugins.lpserve.test_lpserve import TestCaseWithSubprocess
2049
2050-from canonical.config import config
2051-from lp.codehosting import (
2052- get_bzr_path,
2053- get_BZR_PLUGIN_PATH_for_subprocess,
2054- )
2055 from lp.codehosting.bzrutils import make_error_utility
2056
2057
2058-class TestLaunchpadServe(TestCaseWithTransport):
2059+class TestLaunchpadServe(TestCaseWithSubprocess):
2060 """Tests for the lp-serve plugin.
2061
2062 Most of the helper methods here are copied from bzrlib.tests and
2063@@ -38,59 +27,6 @@
2064 """Assert that a server process finished cleanly."""
2065 self.assertEqual((0, '', ''), tuple(result))
2066
2067- def get_python_path(self):
2068- """Return the path to the Python interpreter."""
2069- return '%s/bin/py' % config.root
2070-
2071- def start_bzr_subprocess(self, process_args, env_changes=None,
2072- working_dir=None):
2073- """Start bzr in a subprocess for testing.
2074-
2075- Copied and modified from `bzrlib.tests.TestCase.start_bzr_subprocess`.
2076- This version removes some of the skipping stuff, some of the
2077- irrelevant comments (e.g. about win32) and uses Launchpad's own
2078- mechanisms for getting the path to 'bzr'.
2079-
2080- Comments starting with 'LAUNCHPAD' are comments about our
2081- modifications.
2082- """
2083- if env_changes is None:
2084- env_changes = {}
2085- env_changes['BZR_PLUGIN_PATH'] = get_BZR_PLUGIN_PATH_for_subprocess()
2086- old_env = {}
2087-
2088- def cleanup_environment():
2089- for env_var, value in env_changes.iteritems():
2090- old_env[env_var] = osutils.set_or_unset_env(env_var, value)
2091-
2092- def restore_environment():
2093- for env_var, value in old_env.iteritems():
2094- osutils.set_or_unset_env(env_var, value)
2095-
2096- cwd = None
2097- if working_dir is not None:
2098- cwd = osutils.getcwd()
2099- os.chdir(working_dir)
2100-
2101- # LAUNCHPAD: Because of buildout, we need to get a custom Python
2102- # binary, not sys.executable.
2103- python_path = self.get_python_path()
2104- # LAUNCHPAD: We can't use self.get_bzr_path(), since it'll find
2105- # lib/bzrlib, rather than the path to sourcecode/bzr/bzr.
2106- bzr_path = get_bzr_path()
2107- try:
2108- cleanup_environment()
2109- command = [python_path, bzr_path]
2110- command.extend(process_args)
2111- process = self._popen(
2112- command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
2113- finally:
2114- restore_environment()
2115- if cwd is not None:
2116- os.chdir(cwd)
2117-
2118- return process
2119-
2120 def finish_lpserve_subprocess(self, process):
2121 """Shut down the server process.
2122
2123@@ -169,4 +105,10 @@
2124
2125
2126 def test_suite():
2127- return unittest.TestLoader().loadTestsFromName(__name__)
2128+ from bzrlib import tests
2129+ from bzrlib.plugins import lpserve
2130+
2131+ loader = tests.TestLoader()
2132+ suite = loader.loadTestsFromName(__name__)
2133+ suite = lpserve.load_tests(suite, lpserve, loader)
2134+ return suite