Merge lp:~cmiller/desktopcouch/changes-notifications-to-clients into lp:desktopcouch

Proposed by Chad Miller
Status: Merged
Approved by: Stuart Langridge
Approved revision: not available
Merged at revision: not available
Proposed branch: lp:~cmiller/desktopcouch/changes-notifications-to-clients
Merge into: lp:desktopcouch
Diff against target: 184 lines (+148/-1)
2 files modified
desktopcouch/records/server_base.py (+74/-1)
desktopcouch/records/tests/test_server.py (+74/-0)
To merge this branch: bzr merge lp:~cmiller/desktopcouch/changes-notifications-to-clients
Reviewer Review Type Date Requested Status
Stuart Langridge (community) Approve
Eric Casteleijn (community) Approve
Review via email: mp+14954@code.launchpad.net

Commit message

Add CouchDatabase methods, 1) report_changes(cb), which calls a function for each of the db changes since the last time it was run or since the Database object was created. 2) get_changes() that returns a list of dictionaries with the changes. They does not recheck the database if the last time it was run is less than (a configurable) number of seconds ago. If there is an exception in the callback, then the event is not consumed, and the last-time-check is not updated.

'report_changes(f)' is not a call-this-function-on-any-change function, since that requires some assertion about your execution model that this library does not really want to enforce.

A common use of this might be to add calling this function when your main loop is idle, e.g., glib.mainloop.idle_add or twisted.task.looping_call .

To post a comment you must log in.
98. By Chad Miller

Make a simpler function that returns changes directly.

99. By Chad Miller

Let the user make no niceness at all. None, False, 0, all mean do not be nice
to database.

Revision history for this message
Eric Casteleijn (thisfred) wrote :

Looks great and tests pass, thanks for adding get_changes!

review: Approve
Revision history for this message
Stuart Langridge (sil) wrote :

If multiple changes happen to one document in between polls, then only the most recent change is reported through the API. This is unfortunate, but it appears to be a CouchDB limitation rather than a problem with this API.

Approve. Still wish we could do proper async things, but it Just Ain't Possible.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'desktopcouch/records/server_base.py'
2--- desktopcouch/records/server_base.py 2009-10-14 17:28:43 +0000
3+++ desktopcouch/records/server_base.py 2009-11-17 17:43:09 +0000
4@@ -22,13 +22,15 @@
5 """The Desktop Couch Records API."""
6
7 from couchdb import Server
8-from couchdb.client import ResourceNotFound, ResourceConflict
9+from couchdb.client import ResourceNotFound, ResourceConflict, uri as couchdburi
10 from couchdb.design import ViewDefinition
11 from record import Record
12 import httplib2
13 from oauth import oauth
14 import urlparse
15 import cgi
16+from time import time
17+import json
18
19 #DEFAULT_DESIGN_DOCUMENT = "design"
20 DEFAULT_DESIGN_DOCUMENT = None # each view in its own eponymous design doc.
21@@ -125,6 +127,10 @@
22 raise NoSuchDatabase(database)
23 self.db = self._server[database]
24 self.record_factory = record_factory or Record
25+ self._changes_since = 0
26+ self._changes_last_used = 0
27+ self.report_changes(lambda **_: None) # Update _changes_since.
28+ self._changes_last_used = 0 # Immediate run works.
29
30 def _temporary_query(self, map_fun, reduce_fun=None, language='javascript',
31 wrapper=None, **options):
32@@ -332,3 +338,70 @@
33 return viewdata
34 else:
35 return viewdata[record_type]
36+
37+ def get_changes(self, niceness=10):
38+ """Get a list of database changes. This is the sister function of
39+ report_changes that returns a list instead of calling a function for
40+ each."""
41+ l = list()
42+ self.report_changes(lambda **k: l.append(k), niceness=niceness)
43+ return l
44+
45+ def report_changes(self, function_to_send_changes_to, niceness=10):
46+ """Check to see if there are any changes on this database since last
47+ call (or since this object was instantiated), call a function for each,
48+ and return the number of changes reported.
49+
50+ The callback function is called for every single change, with the
51+ keyword parameters of the dictionary of values returned from couchdb.
52+
53+ >>> def f(seq=None, id=None, changes=None):
54+ ... pass
55+
56+ >>> db_foo.report_changes(f)
57+ >>> time.sleep(30)
58+ >>> db_foo.report_changes(f)
59+
60+ or
61+
62+ >>> cb_id = glib.mainloop.idle_add(db_foo.report_changes, f)
63+
64+ or
65+
66+ >>> task_id = twisted.task.looping_call(db_foo.report_changes, f)
67+
68+ (
69+ {
70+ 'status': '200',
71+ 'content-location': 'http://localhost:39535/test_view_add_and_delete/_changes',
72+ 'transfer-encoding': 'chunked',
73+ 'server': 'CouchDB/0.10.0 (Erlang OTP/R13B)',
74+ 'cache-control': 'must-revalidate',
75+ 'date': 'Tue, 17 Nov 2009 04:26:34 GMT',
76+ 'content-type': 'application/json'
77+ },
78+ {
79+ 'last_seq': 1,
80+ 'results': [
81+ {'changes': [{'rev': '1-6dfcf1344f23fe838db0bac6f319e67b'}], 'id': 'b5f39be14e161d46dc69e7120c6b36fa', 'seq': 1}
82+ ]
83+ }
84+ )
85+ """
86+ now = time()
87+ call_count = 0
88+ if not niceness or now > self._changes_last_used + niceness:
89+
90+ # Can't use self._server.resource.get() directly because it encodes "/".
91+ uri = couchdburi(self._server.resource.uri, self.db.name, "_changes",
92+ since=self._changes_since)
93+ resp, data = self._server.resource.http.request(uri, "GET", "", {})
94+ structure = json.loads(data)
95+ for change in structure.get("results"):
96+ # kw-args can't have unicode keys
97+ change_encoded_keys = dict((k.encode("utf8"), v) for k, v in change.iteritems())
98+ function_to_send_changes_to(**change_encoded_keys)
99+ self._changes_since = change["seq"] # Not s.last_seq. Exceptions!
100+ call_count += 1
101+ self._changes_last_used = now # If exception in cb, we never update governor.
102+ return call_count
103
104=== modified file 'desktopcouch/records/tests/test_server.py'
105--- desktopcouch/records/tests/test_server.py 2009-10-01 09:06:57 +0000
106+++ desktopcouch/records/tests/test_server.py 2009-11-17 17:43:09 +0000
107@@ -200,3 +200,77 @@
108 def test_get_view_by_type_createxcl_fail(self):
109 self.database.get_records(create_view=True)
110 self.assertRaises(KeyError, self.database.get_records, create_view=None)
111+
112+ def test_get_changes(self):
113+ self.test_put_record()
114+ self.test_update_fields()
115+ self.test_delete_record()
116+ self.test_view_add_and_delete()
117+ self.test_func_get_records()
118+
119+ changes = self.database.get_changes()
120+ self.assertTrue(len(changes) > 4)
121+ for change in changes:
122+ self.assertTrue(isinstance(change, dict))
123+ self.failUnless("changes" in change)
124+ self.failUnless("id" in change)
125+
126+ def test_report_changes_polite(self):
127+ def rep(**kwargs):
128+ self.failUnless("changes" in kwargs)
129+ self.failUnless("id" in kwargs)
130+
131+ # First try all operations.
132+ self.test_put_record()
133+ self.test_update_fields()
134+ self.test_delete_record()
135+ self.test_view_add_and_delete()
136+ self.test_func_get_records()
137+
138+ self.database.report_changes(rep) # Make sure nothing horks.
139+
140+ count = self.database.report_changes(lambda **kw: self.fail()) # Too soon to try again.
141+ self.assertEqual(0, count)
142+
143+ def test_report_changes_exceptions(self):
144+ def rep(**kwargs):
145+ self.failUnless("changes" in kwargs)
146+ self.failUnless("id" in kwargs)
147+
148+ self.database.report_changes(rep) # Consume pending.
149+ self.database._changes_last_used = 0
150+
151+ saved_time = self.database._changes_last_used # Store time.
152+ saved_position = self.database._changes_since # Store position.
153+ self.test_put_record() # Queue new changes. This is 1 event!
154+
155+ # Exceptions in our callbacks do not consume an event.
156+ self.assertRaises(ZeroDivisionError, self.database.report_changes, lambda **kw: 1/0)
157+
158+ self.assertEqual(saved_position, self.database._changes_since) # Ensure pos'n is same.
159+ self.assertEqual(saved_time, self.database._changes_last_used) # Ensure time is same.
160+
161+ # Next time we run, we get the same event again.
162+ count = self.database.report_changes(rep) # Consume queued changes.
163+ self.assertEquals(1, count) # Ensure position different.
164+ self.assertEqual(saved_position + 1, self.database._changes_since) # Ensure position different.
165+
166+ def test_report_changes_all_ops_give_known_keys(self):
167+ def rep(**kwargs):
168+ self.failUnless("changes" in kwargs)
169+ self.failUnless("id" in kwargs)
170+
171+ self.database._changes_last_used = 0 # Permit immediate run.
172+ count = self.database.report_changes(rep) # Test expected kw args.
173+
174+ def test_report_changes_nochanges(self):
175+ def rep(**kwargs):
176+ self.failUnless("changes" in kwargs)
177+ self.failUnless("id" in kwargs)
178+
179+ count = self.database.report_changes(rep) # Consume queue.
180+ self.database._changes_last_used = 0 # Permit immediate run.
181+ saved_position = self.database._changes_since # Store position.
182+ count = self.database.report_changes(rep)
183+ self.assertEquals(0, count) # Ensure event count is zero.
184+ self.assertEqual(saved_position, self.database._changes_since) # Pos'n is same.

Subscribers

People subscribed via source and target branches