Merge lp:~cmiller/desktopcouch/replication-daemon into lp:desktopcouch

Proposed by Chad Miller
Status: Superseded
Proposed branch: lp:~cmiller/desktopcouch/replication-daemon
Merge into: lp:desktopcouch
Diff against target: None lines
To merge this branch: bzr merge lp:~cmiller/desktopcouch/replication-daemon
Reviewer Review Type Date Requested Status
Eric Casteleijn (community) Needs Fixing
Review via email: mp+10732@code.launchpad.net

This proposal has been superseded by a proposal from 2009-08-26.

Commit message

Add a daemon that to replicate to our dynamic style of internettin', where
hosts may have the same name, change addresses often, and be offline for
periods. If we've paired with another host, then we know our unique ID
and the ID of the remote end. We advertise our ID via zeroconf, and look
for the remote end to appear on our local network. When it does, we send
all our non-excluded desktopcouch databases to it.

This daemon shoud probably be merged into the desktopcouch-start program
somewhere.

To post a comment you must log in.
Revision history for this message
Chad Miller (cmiller) wrote :

Buggy first try at replication.

Revision history for this message
Stuart Langridge (sil) wrote :

We decided to not use relative imports, I think.

I reckon that this code should actually be integrated into startup before it's approved?

Revision history for this message
Stuart Langridge (sil) wrote :

Also, Xget_replication_list ?

49. By Chad Miller

Added TODO notes and set the replication period to a sane rate.

Revision history for this message
Eric Casteleijn (thisfred) wrote :

Some long lines in bin/desktopcouch-pair

I'd put the recordtype as a constant at the top of the file:

PAIRED_SERVER_TYPE = \
    "http://www.freedesktop.org/wiki/Specifications/desktopcouch/paired_server"

couchdb_io has 4 unused imports:

import urllib2
import json
import tempfile
import os

and relative imports which should be abolutized.

also long lines which I would solve:

base_url = "http://www.freedesktop.org/wiki/Specifications/desktopcouch/"
PAIRED_SERVER_RECORD_TYPE = base_url + "paired_server"
MY_ID_RECORD_TYPE = base_url + "server_identity"

(there are more. It pays to have an editor that shows them. If you're using emacs, I can send you a thingy that does that)

This is broken (calls a method on self outside a class, ViewDefinition is undefined)

def Xget_replication_list(db):
    map_js = """function(doc) { emit(doc.managed_by, doc) }"""
    view_name = "u1_replicators_by_manager"
    design_document = "ubuntuone_replication"
    view = ViewDefinition(design_document, view_name, map_js, None)
    view.sync(db)
    results = self.execute_view(view_name, design_document)
    return results

in dbus_io.py:

long lines, and a lot of *args, **kwargs magic (perhaps that cannot be helped)

review: Needs Fixing
50. By Chad Miller

Fixed most lint problems. Removed unused function.

51. By Chad Miller

Use XDG module to find log directory.

52. By Chad Miller

Since we're defined in desktopcouch module now, it's okay to use direct
functions instead of DBus proxies of those functions.

53. By Chad Miller

Explain about local pairing being disabled because of couchdb auth
not working.

54. By Chad Miller

Fix method name typo.

55. By Chad Miller

Also stop listening end, before user gets far.

56. By Chad Miller

Since a replicate() funciton is missing from python-couchdb , make it
internally.

Fix a few bugs with reversed logic, lack of a port number, and one
change-while-iterating bug.

Remove an unused import.

Log more useful information.

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'bin/desktopcouch-pair'
2--- bin/desktopcouch-pair 2009-08-24 19:06:43 +0000
3+++ bin/desktopcouch-pair 2009-08-26 12:30:49 +0000
4@@ -329,10 +329,11 @@
5
6 hostname, domainname = dbus_io.get_local_hostname()
7 username = getpass.getuser()
8- self.advertisement = dbus_io.Advertisement(port=listen_port,
9+ self.advertisement = dbus_io.PairAdvertisement(port=listen_port,
10 name="%s-%s-%d" % (hostname, username, listen_port),
11 text=dict(version=str(discovery_tool_version),
12 description=get_host_info()))
13+ self.advertisement.publish()
14 return hostname, username, listen_port
15
16 def __init__(self, couchdb_instance):
17@@ -725,4 +726,5 @@
18
19 if __name__ == "__main__":
20 import sys
21+ desktopcouch_port = dbus_io.get_desktopcouch_listening_port()
22 main(sys.argv)
23
24=== added file 'bin/desktopcouch-paired-replication-manager'
25--- bin/desktopcouch-paired-replication-manager 1970-01-01 00:00:00 +0000
26+++ bin/desktopcouch-paired-replication-manager 2009-08-26 12:30:49 +0000
27@@ -0,0 +1,113 @@
28+#!/bin/sh
29+""""exec ${PYTHON:-python} -t $0 "$@";" """
30+# vim: filetype=python expandtab smarttab
31+
32+# Copyright 2009 Canonical Ltd.
33+#
34+# This file is part of desktopcouch.
35+#
36+# desktopcouch is free software: you can redistribute it and/or modify
37+# it under the terms of the GNU Lesser General Public License version 3
38+# as published by the Free Software Foundation.
39+#
40+# desktopcouch is distributed in the hope that it will be useful,
41+# but WITHOUT ANY WARRANTY; without even the implied warranty of
42+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
43+# GNU Lesser General Public License for more details.
44+#
45+# You should have received a copy of the GNU Lesser General Public License
46+# along with desktopcouch. If not, see <http://www.gnu.org/licenses/>.
47+#
48+# Authors: Chad Miller <chad.miller@canonical.com>
49+
50+import os
51+import json
52+import threading
53+import logging
54+import logging.handlers
55+log = logging.getLogger("main")
56+
57+from twisted.internet import gtk2reactor
58+gtk2reactor.install()
59+from twisted.internet import reactor, task
60+
61+import desktopcouch
62+from desktopcouch.pair.couchdb_pairing import couchdb_io
63+from desktopcouch.pair.couchdb_pairing import dbus_io
64+
65+already_replicating = False
66+
67+class ReplicatorThread(threading.Thread):
68+ def __init__(self):
69+ log.debug("starting up replication thread")
70+ super(ReplicatorThread, self).__init__()
71+
72+ def run(self):
73+ global already_replicating # Fuzzy, as not really critical,
74+ already_replicating = True # just trying to be polite.
75+ try:
76+ for uuid, addr, port in dbus_io.get_seen_paired_hosts():
77+ log.debug("host %s is seen, and I want to replicate to it", uuid)
78+ for database_name in couchdb_io.get_database_names_replicatable():
79+ couchdb_io.replicate(database_name, database_name,
80+ target_host=addr, target_port=port)
81+ finally:
82+ already_replicating = False
83+ log.debug("finished replicating")
84+
85+
86+def replicate_local_databases_to_paired_hosts():
87+ if already_replicating:
88+ log.warn("haven't finished replicating before next time to start.")
89+ return False
90+
91+ r = ReplicatorThread()
92+ r.start()
93+
94+def main(args):
95+ log_directory = os.path.expanduser("~/.cache/ubuntuone/log")
96+ try:
97+ os.makedirs(log_directory)
98+ except:
99+ pass
100+ rotating_log = logging.handlers.TimedRotatingFileHandler(
101+ os.path.join(log_directory, "desktop-couch-replication.log"),
102+ "midnight", 1, 14)
103+ rotating_log.setLevel(logging.DEBUG)
104+ formatter = logging.Formatter('%(asctime)s %(levelname)-8s %(message)s')
105+ rotating_log.setFormatter(formatter)
106+ logging.getLogger('').addHandler(rotating_log)
107+ logging.getLogger('').setLevel(logging.DEBUG)
108+
109+ try:
110+ log.info("Starting.")
111+
112+ unique_identifiers = couchdb_io.get_my_host_unique_id()
113+ if unique_identifiers is None:
114+ log.warn("No unique hostaccount id is set, so pairing not enabled.")
115+ sys.exit(2)
116+
117+ port = desktopcouch.find_port()
118+ beacons = [dbus_io.LocationAdvertisement(port, "desktopcouch " + i)
119+ for i in unique_identifiers]
120+ for b in beacons:
121+ b.publish()
122+
123+ dbus_io.discover_services(None, None, True)
124+
125+ try:
126+ dbus_io.maintain_discovered_servers()
127+ t = task.LoopingCall(replicate_local_databases_to_paired_hosts)
128+ t.start(6)
129+ reactor.run()
130+ finally:
131+ for b in beacons:
132+ b.unpublish()
133+
134+ finally:
135+ log.info("Quitting.")
136+
137+if __name__ == "__main__":
138+ import sys
139+
140+ main(sys.argv)
141
142=== modified file 'desktopcouch/pair/couchdb_pairing/couchdb_io.py'
143--- desktopcouch/pair/couchdb_pairing/couchdb_io.py 2009-07-08 17:48:11 +0000
144+++ desktopcouch/pair/couchdb_pairing/couchdb_io.py 2009-08-26 12:30:49 +0000
145@@ -2,7 +2,7 @@
146 #
147 # This file is part of desktopcouch.
148 #
149-# desktopcouch is free software: you can redistribute it and/or modify
150+# desktopcouch is free software: you can redistribute it and/or modify
151 # it under the terms of the GNU Lesser General Public License version 3
152 # as published by the Free Software Foundation.
153 #
154@@ -13,27 +13,127 @@
155 #
156 # You should have received a copy of the GNU Lesser General Public License
157 # along with desktopcouch. If not, see <http://www.gnu.org/licenses/>.
158+#
159+# Authors: Chad Miller <chad.miller@canonical.com>
160 """Communicate with CouchDB."""
161
162 import urllib2
163 import json
164 import logging
165-
166-def replicate_to(port, src_name, dst_host, dst_port, dst_name):
167- """A simple easiest-possible replication instruction for couchdb. It's
168- almost certainly wrong for us."""
169-
170- dst_url = u"http://%(dst_host)s):%(dst_port)d/%(dst_name)s" % locals()
171- doc_data = dict(source_database=src_name.encode("utf8"),
172- target_database=dst_url.encode("utf8"))
173-
174- document = json.dumps(doc_data) # json doesn't mention Unicode.
175-
176- req = urllib2.Request("http://localhost:%d/_replicate" % (port,), document)
177-
178+import tempfile
179+import os
180+
181+from ... import find_port as desktopcouch_find_port
182+from ...records import server
183+
184+PAIRED_SERVER_RECORD_TYPE = "http://www.freedesktop.org/wiki/Specifications/desktopcouch/paired_server"
185+MY_ID_RECORD_TYPE = "http://www.freedesktop.org/wiki/Specifications/desktopcouch/server_identity"
186+
187+def _get_db(name, create=True):
188+ port = desktopcouch_find_port() # make sure d-c is running.
189+ return server.CouchDatabase(name, create=create)
190+
191+def get_database_names_replicatable():
192+ """Find a list of local databases, minus dbs that we do not want to
193+ replicate (explicitly or implicitly)."""
194+
195+ port = int(desktopcouch_find_port())
196+ couchdb_server = server.Server("http://localhost:%(port)d/" % locals())
197+ all = set([db_name for db_name in couchdb_server])
198+
199+ excluded = set()
200+ excluded.add("management")
201+ excluded_msets = _get_management_data(PAIRED_SERVER_RECORD_TYPE, "excluded_names")
202+ for excluded_mset in excluded_msets:
203+ excluded.update(excluded_mset)
204+
205+ return all - excluded
206+
207+def get_my_host_unique_id():
208+ """Returns a list of ids we call ourselves. We complain in the log if it's
209+ more than one, but it's really no error. If there are zero (id est, we've
210+ never paired with anyone), then returns None."""
211+
212+ db = _get_db("management")
213+ ids = _get_management_data(MY_ID_RECORD_TYPE, "self_identity")
214+ ids = list(set(ids)) # uniqify
215+ if len(ids) > 1:
216+ logging.error("DANGER! We have more than one record claiming to be this host's unique identifier. Which is right? We will try to use them all, but this smells really funny.")
217+ return ids
218+ if len(ids) == 1:
219+ return ids
220+ return None
221+
222+def get_local_paired_uuids():
223+ results = _get_management_data(PAIRED_SERVER_RECORD_TYPE, "pairing_identifier")
224+ return results
225+
226+def _get_management_data(record_type, key):
227+ db = _get_db("management")
228+ results = db.get_records(create_view=True)
229+ values = list()
230+ for record in results[record_type]:
231+ if key in record.value: # EAFP, rather than LBYL? Nones default?
232+ value = record.value[key]
233+ if value is not None:
234+ values.append(value)
235+ else:
236+ logging.debug("skipping record empty %s", key)
237+ else:
238+ logging.debug("skipping record with no %s", key)
239+ logging.debug("found %d %s records", len(values), key)
240+ return values
241+
242+def create_remote_database(dst_host, dst_port, dst_name):
243+ dst_url = u"http://%(dst_host)s:%(dst_port)d/" % locals()
244+ return server.CouchDatabase(dst_name, dst_url, create=True)
245+
246+def replicate(source_database, target_database, target_host=None,
247+ target_port=None, source_host=None, source_port=None):
248+ """This replication is instant and blocking, and does not persist. """
249+
250+ data = {}
251+
252+ if source_host:
253+ if source_port:
254+ source = "http://%(source_host)s/%(source_database)s" % locals()
255+ else:
256+ source = "http://%(source_host)s:%(source_port)d/%(source_database)s" % locals()
257+ else:
258+ source = source_database
259+
260+ if target_host:
261+ if target_port:
262+ target = "http://%(target_host)s/%(target_database)s" % locals()
263+ else:
264+ target = "http://%(target_host)s:%(target_port)d/%(target_database)s" % locals()
265+ else:
266+ target = target_database
267+
268+ record = dict(source=source, target=target)
269 try:
270- conn = urllib2.urlopen(req)
271- logging.info("couchdb request resulted in %r", conn.read())
272+ if target_host:
273+ # Remote databases must exist before replicating to them.
274+ create_remote_database(target_host, target_port, target_database)
275+
276+ port = int(desktopcouch_find_port())
277+ url = "http://localhost:%d/" % (port,)
278+
279+ import couchdb
280+ server = couchdb.client.Server(url)
281+ db = server["_replicate"]
282+ db.create(record)
283+
284+ logging.info("successfully replicated %r", record)
285 except:
286 logging.exception("can't talk to couchdb.")
287 raise
288+
289+def Xget_replication_list(db):
290+ map_js = """function(doc) { emit(doc.managed_by, doc) }"""
291+ view_name = "u1_replicators_by_manager"
292+ design_document = "ubuntuone_replication"
293+ view = ViewDefinition(design_document, view_name, map_js, None)
294+ view.sync(db)
295+ results = self.execute_view(view_name, design_document)
296+ return results
297
298=== modified file 'desktopcouch/pair/couchdb_pairing/dbus_io.py'
299--- desktopcouch/pair/couchdb_pairing/dbus_io.py 2009-07-20 14:34:08 +0000
300+++ desktopcouch/pair/couchdb_pairing/dbus_io.py 2009-08-26 12:30:49 +0000
301@@ -2,7 +2,7 @@
302 #
303 # This file is part of desktopcouch.
304 #
305-# desktopcouch is free software: you can redistribute it and/or modify
306+# desktopcouch is free software: you can redistribute it and/or modify
307 # it under the terms of the GNU Lesser General Public License version 3
308 # as published by the Free Software Foundation.
309 #
310@@ -13,6 +13,8 @@
311 #
312 # You should have received a copy of the GNU Lesser General Public License
313 # along with desktopcouch. If not, see <http://www.gnu.org/licenses/>.
314+#
315+# Authors: Chad Miller <chad.miller@canonical.com>
316 """Communicate with DBUS and also the APIs it proxies, like Zeroconf."""
317
318 import logging
319@@ -22,8 +24,11 @@
320 import avahi
321 DBusGMainLoop(set_as_default=True)
322
323-discovery_service_type = "_couchdb_pairing_invitations._tcp"
324+from . import couchdb_io
325
326+invitations_discovery_service_type = "_couchdb_pairing_invitations._tcp"
327+location_discovery_service_type = "_couchdb_location._tcp"
328+desktopcouch_dbus_interface = "org.desktopcouch.CouchDB"
329
330 def get_local_hostname():
331 """Get the name of this host, as Unicode host and domain parts."""
332@@ -43,36 +48,34 @@
333
334 return hostname
335
336-def get_dbus_bus_server():
337+def get_desktopcouch_listening_port():
338+ bus, server = get_dbus_bus_server(desktopcouch_dbus_interface)
339+ return server.desktopCouch.getPort()
340+
341+def get_dbus_bus_server(interface="root"):
342 """Common sequence of steps to get a Bus and Server object from DBUS."""
343 bus = dbus.SystemBus()
344 root_name = bus.get_object(avahi.DBUS_NAME, avahi.DBUS_PATH_SERVER)
345 server = dbus.Interface(root_name, avahi.DBUS_INTERFACE_SERVER)
346 return bus, server
347
348-
349 class Advertisement(object):
350 """Represents an advertised service that exists on this host."""
351-
352- def __init__(self, port, name="rlx!", stype=discovery_service_type,
353- domain="", host="", text="(unknown)"):
354+ def __init__(self, port, name, stype="", domain="", host="", text={}):
355+ super(Advertisement, self).__init__()
356
357 self.logging = logging.getLogger(self.__class__.__name__)
358-
359- super(Advertisement, self).__init__()
360 self.name = name
361 self.stype = stype
362 self.domain = domain
363 self.host = host
364- self.port = port
365+ self.port = int(port)
366 if hasattr(text, "keys"):
367 self.text = avahi.dict_to_txt_array(text)
368 else:
369 self.text = text
370
371 self.group = None
372-
373- self.publish()
374
375 def publish(self):
376 """Start the advertisement."""
377@@ -87,20 +90,125 @@
378
379 g.Commit()
380 self.logging.info("starting advertising %s on port %d",
381- discovery_service_type, self.port)
382+ self.stype, self.port)
383 self.group = g
384
385 def unpublish(self):
386 """End the advertisement."""
387 self.group.Reset()
388 self.logging.info("ending advertising %s on port %d",
389- discovery_service_type, self.port)
390+ self.stype, self.port)
391 self.group = None
392
393 def die(self):
394 """Quit."""
395 self.unpublish()
396
397+class LocationAdvertisement(Advertisement):
398+ """An advertised couchdb location. See Advertisement class."""
399+ def __init__(self, *args, **kwargs):
400+ if "stype" in kwargs:
401+ kwargs.pop(stype)
402+ super(LocationAdvertisement, self).__init__(stype=location_discovery_service_type, *args, **kwargs)
403+
404+class PairAdvertisement(Advertisement):
405+ """An advertised couchdb pairing opportunity. See Advertisement class."""
406+ def __init__(self, *args, **kwargs):
407+ if "stype" in kwargs:
408+ kwargs.pop(stype)
409+ super(PairAdvertisement, self).__init__(stype=invitations_discovery_service_type, *args, **kwargs)
410+
411+def avahitext_to_dict(avahitext):
412+ text = {}
413+ for l in avahitext:
414+ try:
415+ k, v = "".join(chr(i) for i in l).split("=", 1)
416+ text[k] = v
417+ except ValueError, e:
418+ logging.error("k/v field could not be decoded. %s", e)
419+ return text
420+
421+
422+nearby_desktop_couch_instances = dict() # k=uuid, v=(addr, port)
423+
424+def cb_found_desktopcouch_server(uuid, host_address, port):
425+ nearby_desktop_couch_instances[uuid] = (unicode(host_address), int(port))
426+
427+def cb_lost_desktopcouch_server(uuid):
428+ try:
429+ del nearby_desktop_couch_instances[uuid]
430+ except KeyError:
431+ pass
432+
433+def get_seen_paired_hosts():
434+ paired_uuids = couchdb_io.get_local_paired_uuids()
435+ return (
436+ (uuid, addr, port)
437+ for uuid, (addr, port)
438+ in nearby_desktop_couch_instances.iteritems()
439+ if uuid in paired_uuids)
440+
441+def maintain_discovered_servers(add_cb=cb_found_desktopcouch_server,
442+ del_cb=cb_lost_desktopcouch_server):
443+
444+ def remove_item_handler(interface, protocol, name, stype, domain, flags):
445+ """A service disappeared."""
446+
447+ def handle_error(*args):
448+ """An error in resolving a new service."""
449+ logging.error("zeroconf ItemNew error for services, %s", args)
450+
451+ def handle_resolved(*args):
452+ """Successfully resolved a new service, which we decode and send
453+ back to our calling environment with the callback function."""
454+
455+ name, host, port = args[2], args[5], args[8]
456+ if name.startswith("desktopcouch "):
457+ del_cb(name[13:], host, port)
458+ else:
459+ logging.error("no UUID in zeroconf message, %r", args)
460+
461+ del_cb(uuid)
462+
463+ server.ResolveService(interface, protocol, name, stype,
464+ domain, avahi.PROTO_UNSPEC, dbus.UInt32(0),
465+ reply_handler=handle_resolved, error_handler=handle_error)
466+
467+ def new_item_handler(interface, protocol, name, stype, domain, flags):
468+ """A service appeared."""
469+
470+ def handle_error(*args):
471+ """An error in resolving a new service."""
472+ logging.error("zeroconf ItemNew error for services, %s", args)
473+
474+ def handle_resolved(*args):
475+ """Successfully resolved a new service, which we decode and send
476+ back to our calling environment with the callback function."""
477+
478+ name, host, port = args[2], args[5], args[8]
479+ # FIXME strip off "desktopcouch "
480+ if name.startswith("desktopcouch "):
481+ add_cb(name[13:], host, port)
482+ else:
483+ logging.error("no UUID in zeroconf message, %r", name)
484+ return True
485+
486+ server.ResolveService(interface, protocol, name, stype,
487+ domain, avahi.PROTO_UNSPEC, dbus.UInt32(0),
488+ reply_handler=handle_resolved, error_handler=handle_error)
489+
490+ bus, server = get_dbus_bus_server()
491+ domain_name = get_local_hostname()[1]
492+ browser = server.ServiceBrowserNew(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC,
493+ location_discovery_service_type, domain_name, dbus.UInt32(0))
494+ browser_name = bus.get_object(avahi.DBUS_NAME, browser)
495+
496+ sbrowser = dbus.Interface(browser_name,
497+ avahi.DBUS_INTERFACE_SERVICE_BROWSER)
498+ sbrowser.connect_to_signal("ItemNew", new_item_handler)
499+ sbrowser.connect_to_signal("ItemRemove", remove_item_handler)
500+ sbrowser.connect_to_signal("Failure", lambda *a: logging.error("avahi error %r", a))
501+
502
503 def discover_services(add_commport_name_cb, del_commport_name_cb,
504 show_local=False):
505@@ -125,13 +233,10 @@
506 def handle_resolved(*args):
507 """Successfully resolved a new service, which we decode and send
508 back to our calling environment with the callback function."""
509- text = {}
510- for l in args[9]:
511- k, v = "".join(chr(i) for i in l).split("=", 1)
512- text[k] = v
513-
514- add_commport_name_cb(args[2], text.get("description", "?"),
515- args[5], args[8], text.get("version", None))
516+ text = avahitext_to_dict(args[9])
517+ name, host, port = args[2], args[5], args[8]
518+ add_commport_name_cb(name, text.get("description", "?"),
519+ host, port, text.get("version", None))
520
521 if not show_local and flags & avahi.LOOKUP_RESULT_LOCAL:
522 return
523@@ -142,14 +247,14 @@
524
525
526 bus, server = get_dbus_bus_server()
527-
528 domain_name = get_local_hostname()[1]
529
530 browser = server.ServiceBrowserNew(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC,
531- discovery_service_type, domain_name, dbus.UInt32(0))
532+ invitations_discovery_service_type, domain_name, dbus.UInt32(0))
533 browser_name = bus.get_object(avahi.DBUS_NAME, browser)
534
535 sbrowser = dbus.Interface(browser_name,
536 avahi.DBUS_INTERFACE_SERVICE_BROWSER)
537 sbrowser.connect_to_signal("ItemNew", new_item_handler)
538 sbrowser.connect_to_signal("ItemRemove", remove_item_handler)
539+ sbrowser.connect_to_signal("Failure", lambda *a: logging.error("avahi error %r", a))

Subscribers

People subscribed via source and target branches