Merge lp:~facundo/ubuntuone-client/second-inotify-processor into lp:ubuntuone-client

Proposed by Facundo Batista
Status: Merged
Approved by: Guillermo Gonzalez
Approved revision: not available
Merged at revision: not available
Proposed branch: lp:~facundo/ubuntuone-client/second-inotify-processor
Merge into: lp:ubuntuone-client
Diff against target: 468 lines (+229/-77)
4 files modified
tests/syncdaemon/test_eq_inotify.py (+108/-21)
tests/syncdaemon/test_eventqueue.py (+3/-1)
ubuntuone/syncdaemon/event_queue.py (+115/-55)
ubuntuone/syncdaemon/volume_manager.py (+3/-0)
To merge this branch: bzr merge lp:~facundo/ubuntuone-client/second-inotify-processor
Reviewer Review Type Date Requested Status
Guillermo Gonzalez Approve
John O'Brien (community) Approve
Review via email: mp+18027@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Facundo Batista (facundo) wrote :

Added a separate inotify processor to handle UDF ancestors.

There's no change in EQ API, the inotify_{has|rm|add}_watch(dirpath) methods remain the same, they know in which watch manager handle the dirpath.

Note that of course I removed the UDF ancestor handling of the legacy processor.

Revision history for this message
John O'Brien (jdobrien) :
review: Approve
333. By Facundo Batista

Merged trunk iMerged trunk inn

Revision history for this message
Guillermo Gonzalez (verterok) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'tests/syncdaemon/test_eq_inotify.py'
2--- tests/syncdaemon/test_eq_inotify.py 2010-01-26 19:15:10 +0000
3+++ tests/syncdaemon/test_eq_inotify.py 2010-01-26 19:53:12 +0000
4@@ -33,31 +33,104 @@
5 class WatchTests(BaseEQTestCase):
6 """Test the EQ API to add and remove watchs."""
7
8- def test_add_watch(self):
9- """Test that watchs can be added."""
10+ def _create_udf(self, path):
11+ """Create an UDF and returns it and the volume"""
12+ os.makedirs(path)
13+ udf = volume_manager.UDF("vol_id", "node_id", path, path, True)
14+ self.vm.add_udf(udf)
15+
16+ def test_add_general_watch(self):
17+ """Test that general watchs can be added."""
18 # we should have what we asked for
19 self.eq.inotify_add_watch(self.root_dir)
20- # pylint: disable-msg=W0212
21- self.assertTrue(self.root_dir in self.eq._watchs)
22-
23- # we shouldn't have other stuff
24- self.assertTrue("not-added-dir" not in self.eq._watchs)
25-
26- def test_rm_watch(self):
27- """Test that watchs can be removed."""
28- # remove what we added
29+
30+ # check only added dir in watchs
31+ # pylint: disable-msg=W0212
32+ self.assertTrue(self.root_dir in self.eq._general_watchs)
33+ self.assertTrue("not-added-dir" not in self.eq._general_watchs)
34+
35+ # nothing in the udf ancestors watch
36+ self.assertEqual(self.eq._ancestors_watchs, {})
37+
38+ def test_add_watch_on_udf_ancestor(self):
39+ """Test that ancestors watchs can be added."""
40+ # create the udf and add the watch
41+ path_udf = os.path.join(self.home_dir, "path/to/UDF")
42+ self._create_udf(path_udf)
43+ path_ancestor = os.path.join(self.home_dir, "path")
44+ self.eq.inotify_add_watch(path_ancestor)
45+
46+ # check only added dir in watchs
47+ # pylint: disable-msg=W0212
48+ self.assertTrue(path_ancestor in self.eq._ancestors_watchs)
49+ self.assertTrue("not-added-dir" not in self.eq._ancestors_watchs)
50+
51+ # nothing in the general watch
52+ self.assertEqual(self.eq._general_watchs, {})
53+
54+ def test_add_watch_on_udf_exact(self):
55+ """Test adding the watch exactly on UDF."""
56+ # create the udf and add the watch
57+ path_udf = os.path.join(self.home_dir, "path/to/UDF")
58+ self._create_udf(path_udf)
59+ self.eq.inotify_add_watch(path_udf)
60+
61+ # pylint: disable-msg=W0212
62+ self.assertTrue(path_udf in self.eq._general_watchs)
63+ self.assertEqual(self.eq._ancestors_watchs, {})
64+
65+ def test_add_watch_on_udf_child(self):
66+ """Test adding the watch inside UDF."""
67+ # create the udf and add the watch
68+ path_udf = os.path.join(self.home_dir, "path/to/UDF")
69+ self._create_udf(path_udf)
70+ path_ancestor = os.path.join(self.home_dir, "path/to/UDF/inside")
71+ os.mkdir(path_ancestor)
72+ self.eq.inotify_add_watch(path_ancestor)
73+
74+ # pylint: disable-msg=W0212
75+ self.assertTrue(path_ancestor in self.eq._general_watchs)
76+ self.assertEqual(self.eq._ancestors_watchs, {})
77+
78+ def test_rm_watch_wrong(self):
79+ """Test that general watchs can be removed."""
80+ # add two types of watchs
81 self.eq.inotify_add_watch(self.root_dir)
82- self.eq.inotify_rm_watch(self.root_dir)
83- # pylint: disable-msg=W0212
84- self.assertTrue(self.root_dir not in self.eq._watchs)
85+ path_udf = os.path.join(self.home_dir, "path/to/UDF")
86+ self._create_udf(path_udf)
87+ path_ancestor = os.path.join(self.home_dir, "path")
88+ self.eq.inotify_add_watch(path_ancestor)
89
90 # remove different stuff
91- self.eq.inotify_add_watch(self.root_dir)
92 self.assertRaises(ValueError,
93 self.eq.inotify_rm_watch, "not-added-dir")
94
95- def test_has_watch(self):
96- """Test that a path is watched."""
97+ def test_rm_watch_general(self):
98+ """Test that general watchs can be removed."""
99+ # remove what we added
100+ self.eq.inotify_add_watch(self.root_dir)
101+ self.eq.inotify_rm_watch(self.root_dir)
102+
103+ # pylint: disable-msg=W0212
104+ self.assertEqual(self.eq._general_watchs, {})
105+ self.assertEqual(self.eq._ancestors_watchs, {})
106+
107+ def test_rm_watch_ancestor(self):
108+ """Test that ancestor watchs can be removed."""
109+ # create the udf and add the watch
110+ path_udf = os.path.join(self.home_dir, "path/to/UDF")
111+ self._create_udf(path_udf)
112+ path_ancestor = os.path.join(self.home_dir, "path")
113+ self.eq.inotify_add_watch(path_ancestor)
114+
115+ # remove what we added
116+ self.eq.inotify_rm_watch(path_ancestor)
117+ # pylint: disable-msg=W0212
118+ self.assertEqual(self.eq._general_watchs, {})
119+ self.assertEqual(self.eq._ancestors_watchs, {})
120+
121+ def test_has_watch_general(self):
122+ """Test that a general path is watched."""
123 self.assertFalse(self.eq.inotify_has_watch(self.root_dir))
124
125 # add
126@@ -68,6 +141,24 @@
127 self.eq.inotify_rm_watch(self.root_dir)
128 self.assertFalse(self.eq.inotify_has_watch(self.root_dir))
129
130+ def test_has_watch_ancestor(self):
131+ """Test that an ancestor path is watched."""
132+ path_udf = os.path.join(self.home_dir, "path/to/UDF")
133+ self._create_udf(path_udf)
134+ path_ancestor = os.path.join(self.home_dir, "path")
135+
136+ self.assertFalse(self.eq.inotify_has_watch(path_ancestor))
137+
138+ # add
139+ # create the udf and add the watch
140+ self.eq.inotify_add_watch(path_ancestor)
141+ self.assertTrue(self.eq.inotify_has_watch(path_ancestor))
142+
143+ # remove
144+ self.eq.inotify_rm_watch(path_ancestor)
145+ self.assertFalse(self.eq.inotify_has_watch(path_ancestor))
146+
147+
148 class DynamicHitMe(object):
149 """Helper class to test a sequence of signals."""
150
151@@ -1157,7 +1248,6 @@
152
153 @param msg: A string describing the failure that's included in the
154 exception.
155-
156 """
157 if not first == second:
158 if msg is None:
159@@ -1246,9 +1336,6 @@
160 suggested_path, path, True)
161 other_ancestors = other_udf.ancestors
162
163- # pylint: disable-msg=W0212
164- assert not self.eq._processor._is_udf_ancestor(path)
165-
166 os.makedirs(path)
167 # every ancestor has a watch already, added by LocalRescan. Copy that.
168 self.eq.inotify_add_watch(other_udf.path)
169
170=== modified file 'tests/syncdaemon/test_eventqueue.py'
171--- tests/syncdaemon/test_eventqueue.py 2009-11-20 22:00:25 +0000
172+++ tests/syncdaemon/test_eventqueue.py 2010-01-26 19:53:12 +0000
173@@ -39,9 +39,11 @@
174 self.fsmdir = self.mktemp('fsmdir')
175 self.partials_dir = self.mktemp('partials_dir')
176 self.root_dir = self.mktemp('root_dir')
177+ self.home_dir = self.mktemp('home_dir')
178+ self.vm = testcase.FakeVolumeManager(self.root_dir)
179 self.fs = filesystem_manager.FileSystemManager(self.fsmdir,
180 self.partials_dir,
181- testcase.FakeVolumeManager(self.root_dir))
182+ self.vm)
183 self.fs.create(path=self.root_dir,
184 share_id='', is_dir=True)
185 self.fs.set_by_path(path=self.root_dir,
186
187=== modified file 'ubuntuone/syncdaemon/event_queue.py'
188--- ubuntuone/syncdaemon/event_queue.py 2010-01-26 15:54:29 +0000
189+++ ubuntuone/syncdaemon/event_queue.py 2010-01-26 19:53:12 +0000
190@@ -159,7 +159,7 @@
191 }
192
193 # these are the events that will listen from inotify
194-INOTIFY_EVENTS = (
195+INOTIFY_EVENTS_GENERAL = (
196 evtcodes.IN_OPEN |
197 evtcodes.IN_CLOSE_NOWRITE |
198 evtcodes.IN_CLOSE_WRITE |
199@@ -169,6 +169,12 @@
200 evtcodes.IN_MOVED_TO |
201 evtcodes.IN_MOVE_SELF
202 )
203+INOTIFY_EVENTS_ANCESTORS = (
204+ evtcodes.IN_DELETE |
205+ evtcodes.IN_MOVED_FROM |
206+ evtcodes.IN_MOVED_TO |
207+ evtcodes.IN_MOVE_SELF
208+)
209
210 DEFAULT_HANDLER = "handle_default" # receives (event_name, *args, **kwargs)
211
212@@ -201,14 +207,58 @@
213 return True
214
215
216-class _INotifyProcessor(pyinotify.ProcessEvent):
217- """Helper class that is called from inpotify when an event happens.
218+class _AncestorsINotifyProcessor(pyinotify.ProcessEvent):
219+ """inotify's processor when an event happens on an UDFs ancestor."""
220+ def __init__(self, eq):
221+ self.log = logging.getLogger('ubuntuone.SyncDaemon.AncestorsINotProc')
222+ self.eq = eq
223+
224+ def _get_udf(self, path):
225+ """Get the udf for a specific path.
226+
227+ It can return None in case the UDF was deleted in the meantime.
228+ """
229+ for udf in self.eq.fs.vm.udfs.itervalues():
230+ parent = os.path.dirname(udf.path) + os.path.sep
231+ if parent.startswith(path + os.path.sep):
232+ return udf
233+ return None
234+
235+ def process_IN_MOVE_SELF(self, event):
236+ """Don't do anything here.
237+
238+ We just turned this event on because pyinotify does some
239+ path-fixing in its internal processing when this happens.
240+ """
241+ process_IN_MOVED_TO = process_IN_MOVE_SELF
242+
243+ def process_IN_MOVED_FROM(self, event):
244+ """Getting it out or renaming means unsuscribe."""
245+ if event.mask & evtcodes.IN_ISDIR:
246+ udf = self._get_udf(event.path)
247+ if udf is not None:
248+ self.log.info("Got MOVED_FROM on path %r, unsubscribing "
249+ "udf %s", event.path, udf)
250+ self.eq.fs.vm.unsubscribe_udf(udf.volume_id)
251+
252+ def process_IN_DELETE(self, event):
253+ """Check to see if the UDF was deleted."""
254+ if event.mask & evtcodes.IN_ISDIR:
255+ udf = self._get_udf(event.path)
256+ if udf is not None and udf.path == event.pathname:
257+ self.log.info("Got DELETE on path %r, deleting udf %s",
258+ event.path, udf)
259+ self.eq.fs.vm.delete_volume(udf.volume_id)
260+
261+
262+class _GeneralINotifyProcessor(pyinotify.ProcessEvent):
263+ """inotify's processor when a general event happens.
264
265 This class also catchs the MOVEs events, and synthetises a new
266 FS_(DIR|FILE)_MOVE event when possible.
267 """
268 def __init__(self, eq):
269- self.log = logging.getLogger('ubuntuone.SyncDaemon.INotifyProcessor')
270+ self.log = logging.getLogger('ubuntuone.SyncDaemon.GeneralINotProc')
271 self.eq = eq
272 self.held_event = None
273 self.timer = None
274@@ -216,16 +266,6 @@
275 self.frozen_evts = False
276 self._to_mute = MuteFilter()
277
278- def _is_udf_ancestor(self, path):
279- """Decide if path is an UDF ancestor or not."""
280- result = None
281- for udf in self.eq.fs.vm.udfs.itervalues():
282- parent = os.path.dirname(udf.path) + os.path.sep
283- if parent.startswith(path + os.path.sep):
284- return udf
285-
286- return result
287-
288 def add_to_mute_filter(self, event, *paths):
289 """Add an event and path(s) to the mute filter."""
290 # all events have one path except the MOVEs
291@@ -261,14 +301,12 @@
292
293 def process_IN_OPEN(self, event):
294 """Filter IN_OPEN to make it happen only in files."""
295- if not (event.mask & evtcodes.IN_ISDIR) and \
296- not self._is_udf_ancestor(event.path):
297+ if not (event.mask & evtcodes.IN_ISDIR):
298 self.push_event(event)
299
300 def process_IN_CLOSE_NOWRITE(self, event):
301 """Filter IN_CLOSE_NOWRITE to make it happen only in files."""
302- if not (event.mask & evtcodes.IN_ISDIR) and \
303- not self._is_udf_ancestor(event.path):
304+ if not (event.mask & evtcodes.IN_ISDIR):
305 self.push_event(event)
306
307 def process_IN_MOVE_SELF(self, event):
308@@ -281,11 +319,6 @@
309
310 def process_IN_MOVED_FROM(self, event):
311 """Capture the MOVED_FROM to maybe syntethize FILE_MOVED."""
312- udf = self._is_udf_ancestor(event.path)
313- if udf is not None:
314- self.eq.fs.vm.unsubscribe_udf(udf.volume_id)
315- return
316-
317 if self.held_event is not None:
318 self.release_held_event()
319
320@@ -317,9 +350,6 @@
321
322 def process_IN_MOVED_TO(self, event):
323 """Capture the MOVED_TO to maybe syntethize FILE_MOVED."""
324- if self._is_udf_ancestor(event.path):
325- return
326-
327 if self.held_event is not None:
328 if event.cookie == self.held_event.cookie:
329 try:
330@@ -381,16 +411,6 @@
331
332 def process_default(self, event):
333 """Push the event into the EventQueue."""
334- udf = self._is_udf_ancestor(event.path)
335- if udf is not None:
336- # if event is the deletion of the UDF per se,
337- # call delete_volume on VolumeManager for that UDF.
338- ename = NAME_TRANSLATIONS.get(event.mask, None)
339- is_dir_delete = ename is not None and ename == 'FS_DIR_DELETE'
340- if udf.path == event.pathname and is_dir_delete:
341- self.eq.fs.vm.delete_volume(udf.volume_id)
342- return
343-
344 if self.held_event is not None:
345 self.release_held_event()
346 self.push_event(event)
347@@ -482,13 +502,23 @@
348
349 self.log = logging.getLogger('ubuntuone.SyncDaemon.EQ')
350 self.fs = fs
351- # hook inotify
352- self._inotify_reader = None
353- self._inotify_wm = wm = pyinotify.WatchManager()
354- self._processor = _INotifyProcessor(self)
355- self._inotify_notifier = pyinotify.Notifier(wm, self._processor)
356- self._hook_inotify_to_twisted(wm, self._inotify_notifier)
357- self._watchs = {}
358+
359+ # general inotify
360+ self._inotify_general_wm = wm = pyinotify.WatchManager()
361+ self._processor = _GeneralINotifyProcessor(self)
362+ self._inotify_notifier_gral = pyinotify.Notifier(wm, self._processor)
363+ self._inotify_reader_gral = self._hook_inotify_to_twisted(
364+ wm, self._inotify_notifier_gral)
365+ self._general_watchs = {}
366+
367+ # ancestors inotify
368+ self._inotify_ancestors_wm = wm = pyinotify.WatchManager()
369+ antr_processor = _AncestorsINotifyProcessor(self)
370+ self._inotify_notifier_antr = pyinotify.Notifier(wm, antr_processor)
371+ self._inotify_reader_antr = self._hook_inotify_to_twisted(
372+ wm, self._inotify_notifier_antr)
373+ self._ancestors_watchs = {}
374+
375 self.dispatching = False
376 self.dispatch_queue = Queue()
377 self.empty_event_queue_callbacks = set()
378@@ -526,34 +556,64 @@
379 notifier.process_events()
380
381 reader = MyReader()
382- self._inotify_reader = reader
383 reactor.addReader(reader)
384+ return reader
385
386 def shutdown(self):
387 """Prepares the EQ to be closed."""
388- self._inotify_notifier.stop()
389- reactor.removeReader(self._inotify_reader)
390+ self._inotify_notifier_gral.stop()
391+ self._inotify_notifier_antr.stop()
392+ reactor.removeReader(self._inotify_reader_gral)
393+ reactor.removeReader(self._inotify_reader_antr)
394
395 def inotify_rm_watch(self, dirpath):
396 """Remove watch from a dir."""
397- try:
398- wd = self._watchs[dirpath]
399- except KeyError:
400+ if dirpath in self._general_watchs:
401+ w_dict = self._general_watchs
402+ w_manager = self._inotify_general_wm
403+ elif dirpath in self._ancestors_watchs:
404+ w_dict = self._ancestors_watchs
405+ w_manager = self._inotify_ancestors_wm
406+ else:
407 raise ValueError("The path %r is not watched right now!" % dirpath)
408- result = self._inotify_wm.rm_watch(wd)
409+
410+ wd = w_dict[dirpath]
411+ result = w_manager.rm_watch(wd)
412 if not result[wd]:
413 raise RuntimeError("The path %r couldn't be removed!" % dirpath)
414- del self._watchs[dirpath]
415+ del w_dict[dirpath]
416
417 def inotify_add_watch(self, dirpath):
418 """Add watch to a dir."""
419- self.log.debug("Adding inotify watch to %r", dirpath)
420- result = self._inotify_wm.add_watch(dirpath, INOTIFY_EVENTS)
421- self._watchs[dirpath] = result[dirpath]
422+ # see where to add it
423+ if self._is_udf_ancestor(dirpath):
424+ w_type = "ancestors"
425+ w_manager = self._inotify_ancestors_wm
426+ w_dict = self._ancestors_watchs
427+ events = INOTIFY_EVENTS_ANCESTORS
428+ else:
429+ w_type = "general"
430+ w_manager = self._inotify_general_wm
431+ w_dict = self._general_watchs
432+ events = INOTIFY_EVENTS_GENERAL
433+
434+ # add the watch!
435+ self.log.debug("Adding %s inotify watch to %r", w_type, dirpath)
436+ result = w_manager.add_watch(dirpath, events)
437+ w_dict[dirpath] = result[dirpath]
438
439 def inotify_has_watch(self, dirpath):
440 """Check if a dirpath is watched."""
441- return dirpath in self._watchs
442+ return (dirpath in self._general_watchs or
443+ dirpath in self._ancestors_watchs)
444+
445+ def _is_udf_ancestor(self, path):
446+ """Decide if path is an UDF ancestor or not."""
447+ for udf in self.fs.vm.udfs.itervalues():
448+ parent = os.path.dirname(udf.path) + os.path.sep
449+ if parent.startswith(path + os.path.sep):
450+ return True
451+ return False
452
453 def unsubscribe(self, obj):
454 """Removes the callback object from the listener queue.
455
456=== modified file 'ubuntuone/syncdaemon/volume_manager.py'
457--- ubuntuone/syncdaemon/volume_manager.py 2010-01-26 19:40:05 +0000
458+++ ubuntuone/syncdaemon/volume_manager.py 2010-01-26 19:53:12 +0000
459@@ -143,6 +143,9 @@
460 self.path = path
461 self.subscribed = subscribed
462
463+ def __repr__(self):
464+ return "<UDF id %r, real path %r>" % (self.id, self.path)
465+
466 @property
467 def ancestors(self):
468 """Calculate all the ancestors for this UDF's path."""

Subscribers

People subscribed via source and target branches