Merge lp:~jameinel/launchpad/lp-service into lp:launchpad/db-devel

Proposed by John A Meinel
Status: Merged
Approved by: Michael Hudson-Doyle
Approved revision: no longer in the source branch.
Merged at revision: 9914
Proposed branch: lp:~jameinel/launchpad/lp-service
Merge into: lp:launchpad/db-devel
Diff against target: 2134 lines (+1776/-86)
10 files modified
Makefile (+2/-2)
bzrplugins/lpserve/__init__.py (+737/-4)
bzrplugins/lpserve/test_lpserve.py (+534/-0)
configs/development/launchpad-lazr.conf (+1/-0)
lib/canonical/config/schema-lazr.conf (+12/-0)
lib/canonical/launchpad/scripts/runlaunchpad.py (+47/-0)
lib/lp/codehosting/sshserver/session.py (+299/-12)
lib/lp/codehosting/sshserver/tests/test_session.py (+73/-0)
lib/lp/codehosting/tests/test_acceptance.py (+61/-0)
lib/lp/codehosting/tests/test_lpserve.py (+10/-68)
To merge this branch: bzr merge lp:~jameinel/launchpad/lp-service
Reviewer Review Type Date Requested Status
Michael Hudson-Doyle Approve
Gavin Panella (community) Abstain
Jonathan Lange Pending
Review via email: mp+37531@code.launchpad.net

Commit message

Implement LaunchpadForkingService to speed up bzr+ssh connection times.

Description of the change

Retargetted from https://code.edge.launchpad.net/~jameinel/launchpad/lp-service/+merge/35877
because I accidentally started from db-devel in the beginning. This patch includes all suggestions from the original submission.

The goal of this submission is to improve the time for "bzr+ssh" to connect and be useful. The new method must be activated by setting:
  [codehosting]
  use_forking_server = True

It is set to be enabled in "development" mode, but the default is still disabled. I can't give a recommendation for the production config, because the branch is private.

This implements a new service (LaunchpadForkingService). It sits on a socket and waits for a request to 'fork <command>'. When received, it creates a stdin/stdout/stderr fifo on disk, and forks itself, running 'run_bzr_*(command)', rather than using 'exec()' which requires bootstrapping the python process.

The benefit is seen with: time echo hello | ssh localhost bzr serve --inet ...

Without this patch, it is 2.5s to serve on the loopback. With this patch, it is 0.25s.

I'm very happy to work with someone to smooth out the finer points of this submission. I tried to be explicit about places in the code that I had a decision to make, and why I chose the method I did.

I didn't use the FeatureFlag system, because setting it up via the config file was a lot more straightforward. (globally enabling/disabling the functionality).
As near as I can tell, there should be no impact of rolling this code out on production, if use_forking_daemon: False.

(The make run_codehosting will not actually spawn the daemon, and the twisted Conch code just gets an 'if ' check to see that it should use the old code path.)

I haven't run the full test suite, but I have:
 1) Run all of the locally relevant tests "bzr selftest -s bt.lp_serve' and 'bin/test
    lp.codehosting.sshserver
 2) Manually started the sftp service and run commands through the local instance. Both with
    and without the forking service enabled. (And I have confirmed that it isn't used when
    disabled, and is used when enabled, etc.)

To post a comment you must log in.
Revision history for this message
Michael Hudson-Doyle (mwhudson) wrote :

Hi,

This basically looks great and works in local testing. I don't have time for a line-by-line review right now though :(

Cheers,
mwh

Revision history for this message
Michael Hudson-Doyle (mwhudson) wrote :
Download full text (93.3 KiB)

Wow, what a branch. I've been quite picky in this, that's not because
I hate you <wink> but because it's subtle code and has potential to
cause issues, so I wanted to make sure all the comments make sense and
there's no redundant code, etc.

I worry slightly about what would happen if the forking service fell
over -- but clearly, we rely on the sftp server itself staying up, so
this probably isn't such a big deal. I guess we also need to make
sure the forking service is running in production.

All that said, I have no real problems with the code and look forward
to getting my branches onto and off of Launchpad that bit faster!

I'm marking this needs information because I want to see a response to my
comments. But I don't think any of them are deep.

> === modified file 'Makefile'
> --- Makefile 2010-09-07 18:15:01 +0000
> +++ Makefile 2010-10-06 00:19:31 +0000
> @@ -253,7 +253,7 @@
>
> run_codehosting: check_schema inplace stop
> $(RM) thread*.request
> - bin/run -r librarian,sftp,codebrowse -i $(LPCONFIG)
> + bin/run -r librarian,sftp,forker,codebrowse -i $(LPCONFIG)
>
>
> start_librarian: compile

You should add the forker service to the run_all target too.

> === added directory 'bzrplugins/lpserve'
> === renamed file 'bzrplugins/lpserve.py' => 'bzrplugins/lpserve/__init__.py'
> --- bzrplugins/lpserve.py 2010-04-19 06:35:23 +0000
> +++ bzrplugins/lpserve/__init__.py 2010-10-06 00:19:31 +0000
> @@ -8,15 +8,33 @@
>
> __metaclass__ = type
>
> -__all__ = ['cmd_launchpad_server']
> -
> -
> +__all__ = ['cmd_launchpad_server',
> + 'cmd_launchpad_forking_service',
> + ]

This shoudl be formatted like this:

__all__ = [
    'cmd_launchpad_server',
    'cmd_launchpad_forking_service',
    ]

> +
> +
> +import errno
> +import os
> import resource
> +import shlex
> +import shutil
> +import signal
> +import socket
> import sys
> +import tempfile
> +import threading
> +import time
>
> from bzrlib.commands import Command, register_command
> from bzrlib.option import Option
> -from bzrlib import lockdir, ui
> +from bzrlib import (
> + commands,
> + errors,
> + lockdir,
> + osutils,
> + trace,
> + ui,
> + )
>
> from bzrlib.smart import medium, server
> from bzrlib.transport import get_transport
> @@ -110,3 +128,724 @@
>
>
> register_command(cmd_launchpad_server)
> +
> +
> +class LPForkingService(object):
> + """A service that can be asked to start a new bzr subprocess via fork.
> +
> + The basic idea is that python startup is very expensive. For example, the
> + original 'lp-serve' command could take 2.5s just to start up, before any
> + actual actions could be performed.

Well, it's not really Python startup, its more the importing thats
slow. Maybe phrase this as "starting Python and importing Launchpad
is very...".

> + This class provides a service sitting on a socket, which can then be
> + requested to fork and run a given bzr command.
> +
> + Clients connect to the socket and make a simple request, which then
> + receives a response. The possible requests are:
> +
> + "hello\n": Trigger a heartbeat to report that the program is still
> + runnin...

review: Needs Information
Revision history for this message
Martin Pool (mbp) wrote :

notes from talking this over with spm:

 * at the moment you can see in the process listing which branch is
being accessed; it would be very nice to keep that; but at least we
should log the pid/user/branch
 * can run this on staging, then edge, then lpnet
 * propose changes into lp:lp-production-configs to gradually turn it on
 * might want an rt describing how to monitor the new daemon process,
which just needs to mention the socket location and the hello command;
they can write the script from there
 * francis should formally tell tom and charlie we'll be making this change

Revision history for this message
Robert Collins (lifeless) wrote :

On Wed, Oct 6, 2010 at 9:43 PM, Martin Pool <email address hidden> wrote:

>  * can run this on staging, then edge, then lpnet

There is no edge.

>  * francis should formally tell tom and charlie we'll be making this change

The rt describing should mention this; it will get enough airplay ;)

We make architectural changes -all- the time, there's no need to get
particularly cautious about this one.

Revision history for this message
Jonathan Lange (jml) wrote :

On Wed, Oct 6, 2010 at 9:52 AM, Robert Collins
<email address hidden> wrote:
> On Wed, Oct 6, 2010 at 9:43 PM, Martin Pool <email address hidden> wrote:
>
>>  * can run this on staging, then edge, then lpnet
>
> There is no edge.

Yeah there is.

$ ssh bazaar.edge.launchpad.net
No shells on this server.
Connection to bazaar.edge.launchpad.net closed.

jml

Revision history for this message
Robert Collins (lifeless) wrote :

It will be gone extremely soon. I want people to stop thinking of
'edge' as a way to test things.

Revision history for this message
Michael Hudson-Doyle (mwhudson) wrote :

On Wed, 06 Oct 2010 08:43:44 -0000, Martin Pool <email address hidden> wrote:
> notes from talking this over with spm:
>
> * at the moment you can see in the process listing which branch is
> being accessed; it would be very nice to keep that; but at least we
> should log the pid/user/branch

This isn't affected by this branch, although the process name ends up
being a bit unwieldy:

21266 pts/1 S+ 0:00 /usr/bin/python /home/mwh/canonical/checkouts/trunk/eggs/bzr-2.2.0-py2.6-linux-x86_64.egg/EGG-INFO/scripts/bzr launchpad-forking-service --path /var/tmp/launchpad_forking_service.sock BRANCH:~mark/upstart/branch

Maybe inserting a call to setproctitle in become_child would be useful,
especially as we have the dependency already -- one piece of information
we've lost is the user who has connected, although we can probably
figure that out from logs still.

> * can run this on staging, then edge, then lpnet
> * propose changes into lp:lp-production-configs to gradually turn it on
> * might want an rt describing how to monitor the new daemon process,
> which just needs to mention the socket location and the hello command;
> they can write the script from there
> * francis should formally tell tom and charlie we'll be making this change

I still vaguely wonder about controlling this with a feature flag. The
thing is, the codehosting systems don't connect to the database
currently, and that's actually kind of nice in some ways. We could add
an XML-RPC method to query a flag, and have the ssh daemon check that
every 30s or so -- I don't think we want an XML-RPC call delaying the
launch of the lp-servce process on every connection.

Cheers,
mwh

Revision history for this message
John A Meinel (jameinel) wrote :
Download full text (35.4 KiB)

For starters, thanks for the thorough review. It is really nice to get feedback on it, and you have certainly thought a lot about the code.

On 10/6/2010 12:02 AM, Michael Hudson-Doyle wrote:
> Review: Needs Information
> Wow, what a branch. I've been quite picky in this, that's not because
> I hate you <wink> but because it's subtle code and has potential to
> cause issues, so I wanted to make sure all the comments make sense and
> there's no redundant code, etc.
>
> I worry slightly about what would happen if the forking service fell
> over -- but clearly, we rely on the sftp server itself staying up, so
> this probably isn't such a big deal. I guess we also need to make
> sure the forking service is running in production.

We could trap failures to connect to the forking server and fall back to regular spawnProcess. The code is already a simple if/else block. However, I wouldn't really want silent degradation of connection. If we want it, it could probably be done with:

transport = None

if config.codehosting.use_forking_server:
  try:
    transport = ...
  except ???:
     pass # We failed, we'll try again

if transport is None:
  transport = reactor.spawnProcess(...)

>
> All that said, I have no real problems with the code and look forward
> to getting my branches onto and off of Launchpad that bit faster!
>
> I'm marking this needs information because I want to see a response to my
> comments. But I don't think any of them are deep.
>
>> === modified file 'Makefile'
>> --- Makefile 2010-09-07 18:15:01 +0000
>> +++ Makefile 2010-10-06 00:19:31 +0000
>> @@ -253,7 +253,7 @@
>>
>> run_codehosting: check_schema inplace stop
>> $(RM) thread*.request
>> - bin/run -r librarian,sftp,codebrowse -i $(LPCONFIG)
>> + bin/run -r librarian,sftp,forker,codebrowse -i $(LPCONFIG)
>>
>>
>> start_librarian: compile
>
> You should add the forker service to the run_all target too.

Done.

>
>> === added directory 'bzrplugins/lpserve'
>> === renamed file 'bzrplugins/lpserve.py' => 'bzrplugins/lpserve/__init__.py'
>> --- bzrplugins/lpserve.py 2010-04-19 06:35:23 +0000
>> +++ bzrplugins/lpserve/__init__.py 2010-10-06 00:19:31 +0000
>> @@ -8,15 +8,33 @@
>>
>> __metaclass__ = type
>>
>> -__all__ = ['cmd_launchpad_server']
>> -
>> -
>> +__all__ = ['cmd_launchpad_server',
>> + 'cmd_launchpad_forking_service',
>> + ]
>
> This shoudl be formatted like this:
>
> __all__ = [
> 'cmd_launchpad_server',
> 'cmd_launchpad_forking_service',
> ]

check

...

>> +class LPForkingService(object):
>> + """A service that can be asked to start a new bzr subprocess via fork.
>> +
>> + The basic idea is that python startup is very expensive. For example, the
>> + original 'lp-serve' command could take 2.5s just to start up, before any
>> + actual actions could be performed.
>
> Well, it's not really Python startup, its more the importing thats
> slow. Maybe phrase this as "starting Python and importing Launchpad
> is very...".
>
>> + This class provides a service sitting on a socket, which can then be
>> + requested to fork and run a given bzr command.
>> +
>> + Clients connect to the socket and make a simp...

Revision history for this message
Martin Pool (mbp) wrote :

On 7 October 2010 07:15, Michael Hudson-Doyle <email address hidden> wrote:
> On Wed, 06 Oct 2010 08:43:44 -0000, Martin Pool <email address hidden> wrote:
>> notes from talking this over with spm:
>>
>>  * at the moment you can see in the process listing which branch is
>> being accessed; it would be very nice to keep that; but at least we
>> should log the pid/user/branch
>
> This isn't affected by this branch, although the process name ends up
> being a bit unwieldy:
>
> 21266 pts/1    S+     0:00 /usr/bin/python /home/mwh/canonical/checkouts/trunk/eggs/bzr-2.2.0-py2.6-linux-x86_64.egg/EGG-INFO/scripts/bzr launchpad-forking-service --path /var/tmp/launchpad_forking_service.sock BRANCH:~mark/upstart/branch
>
> Maybe inserting a call to setproctitle in become_child would be useful,
> especially as we have the dependency already -- one piece of information
> we've lost is the user who has connected, although we can probably
> figure that out from logs still.

Without the call to setproctitle, we would be losing this thing spm
said he liked, which is to immediately show which user and branch it
corresponds to.

To my mind it's more important to log the username, branch and pid, so
that you can see who it was both while the process is running and
afterwards. It's also nice that jam logs the rusage. But putting
them into the argv is more immediately accessible.

>>  * can run this on staging, then edge, then lpnet
>>  * propose changes into lp:lp-production-configs to gradually turn it on
>>  * might want an rt describing how to monitor the new daemon process,
>> which just needs to mention the socket location and the hello command;
>> they can write the script from there
>>  * francis should formally tell tom and charlie we'll be making this change
>
> I still vaguely wonder about controlling this with a feature flag.  The
> thing is, the codehosting systems don't connect to the database
> currently, and that's actually kind of nice in some ways.  We could add
> an XML-RPC method to query a flag, and have the ssh daemon check that
> every 30s or so -- I don't think we want an XML-RPC call delaying the
> launch of the lp-servce process on every connection.

Actually I was thinking of just having an http URL, accessible only
within the DC, that just presents all the feature flag rules in a
machine readable form: maybe json by default, perhaps others. There's
no need for xmlrpc when we just want to read, and avoiding startup
dependencies is good. Then we can have a FeatureRuleSource that gets
and parses this rather than talking to storm, and it will be fast to
start up for this type of thing; probably fast enough to run on every
request if we want to. bug 656031.

--
Martin

Revision history for this message
Michael Hudson-Doyle (mwhudson) wrote :
Download full text (3.2 KiB)

On Wed, 06 Oct 2010 22:34:40 -0000, Martin Pool <email address hidden> wrote:
> On 7 October 2010 07:15, Michael Hudson-Doyle <email address hidden> wrote:
> > On Wed, 06 Oct 2010 08:43:44 -0000, Martin Pool <email address hidden> wrote:
> >> notes from talking this over with spm:
> >>
> >>  * at the moment you can see in the process listing which branch is
> >> being accessed; it would be very nice to keep that; but at least we
> >> should log the pid/user/branch
> >
> > This isn't affected by this branch, although the process name ends up
> > being a bit unwieldy:
> >
> > 21266 pts/1    S+     0:00 /usr/bin/python /home/mwh/canonical/checkouts/trunk/eggs/bzr-2.2.0-py2.6-linux-x86_64.egg/EGG-INFO/scripts/bzr launchpad-forking-service --path /var/tmp/launchpad_forking_service.sock BRANCH:~mark/upstart/branch
> >
> > Maybe inserting a call to setproctitle in become_child would be useful,
> > especially as we have the dependency already -- one piece of information
> > we've lost is the user who has connected, although we can probably
> > figure that out from logs still.
>
> Without the call to setproctitle, we would be losing this thing spm
> said he liked, which is to immediately show which user and branch it
> corresponds to.

Uh? No, that's the bit we retain. The ps output I showed above was
from running with the branch being reviewed.

> To my mind it's more important to log the username, branch and pid, so
> that you can see who it was both while the process is running and
> afterwards. It's also nice that jam logs the rusage. But putting
> them into the argv is more immediately accessible.

I'm not sure if you're recommending this branch be changed before
landing here.

> >>  * can run this on staging, then edge, then lpnet
> >>  * propose changes into lp:lp-production-configs to gradually turn it on
> >>  * might want an rt describing how to monitor the new daemon process,
> >> which just needs to mention the socket location and the hello command;
> >> they can write the script from there
> >>  * francis should formally tell tom and charlie we'll be making this change
> >
> > I still vaguely wonder about controlling this with a feature flag.  The
> > thing is, the codehosting systems don't connect to the database
> > currently, and that's actually kind of nice in some ways.  We could add
> > an XML-RPC method to query a flag, and have the ssh daemon check that
> > every 30s or so -- I don't think we want an XML-RPC call delaying the
> > launch of the lp-servce process on every connection.
>
> Actually I was thinking of just having an http URL, accessible only
> within the DC, that just presents all the feature flag rules in a
> machine readable form: maybe json by default, perhaps others. There's
> no need for xmlrpc when we just want to read, and avoiding startup
> dependencies is good. Then we can have a FeatureRuleSource that gets
> and parses this rather than talking to storm, and it will be fast to
> start up for this type of thing; probably fast enough to run on every
> request if we want to. bug 656031.

That would be interesting. The sftp server being a twisted app, access
to this URL should really be nonblo...

Read more...

Revision history for this message
John A Meinel (jameinel) wrote :

I do log the username and pid, I don't (at that time) have the branch, because we haven't opened it yet. If we changed "bzrlib.trace.mutter()" to always include the pid, then I think we'd have everything you were asking for.

Revision history for this message
Martin Pool (mbp) wrote :

On 7 October 2010 10:09, Michael Hudson-Doyle <email address hidden> wrote:
>> > 21266 pts/1    S+     0:00 /usr/bin/python /home/mwh/canonical/checkouts/trunk/eggs/bzr-2.2.0-py2.6-linux-x86_64.egg/EGG-INFO/scripts/bzr launchpad-forking-service --path /var/tmp/launchpad_forking_service.sock BRANCH:~mark/upstart/branch
>> >
>> > Maybe inserting a call to setproctitle in become_child would be useful,
>> > especially as we have the dependency already -- one piece of information
>> > we've lost is the user who has connected, although we can probably
>> > figure that out from logs still.
>>
>> Without the call to setproctitle, we would be losing this thing spm
>> said he liked, which is to immediately show which user and branch it
>> corresponds to.
>
> Uh?  No, that's the bit we retain.  The ps output I showed above was
> from running with the branch being reviewed.

The username is not shown. I don't know if it's a big deal; this
branch is old enough already.

>
>> To my mind it's more important to log the username, branch and pid, so
>> that you can see who it was both while the process is running and
>> afterwards.  It's also nice that jam logs the rusage.  But putting
>> them into the argv is more immediately accessible.
>
> I'm not sure if you're recommending this branch be changed before
> landing here.

what john said.

> That would be interesting.  The sftp server being a twisted app, access
> to this URL should really be nonblocking which always adds a wrinkle.
> Maybe that would be fine though, or maybe we can just cheat.

That's actually kind of a feature...

--
Martin

Revision history for this message
Michael Hudson-Doyle (mwhudson) wrote :
Download full text (20.4 KiB)

On Wed, 06 Oct 2010 20:39:01 -0000, John A Meinel <email address hidden> wrote:
> For starters, thanks for the thorough review. It is really nice to get feedback on it, and you have certainly thought a lot about the code.
>
> On 10/6/2010 12:02 AM, Michael Hudson-Doyle wrote:
> > Review: Needs Information
> > Wow, what a branch. I've been quite picky in this, that's not because
> > I hate you <wink> but because it's subtle code and has potential to
> > cause issues, so I wanted to make sure all the comments make sense and
> > there's no redundant code, etc.
> >
> > I worry slightly about what would happen if the forking service fell
> > over -- but clearly, we rely on the sftp server itself staying up, so
> > this probably isn't such a big deal. I guess we also need to make
> > sure the forking service is running in production.
>
>
> We could trap failures to connect to the forking server and fall back
> to regular spawnProcess. The code is already a simple if/else
> block. However, I wouldn't really want silent degradation of
> connection.

Yeah, I don't really like that idea either.

> >> + "fork <command>\n": Request a new subprocess to be started.
> >> + <command> is the bzr command to be run, such as "rocks" or
> >> + "lp-serve --inet 12".
> >> + The immediate response will be the path-on-disk to a directory full
> >> + of named pipes (fifos) that will be the stdout/stderr/stdin of the
> >> + new process.
> >
> > This doesn't quite make it clear what the names of the files will be.
> > The obvious guess is correct, but I'd rather be certain.
> >
> >> + If a client holds the socket open, when the child process exits,
> >> + the exit status (as given by 'wait()') will be written to the
> >> + socket.
> >
> > This appears to be out of date -- there's also a fork-env command now.
> > Is the fork command still useful?
>
> It has been useful in testing, it isn't used in the 'production' code.

If the fork command is still implemented, I think it should still be
documented...

> ...
>
>
> >
> >> + DEFAULT_PATH = '/var/run/launchpad_forking_service.sock'
> >
> > I'm not sure of the value of providing a default here, as it seems
> > that its always in practice going to be overridden by the config. If
> > it makes testing easier, it's worth it I guess.
>
> Well, we override it for testing as well. I wanted to support "bzr
> lp-forking-service" as something you could run. Also, that seemed to
> be the 'best' place to put it, which gives hints as to where you would
> want to put it.

OK, let's leave it.

> >> + self._children_spawned = 0
> >> +
> >> + def _create_master_socket(self):
> >> + self._server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
> >> + self._server_socket.bind(self.master_socket_path)
> >> + if self._perms is not None:
> >> + os.chmod(self.master_socket_path, self._perms)
> >
> > The pedant in me thinks this is a bit racy. But I don't think it's
> > important (in fact, I wonder if the permission stuff is necessary
> > really).
>
> I'm sure it isn't necessary ...

Revision history for this message
Robert Collins (lifeless) wrote :

On feature flags... I'd suggest an xmlrpc/API for evaluating flags.

evaluating flags requires DB access for group membership of a user and
potentially other things like time limited trials, access to opt-in
features and so forth.

The exact API would need to take a user, possibly feature names, and
return a list of the rules whose scopes need to be locally resolved.

Whether this is json or not is immaterial to me - at least for this
patch, the feature evaluating code would load once.

It would be very interesting to do this though, because we could phase
this in per-user.

-Rob

Revision history for this message
Martin Pool (mbp) wrote :

rt 41791 for adding the monitoring service <https://rt.admin.canonical.com/Ticket/Display.html?id=41791>

Revision history for this message
Gavin Panella (allenap) wrote :

Looks like this is taken care of, so abstaining on behalf of the Launchpad code reviewers.

review: Approve
Revision history for this message
Gavin Panella (allenap) :
review: Abstain
Revision history for this message
Michael Hudson-Doyle (mwhudson) wrote :

So I would like to see the code that communicates with the service more twisted-ized, but until it turns out to be an actual problem it's not worth the bother. Let's land this.

review: Approve
Revision history for this message
Michael Hudson-Doyle (mwhudson) wrote :

I think your latest changes are basically ok. Some of it feels a bit cargo culted though -- I don't see the reason for the lazy getForker method, just stash the forker on the instance normally? and the atexit calls aren't needed imho. If you inherited the forker helper class from fixture, you'd be able to use addCleanup, which would probably be a bit cleaner. I'm not sure the big TODO comment about paths is warranted -- it's all true, but much of Launchpad suffers this problem. If you can de-cargo-cult a bit, I'll land this asap.

review: Approve
Revision history for this message
Robert Collins (lifeless) wrote :

lets just land and iterate.

-Rob

Revision history for this message
Michael Hudson-Doyle (mwhudson) wrote :

On Mon, 18 Oct 2010 22:22:39 -0000, Robert Collins <email address hidden> wrote:
> lets just land and iterate.

Yeah fair enough. Throwing it at ec2 now.

Cheers,
mwh

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'Makefile'
--- Makefile 2010-10-18 21:53:28 +0000
+++ Makefile 2010-10-19 22:56:30 +0000
@@ -239,7 +239,7 @@
239239
240run_all: check_schema inplace stop240run_all: check_schema inplace stop
241 $(RM) thread*.request241 $(RM) thread*.request
242 bin/run -r librarian,sftp,mailman,codebrowse,google-webservice,memcached \242 bin/run -r librarian,sftp,forker,mailman,codebrowse,google-webservice,memcached \
243 -i $(LPCONFIG)243 -i $(LPCONFIG)
244244
245run_codebrowse: build245run_codebrowse: build
@@ -253,7 +253,7 @@
253253
254run_codehosting: check_schema inplace stop254run_codehosting: check_schema inplace stop
255 $(RM) thread*.request255 $(RM) thread*.request
256 bin/run -r librarian,sftp,codebrowse -i $(LPCONFIG)256 bin/run -r librarian,sftp,forker,codebrowse -i $(LPCONFIG)
257257
258start_librarian: compile258start_librarian: compile
259 bin/start_librarian259 bin/start_librarian
260260
=== added directory 'bzrplugins/lpserve'
=== renamed file 'bzrplugins/lpserve.py' => 'bzrplugins/lpserve/__init__.py'
--- bzrplugins/lpserve.py 2010-04-19 06:35:23 +0000
+++ bzrplugins/lpserve/__init__.py 2010-10-19 22:56:30 +0000
@@ -8,15 +8,34 @@
88
9__metaclass__ = type9__metaclass__ = type
1010
11__all__ = ['cmd_launchpad_server']11__all__ = [
1212 'cmd_launchpad_server',
1313 'cmd_launchpad_forking_service',
14 ]
15
16
17import errno
18import logging
19import os
14import resource20import resource
21import shlex
22import shutil
23import signal
24import socket
15import sys25import sys
26import tempfile
27import threading
28import time
1629
17from bzrlib.commands import Command, register_command30from bzrlib.commands import Command, register_command
18from bzrlib.option import Option31from bzrlib.option import Option
19from bzrlib import lockdir, ui32from bzrlib import (
33 commands,
34 lockdir,
35 osutils,
36 trace,
37 ui,
38 )
2039
21from bzrlib.smart import medium, server40from bzrlib.smart import medium, server
22from bzrlib.transport import get_transport41from bzrlib.transport import get_transport
@@ -110,3 +129,717 @@
110129
111130
112register_command(cmd_launchpad_server)131register_command(cmd_launchpad_server)
132
133
134class LPForkingService(object):
135 """A service that can be asked to start a new bzr subprocess via fork.
136
137 The basic idea is that bootstrapping time is long. Most of this is time
138 spent during import of all needed libraries (lp.*). For example, the
139 original 'lp-serve' command could take 2.5s just to start up, before any
140 actual actions could be performed.
141
142 This class provides a service sitting on a socket, which can then be
143 requested to fork and run a given bzr command.
144
145 Clients connect to the socket and make a single request, which then
146 receives a response. The possible requests are:
147
148 "hello\n": Trigger a heartbeat to report that the program is still
149 running, and write status information to the log file.
150 "quit\n": Stop the service, but do so 'nicely', waiting for children
151 to exit, etc. Once this is received the service will stop
152 taking new requests on the port.
153 "fork-env <command>\n<env>\nend\n": Request a new subprocess to be
154 started. <command> is the bzr command to be run, such as "rocks"
155 or "lp-serve --inet 12".
156 The immediate response will be the path-on-disk to a directory full
157 of named pipes (fifos) that will be the stdout/stderr/stdin (named
158 accordingly) of the new process.
159 If a client holds the socket open, when the child process exits,
160 the exit status (as given by 'wait()') will be written to the
161 socket.
162
163 Note that one of the key bits is that the client will not be
164 started with exec*, we just call 'commands.run_bzr*()' directly.
165 This way, any modules that are already loaded will not need to be
166 loaded again. However, care must be taken with any global-state
167 that should be reset.
168
169 fork-env allows you to supply environment variables such as
170 "BZR_EMAIL: joe@foo.com" which will be set in os.environ before the
171 command is run.
172 """
173
174 # Design decisions. These are bits where we could have chosen a different
175 # method/implementation and weren't sure what would be best. Documenting
176 # the current decision, and the alternatives.
177 #
178 # [Decision #1]
179 # Serve on a named AF_UNIX socket.
180 # 1) It doesn't make sense to serve to arbitrary hosts, we only want
181 # the local host to make requests. (Since the client needs to
182 # access the named fifos on the current filesystem.)
183 # 2) You can set security parameters on a filesystem path (g+rw,
184 # a-rw).
185 # [Decision #2]
186 # SIGCHLD
187 # We want to quickly detect that children have exited so that we can
188 # inform the client process quickly. At the moment, we register a
189 # SIGCHLD handler that doesn't do anything. However, it means that
190 # when we get the signal, if we are currently blocked in something
191 # like '.accept()', we will jump out temporarily. At that point the
192 # main loop will check if any children have exited. We could have
193 # done this work as part of the signal handler, but that felt 'racy'
194 # doing any serious work in a signal handler.
195 # If we just used socket.timeout as the indicator to go poll for
196 # children exiting, it slows the disconnect by as much as the full
197 # timeout. (So a timeout of 1.0s will cause the process to hang by
198 # that long until it determines that a child has exited, and can
199 # close the connection.)
200 # The current flow means that we'll notice exited children whenever
201 # we finish the current work.
202 # [Decision #3]
203 # Child vs Parent actions.
204 # There are several actions that are done when we get a new request.
205 # We have to create the fifos on disk, fork a new child, connect the
206 # child to those handles, and inform the client of the new path (not
207 # necessarily in that order.) It makes sense to wait to send the path
208 # message until after the fifos have been created. That way the
209 # client can just try to open them immediately, and the
210 # client-and-child will be synchronized by the open() calls.
211 # However, should the client be the one doing the mkfifo, should the
212 # server? Who should be sending the message? Should we fork after the
213 # mkfifo or before.
214 # The current thoughts:
215 # 1) Try to do work in the child when possible. This should allow
216 # for 'scaling' because the server is single-threaded.
217 # 2) We create the directory itself in the server, because that
218 # allows the server to monitor whether the client failed to
219 # clean up after itself or not.
220 # 3) Otherwise we create the fifos in the client, and then send
221 # the message back.
222 # [Decision #4]
223 # Exit information
224 # Inform the client that the child has exited on the socket they used
225 # to request the fork.
226 # 1) Arguably they could see that stdout and stderr have been closed,
227 # and thus stop reading. In testing, I wrote a client which uses
228 # select.poll() over stdin/stdout/stderr and used that to ferry
229 # the content to the appropriate local handle. However for the
230 # FIFOs, when the remote end closed, I wouldn't see any
231 # corresponding information on the local end. There obviously
232 # wasn't any data to be read, so they wouldn't show up as
233 # 'readable' (for me to try to read, and get 0 bytes, indicating
234 # it was closed). I also wasn't seeing POLLHUP, which seemed to be
235 # the correct indicator. As such, we decided to inform the client
236 # on the socket that they originally made the fork request, rather
237 # than just closing the socket immediately.
238 # 2) We could have had the forking server close the socket, and only
239 # the child hold the socket open. When the child exits, then the
240 # OS naturally closes the socket.
241 # If we want the returncode, then we should put that as bytes on
242 # the socket before we exit. Having the child do the work means
243 # that in error conditions, it could easily die before being able
244 # to write anything (think SEGFAULT, etc). The forking server is
245 # already 'wait'() ing on its children. So that we don't get
246 # zombies, and with wait3() we can get the rusage (user time,
247 # memory consumption, etc.)
248 # As such, it seems reasonable that the server can then also
249 # report back when a child is seen as exiting.
250 # [Decision #5]
251 # cleanup once connected
252 # The child process blocks during 'open()' waiting for the client to
253 # connect to its fifos. Once the client has connected, the child then
254 # deletes the temporary directory and the fifos from disk. This means
255 # that there isn't much left for diagnosis, but it also means that
256 # the client won't leave garbage around if it crashes, etc.
257 # Note that the forking service itself still monitors the paths
258 # created, and will delete garbage if it sees that a child failed to
259 # do so.
260 # [Decision #6]
261 # os._exit(retcode) in the child
262 # Calling sys.exit(retcode) raises an exception, which then bubbles
263 # up the stack and runs exit functions (and finally statements). When
264 # I tried using it originally, I would see the current child bubble
265 # all the way up the stack (through the server code that it fork()
266 # through), and then get to main() returning code 0. The process
267 # would still exit nonzero. My guess is that something in the atexit
268 # functions was failing, but that it was happening after logging, etc
269 # had been shut down.
270 # Any global state from the child process should be flushed before
271 # run_bzr_* has exited (which we *do* wait for), and any other global
272 # state is probably a remnant from the service process. Which will be
273 # cleaned up by the service itself, rather than the child.
274 # There is some concern that log files may not get flushed, so we
275 # currently call sys.exitfunc() first. The main problem is that I
276 # don't know any way to *remove* a function registered via 'atexit()'
277 # so if the forking service has some state, we my try to clean it up
278 # incorrectly.
279 # Note that the bzr script itself uses sys.exitfunc(); os._exit() in
280 # the 'bzr' main script, as the teardown time of all the python state
281 # was quite noticeable in real-world runtime. As such, bzrlib should
282 # be pretty safe, or it would have been failing for people already.
283 # [Decision #7]
284 # prefork vs max children vs ?
285 # For simplicity it seemed easiest to just fork when requested. Over
286 # time, I realized it would be easy to allow running an arbitrary
287 # command (no harder than just running one command), so it seemed
288 # reasonable to switch over. If we go the prefork route, then we'll
289 # need a way to tell the pre-forked children what command to run.
290 # This could be as easy as just adding one more fifo that they wait
291 # on in the same directory.
292 # For now, I've chosen not to limit the number of forked children. I
293 # don't know what a reasonable value is, and probably there are
294 # already limitations at play. (If Conch limits connections, then it
295 # will already be doing all the work, etc.)
296 # [Decision #8]
297 # nicer errors on the request socket
298 # This service is meant to be run only on the local system. As such,
299 # we don't try to be extra defensive about leaking information to
300 # the one connecting to the socket. (We should still watch out what
301 # we send across the per-child fifos, since those are connected to
302 # remote clients.) Instead we try to be helpful, and tell them as
303 # much as we know about what went wrong.
304
305 DEFAULT_PATH = '/var/run/launchpad_forking_service.sock'
306 DEFAULT_PERMISSIONS = 00660 # Permissions on the master socket (rw-rw----)
307 WAIT_FOR_CHILDREN_TIMEOUT = 5*60 # Wait no more than 5 min for children
308 SOCKET_TIMEOUT = 1.0
309 SLEEP_FOR_CHILDREN_TIMEOUT = 1.0
310 WAIT_FOR_REQUEST_TIMEOUT = 1.0 # No request should take longer than this to
311 # be read
312
313 _fork_function = os.fork
314
315 def __init__(self, path=DEFAULT_PATH, perms=DEFAULT_PERMISSIONS):
316 self.master_socket_path = path
317 self._perms = perms
318 self._start_time = None
319 self._should_terminate = threading.Event()
320 # We address these locally, in case of shutdown socket may be gc'd
321 # before we are
322 self._socket_timeout = socket.timeout
323 self._socket_error = socket.error
324 # Map from pid => (temp_path_for_handles, request_socket)
325 self._child_processes = {}
326 self._children_spawned = 0
327
328 def _create_master_socket(self):
329 self._server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
330 self._server_socket.bind(self.master_socket_path)
331 if self._perms is not None:
332 os.chmod(self.master_socket_path, self._perms)
333 self._server_socket.listen(5)
334 self._server_socket.settimeout(self.SOCKET_TIMEOUT)
335 trace.mutter('set socket timeout to: %s' % (self.SOCKET_TIMEOUT,))
336
337 def _cleanup_master_socket(self):
338 self._server_socket.close()
339 try:
340 os.remove(self.master_socket_path)
341 except (OSError, IOError), e:
342 # If we don't delete it, then we get 'address already in
343 # use' failures
344 trace.mutter('failed to cleanup: %s'
345 % (self.master_socket_path,))
346
347 def _handle_sigchld(self, signum, frm):
348 # We don't actually do anything here, we just want an interrupt (EINTR)
349 # on socket.accept() when SIGCHLD occurs.
350 pass
351
352 def _handle_sigterm(self, signum, frm):
353 # Unregister this as the default handler, 2 SIGTERMs will exit us.
354 signal.signal(signal.SIGTERM, signal.SIG_DFL)
355 # SIGTERM should also generate EINTR on our wait loop, so this should
356 # be enough
357 self._should_terminate.set()
358
359 def _register_signals(self):
360 """Register a SIGCHILD and SIGTERM handler.
361
362 If we have a trigger for SIGCHILD then we can quickly respond to
363 clients when their process exits. The main risk is getting more EAGAIN
364 errors elsewhere.
365
366 SIGTERM allows us to cleanup nicely before we exit.
367 """
368 signal.signal(signal.SIGCHLD, self._handle_sigchld)
369 signal.signal(signal.SIGTERM, self._handle_sigterm)
370
371 def _unregister_signals(self):
372 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
373 signal.signal(signal.SIGTERM, signal.SIG_DFL)
374
375 def _create_child_file_descriptors(self, base_path):
376 stdin_path = os.path.join(base_path, 'stdin')
377 stdout_path = os.path.join(base_path, 'stdout')
378 stderr_path = os.path.join(base_path, 'stderr')
379 os.mkfifo(stdin_path)
380 os.mkfifo(stdout_path)
381 os.mkfifo(stderr_path)
382
383 def _bind_child_file_descriptors(self, base_path):
384 stdin_path = os.path.join(base_path, 'stdin')
385 stdout_path = os.path.join(base_path, 'stdout')
386 stderr_path = os.path.join(base_path, 'stderr')
387 # These open calls will block until another process connects (which
388 # must connect in the same order)
389 stdin_fid = os.open(stdin_path, os.O_RDONLY)
390 stdout_fid = os.open(stdout_path, os.O_WRONLY)
391 stderr_fid = os.open(stderr_path, os.O_WRONLY)
392 # Note: by this point bzrlib has opened stderr for logging
393 # (as part of starting the service process in the first place).
394 # As such, it has a stream handler that writes to stderr. logging
395 # tries to flush and close that, but the file is already closed.
396 # This just supresses that exception
397 logging.raiseExceptions = False
398 sys.stdin.close()
399 sys.stdout.close()
400 sys.stderr.close()
401 os.dup2(stdin_fid, 0)
402 os.dup2(stdout_fid, 1)
403 os.dup2(stderr_fid, 2)
404 sys.stdin = os.fdopen(stdin_fid, 'rb')
405 sys.stdout = os.fdopen(stdout_fid, 'wb')
406 sys.stderr = os.fdopen(stderr_fid, 'wb')
407 ui.ui_factory.stdin = sys.stdin
408 ui.ui_factory.stdout = sys.stdout
409 ui.ui_factory.stderr = sys.stderr
410 # Now that we've opened the handles, delete everything so that we don't
411 # leave garbage around. Because the open() is done in blocking mode, we
412 # know that someone has already connected to them, and we don't want
413 # anyone else getting confused and connecting.
414 # See [Decision #5]
415 os.remove(stderr_path)
416 os.remove(stdout_path)
417 os.remove(stdin_path)
418 os.rmdir(base_path)
419
420 def _close_child_file_descriptors(self):
421 sys.stdin.close()
422 sys.stderr.close()
423 sys.stdout.close()
424
425 def become_child(self, command_argv, path):
426 """We are in the spawned child code, do our magic voodoo."""
427 # Stop tracking new signals
428 self._unregister_signals()
429 # Reset the start time
430 trace._bzr_log_start_time = time.time()
431 trace.mutter('%d starting %r'
432 % (os.getpid(), command_argv))
433 self._bind_child_file_descriptors(path)
434 self._run_child_command(command_argv)
435
436 def _run_child_command(self, command_argv):
437 # This is the point where we would actually want to do something with
438 # our life
439 # TODO: We may want to consider special-casing the 'lp-serve' command.
440 # As that is the primary use-case for this service, it might be
441 # interesting to have an already-instantiated instance, where we
442 # can just pop on an extra argument and be ready to go. However,
443 # that would probably only really be measurable if we prefork. As
444 # it looks like ~200ms is 'fork()' time, but only 50ms is
445 # run-the-command time.
446 retcode = commands.run_bzr_catch_errors(command_argv)
447 self._close_child_file_descriptors()
448 trace.mutter('%d finished %r'
449 % (os.getpid(), command_argv))
450 # We force os._exit() here, because we don't want to unwind the stack,
451 # which has complex results. (We can get it to unwind back to the
452 # cmd_launchpad_forking_service code, and even back to main() reporting
453 # thereturn code, but after that, suddenly the return code changes from
454 # a '0' to a '1', with no logging of info.
455 # TODO: Should we call sys.exitfunc() here? it allows atexit functions
456 # to fire, however, some of those may be still around from the
457 # parent process, which we don't really want.
458 sys.exitfunc()
459 # See [Decision #6]
460 os._exit(retcode)
461
462 @staticmethod
463 def command_to_argv(command_str):
464 """Convert a 'foo bar' style command to [u'foo', u'bar']"""
465 # command_str must be a utf-8 string
466 return [s.decode('utf-8') for s in shlex.split(command_str)]
467
468 @staticmethod
469 def parse_env(env_str):
470 """Convert the environment information into a dict.
471
472 :param env_str: A string full of environment variable declarations.
473 Each key is simple ascii "key: value\n"
474 The string must end with "end\n".
475 :return: A dict of environment variables
476 """
477 env = {}
478 if not env_str.endswith('end\n'):
479 raise ValueError('Invalid env-str: %r' % (env_str,))
480 env_str = env_str[:-5]
481 if not env_str:
482 return env
483 env_entries = env_str.split('\n')
484 for entry in env_entries:
485 key, value = entry.split(': ', 1)
486 env[key] = value
487 return env
488
489 def fork_one_request(self, conn, client_addr, command_argv, env):
490 """Fork myself and serve a request."""
491 temp_name = tempfile.mkdtemp(prefix='lp-forking-service-child-')
492 # Now that we've set everything up, send the response to the client we
493 # create them first, so the client can start trying to connect to them,
494 # while we fork and have the child do the same.
495 self._children_spawned += 1
496 pid = self._fork_function()
497 if pid == 0:
498 pid = os.getpid()
499 trace.mutter('%d spawned' % (pid,))
500 self._server_socket.close()
501 for env_var, value in env.iteritems():
502 osutils.set_or_unset_env(env_var, value)
503 # See [Decision #3]
504 self._create_child_file_descriptors(temp_name)
505 conn.sendall('ok\n%d\n%s\n' % (pid, temp_name))
506 conn.close()
507 self.become_child(command_argv, temp_name)
508 trace.warning('become_child returned!!!')
509 sys.exit(1)
510 else:
511 self._child_processes[pid] = (temp_name, conn)
512 self.log(client_addr, 'Spawned process %s for %r: %s'
513 % (pid, command_argv, temp_name))
514
515 def main_loop(self):
516 self._start_time = time.time()
517 self._should_terminate.clear()
518 self._register_signals()
519 self._create_master_socket()
520 trace.note('Listening on socket: %s' % (self.master_socket_path,))
521 try:
522 try:
523 self._do_loop()
524 finally:
525 # Stop talking to others, we are shutting down
526 self._cleanup_master_socket()
527 except KeyboardInterrupt:
528 # SIGINT received, try to shutdown cleanly
529 pass
530 trace.note('Shutting down. Waiting up to %.0fs for %d child processes'
531 % (self.WAIT_FOR_CHILDREN_TIMEOUT,
532 len(self._child_processes)))
533 self._shutdown_children()
534 trace.note('Exiting')
535
536 def _do_loop(self):
537 while not self._should_terminate.isSet():
538 try:
539 conn, client_addr = self._server_socket.accept()
540 except self._socket_timeout:
541 pass # run shutdown and children checks
542 except self._socket_error, e:
543 if e.args[0] == errno.EINTR:
544 pass # run shutdown and children checks
545 elif e.args[0] != errno.EBADF:
546 # We can get EBADF here while we are shutting down
547 # So we just ignore it for now
548 pass
549 else:
550 # Log any other failure mode
551 trace.warning("listening socket error: %s", e)
552 else:
553 self.log(client_addr, 'connected')
554 # TODO: We should probably trap exceptions coming out of this
555 # and log them, so that we don't kill the service because
556 # of an unhandled error
557 # Note: settimeout is used so that a malformed request doesn't
558 # cause us to hang forever. Note that the particular
559 # implementation means that a malicious client could
560 # probably send us one byte every Xms, and we would just
561 # keep trying to read it. However, as a local service, we
562 # aren't worrying about it.
563 conn.settimeout(self.WAIT_FOR_REQUEST_TIMEOUT)
564 try:
565 self.serve_one_connection(conn, client_addr)
566 except self._socket_timeout, e:
567 trace.log_exception_quietly()
568 self.log(client_addr, 'request timeout failure: %s' % (e,))
569 conn.sendall('FAILURE\nrequest timed out\n')
570 conn.close()
571 self._poll_children()
572
573 def log(self, client_addr, message):
574 """Log a message to the trace log.
575
576 Include the information about what connection is being served.
577 """
578 if client_addr is not None:
579 # Note, we don't use conn.getpeername() because if a client
580 # disconnects before we get here, that raises an exception
581 conn_info = '[%s] ' % (client_addr,)
582 else:
583 conn_info = ''
584 trace.mutter('%s%s' % (conn_info, message))
585
586 def log_information(self):
587 """Log the status information.
588
589 This includes stuff like number of children, and ... ?
590 """
591 self._poll_children()
592 self.log(None, 'Running for %.3fs' % (time.time() - self._start_time))
593 self.log(None, '%d children currently running (spawned %d total)'
594 % (len(self._child_processes), self._children_spawned))
595 # Read the current information about memory consumption, etc.
596 self.log(None, 'Self: %s'
597 % (resource.getrusage(resource.RUSAGE_SELF),))
598 # This seems to be the sum of all rusage for all children that have
599 # been collected (not for currently running children, or ones we
600 # haven't "wait"ed on.) We may want to read /proc/PID/status, since
601 # 'live' information is probably more useful.
602 self.log(None, 'Finished children: %s'
603 % (resource.getrusage(resource.RUSAGE_CHILDREN),))
604
605 def _poll_children(self):
606 """See if children are still running, etc.
607
608 One interesting hook here would be to track memory consumption, etc.
609 """
610 while self._child_processes:
611 try:
612 c_id, exit_code, rusage = os.wait3(os.WNOHANG)
613 except OSError, e:
614 if e.errno == errno.ECHILD:
615 # TODO: We handle this right now because the test suite
616 # fakes a child, since we wanted to test some code
617 # without actually forking anything
618 trace.mutter('_poll_children() called, and'
619 ' self._child_processes indicates there are'
620 ' children, but os.wait3() says there are not.'
621 ' current_children: %s' % (self._child_processes,))
622 return
623 if c_id == 0:
624 # No more children stopped right now
625 return
626 c_path, sock = self._child_processes.pop(c_id)
627 trace.mutter('%s exited %s and usage: %s'
628 % (c_id, exit_code, rusage))
629 # See [Decision #4]
630 try:
631 sock.sendall('exited\n%s\n' % (exit_code,))
632 except (self._socket_timeout, self._socket_error), e:
633 # The client disconnected before we wanted them to,
634 # no big deal
635 trace.mutter('%s\'s socket already closed: %s' % (c_id, e))
636 else:
637 sock.close()
638 if os.path.exists(c_path):
639 # The child failed to cleanup after itself, do the work here
640 trace.warning('Had to clean up after child %d: %s\n'
641 % (c_id, c_path))
642 shutil.rmtree(c_path, ignore_errors=True)
643
644 def _wait_for_children(self, secs):
645 start = time.time()
646 end = start + secs
647 while self._child_processes:
648 self._poll_children()
649 if secs > 0 and time.time() > end:
650 break
651 time.sleep(self.SLEEP_FOR_CHILDREN_TIMEOUT)
652
653 def _shutdown_children(self):
654 self._wait_for_children(self.WAIT_FOR_CHILDREN_TIMEOUT)
655 if self._child_processes:
656 trace.warning('Children still running: %s'
657 % ', '.join(map(str, self._child_processes)))
658 for c_id in self._child_processes:
659 trace.warning('sending SIGINT to %d' % (c_id,))
660 os.kill(c_id, signal.SIGINT)
661 # We sent the SIGINT signal, see if they exited
662 self._wait_for_children(self.SLEEP_FOR_CHILDREN_TIMEOUT)
663 if self._child_processes:
664 # No? Then maybe something more powerful
665 for c_id in self._child_processes:
666 trace.warning('sending SIGKILL to %d' % (c_id,))
667 os.kill(c_id, signal.SIGKILL)
668 # We sent the SIGKILL signal, see if they exited
669 self._wait_for_children(self.SLEEP_FOR_CHILDREN_TIMEOUT)
670 if self._child_processes:
671 for c_id, (c_path, sock) in self._child_processes.iteritems():
672 # TODO: We should probably put something into this message?
673 # However, the likelyhood is very small that this isn't
674 # already closed because of SIGKILL + _wait_for_children
675 # And I don't really know what to say...
676 sock.close()
677 if os.path.exists(c_path):
678 trace.warning('Cleaning up after immortal child %d: %s\n'
679 % (c_id, c_path))
680 shutil.rmtree(c_path)
681
682 def _parse_fork_request(self, conn, client_addr, request):
683 if request.startswith('fork-env '):
684 while not request.endswith('end\n'):
685 request += osutils.read_bytes_from_socket(conn)
686 command, env = request[9:].split('\n', 1)
687 else:
688 command = request[5:].strip()
689 env = 'end\n' # No env set
690 try:
691 command_argv = self.command_to_argv(command)
692 env = self.parse_env(env)
693 except Exception, e:
694 # TODO: Log the traceback?
695 self.log(client_addr, 'command or env parsing failed: %r'
696 % (str(e),))
697 conn.sendall('FAILURE\ncommand or env parsing failed: %r'
698 % (str(e),))
699 else:
700 return command_argv, env
701 return None, None
702
703 def serve_one_connection(self, conn, client_addr):
704 request = ''
705 while '\n' not in request:
706 request += osutils.read_bytes_from_socket(conn)
707 # telnet likes to use '\r\n' rather than '\n', and it is nice to have
708 # an easy way to debug.
709 request = request.replace('\r\n', '\n')
710 self.log(client_addr, 'request: %r' % (request,))
711 if request == 'hello\n':
712 conn.sendall('ok\nyep, still alive\n')
713 self.log_information()
714 conn.close()
715 elif request == 'quit\n':
716 self._should_terminate.set()
717 conn.sendall('ok\nquit command requested... exiting\n')
718 conn.close()
719 elif request.startswith('fork ') or request.startswith('fork-env '):
720 command_argv, env = self._parse_fork_request(conn, client_addr,
721 request)
722 if command_argv is not None:
723 # See [Decision #7]
724 # TODO: Do we want to limit the number of children? And/or
725 # prefork additional instances? (the design will need to
726 # change if we prefork and run arbitrary commands.)
727 self.fork_one_request(conn, client_addr, command_argv, env)
728 # We don't close the conn like other code paths, since we use
729 # it again later.
730 else:
731 conn.close()
732 else:
733 self.log(client_addr, 'FAILURE: unknown request: %r' % (request,))
734 # See [Decision #8]
735 conn.sendall('FAILURE\nunknown request: %r\n' % (request,))
736 conn.close()
737
738
739class cmd_launchpad_forking_service(Command):
740 """Launch a long-running process, where you can ask for new processes.
741
742 The process will block on a given AF_UNIX socket waiting for requests to be
743 made. When a request is made, it will fork itself and redirect
744 stdout/in/err to fifos on the filesystem, and start running the requested
745 command. The caller will be informed where those file handles can be found.
746 Thus it only makes sense that the process connecting to the port must be on
747 the same system.
748 """
749
750 aliases = ['lp-service']
751
752 takes_options = [Option('path',
753 help='Listen for connections at PATH',
754 type=str),
755 Option('perms',
756 help='Set the mode bits for the socket, interpreted'
757 ' as an octal integer (same as chmod)'),
758 Option('preload',
759 help="Do/don't preload libraries before startup."),
760 Option('children-timeout', type=int, argname='SEC',
761 help="Only wait SEC seconds for children to exit"),
762 ]
763
764 def _preload_libraries(self):
765 for pyname in libraries_to_preload:
766 try:
767 __import__(pyname)
768 except ImportError, e:
769 trace.mutter('failed to preload %s: %s' % (pyname, e))
770
771 def run(self, path=None, perms=None, preload=True,
772 children_timeout=LPForkingService.WAIT_FOR_CHILDREN_TIMEOUT):
773 if path is None:
774 path = LPForkingService.DEFAULT_PATH
775 if perms is None:
776 perms = LPForkingService.DEFAULT_PERMISSIONS
777 if preload:
778 # We 'note' this because it often takes a fair amount of time.
779 trace.note('Preloading %d modules' % (len(libraries_to_preload),))
780 self._preload_libraries()
781 service = LPForkingService(path, perms)
782 service.WAIT_FOR_CHILDREN_TIMEOUT = children_timeout
783 service.main_loop()
784
785register_command(cmd_launchpad_forking_service)
786
787
788class cmd_launchpad_replay(Command):
789 """Write input from stdin back to stdout or stderr.
790
791 This is a hidden command, primarily available for testing
792 cmd_launchpad_forking_service.
793 """
794
795 hidden = True
796
797 def run(self):
798 # Just read line-by-line from stdin, and write out to stdout or stderr
799 # depending on the prefix
800 for line in sys.stdin:
801 channel, contents = line.split(' ', 1)
802 channel = int(channel)
803 if channel == 1:
804 sys.stdout.write(contents)
805 sys.stdout.flush()
806 elif channel == 2:
807 sys.stderr.write(contents)
808 sys.stderr.flush()
809 else:
810 raise RuntimeError('Invalid channel request.')
811 return 0
812
813register_command(cmd_launchpad_replay)
814
815# This list was generated by run lsprofing a spawned child, and looking for
816# <module ...> times, which indicate an import occured. Other possibilities are
817# to just run "bzr lp-serve --profile-imports" manually, and observe what was
818# expensive to import. It doesn't seem very easy to get this right
819# automatically.
820libraries_to_preload = [
821 'bzrlib.errors',
822 'bzrlib.repofmt.groupcompress_repo',
823 'bzrlib.repository',
824 'bzrlib.smart',
825 'bzrlib.smart.protocol',
826 'bzrlib.smart.request',
827 'bzrlib.smart.server',
828 'bzrlib.smart.vfs',
829 'bzrlib.transport.local',
830 'bzrlib.transport.readonly',
831 'lp.codehosting.bzrutils',
832 'lp.codehosting.vfs',
833 'lp.codehosting.vfs.branchfs',
834 'lp.codehosting.vfs.branchfsclient',
835 'lp.codehosting.vfs.hooks',
836 'lp.codehosting.vfs.transport',
837 ]
838
839
840def load_tests(standard_tests, module, loader):
841 standard_tests.addTests(loader.loadTestsFromModuleNames(
842 [__name__ + '.' + x for x in [
843 'test_lpserve',
844 ]]))
845 return standard_tests
113846
=== added file 'bzrplugins/lpserve/test_lpserve.py'
--- bzrplugins/lpserve/test_lpserve.py 1970-01-01 00:00:00 +0000
+++ bzrplugins/lpserve/test_lpserve.py 2010-10-19 22:56:30 +0000
@@ -0,0 +1,534 @@
1# Copyright 2010 Canonical Ltd. This software is licensed under the
2# GNU Affero General Public License version 3 (see the file LICENSE).
3
4import os
5import signal
6import socket
7import subprocess
8import tempfile
9import threading
10import time
11
12from testtools import content
13
14from bzrlib import (
15 osutils,
16 tests,
17 trace,
18 )
19from bzrlib.plugins import lpserve
20
21from canonical.config import config
22from lp.codehosting import get_bzr_path, get_BZR_PLUGIN_PATH_for_subprocess
23
24
25class TestingLPForkingServiceInAThread(lpserve.LPForkingService):
26 """A test-double to run a "forking service" in a thread.
27
28 Note that we don't allow actually forking, but it does allow us to interact
29 with the service for other operations.
30 """
31
32 # For testing, we set the timeouts much lower, because we want the tests to
33 # run quickly
34 WAIT_FOR_CHILDREN_TIMEOUT = 0.5
35 SOCKET_TIMEOUT = 0.01
36 SLEEP_FOR_CHILDREN_TIMEOUT = 0.01
37 WAIT_FOR_REQUEST_TIMEOUT = 0.1
38
39 # We're running in a thread as part of the test suite, blow up if we try to
40 # fork
41 _fork_function = None
42
43 def __init__(self, path, perms=None):
44 self.service_started = threading.Event()
45 self.service_stopped = threading.Event()
46 self.this_thread = None
47 self.fork_log = []
48 super(TestingLPForkingServiceInAThread, self).__init__(
49 path=path, perms=None)
50
51 def _register_signals(self):
52 pass # Don't register it for the test suite
53
54 def _unregister_signals(self):
55 pass # We don't fork, and didn't register, so don't unregister
56
57 def _create_master_socket(self):
58 super(TestingLPForkingServiceInAThread, self)._create_master_socket()
59 self.service_started.set()
60
61 def main_loop(self):
62 self.service_stopped.clear()
63 super(TestingLPForkingServiceInAThread, self).main_loop()
64 self.service_stopped.set()
65
66 def fork_one_request(self, conn, client_addr, command, env):
67 # We intentionally don't allow the test suite to request a fork, as
68 # threads + forks and everything else don't exactly play well together
69 self.fork_log.append((command, env))
70 conn.sendall('ok\nfake forking\n')
71 conn.close()
72
73 @staticmethod
74 def start_service(test):
75 """Start a new LPForkingService in a thread at a random path.
76
77 This will block until the service has created its socket, and is ready
78 to communicate.
79
80 :return: A new TestingLPForkingServiceInAThread instance
81 """
82 fd, path = tempfile.mkstemp(prefix='tmp-lp-forking-service-',
83 suffix='.sock')
84 # We don't want a temp file, we want a temp socket
85 os.close(fd)
86 os.remove(path)
87 new_service = TestingLPForkingServiceInAThread(path=path)
88 thread = threading.Thread(target=new_service.main_loop,
89 name='TestingLPForkingServiceInAThread')
90 new_service.this_thread = thread
91 # should we be doing thread.setDaemon(True) ?
92 thread.start()
93 new_service.service_started.wait(10.0)
94 if not new_service.service_started.isSet():
95 raise RuntimeError(
96 'Failed to start the TestingLPForkingServiceInAThread')
97 test.addCleanup(new_service.stop_service)
98 # what about returning new_service._sockname ?
99 return new_service
100
101 def stop_service(self):
102 """Stop the test-server thread. This can be called multiple times."""
103 if self.this_thread is None:
104 # We already stopped the process
105 return
106 self._should_terminate.set()
107 self.service_stopped.wait(10.0)
108 if not self.service_stopped.isSet():
109 raise RuntimeError(
110 'Failed to stop the TestingLPForkingServiceInAThread')
111 self.this_thread.join()
112 # Break any refcycles
113 self.this_thread = None
114
115
116class TestTestingLPForkingServiceInAThread(tests.TestCaseWithTransport):
117
118 def test_start_and_stop_service(self):
119 service = TestingLPForkingServiceInAThread.start_service(self)
120 service.stop_service()
121
122 def test_multiple_stops(self):
123 service = TestingLPForkingServiceInAThread.start_service(self)
124 service.stop_service()
125 # calling stop_service repeatedly is a no-op (and not an error)
126 service.stop_service()
127
128 def test_autostop(self):
129 # We shouldn't leak a thread here, as it should be part of the test
130 # case teardown.
131 service = TestingLPForkingServiceInAThread.start_service(self)
132
133
134class TestCaseWithLPForkingService(tests.TestCaseWithTransport):
135
136 def setUp(self):
137 super(TestCaseWithLPForkingService, self).setUp()
138 self.service = TestingLPForkingServiceInAThread.start_service(self)
139
140 def send_message_to_service(self, message, one_byte_at_a_time=False):
141 client_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
142 client_sock.connect(self.service.master_socket_path)
143 if one_byte_at_a_time:
144 for byte in message:
145 client_sock.send(byte)
146 else:
147 client_sock.sendall(message)
148 response = client_sock.recv(1024)
149 return response
150
151
152class TestLPForkingServiceCommandToArgv(tests.TestCase):
153
154 def assertAsArgv(self, argv, command_str):
155 self.assertEqual(argv,
156 lpserve.LPForkingService.command_to_argv(command_str))
157
158 def test_simple(self):
159 self.assertAsArgv([u'foo'], 'foo')
160 self.assertAsArgv([u'foo', u'bar'], 'foo bar')
161
162 def test_quoted(self):
163 self.assertAsArgv([u'foo'], 'foo')
164 self.assertAsArgv([u'foo bar'], '"foo bar"')
165
166 def test_unicode(self):
167 self.assertAsArgv([u'command', u'\xe5'], 'command \xc3\xa5')
168
169
170class TestLPForkingServiceParseEnv(tests.TestCase):
171
172 def assertEnv(self, env, env_str):
173 self.assertEqual(env, lpserve.LPForkingService.parse_env(env_str))
174
175 def assertInvalid(self, env_str):
176 self.assertRaises(ValueError, lpserve.LPForkingService.parse_env,
177 env_str)
178
179 def test_no_entries(self):
180 self.assertEnv({}, 'end\n')
181
182 def test_one_entries(self):
183 self.assertEnv({'BZR_EMAIL': 'joe@foo.com'},
184 'BZR_EMAIL: joe@foo.com\n'
185 'end\n')
186
187 def test_two_entries(self):
188 self.assertEnv({'BZR_EMAIL': 'joe@foo.com', 'BAR': 'foo'},
189 'BZR_EMAIL: joe@foo.com\n'
190 'BAR: foo\n'
191 'end\n')
192
193 def test_invalid_empty(self):
194 self.assertInvalid('')
195
196 def test_invalid_end(self):
197 self.assertInvalid("BZR_EMAIL: joe@foo.com\n")
198
199 def test_invalid_entry(self):
200 self.assertInvalid("BZR_EMAIL joe@foo.com\nend\n")
201
202
203class TestLPForkingService(TestCaseWithLPForkingService):
204
205 def test_send_quit_message(self):
206 response = self.send_message_to_service('quit\n')
207 self.assertEqual('ok\nquit command requested... exiting\n', response)
208 self.service.service_stopped.wait(10.0)
209 self.assertTrue(self.service.service_stopped.isSet())
210
211 def test_send_invalid_message_fails(self):
212 response = self.send_message_to_service('unknown\n')
213 self.assertStartsWith(response, 'FAILURE')
214
215 def test_send_hello_heartbeat(self):
216 response = self.send_message_to_service('hello\n')
217 self.assertEqual('ok\nyep, still alive\n', response)
218
219 def test_send_simple_fork(self):
220 response = self.send_message_to_service('fork rocks\n')
221 self.assertEqual('ok\nfake forking\n', response)
222 self.assertEqual([(['rocks'], {})], self.service.fork_log)
223
224 def test_send_fork_env_with_empty_env(self):
225 response = self.send_message_to_service(
226 'fork-env rocks\n'
227 'end\n')
228 self.assertEqual('ok\nfake forking\n', response)
229 self.assertEqual([(['rocks'], {})], self.service.fork_log)
230
231 def test_send_fork_env_with_env(self):
232 response = self.send_message_to_service(
233 'fork-env rocks\n'
234 'BZR_EMAIL: joe@example.com\n'
235 'end\n')
236 self.assertEqual('ok\nfake forking\n', response)
237 self.assertEqual([(['rocks'], {'BZR_EMAIL': 'joe@example.com'})],
238 self.service.fork_log)
239
240 def test_send_fork_env_slowly(self):
241 response = self.send_message_to_service(
242 'fork-env rocks\n'
243 'BZR_EMAIL: joe@example.com\n'
244 'end\n', one_byte_at_a_time=True)
245 self.assertEqual('ok\nfake forking\n', response)
246 self.assertEqual([(['rocks'], {'BZR_EMAIL': 'joe@example.com'})],
247 self.service.fork_log)
248
249 def test_send_incomplete_fork_env_timeout(self):
250 # We should get a failure message if we can't quickly read the whole
251 # content
252 response = self.send_message_to_service(
253 'fork-env rocks\n'
254 'BZR_EMAIL: joe@example.com\n',
255 one_byte_at_a_time=True)
256 # Note that we *don't* send a final 'end\n'
257 self.assertStartsWith(response, 'FAILURE\n')
258
259 def test_send_incomplete_request_timeout(self):
260 # Requests end with '\n', send one without it
261 response = self.send_message_to_service('hello',
262 one_byte_at_a_time=True)
263 self.assertStartsWith(response, 'FAILURE\n')
264
265
266class TestCaseWithSubprocess(tests.TestCaseWithTransport):
267 """Override the bzr start_bzr_subprocess command.
268
269 The launchpad infrastructure requires a fair amount of configuration to get
270 paths, etc correct. This provides a "start_bzr_subprocess" command that
271 has all of those paths appropriately set, but otherwise functions the same
272 as the bzrlib.tests.TestCase version.
273 """
274
275 def get_python_path(self):
276 """Return the path to the Python interpreter."""
277 return '%s/bin/py' % config.root
278
279 def start_bzr_subprocess(self, process_args, env_changes=None,
280 working_dir=None):
281 """Start bzr in a subprocess for testing.
282
283 Copied and modified from `bzrlib.tests.TestCase.start_bzr_subprocess`.
284 This version removes some of the skipping stuff, some of the
285 irrelevant comments (e.g. about win32) and uses Launchpad's own
286 mechanisms for getting the path to 'bzr'.
287
288 Comments starting with 'LAUNCHPAD' are comments about our
289 modifications.
290 """
291 if env_changes is None:
292 env_changes = {}
293 env_changes['BZR_PLUGIN_PATH'] = get_BZR_PLUGIN_PATH_for_subprocess()
294 old_env = {}
295
296 def cleanup_environment():
297 for env_var, value in env_changes.iteritems():
298 old_env[env_var] = osutils.set_or_unset_env(env_var, value)
299
300 def restore_environment():
301 for env_var, value in old_env.iteritems():
302 osutils.set_or_unset_env(env_var, value)
303
304 cwd = None
305 if working_dir is not None:
306 cwd = osutils.getcwd()
307 os.chdir(working_dir)
308
309 # LAUNCHPAD: Because of buildout, we need to get a custom Python
310 # binary, not sys.executable.
311 python_path = self.get_python_path()
312 # LAUNCHPAD: We can't use self.get_bzr_path(), since it'll find
313 # lib/bzrlib, rather than the path to sourcecode/bzr/bzr.
314 bzr_path = get_bzr_path()
315 try:
316 cleanup_environment()
317 command = [python_path, bzr_path]
318 command.extend(process_args)
319 process = self._popen(
320 command, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
321 stderr=subprocess.PIPE)
322 finally:
323 restore_environment()
324 if cwd is not None:
325 os.chdir(cwd)
326
327 return process
328
329
330class TestCaseWithLPForkingServiceSubprocess(TestCaseWithSubprocess):
331 """Tests will get a separate process to communicate to.
332
333 The number of these tests should be small, because it is expensive to start
334 and stop the daemon.
335
336 TODO: This should probably use testresources, or layers somehow...
337 """
338
339 def setUp(self):
340 super(TestCaseWithLPForkingServiceSubprocess, self).setUp()
341 (self.service_process,
342 self.service_path) = self.start_service_subprocess()
343 self.addCleanup(self.stop_service)
344
345 def start_conversation(self, message, one_byte_at_a_time=False):
346 """Start talking to the service, and get the initial response."""
347 client_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
348 trace.mutter('sending %r to socket %s' % (message, self.service_path))
349 client_sock.connect(self.service_path)
350 if one_byte_at_a_time:
351 for byte in message:
352 client_sock.send(byte)
353 else:
354 client_sock.sendall(message)
355 response = client_sock.recv(1024)
356 trace.mutter('response: %r' % (response,))
357 if response.startswith("FAILURE"):
358 raise RuntimeError('Failed to send message: %r' % (response,))
359 return response, client_sock
360
361 def send_message_to_service(self, message, one_byte_at_a_time=False):
362 response, client_sock = self.start_conversation(message,
363 one_byte_at_a_time=one_byte_at_a_time)
364 client_sock.close()
365 return response
366
367 def send_fork_request(self, command, env=None):
368 if env is not None:
369 request_lines = ['fork-env %s\n' % (command,)]
370 for key, value in env.iteritems():
371 request_lines.append('%s: %s\n' % (key, value))
372 request_lines.append('end\n')
373 request = ''.join(request_lines)
374 else:
375 request = 'fork %s\n' % (command,)
376 response, sock = self.start_conversation(request)
377 ok, pid, path, tail = response.split('\n')
378 self.assertEqual('ok', ok)
379 self.assertEqual('', tail)
380 # Don't really care what it is, but should be an integer
381 pid = int(pid)
382 path = path.strip()
383 self.assertContainsRe(path, '/lp-forking-service-child-')
384 return path, pid, sock
385
386 def start_service_subprocess(self):
387 # Make sure this plugin is exposed to the subprocess
388 # SLOOWWW (~2 seconds, which is why we are doing the work anyway)
389 fd, tempname = tempfile.mkstemp(prefix='tmp-log-bzr-lp-forking-')
390 # I'm not 100% sure about when cleanup runs versus addDetail, but I
391 # think this will work.
392 self.addCleanup(os.remove, tempname)
393
394 def read_log():
395 f = os.fdopen(fd)
396 f.seek(0)
397 content = f.read()
398 f.close()
399 return [content]
400 self.addDetail('server-log', content.Content(
401 content.ContentType('text', 'plain', {"charset": "utf8"}),
402 read_log))
403 service_fd, path = tempfile.mkstemp(prefix='tmp-lp-service-',
404 suffix='.sock')
405 os.close(service_fd)
406 os.remove(path) # service wants create it as a socket
407 env_changes = {'BZR_PLUGIN_PATH': lpserve.__path__[0],
408 'BZR_LOG': tempname}
409 proc = self.start_bzr_subprocess(
410 ['lp-service', '--path', path, '--no-preload',
411 '--children-timeout=1'],
412 env_changes=env_changes)
413 trace.mutter('started lp-service subprocess')
414 expected = 'Listening on socket: %s\n' % (path,)
415 path_line = proc.stderr.readline()
416 trace.mutter(path_line)
417 self.assertEqual(expected, path_line)
418 # The process won't delete it, so we do
419 return proc, path
420
421 def stop_service(self):
422 if self.service_process is None:
423 # Already stopped
424 return
425 # First, try to stop the service gracefully, by sending a 'quit'
426 # message
427 try:
428 response = self.send_message_to_service('quit\n')
429 except socket.error, e:
430 # Ignore a failure to connect, the service must be stopping/stopped
431 # already
432 response = None
433 tend = time.time() + 10.0
434 while self.service_process.poll() is None:
435 if time.time() > tend:
436 self.finish_bzr_subprocess(process=self.service_process,
437 send_signal=signal.SIGINT, retcode=3)
438 self.fail('Failed to quit gracefully after 10.0 seconds')
439 time.sleep(0.1)
440 if response is not None:
441 self.assertEqual('ok\nquit command requested... exiting\n',
442 response)
443
444 def _get_fork_handles(self, path):
445 trace.mutter('getting handles for: %s' % (path,))
446 stdin_path = os.path.join(path, 'stdin')
447 stdout_path = os.path.join(path, 'stdout')
448 stderr_path = os.path.join(path, 'stderr')
449 # The ordering must match the ordering of the service or we get a
450 # deadlock.
451 child_stdin = open(stdin_path, 'wb')
452 child_stdout = open(stdout_path, 'rb')
453 child_stderr = open(stderr_path, 'rb')
454 return child_stdin, child_stdout, child_stderr
455
456 def communicate_with_fork(self, path, stdin=None):
457 child_stdin, child_stdout, child_stderr = self._get_fork_handles(path)
458 if stdin is not None:
459 child_stdin.write(stdin)
460 child_stdin.close()
461 stdout_content = child_stdout.read()
462 stderr_content = child_stderr.read()
463 return stdout_content, stderr_content
464
465 def assertReturnCode(self, expected_code, sock):
466 """Assert that we get the expected return code as a message."""
467 response = sock.recv(1024)
468 self.assertStartsWith(response, 'exited\n')
469 code = int(response.split('\n', 1)[1])
470 self.assertEqual(expected_code, code)
471
472 def test_fork_lp_serve_hello(self):
473 path, _, sock = self.send_fork_request('lp-serve --inet 2')
474 stdout_content, stderr_content = self.communicate_with_fork(path,
475 'hello\n')
476 self.assertEqual('ok\x012\n', stdout_content)
477 self.assertEqual('', stderr_content)
478 self.assertReturnCode(0, sock)
479
480 def test_fork_replay(self):
481 path, _, sock = self.send_fork_request('launchpad-replay')
482 stdout_content, stderr_content = self.communicate_with_fork(path,
483 '1 hello\n2 goodbye\n1 maybe\n')
484 self.assertEqualDiff('hello\nmaybe\n', stdout_content)
485 self.assertEqualDiff('goodbye\n', stderr_content)
486 self.assertReturnCode(0, sock)
487
488 def test_just_run_service(self):
489 # Start and stop are defined in setUp()
490 pass
491
492 def test_fork_multiple_children(self):
493 paths = []
494 for idx in range(4):
495 paths.append(self.send_fork_request('launchpad-replay'))
496 # Do them out of order, as order shouldn't matter.
497 for idx in [3, 2, 0, 1]:
498 p, pid, sock = paths[idx]
499 stdout_msg = 'hello %d\n' % (idx,)
500 stderr_msg = 'goodbye %d\n' % (idx+1,)
501 stdout, stderr = self.communicate_with_fork(p,
502 '1 %s2 %s' % (stdout_msg, stderr_msg))
503 self.assertEqualDiff(stdout_msg, stdout)
504 self.assertEqualDiff(stderr_msg, stderr)
505 self.assertReturnCode(0, sock)
506
507 def test_fork_respects_env_vars(self):
508 path, pid, sock = self.send_fork_request('whoami',
509 env={'BZR_EMAIL': 'this_test@example.com'})
510 stdout_content, stderr_content = self.communicate_with_fork(path)
511 self.assertEqual('', stderr_content)
512 self.assertEqual('this_test@example.com\n', stdout_content)
513
514 def _check_exits_nicely(self, sig_id):
515 path, _, sock = self.send_fork_request('rocks')
516 self.assertEqual(None, self.service_process.poll())
517 # Now when we send SIGTERM, it should wait for the child to exit,
518 # before it tries to exit itself.
519 # In python2.6+ we could use self.service_process.terminate()
520 os.kill(self.service_process.pid, sig_id)
521 self.assertEqual(None, self.service_process.poll())
522 # Now talk to the child, so the service can close
523 stdout_content, stderr_content = self.communicate_with_fork(path)
524 self.assertEqual('It sure does!\n', stdout_content)
525 self.assertEqual('', stderr_content)
526 self.assertReturnCode(0, sock)
527 # And the process should exit cleanly
528 self.assertEqual(0, self.service_process.wait())
529
530 def test_sigterm_exits_nicely(self):
531 self._check_exits_nicely(signal.SIGTERM)
532
533 def test_sigint_exits_nicely(self):
534 self._check_exits_nicely(signal.SIGINT)
0535
=== modified file 'configs/development/launchpad-lazr.conf'
--- configs/development/launchpad-lazr.conf 2010-10-08 06:22:10 +0000
+++ configs/development/launchpad-lazr.conf 2010-10-19 22:56:30 +0000
@@ -76,6 +76,7 @@
76lp_url_hosts: dev76lp_url_hosts: dev
77access_log: /var/tmp/bazaar.launchpad.dev/codehosting-access.log77access_log: /var/tmp/bazaar.launchpad.dev/codehosting-access.log
78blacklisted_hostnames:78blacklisted_hostnames:
79use_forking_daemon: True
7980
80[codeimport]81[codeimport]
81bazaar_branch_store: file:///tmp/bazaar-branches82bazaar_branch_store: file:///tmp/bazaar-branches
8283
=== modified file 'lib/canonical/config/schema-lazr.conf'
--- lib/canonical/config/schema-lazr.conf 2010-10-15 17:04:51 +0000
+++ lib/canonical/config/schema-lazr.conf 2010-10-19 22:56:30 +0000
@@ -301,6 +301,18 @@
301# datatype: string301# datatype: string
302logfile: -302logfile: -
303303
304# The location of the log file used by the LaunchpadForkingService
305# datatype: string
306forker_logfile: -
307
308# Should we be using the forking daemon? Or should we be calling spawnProcess
309# instead?
310# datatype: boolean
311use_forking_daemon: False
312# What disk path will the daemon listen on
313# datatype: string
314forking_daemon_socket: /var/tmp/launchpad_forking_service.sock
315
304# The prefix of the web URL for all public branches. This should end with a316# The prefix of the web URL for all public branches. This should end with a
305# slash.317# slash.
306#318#
307319
=== modified file 'lib/canonical/launchpad/scripts/runlaunchpad.py'
--- lib/canonical/launchpad/scripts/runlaunchpad.py 2010-10-11 04:07:36 +0000
+++ lib/canonical/launchpad/scripts/runlaunchpad.py 2010-10-19 22:56:30 +0000
@@ -174,6 +174,52 @@
174 process.stdin.close()174 process.stdin.close()
175175
176176
177class ForkingSessionService(Service):
178 """A lp-forking-service for handling codehosting access."""
179
180 # TODO: The "sftp" (aka codehosting) server depends fairly heavily on this
181 # service. It would seem reasonable to make one always start if the
182 # other one is started. Though this might be a way to "FeatureFlag"
183 # whether this is active or not.
184 @property
185 def should_launch(self):
186 return (config.codehosting.launch and
187 config.codehosting.use_forking_daemon)
188
189 @property
190 def logfile(self):
191 """Return the log file to use.
192
193 Default to the value of the configuration key logfile.
194 """
195 return config.codehosting.forker_logfile
196
197 def launch(self):
198 # Following the logic in TacFile. Specifically, if you configure sftp
199 # to not run (and thus bzr+ssh) then we don't want to run the forking
200 # service.
201 if not self.should_launch:
202 return
203 from lp.codehosting import get_bzr_path
204 command = [config.root + '/bin/py', get_bzr_path(),
205 'launchpad-forking-service',
206 '--path', config.codehosting.forking_daemon_socket,
207 ]
208 env = dict(os.environ)
209 env['BZR_PLUGIN_PATH'] = config.root + '/bzrplugins'
210 logfile = self.logfile
211 if logfile == '-':
212 # This process uses a different logging infrastructure from the
213 # rest of the Launchpad code. As such, it cannot trivially use '-'
214 # as the logfile. So we just ignore this setting.
215 pass
216 else:
217 env['BZR_LOG'] = logfile
218 process = subprocess.Popen(command, env=env, stdin=subprocess.PIPE)
219 self.addCleanup(stop_process, process)
220 process.stdin.close()
221
222
177def stop_process(process):223def stop_process(process):
178 """kill process and BLOCK until process dies.224 """kill process and BLOCK until process dies.
179225
@@ -193,6 +239,7 @@
193 'librarian': TacFile('librarian', 'daemons/librarian.tac',239 'librarian': TacFile('librarian', 'daemons/librarian.tac',
194 'librarian_server', prepare_for_librarian),240 'librarian_server', prepare_for_librarian),
195 'sftp': TacFile('sftp', 'daemons/sftp.tac', 'codehosting'),241 'sftp': TacFile('sftp', 'daemons/sftp.tac', 'codehosting'),
242 'forker': ForkingSessionService(),
196 'mailman': MailmanService(),243 'mailman': MailmanService(),
197 'codebrowse': CodebrowseService(),244 'codebrowse': CodebrowseService(),
198 'google-webservice': GoogleWebService(),245 'google-webservice': GoogleWebService(),
199246
=== modified file 'lib/lp/codehosting/sshserver/session.py'
--- lib/lp/codehosting/sshserver/session.py 2010-08-20 20:31:18 +0000
+++ lib/lp/codehosting/sshserver/session.py 2010-10-19 22:56:30 +0000
@@ -9,11 +9,19 @@
9 ]9 ]
1010
11import os11import os
12import signal
13import socket
12import urlparse14import urlparse
1315
14from twisted.internet.process import ProcessExitedAlready16from zope.event import notify
17from zope.interface import implements
18
19from twisted.internet import (
20 error,
21 interfaces,
22 process,
23 )
15from twisted.python import log24from twisted.python import log
16from zope.event import notify
1725
18from canonical.config import config26from canonical.config import config
19from lp.codehosting import get_bzr_path27from lp.codehosting import get_bzr_path
@@ -35,6 +43,238 @@
35 """Raised when a session is asked to execute a forbidden command."""43 """Raised when a session is asked to execute a forbidden command."""
3644
3745
46class _WaitForExit(process.ProcessReader):
47 """Wait on a socket for the exit status."""
48
49 def __init__(self, reactor, proc, sock):
50 super(_WaitForExit, self).__init__(reactor, proc, 'exit',
51 sock.fileno())
52 self._sock = sock
53 self.connected = 1
54
55 def close(self):
56 self._sock.close()
57
58 def dataReceived(self, data):
59 # TODO: how do we handle getting only *some* of the content?, Maybe we
60 # need to read more bytes first...
61
62 # This is the only thing we do differently from the standard
63 # ProcessReader. When we get data on this socket, we need to treat it
64 # as a return code, or a failure.
65 if not data.startswith('exited'):
66 # Bad data, we want to signal that we are closing the connection
67 # TODO: How?
68 self.proc.childConnectionLost(self.name, "invalid data")
69 self.close()
70 # I don't know what to put here if we get bogus data, but I *do*
71 # want to say that the process is now considered dead to me
72 log.err('Got invalid exit information: %r' % (data,))
73 exit_status = (255 << 8)
74 else:
75 exit_status = int(data.split('\n')[1])
76 self.proc.processEnded(exit_status)
77
78
79class ForkedProcessTransport(process.BaseProcess):
80 """Wrap the forked process in a ProcessTransport so we can talk to it.
81
82 Note that instantiating the class creates the fork and sets it up in the
83 reactor.
84 """
85
86 implements(interfaces.IProcessTransport)
87
88 # Design decisions
89 # [Decision #a]
90 # Inherit from process.BaseProcess
91 # This seems slightly risky, as process.BaseProcess is actually
92 # imported from twisted.internet._baseprocess.BaseProcess. The
93 # real-world Process then actually inherits from process._BaseProcess
94 # I've also had to copy a fair amount from the actual Process
95 # command.
96 # One option would be to inherit from process.Process, and just
97 # override stuff like __init__ and reapProcess which I don't want to
98 # do in the same way. (Is it ok not to call your Base classes
99 # __init__ if you don't want to do that exact work?)
100 def __init__(self, reactor, executable, args, environment, proto):
101 process.BaseProcess.__init__(self, proto)
102 # Map from standard file descriptor to the associated pipe
103 self.pipes = {}
104 pid, path, sock = self._spawn(executable, args, environment)
105 self._fifo_path = path
106 self.pid = pid
107 self.process_sock = sock
108 self._fifo_path = path
109 self._connectSpawnToReactor(reactor)
110 if self.proto is not None:
111 self.proto.makeConnection(self)
112
113 def _sendMessageToService(self, message):
114 """Send a message to the Forking service and get the response"""
115 path = config.codehosting.forking_daemon_socket
116 client_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
117 log.msg('Connecting to Forking Service @ socket: %s for %r'
118 % (path, message))
119 try:
120 client_sock.connect(path)
121 client_sock.sendall(message)
122 # We define the requests to be no bigger than 1kB. (For now)
123 response = client_sock.recv(1024)
124 except socket.error, e:
125 # TODO: What exceptions should be raised?
126 # Raising the raw exception seems to kill the twisted reactor
127 # Note that if the connection is refused, we *could* just
128 # fall back on a regular 'spawnProcess' call.
129 log.err('Connection failed: %s' % (e,))
130 raise
131 if response.startswith("FAILURE"):
132 raise RuntimeError('Failed to send message: %r' % (response,))
133 return response, client_sock
134
135 def _spawn(self, executable, args, environment):
136 """Start the new process.
137
138 This talks to the ForkingSessionService and requests a new process be
139 started. Similar to what Process.__init__/_fork would do.
140
141 :return: The pid, communication directory, and request socket.
142 """
143 assert executable == 'bzr', executable # Maybe .endswith()
144 assert args[0] == 'bzr', args[0]
145 command_str = ' '.join(args[1:])
146 message = ['fork-env %s\n' % (' '.join(args[1:]),)]
147 for key, value in environment.iteritems():
148 # XXX: Currently we only pass BZR_EMAIL, should we be passing
149 # everything else? Note that many won't be handled properly,
150 # since the process is already running.
151 if key != 'BZR_EMAIL':
152 continue
153 message.append('%s: %s\n' % (key, value))
154 message.append('end\n')
155 message = ''.join(message)
156 response, sock = self._sendMessageToService(message)
157 if response.startswith('FAILURE'):
158 # TODO: Is there a better error to raise?
159 raise RuntimeError("Failed while sending message to forking "
160 "service. message: %r, failure: %r"
161 % (message, response))
162 ok, pid, path, tail = response.split('\n')
163 assert ok == 'ok'
164 assert tail == ''
165 pid = int(pid)
166 log.msg('Forking returned pid: %d, path: %s' % (pid, path))
167 return pid, path, sock
168
169 def _connectSpawnToReactor(self, reactor):
170 stdin_path = os.path.join(self._fifo_path, 'stdin')
171 stdout_path = os.path.join(self._fifo_path, 'stdout')
172 stderr_path = os.path.join(self._fifo_path, 'stderr')
173 child_stdin_fd = os.open(stdin_path, os.O_WRONLY)
174 self.pipes[0] = process.ProcessWriter(reactor, self, 0,
175 child_stdin_fd)
176 child_stdout_fd = os.open(stdout_path, os.O_RDONLY)
177 # forceReadHack=True ? Used in process.py doesn't seem to be needed
178 # here
179 self.pipes[1] = process.ProcessReader(reactor, self, 1,
180 child_stdout_fd)
181 child_stderr_fd = os.open(stderr_path, os.O_RDONLY)
182 self.pipes[2] = process.ProcessReader(reactor, self, 2,
183 child_stderr_fd)
184 # Note: _exiter forms a GC cycle, since it points to us, and we hold a
185 # reference to it
186 self._exiter = _WaitForExit(reactor, self, self.process_sock)
187 self.pipes['exit'] = self._exiter
188
189 def _getReason(self, status):
190 # Copied from twisted.internet.process._BaseProcess
191 exitCode = sig = None
192 if os.WIFEXITED(status):
193 exitCode = os.WEXITSTATUS(status)
194 else:
195 sig = os.WTERMSIG(status)
196 if exitCode or sig:
197 return error.ProcessTerminated(exitCode, sig, status)
198 return error.ProcessDone(status)
199
200 def signalProcess(self, signalID):
201 """
202 Send the given signal C{signalID} to the process. It'll translate a
203 few signals ('HUP', 'STOP', 'INT', 'KILL', 'TERM') from a string
204 representation to its int value, otherwise it'll pass directly the
205 value provided
206
207 @type signalID: C{str} or C{int}
208 """
209 # Copied from twisted.internet.process._BaseProcess
210 if signalID in ('HUP', 'STOP', 'INT', 'KILL', 'TERM'):
211 signalID = getattr(signal, 'SIG%s' % (signalID,))
212 if self.pid is None:
213 raise process.ProcessExitedAlready()
214 os.kill(self.pid, signalID)
215
216 # Implemented because conch.ssh.session uses it, the Process implementation
217 # ignores writes if channel '0' is not available
218 def write(self, data):
219 self.pipes[0].write(data)
220
221 def writeToChild(self, childFD, data):
222 # Copied from twisted.internet.process.Process
223 self.pipes[childFD].write(data)
224
225 def closeChildFD(self, childFD):
226 if childFD in self.pipes:
227 self.pipes[childFD].loseConnection()
228
229 def closeStdin(self):
230 self.closeChildFD(0)
231
232 def closeStdout(self):
233 self.closeChildFD(1)
234
235 def closeStderr(self):
236 self.closeChildFD(2)
237
238 def loseConnection(self):
239 self.closeStdin()
240 self.closeStdout()
241 self.closeStderr()
242
243 # Implemented because ProcessWriter/ProcessReader want to call it
244 # Copied from twisted.internet.Process
245 def childDataReceived(self, name, data):
246 self.proto.childDataReceived(name, data)
247
248 # Implemented because ProcessWriter/ProcessReader want to call it
249 # Copied from twisted.internet.Process
250 def childConnectionLost(self, childFD, reason):
251 close = getattr(self.pipes[childFD], 'close', None)
252 if close is not None:
253 close()
254 else:
255 os.close(self.pipes[childFD].fileno())
256 del self.pipes[childFD]
257 try:
258 self.proto.childConnectionLost(childFD)
259 except:
260 log.err()
261 self.maybeCallProcessEnded()
262
263 # Implemented because of childConnectionLost
264 # Adapted from twisted.internet.Process
265 # Note: Process.maybeCallProcessEnded() tries to reapProcess() at this
266 # point, but the daemon will be doing the reaping for us (we can't
267 # because the process isn't a direct child.)
268 def maybeCallProcessEnded(self):
269 if self.pipes:
270 # Not done if we still have open pipes
271 return
272 if not self.lostProcess:
273 return
274 process.BaseProcess.maybeCallProcessEnded(self)
275 # pauseProducing, present in process.py, not a IProcessTransport interface
276
277
38class ExecOnlySession(DoNothingSession):278class ExecOnlySession(DoNothingSession):
39 """Conch session that only allows executing commands."""279 """Conch session that only allows executing commands."""
40280
@@ -58,7 +298,7 @@
58 notify(BazaarSSHClosed(self.avatar))298 notify(BazaarSSHClosed(self.avatar))
59 try:299 try:
60 self._transport.signalProcess('HUP')300 self._transport.signalProcess('HUP')
61 except (OSError, ProcessExitedAlready):301 except (OSError, process.ProcessExitedAlready):
62 pass302 pass
63 self._transport.loseConnection()303 self._transport.loseConnection()
64304
@@ -81,8 +321,7 @@
81 except ForbiddenCommand, e:321 except ForbiddenCommand, e:
82 self.errorWithMessage(protocol, str(e) + '\r\n')322 self.errorWithMessage(protocol, str(e) + '\r\n')
83 return323 return
84 log.msg('Running: %r, %r, %r'324 log.msg('Running: %r, %r' % (executable, arguments))
85 % (executable, arguments, self.environment))
86 if self._transport is not None:325 if self._transport is not None:
87 log.err(326 log.err(
88 "ERROR: %r already running a command on transport %r"327 "ERROR: %r already running a command on transport %r"
@@ -91,8 +330,12 @@
91 # violation. Apart from this line and its twin, this class knows330 # violation. Apart from this line and its twin, this class knows
92 # nothing about Bazaar.331 # nothing about Bazaar.
93 notify(BazaarSSHStarted(self.avatar))332 notify(BazaarSSHStarted(self.avatar))
94 self._transport = self.reactor.spawnProcess(333 self._transport = self._spawn(protocol, executable, arguments,
95 protocol, executable, arguments, env=self.environment)334 env=self.environment)
335
336 def _spawn(self, protocol, executable, arguments, env):
337 return self.reactor.spawnProcess(protocol, executable, arguments,
338 env=env)
96339
97 def getCommandToRun(self, command):340 def getCommandToRun(self, command):
98 """Return the command that will actually be run given `command`.341 """Return the command that will actually be run given `command`.
@@ -144,21 +387,65 @@
144 % {'user_id': self.avatar.user_id})387 % {'user_id': self.avatar.user_id})
145388
146389
390class ForkingRestrictedExecOnlySession(RestrictedExecOnlySession):
391 """Use the Forking Service instead of spawnProcess."""
392
393 def _simplifyEnvironment(self, env):
394 """Pull out the bits of the environment we want to pass along."""
395 env = {}
396 for env_var in ['BZR_EMAIL']:
397 if env_var in self.environment:
398 env[env_var] = self.environment[env_var]
399 return env
400
401 def getCommandToFork(self, executable, arguments, env):
402 assert executable.endswith('/bin/py')
403 assert arguments[0] == executable
404 assert arguments[1].endswith('/bzr')
405 executable = 'bzr'
406 arguments = arguments[1:]
407 arguments[0] = 'bzr'
408 env = self._simplifyEnvironment(env)
409 return executable, arguments, env
410
411 def _spawn(self, protocol, executable, arguments, env):
412 # When spawning, adapt the idea of "bin/py .../bzr" to just using "bzr"
413 # and the executable
414 executable, arguments, env = self.getCommandToFork(executable,
415 arguments, env)
416 return ForkedProcessTransport(self.reactor, executable,
417 arguments, env, protocol)
418
419
147def launch_smart_server(avatar):420def launch_smart_server(avatar):
148 from twisted.internet import reactor421 from twisted.internet import reactor
149422
150 command = (423 python_command = "%(root)s/bin/py %(bzr)s" % {
151 "%(root)s/bin/py %(bzr)s lp-serve --inet "424 'root': config.root,
152 % {'root': config.root, 'bzr': get_bzr_path()})425 'bzr': get_bzr_path(),
426 }
427 args = " lp-serve --inet %(user_id)s"
428 command = python_command + args
429 forking_command = "bzr" + args
153430
154 environment = dict(os.environ)431 environment = dict(os.environ)
155432
156 # Extract the hostname from the supermirror root config.433 # Extract the hostname from the supermirror root config.
157 hostname = urlparse.urlparse(config.codehosting.supermirror_root)[1]434 hostname = urlparse.urlparse(config.codehosting.supermirror_root)[1]
158 environment['BZR_EMAIL'] = '%s@%s' % (avatar.username, hostname)435 environment['BZR_EMAIL'] = '%s@%s' % (avatar.username, hostname)
159 return RestrictedExecOnlySession(436 klass = RestrictedExecOnlySession
437 # TODO: Use a FeatureFlag to enable this in a more fine-grained approach.
438 # If the forking daemon has been spawned, then we can use it if the
439 # feature is set to true for the given user, etc.
440 # A global config is a good first step, but does require restarting
441 # the service to change the flag. 'config' doesn't support SIGHUP.
442 # For now, restarting the service is necessary to enabled/disable the
443 # forking daemon.
444 if config.codehosting.use_forking_daemon:
445 klass = ForkingRestrictedExecOnlySession
446 return klass(
160 avatar,447 avatar,
161 reactor,448 reactor,
162 'bzr serve --inet --directory=/ --allow-writes',449 'bzr serve --inet --directory=/ --allow-writes',
163 command + ' %(user_id)s',450 command,
164 environment=environment)451 environment=environment)
165452
=== modified file 'lib/lp/codehosting/sshserver/tests/test_session.py'
--- lib/lp/codehosting/sshserver/tests/test_session.py 2010-08-20 20:31:18 +0000
+++ lib/lp/codehosting/sshserver/tests/test_session.py 2010-10-19 22:56:30 +0000
@@ -5,6 +5,7 @@
55
6__metaclass__ = type6__metaclass__ = type
77
8import socket
8import unittest9import unittest
910
10from twisted.conch.interfaces import ISession11from twisted.conch.interfaces import ISession
@@ -21,9 +22,12 @@
21from lp.codehosting.sshserver.session import (22from lp.codehosting.sshserver.session import (
22 ExecOnlySession,23 ExecOnlySession,
23 ForbiddenCommand,24 ForbiddenCommand,
25 ForkingRestrictedExecOnlySession,
24 RestrictedExecOnlySession,26 RestrictedExecOnlySession,
27 _WaitForExit,
25 )28 )
26from lp.codehosting.tests.helpers import AvatarTestCase29from lp.codehosting.tests.helpers import AvatarTestCase
30from lp.testing import TestCase
2731
2832
29class MockReactor:33class MockReactor:
@@ -40,6 +44,9 @@
40 usePTY, childFDs))44 usePTY, childFDs))
41 return MockProcessTransport(executable)45 return MockProcessTransport(executable)
4246
47 def addReader(self, reader):
48 self.log.append(('addReader', reader))
49
4350
44class MockSSHSession:51class MockSSHSession:
45 """Just enough of SSHSession to allow checking of reporting to stderr."""52 """Just enough of SSHSession to allow checking of reporting to stderr."""
@@ -60,6 +67,7 @@
60 self._executable = executable67 self._executable = executable
61 self.log = []68 self.log = []
62 self.session = MockSSHSession(self.log)69 self.session = MockSSHSession(self.log)
70 self.status = None
6371
64 def closeStdin(self):72 def closeStdin(self):
65 self.log.append(('closeStdin',))73 self.log.append(('closeStdin',))
@@ -67,6 +75,9 @@
67 def loseConnection(self):75 def loseConnection(self):
68 self.log.append(('loseConnection',))76 self.log.append(('loseConnection',))
6977
78 def childConnectionLost(self, childFD, reason=None):
79 self.log.append(('childConnectionLost', childFD, reason))
80
70 def signalProcess(self, signal):81 def signalProcess(self, signal):
71 if self._executable == 'raise-os-error':82 if self._executable == 'raise-os-error':
72 raise OSError()83 raise OSError()
@@ -77,6 +88,39 @@
77 def write(self, data):88 def write(self, data):
78 self.log.append(('write', data))89 self.log.append(('write', data))
7990
91 def processEnded(self, status):
92 self.log.append(('processEnded', status))
93
94
95class Test_WaitForExit(TestCase):
96
97 def setUp(self):
98 TestCase.setUp(self)
99 self.reactor = MockReactor()
100 self.proc = MockProcessTransport('executable')
101 sock = socket.socket()
102 self.exiter = _WaitForExit(self.reactor, self.proc, sock)
103
104 def test__init__starts_reading(self):
105 self.assertEqual([('addReader', self.exiter)], self.reactor.log)
106
107 def test_dataReceived_ends_cleanly(self):
108 self.exiter.dataReceived('exited\n0\n')
109 self.assertEqual([('processEnded', 0)], self.proc.log)
110
111 def test_dataReceived_ends_with_errno(self):
112 self.exiter.dataReceived('exited\n256\n')
113 self.assertEqual([('processEnded', 256)], self.proc.log)
114
115 def test_dataReceived_bad_data(self):
116 # Note: The dataReceived code calls 'log.err' which ends up getting
117 # printed during the test run. How do I suppress that or even
118 # better, check that it does so?
119 # self.flushLoggedErrors() doesn't seem to do anything.
120 self.exiter.dataReceived('bogus\n')
121 self.assertEqual([('childConnectionLost', 'exit', 'invalid data'),
122 ('processEnded', (255 << 8))], self.proc.log)
123
80124
81class TestExecOnlySession(AvatarTestCase):125class TestExecOnlySession(AvatarTestCase):
82 """Tests for ExecOnlySession.126 """Tests for ExecOnlySession.
@@ -340,6 +384,35 @@
340 self.assertRaises(384 self.assertRaises(
341 ForbiddenCommand, session.getCommandToRun, 'rm -rf /')385 ForbiddenCommand, session.getCommandToRun, 'rm -rf /')
342386
387 def test_avatarAdaptsToOnlyRestrictedSession(self):
388 config.push('codehosting-no-forking',
389 "[codehosting]\nuse_forking_daemon: False\n")
390 self.addCleanup(config.pop, 'codehosting-no-forking')
391 session = ISession(self.avatar)
392 self.failIf(isinstance(session, ForkingRestrictedExecOnlySession),
393 "ISession(avatar) shouldn't adapt to "
394 " ForkingRestrictedExecOnlySession when forking is disabled. ")
395
396 def test_avatarAdaptsToForkingRestrictedExecOnlySession(self):
397 config.push('codehosting-forking',
398 "[codehosting]\nuse_forking_daemon: True\n")
399 self.addCleanup(config.pop, 'codehosting-forking')
400 session = ISession(self.avatar)
401 self.failUnless(
402 isinstance(session, ForkingRestrictedExecOnlySession),
403 "ISession(avatar) doesn't adapt to "
404 " ForkingRestrictedExecOnlySession. "
405 "Got %r instead." % (session,))
406 executable, arguments = session.getCommandToRun(
407 'bzr serve --inet --directory=/ --allow-writes')
408 executable, arguments, env = session.getCommandToFork(
409 executable, arguments, session.environment)
410 self.assertEqual('bzr', executable)
411 self.assertEqual(
412 ['bzr', 'lp-serve',
413 '--inet', str(self.avatar.user_id)],
414 list(arguments))
415
343416
344def test_suite():417def test_suite():
345 return unittest.TestLoader().loadTestsFromName(__name__)418 return unittest.TestLoader().loadTestsFromName(__name__)
346419
=== modified file 'lib/lp/codehosting/tests/test_acceptance.py'
--- lib/lp/codehosting/tests/test_acceptance.py 2010-10-17 01:26:56 +0000
+++ lib/lp/codehosting/tests/test_acceptance.py 2010-10-19 22:56:30 +0000
@@ -8,6 +8,9 @@
8import atexit8import atexit
9import os9import os
10import re10import re
11import signal
12import subprocess
13import sys
11import unittest14import unittest
12import xmlrpclib15import xmlrpclib
1316
@@ -51,9 +54,58 @@
51from lp.testing import TestCaseWithFactory54from lp.testing import TestCaseWithFactory
5255
5356
57class ForkingServerForTests(object):
58 """Map starting/stopping a LPForkingService with setUp() and tearDown()."""
59
60 def __init__(self):
61 self.process = None
62 self.socket_path = None
63
64 def setUp(self):
65 bzr_path = get_bzr_path()
66 BZR_PLUGIN_PATH = get_BZR_PLUGIN_PATH_for_subprocess()
67 env = os.environ.copy()
68 env['BZR_PLUGIN_PATH'] = BZR_PLUGIN_PATH
69 # TODO: We probably want to use a random disk path for
70 # forking_daemon_socket, but we need to update config so that the
71 # CodeHosting service can find it.
72 # The main problem is that CodeHostingTac seems to start a tac
73 # server directly from the disk configs, and doesn't use the
74 # in-memory config. So we can't just override the memory
75 # settings, we have to somehow pass it a new config-on-disk to
76 # use.
77 self.socket_path = config.codehosting.forking_daemon_socket
78 process = subprocess.Popen(
79 [sys.executable, bzr_path, 'launchpad-forking-service',
80 '--path', self.socket_path,
81 ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
82 self.process = process
83 # Wait for it to indicate it is running
84 # The first line should be "Preloading" indicating it is ready
85 preloading_line = process.stderr.readline()
86 # The next line is the "Listening on socket" line
87 socket_line = process.stderr.readline()
88 # Now it is ready
89
90 def tearDown(self):
91 # SIGTERM is the graceful exit request, potentially we could wait a bit
92 # and send something stronger?
93 if self.process is not None and self.process.poll() is None:
94 os.kill(self.process.pid, signal.SIGTERM)
95 self.process.wait()
96 self.process = None
97 # We want to make sure the socket path has been cleaned up, so that
98 # future runs can work correctly
99 if os.path.exists(self.socket_path):
100 # Should there be a warning/error here?
101 os.remove(self.socket_path)
102
103
104
54class SSHServerLayer(ZopelessAppServerLayer):105class SSHServerLayer(ZopelessAppServerLayer):
55106
56 _tac_handler = None107 _tac_handler = None
108 _forker_service = None
57109
58 @classmethod110 @classmethod
59 def getTacHandler(cls):111 def getTacHandler(cls):
@@ -64,18 +116,27 @@
64 return cls._tac_handler116 return cls._tac_handler
65117
66 @classmethod118 @classmethod
119 def getForker(cls):
120 if cls._forker_service is None:
121 cls._forker_service = ForkingServerForTests()
122 return cls._forker_service
123
124 @classmethod
67 @profiled125 @profiled
68 def setUp(cls):126 def setUp(cls):
69 tac_handler = SSHServerLayer.getTacHandler()127 tac_handler = SSHServerLayer.getTacHandler()
70 tac_handler.setUp()128 tac_handler.setUp()
71 SSHServerLayer._reset()129 SSHServerLayer._reset()
72 atexit.register(tac_handler.tearDown)130 atexit.register(tac_handler.tearDown)
131 forker = SSHServerLayer.getForker()
132 forker.setUp()
73133
74 @classmethod134 @classmethod
75 @profiled135 @profiled
76 def tearDown(cls):136 def tearDown(cls):
77 SSHServerLayer._reset()137 SSHServerLayer._reset()
78 SSHServerLayer.getTacHandler().tearDown()138 SSHServerLayer.getTacHandler().tearDown()
139 SSHServerLayer.getForker().tearDown()
79140
80 @classmethod141 @classmethod
81 @profiled142 @profiled
82143
=== modified file 'lib/lp/codehosting/tests/test_lpserve.py'
--- lib/lp/codehosting/tests/test_lpserve.py 2010-08-20 20:31:18 +0000
+++ lib/lp/codehosting/tests/test_lpserve.py 2010-10-19 22:56:30 +0000
@@ -1,32 +1,21 @@
1# Copyright 2009 Canonical Ltd. This software is licensed under the1# Copyright 2009-2010 Canonical Ltd. This software is licensed under the
2# GNU Affero General Public License version 3 (see the file LICENSE).2# GNU Affero General Public License version 3 (see the file LICENSE).
33
4"""Tests for the lp-serve plugin."""4"""Tests for the lp-serve plugin."""
55
6__metaclass__ = type6__metaclass__ = type
77
8import os
9import re
10from subprocess import PIPE
11import unittest
12
13from bzrlib import (8from bzrlib import (
14 errors,9 errors,
15 osutils,
16 )10 )
17from bzrlib.smart import medium11from bzrlib.smart import medium
18from bzrlib.tests import TestCaseWithTransport
19from bzrlib.transport import remote12from bzrlib.transport import remote
13from bzrlib.plugins.lpserve.test_lpserve import TestCaseWithSubprocess
2014
21from canonical.config import config
22from lp.codehosting import (
23 get_bzr_path,
24 get_BZR_PLUGIN_PATH_for_subprocess,
25 )
26from lp.codehosting.bzrutils import make_error_utility15from lp.codehosting.bzrutils import make_error_utility
2716
2817
29class TestLaunchpadServe(TestCaseWithTransport):18class TestLaunchpadServe(TestCaseWithSubprocess):
30 """Tests for the lp-serve plugin.19 """Tests for the lp-serve plugin.
3120
32 Most of the helper methods here are copied from bzrlib.tests and21 Most of the helper methods here are copied from bzrlib.tests and
@@ -38,59 +27,6 @@
38 """Assert that a server process finished cleanly."""27 """Assert that a server process finished cleanly."""
39 self.assertEqual((0, '', ''), tuple(result))28 self.assertEqual((0, '', ''), tuple(result))
4029
41 def get_python_path(self):
42 """Return the path to the Python interpreter."""
43 return '%s/bin/py' % config.root
44
45 def start_bzr_subprocess(self, process_args, env_changes=None,
46 working_dir=None):
47 """Start bzr in a subprocess for testing.
48
49 Copied and modified from `bzrlib.tests.TestCase.start_bzr_subprocess`.
50 This version removes some of the skipping stuff, some of the
51 irrelevant comments (e.g. about win32) and uses Launchpad's own
52 mechanisms for getting the path to 'bzr'.
53
54 Comments starting with 'LAUNCHPAD' are comments about our
55 modifications.
56 """
57 if env_changes is None:
58 env_changes = {}
59 env_changes['BZR_PLUGIN_PATH'] = get_BZR_PLUGIN_PATH_for_subprocess()
60 old_env = {}
61
62 def cleanup_environment():
63 for env_var, value in env_changes.iteritems():
64 old_env[env_var] = osutils.set_or_unset_env(env_var, value)
65
66 def restore_environment():
67 for env_var, value in old_env.iteritems():
68 osutils.set_or_unset_env(env_var, value)
69
70 cwd = None
71 if working_dir is not None:
72 cwd = osutils.getcwd()
73 os.chdir(working_dir)
74
75 # LAUNCHPAD: Because of buildout, we need to get a custom Python
76 # binary, not sys.executable.
77 python_path = self.get_python_path()
78 # LAUNCHPAD: We can't use self.get_bzr_path(), since it'll find
79 # lib/bzrlib, rather than the path to sourcecode/bzr/bzr.
80 bzr_path = get_bzr_path()
81 try:
82 cleanup_environment()
83 command = [python_path, bzr_path]
84 command.extend(process_args)
85 process = self._popen(
86 command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
87 finally:
88 restore_environment()
89 if cwd is not None:
90 os.chdir(cwd)
91
92 return process
93
94 def finish_lpserve_subprocess(self, process):30 def finish_lpserve_subprocess(self, process):
95 """Shut down the server process.31 """Shut down the server process.
9632
@@ -169,4 +105,10 @@
169105
170106
171def test_suite():107def test_suite():
172 return unittest.TestLoader().loadTestsFromName(__name__)108 from bzrlib import tests
109 from bzrlib.plugins import lpserve
110
111 loader = tests.TestLoader()
112 suite = loader.loadTestsFromName(__name__)
113 suite = lpserve.load_tests(suite, lpserve, loader)
114 return suite