Merge lp:~facundo/ubuntuone-client/second-inotify-processor into lp:ubuntuone-client
- second-inotify-processor
- Merge into trunk
Proposed by
Facundo Batista
Status: | Merged |
---|---|
Approved by: | Guillermo Gonzalez |
Approved revision: | not available |
Merged at revision: | not available |
Proposed branch: | lp:~facundo/ubuntuone-client/second-inotify-processor |
Merge into: | lp:ubuntuone-client |
Diff against target: |
468 lines (+229/-77) 4 files modified
tests/syncdaemon/test_eq_inotify.py (+108/-21) tests/syncdaemon/test_eventqueue.py (+3/-1) ubuntuone/syncdaemon/event_queue.py (+115/-55) ubuntuone/syncdaemon/volume_manager.py (+3/-0) |
To merge this branch: | bzr merge lp:~facundo/ubuntuone-client/second-inotify-processor |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Guillermo Gonzalez | Approve | ||
John O'Brien (community) | Approve | ||
Review via email: mp+18027@code.launchpad.net |
Commit message
Description of the change
To post a comment you must log in.
Revision history for this message
Facundo Batista (facundo) wrote : | # |
Revision history for this message
John O'Brien (jdobrien) : | # |
review:
Approve
- 333. By Facundo Batista
-
Merged trunk iMerged trunk inn
Revision history for this message
Guillermo Gonzalez (verterok) : | # |
review:
Approve
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'tests/syncdaemon/test_eq_inotify.py' |
2 | --- tests/syncdaemon/test_eq_inotify.py 2010-01-26 19:15:10 +0000 |
3 | +++ tests/syncdaemon/test_eq_inotify.py 2010-01-26 19:53:12 +0000 |
4 | @@ -33,31 +33,104 @@ |
5 | class WatchTests(BaseEQTestCase): |
6 | """Test the EQ API to add and remove watchs.""" |
7 | |
8 | - def test_add_watch(self): |
9 | - """Test that watchs can be added.""" |
10 | + def _create_udf(self, path): |
11 | + """Create an UDF and returns it and the volume""" |
12 | + os.makedirs(path) |
13 | + udf = volume_manager.UDF("vol_id", "node_id", path, path, True) |
14 | + self.vm.add_udf(udf) |
15 | + |
16 | + def test_add_general_watch(self): |
17 | + """Test that general watchs can be added.""" |
18 | # we should have what we asked for |
19 | self.eq.inotify_add_watch(self.root_dir) |
20 | - # pylint: disable-msg=W0212 |
21 | - self.assertTrue(self.root_dir in self.eq._watchs) |
22 | - |
23 | - # we shouldn't have other stuff |
24 | - self.assertTrue("not-added-dir" not in self.eq._watchs) |
25 | - |
26 | - def test_rm_watch(self): |
27 | - """Test that watchs can be removed.""" |
28 | - # remove what we added |
29 | + |
30 | + # check only added dir in watchs |
31 | + # pylint: disable-msg=W0212 |
32 | + self.assertTrue(self.root_dir in self.eq._general_watchs) |
33 | + self.assertTrue("not-added-dir" not in self.eq._general_watchs) |
34 | + |
35 | + # nothing in the udf ancestors watch |
36 | + self.assertEqual(self.eq._ancestors_watchs, {}) |
37 | + |
38 | + def test_add_watch_on_udf_ancestor(self): |
39 | + """Test that ancestors watchs can be added.""" |
40 | + # create the udf and add the watch |
41 | + path_udf = os.path.join(self.home_dir, "path/to/UDF") |
42 | + self._create_udf(path_udf) |
43 | + path_ancestor = os.path.join(self.home_dir, "path") |
44 | + self.eq.inotify_add_watch(path_ancestor) |
45 | + |
46 | + # check only added dir in watchs |
47 | + # pylint: disable-msg=W0212 |
48 | + self.assertTrue(path_ancestor in self.eq._ancestors_watchs) |
49 | + self.assertTrue("not-added-dir" not in self.eq._ancestors_watchs) |
50 | + |
51 | + # nothing in the general watch |
52 | + self.assertEqual(self.eq._general_watchs, {}) |
53 | + |
54 | + def test_add_watch_on_udf_exact(self): |
55 | + """Test adding the watch exactly on UDF.""" |
56 | + # create the udf and add the watch |
57 | + path_udf = os.path.join(self.home_dir, "path/to/UDF") |
58 | + self._create_udf(path_udf) |
59 | + self.eq.inotify_add_watch(path_udf) |
60 | + |
61 | + # pylint: disable-msg=W0212 |
62 | + self.assertTrue(path_udf in self.eq._general_watchs) |
63 | + self.assertEqual(self.eq._ancestors_watchs, {}) |
64 | + |
65 | + def test_add_watch_on_udf_child(self): |
66 | + """Test adding the watch inside UDF.""" |
67 | + # create the udf and add the watch |
68 | + path_udf = os.path.join(self.home_dir, "path/to/UDF") |
69 | + self._create_udf(path_udf) |
70 | + path_ancestor = os.path.join(self.home_dir, "path/to/UDF/inside") |
71 | + os.mkdir(path_ancestor) |
72 | + self.eq.inotify_add_watch(path_ancestor) |
73 | + |
74 | + # pylint: disable-msg=W0212 |
75 | + self.assertTrue(path_ancestor in self.eq._general_watchs) |
76 | + self.assertEqual(self.eq._ancestors_watchs, {}) |
77 | + |
78 | + def test_rm_watch_wrong(self): |
79 | + """Test that general watchs can be removed.""" |
80 | + # add two types of watchs |
81 | self.eq.inotify_add_watch(self.root_dir) |
82 | - self.eq.inotify_rm_watch(self.root_dir) |
83 | - # pylint: disable-msg=W0212 |
84 | - self.assertTrue(self.root_dir not in self.eq._watchs) |
85 | + path_udf = os.path.join(self.home_dir, "path/to/UDF") |
86 | + self._create_udf(path_udf) |
87 | + path_ancestor = os.path.join(self.home_dir, "path") |
88 | + self.eq.inotify_add_watch(path_ancestor) |
89 | |
90 | # remove different stuff |
91 | - self.eq.inotify_add_watch(self.root_dir) |
92 | self.assertRaises(ValueError, |
93 | self.eq.inotify_rm_watch, "not-added-dir") |
94 | |
95 | - def test_has_watch(self): |
96 | - """Test that a path is watched.""" |
97 | + def test_rm_watch_general(self): |
98 | + """Test that general watchs can be removed.""" |
99 | + # remove what we added |
100 | + self.eq.inotify_add_watch(self.root_dir) |
101 | + self.eq.inotify_rm_watch(self.root_dir) |
102 | + |
103 | + # pylint: disable-msg=W0212 |
104 | + self.assertEqual(self.eq._general_watchs, {}) |
105 | + self.assertEqual(self.eq._ancestors_watchs, {}) |
106 | + |
107 | + def test_rm_watch_ancestor(self): |
108 | + """Test that ancestor watchs can be removed.""" |
109 | + # create the udf and add the watch |
110 | + path_udf = os.path.join(self.home_dir, "path/to/UDF") |
111 | + self._create_udf(path_udf) |
112 | + path_ancestor = os.path.join(self.home_dir, "path") |
113 | + self.eq.inotify_add_watch(path_ancestor) |
114 | + |
115 | + # remove what we added |
116 | + self.eq.inotify_rm_watch(path_ancestor) |
117 | + # pylint: disable-msg=W0212 |
118 | + self.assertEqual(self.eq._general_watchs, {}) |
119 | + self.assertEqual(self.eq._ancestors_watchs, {}) |
120 | + |
121 | + def test_has_watch_general(self): |
122 | + """Test that a general path is watched.""" |
123 | self.assertFalse(self.eq.inotify_has_watch(self.root_dir)) |
124 | |
125 | # add |
126 | @@ -68,6 +141,24 @@ |
127 | self.eq.inotify_rm_watch(self.root_dir) |
128 | self.assertFalse(self.eq.inotify_has_watch(self.root_dir)) |
129 | |
130 | + def test_has_watch_ancestor(self): |
131 | + """Test that an ancestor path is watched.""" |
132 | + path_udf = os.path.join(self.home_dir, "path/to/UDF") |
133 | + self._create_udf(path_udf) |
134 | + path_ancestor = os.path.join(self.home_dir, "path") |
135 | + |
136 | + self.assertFalse(self.eq.inotify_has_watch(path_ancestor)) |
137 | + |
138 | + # add |
139 | + # create the udf and add the watch |
140 | + self.eq.inotify_add_watch(path_ancestor) |
141 | + self.assertTrue(self.eq.inotify_has_watch(path_ancestor)) |
142 | + |
143 | + # remove |
144 | + self.eq.inotify_rm_watch(path_ancestor) |
145 | + self.assertFalse(self.eq.inotify_has_watch(path_ancestor)) |
146 | + |
147 | + |
148 | class DynamicHitMe(object): |
149 | """Helper class to test a sequence of signals.""" |
150 | |
151 | @@ -1157,7 +1248,6 @@ |
152 | |
153 | @param msg: A string describing the failure that's included in the |
154 | exception. |
155 | - |
156 | """ |
157 | if not first == second: |
158 | if msg is None: |
159 | @@ -1246,9 +1336,6 @@ |
160 | suggested_path, path, True) |
161 | other_ancestors = other_udf.ancestors |
162 | |
163 | - # pylint: disable-msg=W0212 |
164 | - assert not self.eq._processor._is_udf_ancestor(path) |
165 | - |
166 | os.makedirs(path) |
167 | # every ancestor has a watch already, added by LocalRescan. Copy that. |
168 | self.eq.inotify_add_watch(other_udf.path) |
169 | |
170 | === modified file 'tests/syncdaemon/test_eventqueue.py' |
171 | --- tests/syncdaemon/test_eventqueue.py 2009-11-20 22:00:25 +0000 |
172 | +++ tests/syncdaemon/test_eventqueue.py 2010-01-26 19:53:12 +0000 |
173 | @@ -39,9 +39,11 @@ |
174 | self.fsmdir = self.mktemp('fsmdir') |
175 | self.partials_dir = self.mktemp('partials_dir') |
176 | self.root_dir = self.mktemp('root_dir') |
177 | + self.home_dir = self.mktemp('home_dir') |
178 | + self.vm = testcase.FakeVolumeManager(self.root_dir) |
179 | self.fs = filesystem_manager.FileSystemManager(self.fsmdir, |
180 | self.partials_dir, |
181 | - testcase.FakeVolumeManager(self.root_dir)) |
182 | + self.vm) |
183 | self.fs.create(path=self.root_dir, |
184 | share_id='', is_dir=True) |
185 | self.fs.set_by_path(path=self.root_dir, |
186 | |
187 | === modified file 'ubuntuone/syncdaemon/event_queue.py' |
188 | --- ubuntuone/syncdaemon/event_queue.py 2010-01-26 15:54:29 +0000 |
189 | +++ ubuntuone/syncdaemon/event_queue.py 2010-01-26 19:53:12 +0000 |
190 | @@ -159,7 +159,7 @@ |
191 | } |
192 | |
193 | # these are the events that will listen from inotify |
194 | -INOTIFY_EVENTS = ( |
195 | +INOTIFY_EVENTS_GENERAL = ( |
196 | evtcodes.IN_OPEN | |
197 | evtcodes.IN_CLOSE_NOWRITE | |
198 | evtcodes.IN_CLOSE_WRITE | |
199 | @@ -169,6 +169,12 @@ |
200 | evtcodes.IN_MOVED_TO | |
201 | evtcodes.IN_MOVE_SELF |
202 | ) |
203 | +INOTIFY_EVENTS_ANCESTORS = ( |
204 | + evtcodes.IN_DELETE | |
205 | + evtcodes.IN_MOVED_FROM | |
206 | + evtcodes.IN_MOVED_TO | |
207 | + evtcodes.IN_MOVE_SELF |
208 | +) |
209 | |
210 | DEFAULT_HANDLER = "handle_default" # receives (event_name, *args, **kwargs) |
211 | |
212 | @@ -201,14 +207,58 @@ |
213 | return True |
214 | |
215 | |
216 | -class _INotifyProcessor(pyinotify.ProcessEvent): |
217 | - """Helper class that is called from inpotify when an event happens. |
218 | +class _AncestorsINotifyProcessor(pyinotify.ProcessEvent): |
219 | + """inotify's processor when an event happens on an UDFs ancestor.""" |
220 | + def __init__(self, eq): |
221 | + self.log = logging.getLogger('ubuntuone.SyncDaemon.AncestorsINotProc') |
222 | + self.eq = eq |
223 | + |
224 | + def _get_udf(self, path): |
225 | + """Get the udf for a specific path. |
226 | + |
227 | + It can return None in case the UDF was deleted in the meantime. |
228 | + """ |
229 | + for udf in self.eq.fs.vm.udfs.itervalues(): |
230 | + parent = os.path.dirname(udf.path) + os.path.sep |
231 | + if parent.startswith(path + os.path.sep): |
232 | + return udf |
233 | + return None |
234 | + |
235 | + def process_IN_MOVE_SELF(self, event): |
236 | + """Don't do anything here. |
237 | + |
238 | + We just turned this event on because pyinotify does some |
239 | + path-fixing in its internal processing when this happens. |
240 | + """ |
241 | + process_IN_MOVED_TO = process_IN_MOVE_SELF |
242 | + |
243 | + def process_IN_MOVED_FROM(self, event): |
244 | + """Getting it out or renaming means unsuscribe.""" |
245 | + if event.mask & evtcodes.IN_ISDIR: |
246 | + udf = self._get_udf(event.path) |
247 | + if udf is not None: |
248 | + self.log.info("Got MOVED_FROM on path %r, unsubscribing " |
249 | + "udf %s", event.path, udf) |
250 | + self.eq.fs.vm.unsubscribe_udf(udf.volume_id) |
251 | + |
252 | + def process_IN_DELETE(self, event): |
253 | + """Check to see if the UDF was deleted.""" |
254 | + if event.mask & evtcodes.IN_ISDIR: |
255 | + udf = self._get_udf(event.path) |
256 | + if udf is not None and udf.path == event.pathname: |
257 | + self.log.info("Got DELETE on path %r, deleting udf %s", |
258 | + event.path, udf) |
259 | + self.eq.fs.vm.delete_volume(udf.volume_id) |
260 | + |
261 | + |
262 | +class _GeneralINotifyProcessor(pyinotify.ProcessEvent): |
263 | + """inotify's processor when a general event happens. |
264 | |
265 | This class also catchs the MOVEs events, and synthetises a new |
266 | FS_(DIR|FILE)_MOVE event when possible. |
267 | """ |
268 | def __init__(self, eq): |
269 | - self.log = logging.getLogger('ubuntuone.SyncDaemon.INotifyProcessor') |
270 | + self.log = logging.getLogger('ubuntuone.SyncDaemon.GeneralINotProc') |
271 | self.eq = eq |
272 | self.held_event = None |
273 | self.timer = None |
274 | @@ -216,16 +266,6 @@ |
275 | self.frozen_evts = False |
276 | self._to_mute = MuteFilter() |
277 | |
278 | - def _is_udf_ancestor(self, path): |
279 | - """Decide if path is an UDF ancestor or not.""" |
280 | - result = None |
281 | - for udf in self.eq.fs.vm.udfs.itervalues(): |
282 | - parent = os.path.dirname(udf.path) + os.path.sep |
283 | - if parent.startswith(path + os.path.sep): |
284 | - return udf |
285 | - |
286 | - return result |
287 | - |
288 | def add_to_mute_filter(self, event, *paths): |
289 | """Add an event and path(s) to the mute filter.""" |
290 | # all events have one path except the MOVEs |
291 | @@ -261,14 +301,12 @@ |
292 | |
293 | def process_IN_OPEN(self, event): |
294 | """Filter IN_OPEN to make it happen only in files.""" |
295 | - if not (event.mask & evtcodes.IN_ISDIR) and \ |
296 | - not self._is_udf_ancestor(event.path): |
297 | + if not (event.mask & evtcodes.IN_ISDIR): |
298 | self.push_event(event) |
299 | |
300 | def process_IN_CLOSE_NOWRITE(self, event): |
301 | """Filter IN_CLOSE_NOWRITE to make it happen only in files.""" |
302 | - if not (event.mask & evtcodes.IN_ISDIR) and \ |
303 | - not self._is_udf_ancestor(event.path): |
304 | + if not (event.mask & evtcodes.IN_ISDIR): |
305 | self.push_event(event) |
306 | |
307 | def process_IN_MOVE_SELF(self, event): |
308 | @@ -281,11 +319,6 @@ |
309 | |
310 | def process_IN_MOVED_FROM(self, event): |
311 | """Capture the MOVED_FROM to maybe syntethize FILE_MOVED.""" |
312 | - udf = self._is_udf_ancestor(event.path) |
313 | - if udf is not None: |
314 | - self.eq.fs.vm.unsubscribe_udf(udf.volume_id) |
315 | - return |
316 | - |
317 | if self.held_event is not None: |
318 | self.release_held_event() |
319 | |
320 | @@ -317,9 +350,6 @@ |
321 | |
322 | def process_IN_MOVED_TO(self, event): |
323 | """Capture the MOVED_TO to maybe syntethize FILE_MOVED.""" |
324 | - if self._is_udf_ancestor(event.path): |
325 | - return |
326 | - |
327 | if self.held_event is not None: |
328 | if event.cookie == self.held_event.cookie: |
329 | try: |
330 | @@ -381,16 +411,6 @@ |
331 | |
332 | def process_default(self, event): |
333 | """Push the event into the EventQueue.""" |
334 | - udf = self._is_udf_ancestor(event.path) |
335 | - if udf is not None: |
336 | - # if event is the deletion of the UDF per se, |
337 | - # call delete_volume on VolumeManager for that UDF. |
338 | - ename = NAME_TRANSLATIONS.get(event.mask, None) |
339 | - is_dir_delete = ename is not None and ename == 'FS_DIR_DELETE' |
340 | - if udf.path == event.pathname and is_dir_delete: |
341 | - self.eq.fs.vm.delete_volume(udf.volume_id) |
342 | - return |
343 | - |
344 | if self.held_event is not None: |
345 | self.release_held_event() |
346 | self.push_event(event) |
347 | @@ -482,13 +502,23 @@ |
348 | |
349 | self.log = logging.getLogger('ubuntuone.SyncDaemon.EQ') |
350 | self.fs = fs |
351 | - # hook inotify |
352 | - self._inotify_reader = None |
353 | - self._inotify_wm = wm = pyinotify.WatchManager() |
354 | - self._processor = _INotifyProcessor(self) |
355 | - self._inotify_notifier = pyinotify.Notifier(wm, self._processor) |
356 | - self._hook_inotify_to_twisted(wm, self._inotify_notifier) |
357 | - self._watchs = {} |
358 | + |
359 | + # general inotify |
360 | + self._inotify_general_wm = wm = pyinotify.WatchManager() |
361 | + self._processor = _GeneralINotifyProcessor(self) |
362 | + self._inotify_notifier_gral = pyinotify.Notifier(wm, self._processor) |
363 | + self._inotify_reader_gral = self._hook_inotify_to_twisted( |
364 | + wm, self._inotify_notifier_gral) |
365 | + self._general_watchs = {} |
366 | + |
367 | + # ancestors inotify |
368 | + self._inotify_ancestors_wm = wm = pyinotify.WatchManager() |
369 | + antr_processor = _AncestorsINotifyProcessor(self) |
370 | + self._inotify_notifier_antr = pyinotify.Notifier(wm, antr_processor) |
371 | + self._inotify_reader_antr = self._hook_inotify_to_twisted( |
372 | + wm, self._inotify_notifier_antr) |
373 | + self._ancestors_watchs = {} |
374 | + |
375 | self.dispatching = False |
376 | self.dispatch_queue = Queue() |
377 | self.empty_event_queue_callbacks = set() |
378 | @@ -526,34 +556,64 @@ |
379 | notifier.process_events() |
380 | |
381 | reader = MyReader() |
382 | - self._inotify_reader = reader |
383 | reactor.addReader(reader) |
384 | + return reader |
385 | |
386 | def shutdown(self): |
387 | """Prepares the EQ to be closed.""" |
388 | - self._inotify_notifier.stop() |
389 | - reactor.removeReader(self._inotify_reader) |
390 | + self._inotify_notifier_gral.stop() |
391 | + self._inotify_notifier_antr.stop() |
392 | + reactor.removeReader(self._inotify_reader_gral) |
393 | + reactor.removeReader(self._inotify_reader_antr) |
394 | |
395 | def inotify_rm_watch(self, dirpath): |
396 | """Remove watch from a dir.""" |
397 | - try: |
398 | - wd = self._watchs[dirpath] |
399 | - except KeyError: |
400 | + if dirpath in self._general_watchs: |
401 | + w_dict = self._general_watchs |
402 | + w_manager = self._inotify_general_wm |
403 | + elif dirpath in self._ancestors_watchs: |
404 | + w_dict = self._ancestors_watchs |
405 | + w_manager = self._inotify_ancestors_wm |
406 | + else: |
407 | raise ValueError("The path %r is not watched right now!" % dirpath) |
408 | - result = self._inotify_wm.rm_watch(wd) |
409 | + |
410 | + wd = w_dict[dirpath] |
411 | + result = w_manager.rm_watch(wd) |
412 | if not result[wd]: |
413 | raise RuntimeError("The path %r couldn't be removed!" % dirpath) |
414 | - del self._watchs[dirpath] |
415 | + del w_dict[dirpath] |
416 | |
417 | def inotify_add_watch(self, dirpath): |
418 | """Add watch to a dir.""" |
419 | - self.log.debug("Adding inotify watch to %r", dirpath) |
420 | - result = self._inotify_wm.add_watch(dirpath, INOTIFY_EVENTS) |
421 | - self._watchs[dirpath] = result[dirpath] |
422 | + # see where to add it |
423 | + if self._is_udf_ancestor(dirpath): |
424 | + w_type = "ancestors" |
425 | + w_manager = self._inotify_ancestors_wm |
426 | + w_dict = self._ancestors_watchs |
427 | + events = INOTIFY_EVENTS_ANCESTORS |
428 | + else: |
429 | + w_type = "general" |
430 | + w_manager = self._inotify_general_wm |
431 | + w_dict = self._general_watchs |
432 | + events = INOTIFY_EVENTS_GENERAL |
433 | + |
434 | + # add the watch! |
435 | + self.log.debug("Adding %s inotify watch to %r", w_type, dirpath) |
436 | + result = w_manager.add_watch(dirpath, events) |
437 | + w_dict[dirpath] = result[dirpath] |
438 | |
439 | def inotify_has_watch(self, dirpath): |
440 | """Check if a dirpath is watched.""" |
441 | - return dirpath in self._watchs |
442 | + return (dirpath in self._general_watchs or |
443 | + dirpath in self._ancestors_watchs) |
444 | + |
445 | + def _is_udf_ancestor(self, path): |
446 | + """Decide if path is an UDF ancestor or not.""" |
447 | + for udf in self.fs.vm.udfs.itervalues(): |
448 | + parent = os.path.dirname(udf.path) + os.path.sep |
449 | + if parent.startswith(path + os.path.sep): |
450 | + return True |
451 | + return False |
452 | |
453 | def unsubscribe(self, obj): |
454 | """Removes the callback object from the listener queue. |
455 | |
456 | === modified file 'ubuntuone/syncdaemon/volume_manager.py' |
457 | --- ubuntuone/syncdaemon/volume_manager.py 2010-01-26 19:40:05 +0000 |
458 | +++ ubuntuone/syncdaemon/volume_manager.py 2010-01-26 19:53:12 +0000 |
459 | @@ -143,6 +143,9 @@ |
460 | self.path = path |
461 | self.subscribed = subscribed |
462 | |
463 | + def __repr__(self): |
464 | + return "<UDF id %r, real path %r>" % (self.id, self.path) |
465 | + |
466 | @property |
467 | def ancestors(self): |
468 | """Calculate all the ancestors for this UDF's path.""" |
Added a separate inotify processor to handle UDF ancestors.
There's no change in EQ API, the inotify_ {has|rm| add}_watch( dirpath) methods remain the same, they know in which watch manager handle the dirpath.
Note that of course I removed the UDF ancestor handling of the legacy processor.