Merge lp:~cmiller/desktopcouch/replication-daemon into lp:desktopcouch
- replication-daemon
- Merge into trunk
Status: | Merged |
---|---|
Approved by: | Eric Casteleijn |
Approved revision: | 51 |
Merged at revision: | not available |
Proposed branch: | lp:~cmiller/desktopcouch/replication-daemon |
Merge into: | lp:desktopcouch |
Diff against target: | None lines |
To merge this branch: | bzr merge lp:~cmiller/desktopcouch/replication-daemon |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Stuart Langridge (community) | Approve | ||
Eric Casteleijn (community) | Approve | ||
Review via email: mp+10740@code.launchpad.net |
This proposal supersedes a proposal from 2009-08-26.
Commit message
Add local-db replication daemon, and fix a few typos.
Since couchdb auth is not yet supported, stop the pairing tool from giving the user that local replication is supported. Once couchdb is not a security risk and we can bind it to an external interface, then replication will work for everyone.
Description of the change
Chad Miller (cmiller) wrote : Posted in a previous version of this proposal | # |
Stuart Langridge (sil) wrote : Posted in a previous version of this proposal | # |
We decided to not use relative imports, I think.
I reckon that this code should actually be integrated into startup before it's approved?
Stuart Langridge (sil) wrote : Posted in a previous version of this proposal | # |
Also, Xget_replicatio
Eric Casteleijn (thisfred) wrote : Posted in a previous version of this proposal | # |
Some long lines in bin/desktopcouc
I'd put the recordtype as a constant at the top of the file:
PAIRED_SERVER_TYPE = \
"http://
couchdb_io has 4 unused imports:
import urllib2
import json
import tempfile
import os
and relative imports which should be abolutized.
also long lines which I would solve:
base_url = "http://
PAIRED_
MY_ID_RECORD_TYPE = base_url + "server_identity"
(there are more. It pays to have an editor that shows them. If you're using emacs, I can send you a thingy that does that)
This is broken (calls a method on self outside a class, ViewDefinition is undefined)
def Xget_replicatio
map_js = """function(doc) { emit(doc.
view_name = "u1_replicators
design_document = "ubuntuone_
view = ViewDefinition(
view.sync(db)
results = self.execute_
return results
in dbus_io.py:
long lines, and a lot of *args, **kwargs magic (perhaps that cannot be helped)
Stuart Langridge (sil) wrote : | # |
Code looks good. I can't actually test the functionality because I only have one machine here.
We should use xdg.BaseDirectory to find the cache folder rather than hardcoding ~/.cache, but that's a small point.
- 51. By Chad Miller
-
Use XDG module to find log directory.
Eric Casteleijn (thisfred) wrote : | # |
Looks good.
Most of the long lines are still in there, but we can fix them at a less pressing date.
- 52. By Chad Miller
-
Since we're defined in desktopcouch module now, it's okay to use direct
functions instead of DBus proxies of those functions. - 53. By Chad Miller
-
Explain about local pairing being disabled because of couchdb auth
not working. - 54. By Chad Miller
-
Fix method name typo.
- 55. By Chad Miller
-
Also stop listening end, before user gets far.
Eric Casteleijn (thisfred) wrote : | # |
I'm probably missing some gtk thing or an implicit import, but it looks like pick_or_listen is not defined in the following:
Eric Casteleijn (thisfred) wrote : | # |
also:
in desktopcouch-
import json is unused, and import logging.handler seems unnecessary.
- 56. By Chad Miller
-
Since a replicate() funciton is missing from python-couchdb , make it
internally.Fix a few bugs with reversed logic, lack of a port number, and one
change-while-iterating bug. Remove an unused import.
Log more useful information.
Stuart Langridge (sil) wrote : | # |
r51-56 look good to me!
Preview Diff
1 | === modified file 'bin/desktopcouch-pair' | |||
2 | --- bin/desktopcouch-pair 2009-08-24 19:06:43 +0000 | |||
3 | +++ bin/desktopcouch-pair 2009-08-26 12:30:49 +0000 | |||
4 | @@ -329,10 +329,11 @@ | |||
5 | 329 | 329 | ||
6 | 330 | hostname, domainname = dbus_io.get_local_hostname() | 330 | hostname, domainname = dbus_io.get_local_hostname() |
7 | 331 | username = getpass.getuser() | 331 | username = getpass.getuser() |
9 | 332 | self.advertisement = dbus_io.Advertisement(port=listen_port, | 332 | self.advertisement = dbus_io.PairAdvertisement(port=listen_port, |
10 | 333 | name="%s-%s-%d" % (hostname, username, listen_port), | 333 | name="%s-%s-%d" % (hostname, username, listen_port), |
11 | 334 | text=dict(version=str(discovery_tool_version), | 334 | text=dict(version=str(discovery_tool_version), |
12 | 335 | description=get_host_info())) | 335 | description=get_host_info())) |
13 | 336 | self.advertisement.publish() | ||
14 | 336 | return hostname, username, listen_port | 337 | return hostname, username, listen_port |
15 | 337 | 338 | ||
16 | 338 | def __init__(self, couchdb_instance): | 339 | def __init__(self, couchdb_instance): |
17 | @@ -725,4 +726,5 @@ | |||
18 | 725 | 726 | ||
19 | 726 | if __name__ == "__main__": | 727 | if __name__ == "__main__": |
20 | 727 | import sys | 728 | import sys |
21 | 729 | desktopcouch_port = dbus_io.get_desktopcouch_listening_port() | ||
22 | 728 | main(sys.argv) | 730 | main(sys.argv) |
23 | 729 | 731 | ||
24 | === added file 'bin/desktopcouch-paired-replication-manager' | |||
25 | --- bin/desktopcouch-paired-replication-manager 1970-01-01 00:00:00 +0000 | |||
26 | +++ bin/desktopcouch-paired-replication-manager 2009-08-26 13:59:05 +0000 | |||
27 | @@ -0,0 +1,122 @@ | |||
28 | 1 | #!/bin/sh | ||
29 | 2 | """"exec ${PYTHON:-python} -t $0 "$@";" """ | ||
30 | 3 | # vim: filetype=python expandtab smarttab | ||
31 | 4 | |||
32 | 5 | # Copyright 2009 Canonical Ltd. | ||
33 | 6 | # | ||
34 | 7 | # This file is part of desktopcouch. | ||
35 | 8 | # | ||
36 | 9 | # desktopcouch is free software: you can redistribute it and/or modify | ||
37 | 10 | # it under the terms of the GNU Lesser General Public License version 3 | ||
38 | 11 | # as published by the Free Software Foundation. | ||
39 | 12 | # | ||
40 | 13 | # desktopcouch is distributed in the hope that it will be useful, | ||
41 | 14 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
42 | 15 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
43 | 16 | # GNU Lesser General Public License for more details. | ||
44 | 17 | # | ||
45 | 18 | # You should have received a copy of the GNU Lesser General Public License | ||
46 | 19 | # along with desktopcouch. If not, see <http://www.gnu.org/licenses/>. | ||
47 | 20 | # | ||
48 | 21 | # Authors: Chad Miller <chad.miller@canonical.com> | ||
49 | 22 | |||
50 | 23 | import os | ||
51 | 24 | import json | ||
52 | 25 | import threading | ||
53 | 26 | import logging | ||
54 | 27 | import logging.handlers | ||
55 | 28 | log = logging.getLogger("main") | ||
56 | 29 | |||
57 | 30 | from twisted.internet import gtk2reactor | ||
58 | 31 | gtk2reactor.install() | ||
59 | 32 | from twisted.internet import reactor, task | ||
60 | 33 | |||
61 | 34 | import desktopcouch | ||
62 | 35 | from desktopcouch.pair.couchdb_pairing import couchdb_io | ||
63 | 36 | from desktopcouch.pair.couchdb_pairing import dbus_io | ||
64 | 37 | |||
65 | 38 | already_replicating = False | ||
66 | 39 | |||
67 | 40 | class ReplicatorThread(threading.Thread): | ||
68 | 41 | def __init__(self): | ||
69 | 42 | log.debug("starting up replication thread") | ||
70 | 43 | super(ReplicatorThread, self).__init__() | ||
71 | 44 | |||
72 | 45 | def run(self): | ||
73 | 46 | global already_replicating # Fuzzy, as not really critical, | ||
74 | 47 | already_replicating = True # just trying to be polite. | ||
75 | 48 | try: | ||
76 | 49 | for uuid, addr, port in dbus_io.get_seen_paired_hosts(): | ||
77 | 50 | log.debug("host %s is seen; want to replicate to it", uuid) | ||
78 | 51 | for db_name in couchdb_io.get_database_names_replicatable(): | ||
79 | 52 | couchdb_io.replicate(db_name, db_name, | ||
80 | 53 | target_host=addr, target_port=port) | ||
81 | 54 | |||
82 | 55 | |||
83 | 56 | # TODO: get static addressed paired hosts and replicate to | ||
84 | 57 | # those too. | ||
85 | 58 | |||
86 | 59 | finally: | ||
87 | 60 | already_replicating = False | ||
88 | 61 | log.debug("finished replicating") | ||
89 | 62 | |||
90 | 63 | |||
91 | 64 | def replicate_local_databases_to_paired_hosts(): | ||
92 | 65 | if already_replicating: | ||
93 | 66 | log.warn("haven't finished replicating before next time to start.") | ||
94 | 67 | return False | ||
95 | 68 | |||
96 | 69 | r = ReplicatorThread() | ||
97 | 70 | r.start() | ||
98 | 71 | |||
99 | 72 | def main(args): | ||
100 | 73 | log_directory = os.path.expanduser("~/.cache/ubuntuone/log") | ||
101 | 74 | try: | ||
102 | 75 | os.makedirs(log_directory) | ||
103 | 76 | except: | ||
104 | 77 | pass | ||
105 | 78 | rotating_log = logging.handlers.TimedRotatingFileHandler( | ||
106 | 79 | os.path.join(log_directory, "desktop-couch-replication.log"), | ||
107 | 80 | "midnight", 1, 14) | ||
108 | 81 | rotating_log.setLevel(logging.DEBUG) | ||
109 | 82 | formatter = logging.Formatter('%(asctime)s %(levelname)-8s %(message)s') | ||
110 | 83 | rotating_log.setFormatter(formatter) | ||
111 | 84 | logging.getLogger('').addHandler(rotating_log) | ||
112 | 85 | logging.getLogger('').setLevel(logging.DEBUG) | ||
113 | 86 | |||
114 | 87 | try: | ||
115 | 88 | log.info("Starting.") | ||
116 | 89 | |||
117 | 90 | unique_identifiers = couchdb_io.get_my_host_unique_id() | ||
118 | 91 | if unique_identifiers is None: | ||
119 | 92 | log.warn("No unique hostaccount id is set, so pairing not enabled.") | ||
120 | 93 | sys.exit(2) | ||
121 | 94 | |||
122 | 95 | port = desktopcouch.find_port() | ||
123 | 96 | beacons = [dbus_io.LocationAdvertisement(port, "desktopcouch " + i) | ||
124 | 97 | for i in unique_identifiers] | ||
125 | 98 | for b in beacons: | ||
126 | 99 | b.publish() | ||
127 | 100 | |||
128 | 101 | dbus_io.discover_services(None, None, True) | ||
129 | 102 | |||
130 | 103 | try: | ||
131 | 104 | dbus_io.maintain_discovered_servers() | ||
132 | 105 | t = task.LoopingCall(replicate_local_databases_to_paired_hosts) | ||
133 | 106 | t.start(600) | ||
134 | 107 | |||
135 | 108 | # TODO: port may change, so every so often, check it and | ||
136 | 109 | # perhaps refresh the beacons. | ||
137 | 110 | |||
138 | 111 | reactor.run() | ||
139 | 112 | finally: | ||
140 | 113 | for b in beacons: | ||
141 | 114 | b.unpublish() | ||
142 | 115 | |||
143 | 116 | finally: | ||
144 | 117 | log.info("Quitting.") | ||
145 | 118 | |||
146 | 119 | if __name__ == "__main__": | ||
147 | 120 | import sys | ||
148 | 121 | |||
149 | 122 | main(sys.argv) | ||
150 | 0 | 123 | ||
151 | === modified file 'desktopcouch/pair/couchdb_pairing/couchdb_io.py' | |||
152 | --- desktopcouch/pair/couchdb_pairing/couchdb_io.py 2009-07-08 17:48:11 +0000 | |||
153 | +++ desktopcouch/pair/couchdb_pairing/couchdb_io.py 2009-08-26 13:59:05 +0000 | |||
154 | @@ -2,7 +2,7 @@ | |||
155 | 2 | # | 2 | # |
156 | 3 | # This file is part of desktopcouch. | 3 | # This file is part of desktopcouch. |
157 | 4 | # | 4 | # |
159 | 5 | # desktopcouch is free software: you can redistribute it and/or modify | 5 | # desktopcouch is free software: you can redistribute it and/or modify |
160 | 6 | # it under the terms of the GNU Lesser General Public License version 3 | 6 | # it under the terms of the GNU Lesser General Public License version 3 |
161 | 7 | # as published by the Free Software Foundation. | 7 | # as published by the Free Software Foundation. |
162 | 8 | # | 8 | # |
163 | @@ -13,27 +13,118 @@ | |||
164 | 13 | # | 13 | # |
165 | 14 | # You should have received a copy of the GNU Lesser General Public License | 14 | # You should have received a copy of the GNU Lesser General Public License |
166 | 15 | # along with desktopcouch. If not, see <http://www.gnu.org/licenses/>. | 15 | # along with desktopcouch. If not, see <http://www.gnu.org/licenses/>. |
167 | 16 | # | ||
168 | 17 | # Authors: Chad Miller <chad.miller@canonical.com> | ||
169 | 16 | """Communicate with CouchDB.""" | 18 | """Communicate with CouchDB.""" |
170 | 17 | 19 | ||
171 | 18 | import urllib2 | ||
172 | 19 | import json | ||
173 | 20 | import logging | 20 | import logging |
174 | 21 | 21 | ||
187 | 22 | def replicate_to(port, src_name, dst_host, dst_port, dst_name): | 22 | from desktopcouch import find_port as desktopcouch_find_port |
188 | 23 | """A simple easiest-possible replication instruction for couchdb. It's | 23 | from desktopcouch.records import server |
189 | 24 | almost certainly wrong for us.""" | 24 | |
190 | 25 | 25 | RECTYPE_BASE = "http://www.freedesktop.org/wiki/Specifications/desktopcouch/" | |
191 | 26 | dst_url = u"http://%(dst_host)s):%(dst_port)d/%(dst_name)s" % locals() | 26 | PAIRED_SERVER_RECORD_TYPE = RECTYPE_BASE + "paired_server" |
192 | 27 | doc_data = dict(source_database=src_name.encode("utf8"), | 27 | MY_ID_RECORD_TYPE = RECTYPE_BASE + "server_identity" |
193 | 28 | target_database=dst_url.encode("utf8")) | 28 | |
194 | 29 | 29 | def _get_db(name, create=True): | |
195 | 30 | document = json.dumps(doc_data) # json doesn't mention Unicode. | 30 | port = desktopcouch_find_port() # make sure d-c is running. |
196 | 31 | 31 | return server.CouchDatabase(name, create=create) | |
197 | 32 | req = urllib2.Request("http://localhost:%d/_replicate" % (port,), document) | 32 | |
198 | 33 | 33 | def get_database_names_replicatable(): | |
199 | 34 | """Find a list of local databases, minus dbs that we do not want to | ||
200 | 35 | replicate (explicitly or implicitly).""" | ||
201 | 36 | |||
202 | 37 | port = int(desktopcouch_find_port()) | ||
203 | 38 | couchdb_server = server.Server("http://localhost:%(port)d/" % locals()) | ||
204 | 39 | all = set([db_name for db_name in couchdb_server]) | ||
205 | 40 | |||
206 | 41 | excluded = set() | ||
207 | 42 | excluded.add("management") | ||
208 | 43 | excluded_msets = _get_management_data(PAIRED_SERVER_RECORD_TYPE, | ||
209 | 44 | "excluded_names") | ||
210 | 45 | for excluded_mset in excluded_msets: | ||
211 | 46 | excluded.update(excluded_mset) | ||
212 | 47 | |||
213 | 48 | return all - excluded | ||
214 | 49 | |||
215 | 50 | def get_my_host_unique_id(): | ||
216 | 51 | """Returns a list of ids we call ourselves. We complain in the log if it's | ||
217 | 52 | more than one, but it's really no error. If there are zero (id est, we've | ||
218 | 53 | never paired with anyone), then returns None.""" | ||
219 | 54 | |||
220 | 55 | db = _get_db("management") | ||
221 | 56 | ids = _get_management_data(MY_ID_RECORD_TYPE, "self_identity") | ||
222 | 57 | ids = list(set(ids)) # uniqify | ||
223 | 58 | if len(ids) > 1: | ||
224 | 59 | logging.error("DANGER! We have more than one record claiming to be this host's unique identifier. Which is right? We will try to use them all, but this smells really funny.") | ||
225 | 60 | return ids | ||
226 | 61 | if len(ids) == 1: | ||
227 | 62 | return ids | ||
228 | 63 | return None | ||
229 | 64 | |||
230 | 65 | def get_local_paired_uuids(): | ||
231 | 66 | results = _get_management_data(PAIRED_SERVER_RECORD_TYPE, "pairing_identifier") | ||
232 | 67 | return results | ||
233 | 68 | |||
234 | 69 | def _get_management_data(record_type, key): | ||
235 | 70 | db = _get_db("management") | ||
236 | 71 | results = db.get_records(create_view=True) | ||
237 | 72 | values = list() | ||
238 | 73 | for record in results[record_type]: | ||
239 | 74 | if key in record.value: # EAFP, rather than LBYL? Nones default? | ||
240 | 75 | value = record.value[key] | ||
241 | 76 | if value is not None: | ||
242 | 77 | values.append(value) | ||
243 | 78 | else: | ||
244 | 79 | logging.debug("skipping record empty %s", key) | ||
245 | 80 | else: | ||
246 | 81 | logging.debug("skipping record with no %s", key) | ||
247 | 82 | logging.debug("found %d %s records", len(values), key) | ||
248 | 83 | return values | ||
249 | 84 | |||
250 | 85 | def create_remote_database(dst_host, dst_port, dst_name): | ||
251 | 86 | dst_url = u"http://%(dst_host)s:%(dst_port)d/" % locals() | ||
252 | 87 | return server.CouchDatabase(dst_name, dst_url, create=True) | ||
253 | 88 | |||
254 | 89 | def replicate(source_database, target_database, target_host=None, | ||
255 | 90 | target_port=None, source_host=None, source_port=None): | ||
256 | 91 | """This replication is instant and blocking, and does not persist. """ | ||
257 | 92 | |||
258 | 93 | data = {} | ||
259 | 94 | |||
260 | 95 | if source_host: | ||
261 | 96 | if source_port: | ||
262 | 97 | source = "http://%(source_host)s/%(source_database)s" % locals() | ||
263 | 98 | else: | ||
264 | 99 | source = "http://%(source_host)s:%(source_port)d/%(source_database)s" % locals() | ||
265 | 100 | else: | ||
266 | 101 | source = source_database | ||
267 | 102 | |||
268 | 103 | if target_host: | ||
269 | 104 | if target_port: | ||
270 | 105 | target = "http://%(target_host)s/%(target_database)s" % locals() | ||
271 | 106 | else: | ||
272 | 107 | target = "http://%(target_host)s:%(target_port)d/%(target_database)s" % locals() | ||
273 | 108 | else: | ||
274 | 109 | target = target_database | ||
275 | 110 | |||
276 | 111 | record = dict(source=source, target=target) | ||
277 | 34 | try: | 112 | try: |
280 | 35 | conn = urllib2.urlopen(req) | 113 | if target_host: |
281 | 36 | logging.info("couchdb request resulted in %r", conn.read()) | 114 | # Remote databases must exist before replicating to them. |
282 | 115 | create_remote_database(target_host, target_port, target_database) | ||
283 | 116 | |||
284 | 117 | # TODO: Get admin username and password from keyring. Populate URL. | ||
285 | 118 | |||
286 | 119 | port = int(desktopcouch_find_port()) | ||
287 | 120 | url = "http://localhost:%d/" % (port,) | ||
288 | 121 | |||
289 | 122 | import couchdb | ||
290 | 123 | server = couchdb.client.Server(url) | ||
291 | 124 | db = server["_replicate"] | ||
292 | 125 | db.create(record) | ||
293 | 126 | |||
294 | 127 | logging.info("successfully replicated %r", record) | ||
295 | 37 | except: | 128 | except: |
296 | 38 | logging.exception("can't talk to couchdb.") | 129 | logging.exception("can't talk to couchdb.") |
297 | 39 | raise | 130 | raise |
298 | 40 | 131 | ||
299 | === modified file 'desktopcouch/pair/couchdb_pairing/dbus_io.py' | |||
300 | --- desktopcouch/pair/couchdb_pairing/dbus_io.py 2009-07-20 14:34:08 +0000 | |||
301 | +++ desktopcouch/pair/couchdb_pairing/dbus_io.py 2009-08-26 13:59:05 +0000 | |||
302 | @@ -2,7 +2,7 @@ | |||
303 | 2 | # | 2 | # |
304 | 3 | # This file is part of desktopcouch. | 3 | # This file is part of desktopcouch. |
305 | 4 | # | 4 | # |
307 | 5 | # desktopcouch is free software: you can redistribute it and/or modify | 5 | # desktopcouch is free software: you can redistribute it and/or modify |
308 | 6 | # it under the terms of the GNU Lesser General Public License version 3 | 6 | # it under the terms of the GNU Lesser General Public License version 3 |
309 | 7 | # as published by the Free Software Foundation. | 7 | # as published by the Free Software Foundation. |
310 | 8 | # | 8 | # |
311 | @@ -13,6 +13,8 @@ | |||
312 | 13 | # | 13 | # |
313 | 14 | # You should have received a copy of the GNU Lesser General Public License | 14 | # You should have received a copy of the GNU Lesser General Public License |
314 | 15 | # along with desktopcouch. If not, see <http://www.gnu.org/licenses/>. | 15 | # along with desktopcouch. If not, see <http://www.gnu.org/licenses/>. |
315 | 16 | # | ||
316 | 17 | # Authors: Chad Miller <chad.miller@canonical.com> | ||
317 | 16 | """Communicate with DBUS and also the APIs it proxies, like Zeroconf.""" | 18 | """Communicate with DBUS and also the APIs it proxies, like Zeroconf.""" |
318 | 17 | 19 | ||
319 | 18 | import logging | 20 | import logging |
320 | @@ -22,8 +24,11 @@ | |||
321 | 22 | import avahi | 24 | import avahi |
322 | 23 | DBusGMainLoop(set_as_default=True) | 25 | DBusGMainLoop(set_as_default=True) |
323 | 24 | 26 | ||
325 | 25 | discovery_service_type = "_couchdb_pairing_invitations._tcp" | 27 | from desktopcouch.pair.couchdb_pairing import couchdb_io |
326 | 26 | 28 | ||
327 | 29 | invitations_discovery_service_type = "_couchdb_pairing_invitations._tcp" | ||
328 | 30 | location_discovery_service_type = "_couchdb_location._tcp" | ||
329 | 31 | desktopcouch_dbus_interface = "org.desktopcouch.CouchDB" | ||
330 | 27 | 32 | ||
331 | 28 | def get_local_hostname(): | 33 | def get_local_hostname(): |
332 | 29 | """Get the name of this host, as Unicode host and domain parts.""" | 34 | """Get the name of this host, as Unicode host and domain parts.""" |
333 | @@ -43,36 +48,34 @@ | |||
334 | 43 | 48 | ||
335 | 44 | return hostname | 49 | return hostname |
336 | 45 | 50 | ||
338 | 46 | def get_dbus_bus_server(): | 51 | def get_desktopcouch_listening_port(): |
339 | 52 | bus, server = get_dbus_bus_server(desktopcouch_dbus_interface) | ||
340 | 53 | return server.desktopCouch.getPort() | ||
341 | 54 | |||
342 | 55 | def get_dbus_bus_server(interface="root"): | ||
343 | 47 | """Common sequence of steps to get a Bus and Server object from DBUS.""" | 56 | """Common sequence of steps to get a Bus and Server object from DBUS.""" |
344 | 48 | bus = dbus.SystemBus() | 57 | bus = dbus.SystemBus() |
345 | 49 | root_name = bus.get_object(avahi.DBUS_NAME, avahi.DBUS_PATH_SERVER) | 58 | root_name = bus.get_object(avahi.DBUS_NAME, avahi.DBUS_PATH_SERVER) |
346 | 50 | server = dbus.Interface(root_name, avahi.DBUS_INTERFACE_SERVER) | 59 | server = dbus.Interface(root_name, avahi.DBUS_INTERFACE_SERVER) |
347 | 51 | return bus, server | 60 | return bus, server |
348 | 52 | 61 | ||
349 | 53 | |||
350 | 54 | class Advertisement(object): | 62 | class Advertisement(object): |
351 | 55 | """Represents an advertised service that exists on this host.""" | 63 | """Represents an advertised service that exists on this host.""" |
355 | 56 | 64 | def __init__(self, port, name, stype="", domain="", host="", text={}): | |
356 | 57 | def __init__(self, port, name="rlx!", stype=discovery_service_type, | 65 | super(Advertisement, self).__init__() |
354 | 58 | domain="", host="", text="(unknown)"): | ||
357 | 59 | 66 | ||
358 | 60 | self.logging = logging.getLogger(self.__class__.__name__) | 67 | self.logging = logging.getLogger(self.__class__.__name__) |
359 | 61 | |||
360 | 62 | super(Advertisement, self).__init__() | ||
361 | 63 | self.name = name | 68 | self.name = name |
362 | 64 | self.stype = stype | 69 | self.stype = stype |
363 | 65 | self.domain = domain | 70 | self.domain = domain |
364 | 66 | self.host = host | 71 | self.host = host |
366 | 67 | self.port = port | 72 | self.port = int(port) |
367 | 68 | if hasattr(text, "keys"): | 73 | if hasattr(text, "keys"): |
368 | 69 | self.text = avahi.dict_to_txt_array(text) | 74 | self.text = avahi.dict_to_txt_array(text) |
369 | 70 | else: | 75 | else: |
370 | 71 | self.text = text | 76 | self.text = text |
371 | 72 | 77 | ||
372 | 73 | self.group = None | 78 | self.group = None |
373 | 74 | |||
374 | 75 | self.publish() | ||
375 | 76 | 79 | ||
376 | 77 | def publish(self): | 80 | def publish(self): |
377 | 78 | """Start the advertisement.""" | 81 | """Start the advertisement.""" |
378 | @@ -87,20 +90,128 @@ | |||
379 | 87 | 90 | ||
380 | 88 | g.Commit() | 91 | g.Commit() |
381 | 89 | self.logging.info("starting advertising %s on port %d", | 92 | self.logging.info("starting advertising %s on port %d", |
383 | 90 | discovery_service_type, self.port) | 93 | self.stype, self.port) |
384 | 91 | self.group = g | 94 | self.group = g |
385 | 92 | 95 | ||
386 | 93 | def unpublish(self): | 96 | def unpublish(self): |
387 | 94 | """End the advertisement.""" | 97 | """End the advertisement.""" |
388 | 95 | self.group.Reset() | 98 | self.group.Reset() |
389 | 96 | self.logging.info("ending advertising %s on port %d", | 99 | self.logging.info("ending advertising %s on port %d", |
391 | 97 | discovery_service_type, self.port) | 100 | self.stype, self.port) |
392 | 98 | self.group = None | 101 | self.group = None |
393 | 99 | 102 | ||
394 | 100 | def die(self): | 103 | def die(self): |
395 | 101 | """Quit.""" | 104 | """Quit.""" |
396 | 102 | self.unpublish() | 105 | self.unpublish() |
397 | 103 | 106 | ||
398 | 107 | class LocationAdvertisement(Advertisement): | ||
399 | 108 | """An advertised couchdb location. See Advertisement class.""" | ||
400 | 109 | def __init__(self, *args, **kwargs): | ||
401 | 110 | if "stype" in kwargs: | ||
402 | 111 | kwargs.pop(stype) | ||
403 | 112 | super(LocationAdvertisement, self).__init__( | ||
404 | 113 | stype=location_discovery_service_type, *args, **kwargs) | ||
405 | 114 | |||
406 | 115 | class PairAdvertisement(Advertisement): | ||
407 | 116 | """An advertised couchdb pairing opportunity. See Advertisement class.""" | ||
408 | 117 | def __init__(self, *args, **kwargs): | ||
409 | 118 | if "stype" in kwargs: | ||
410 | 119 | kwargs.pop(stype) | ||
411 | 120 | super(PairAdvertisement, self).__init__( | ||
412 | 121 | stype=invitations_discovery_service_type, *args, **kwargs) | ||
413 | 122 | |||
414 | 123 | def avahitext_to_dict(avahitext): | ||
415 | 124 | text = {} | ||
416 | 125 | for l in avahitext: | ||
417 | 126 | try: | ||
418 | 127 | k, v = "".join(chr(i) for i in l).split("=", 1) | ||
419 | 128 | text[k] = v | ||
420 | 129 | except ValueError, e: | ||
421 | 130 | logging.error("k/v field could not be decoded. %s", e) | ||
422 | 131 | return text | ||
423 | 132 | |||
424 | 133 | |||
425 | 134 | nearby_desktop_couch_instances = dict() # k=uuid, v=(addr, port) | ||
426 | 135 | |||
427 | 136 | def cb_found_desktopcouch_server(uuid, host_address, port): | ||
428 | 137 | nearby_desktop_couch_instances[uuid] = (unicode(host_address), int(port)) | ||
429 | 138 | |||
430 | 139 | def cb_lost_desktopcouch_server(uuid): | ||
431 | 140 | try: | ||
432 | 141 | del nearby_desktop_couch_instances[uuid] | ||
433 | 142 | except KeyError: | ||
434 | 143 | pass | ||
435 | 144 | |||
436 | 145 | def get_seen_paired_hosts(): | ||
437 | 146 | paired_uuids = couchdb_io.get_local_paired_uuids() | ||
438 | 147 | return ( | ||
439 | 148 | (uuid, addr, port) | ||
440 | 149 | for uuid, (addr, port) | ||
441 | 150 | in nearby_desktop_couch_instances.iteritems() | ||
442 | 151 | if uuid in paired_uuids) | ||
443 | 152 | |||
444 | 153 | def maintain_discovered_servers(add_cb=cb_found_desktopcouch_server, | ||
445 | 154 | del_cb=cb_lost_desktopcouch_server): | ||
446 | 155 | |||
447 | 156 | def remove_item_handler(interface, protocol, name, stype, domain, flags): | ||
448 | 157 | """A service disappeared.""" | ||
449 | 158 | |||
450 | 159 | def handle_error(*args): | ||
451 | 160 | """An error in resolving a new service.""" | ||
452 | 161 | logging.error("zeroconf ItemNew error for services, %s", args) | ||
453 | 162 | |||
454 | 163 | def handle_resolved(*args): | ||
455 | 164 | """Successfully resolved a new service, which we decode and send | ||
456 | 165 | back to our calling environment with the callback function.""" | ||
457 | 166 | |||
458 | 167 | name, host, port = args[2], args[5], args[8] | ||
459 | 168 | if name.startswith("desktopcouch "): | ||
460 | 169 | del_cb(name[13:], host, port) | ||
461 | 170 | else: | ||
462 | 171 | logging.error("no UUID in zeroconf message, %r", args) | ||
463 | 172 | |||
464 | 173 | del_cb(uuid) | ||
465 | 174 | |||
466 | 175 | server.ResolveService(interface, protocol, name, stype, | ||
467 | 176 | domain, avahi.PROTO_UNSPEC, dbus.UInt32(0), | ||
468 | 177 | reply_handler=handle_resolved, error_handler=handle_error) | ||
469 | 178 | |||
470 | 179 | def new_item_handler(interface, protocol, name, stype, domain, flags): | ||
471 | 180 | """A service appeared.""" | ||
472 | 181 | |||
473 | 182 | def handle_error(*args): | ||
474 | 183 | """An error in resolving a new service.""" | ||
475 | 184 | logging.error("zeroconf ItemNew error for services, %s", args) | ||
476 | 185 | |||
477 | 186 | def handle_resolved(*args): | ||
478 | 187 | """Successfully resolved a new service, which we decode and send | ||
479 | 188 | back to our calling environment with the callback function.""" | ||
480 | 189 | |||
481 | 190 | name, host, port = args[2], args[5], args[8] | ||
482 | 191 | # FIXME strip off "desktopcouch " | ||
483 | 192 | if name.startswith("desktopcouch "): | ||
484 | 193 | add_cb(name[13:], host, port) | ||
485 | 194 | else: | ||
486 | 195 | logging.error("no UUID in zeroconf message, %r", name) | ||
487 | 196 | return True | ||
488 | 197 | |||
489 | 198 | server.ResolveService(interface, protocol, name, stype, | ||
490 | 199 | domain, avahi.PROTO_UNSPEC, dbus.UInt32(0), | ||
491 | 200 | reply_handler=handle_resolved, error_handler=handle_error) | ||
492 | 201 | |||
493 | 202 | bus, server = get_dbus_bus_server() | ||
494 | 203 | domain_name = get_local_hostname()[1] | ||
495 | 204 | browser = server.ServiceBrowserNew(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, | ||
496 | 205 | location_discovery_service_type, domain_name, dbus.UInt32(0)) | ||
497 | 206 | browser_name = bus.get_object(avahi.DBUS_NAME, browser) | ||
498 | 207 | |||
499 | 208 | sbrowser = dbus.Interface(browser_name, | ||
500 | 209 | avahi.DBUS_INTERFACE_SERVICE_BROWSER) | ||
501 | 210 | sbrowser.connect_to_signal("ItemNew", new_item_handler) | ||
502 | 211 | sbrowser.connect_to_signal("ItemRemove", remove_item_handler) | ||
503 | 212 | sbrowser.connect_to_signal("Failure", | ||
504 | 213 | lambda *a: logging.error("avahi error %r", a)) | ||
505 | 214 | |||
506 | 104 | 215 | ||
507 | 105 | def discover_services(add_commport_name_cb, del_commport_name_cb, | 216 | def discover_services(add_commport_name_cb, del_commport_name_cb, |
508 | 106 | show_local=False): | 217 | show_local=False): |
509 | @@ -125,13 +236,10 @@ | |||
510 | 125 | def handle_resolved(*args): | 236 | def handle_resolved(*args): |
511 | 126 | """Successfully resolved a new service, which we decode and send | 237 | """Successfully resolved a new service, which we decode and send |
512 | 127 | back to our calling environment with the callback function.""" | 238 | back to our calling environment with the callback function.""" |
520 | 128 | text = {} | 239 | text = avahitext_to_dict(args[9]) |
521 | 129 | for l in args[9]: | 240 | name, host, port = args[2], args[5], args[8] |
522 | 130 | k, v = "".join(chr(i) for i in l).split("=", 1) | 241 | add_commport_name_cb(name, text.get("description", "?"), |
523 | 131 | text[k] = v | 242 | host, port, text.get("version", None)) |
517 | 132 | |||
518 | 133 | add_commport_name_cb(args[2], text.get("description", "?"), | ||
519 | 134 | args[5], args[8], text.get("version", None)) | ||
524 | 135 | 243 | ||
525 | 136 | if not show_local and flags & avahi.LOOKUP_RESULT_LOCAL: | 244 | if not show_local and flags & avahi.LOOKUP_RESULT_LOCAL: |
526 | 137 | return | 245 | return |
527 | @@ -142,14 +250,15 @@ | |||
528 | 142 | 250 | ||
529 | 143 | 251 | ||
530 | 144 | bus, server = get_dbus_bus_server() | 252 | bus, server = get_dbus_bus_server() |
531 | 145 | |||
532 | 146 | domain_name = get_local_hostname()[1] | 253 | domain_name = get_local_hostname()[1] |
533 | 147 | 254 | ||
534 | 148 | browser = server.ServiceBrowserNew(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, | 255 | browser = server.ServiceBrowserNew(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, |
536 | 149 | discovery_service_type, domain_name, dbus.UInt32(0)) | 256 | invitations_discovery_service_type, domain_name, dbus.UInt32(0)) |
537 | 150 | browser_name = bus.get_object(avahi.DBUS_NAME, browser) | 257 | browser_name = bus.get_object(avahi.DBUS_NAME, browser) |
538 | 151 | 258 | ||
539 | 152 | sbrowser = dbus.Interface(browser_name, | 259 | sbrowser = dbus.Interface(browser_name, |
540 | 153 | avahi.DBUS_INTERFACE_SERVICE_BROWSER) | 260 | avahi.DBUS_INTERFACE_SERVICE_BROWSER) |
541 | 154 | sbrowser.connect_to_signal("ItemNew", new_item_handler) | 261 | sbrowser.connect_to_signal("ItemNew", new_item_handler) |
542 | 155 | sbrowser.connect_to_signal("ItemRemove", remove_item_handler) | 262 | sbrowser.connect_to_signal("ItemRemove", remove_item_handler) |
543 | 263 | sbrowser.connect_to_signal("Failure", | ||
544 | 264 | lambda *a: logging.error("avahi error %r", a)) |
Buggy first try at replication.