Merge lp:~verterok/ubuntuone-client/transactions-for-sync into lp:ubuntuone-client

Proposed by Guillermo Gonzalez
Status: Merged
Approved by: dobey
Approved revision: 131
Merged at revision: not available
Proposed branch: lp:~verterok/ubuntuone-client/transactions-for-sync
Merge into: lp:ubuntuone-client
Diff against target: None lines
To merge this branch: bzr merge lp:~verterok/ubuntuone-client/transactions-for-sync
Reviewer Review Type Date Requested Status
dobey (community) Approve
Elliot Murphy (community) Approve
Review via email: mp+9974@code.launchpad.net

Commit message

Add 'transactions' to Sync (FSKey) regarding FileSystemManager changes

To post a comment you must log in.
Revision history for this message
Guillermo Gonzalez (verterok) wrote :

This branch adds "transactions" to Syncdaemon Sync class, basically almost all FileSystemManager operations are queued and executed when FSKey.sync() is called. This allow us to only update the metadata once instead of doing multiple writes. (scanning a ~9k files tree is ~15sec faster)

Revision history for this message
Elliot Murphy (statik) :
review: Approve
Revision history for this message
dobey (dobey) wrote :

Looks ok to me.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'tests/syncdaemon/test_fsm.py'
2--- tests/syncdaemon/test_fsm.py 2009-07-31 18:24:54 +0000
3+++ tests/syncdaemon/test_fsm.py 2009-08-11 12:22:34 +0000
4@@ -160,6 +160,28 @@
5 now = time.time()
6 self.assertTrue(now-3 <= when <= now) # 3 seconds test range
7
8+ def test_with_node_id(self):
9+ '''Test creation with node_id'''
10+ # create, but not twice
11+ path = os.path.join(self.share.path, 'path')
12+ self.fsm.create(path, "share", node_id='a_node_id')
13+ self.assertRaises(ValueError, self.fsm.create, path, "share")
14+ self.assertRaises(ValueError, self.fsm.create, path, "other")
15+ mdobj = self.fsm.get_by_path(path)
16+ self.assertEqual(mdobj.path, "path")
17+ self.assertEqual(mdobj.share_id, "share")
18+ self.assertEqual(mdobj.node_id, "a_node_id")
19+ when = mdobj.info.created
20+ now = time.time()
21+ self.assertTrue(now-3 <= when <= now) # 3 seconds test range
22+
23+ # set uuid using valid path, but not twice
24+ self.assertRaises(ValueError, self.fsm.set_node_id, path, "whatever")
25+ mdobj = self.fsm.get_by_path(path)
26+ when = mdobj.info.node_id_assigned
27+ now = time.time()
28+ self.assertTrue(now-3 <= when <= now) # 3 seconds test range
29+
30 def test_invalid_args(self):
31 '''Test using invalid args in set_node_id.'''
32 path = os.path.join(self.share.path, 'path')
33@@ -180,7 +202,7 @@
34 self.fsm.set_node_id(path, "uuid")
35
36 # opening another FSM
37- fsm = FileSystemManager(self.fsmdir, self.fsm.vm)
38+ FileSystemManager(self.fsmdir, self.fsm.vm)
39 self.fsm.set_node_id(path, "uuid")
40
41 def test_twice_different_bad(self):
42@@ -519,12 +541,6 @@
43 self.assertRaises(ValueError, self.fsm.set_by_mdid, mdid, info="-")
44 self.assertRaises(ValueError, self.fsm.set_by_path, path, info="-")
45
46- # test with forbidden stat
47- self.assertRaises(ValueError, self.fsm.set_by_node_id, "uuid", "share",
48- stat="-")
49- self.assertRaises(ValueError, self.fsm.set_by_mdid, mdid, stat="-")
50- self.assertRaises(ValueError, self.fsm.set_by_path, path, stat="-")
51-
52 # test with forbidden share
53 self.assertRaises(ValueError, self.fsm.set_by_mdid, mdid, share_id="-")
54 self.assertRaises(ValueError, self.fsm.set_by_path, path, share_id="-")
55@@ -666,6 +682,31 @@
56 self.assertTrue(mdid2 in all)
57 self.assertTrue(mdid3 in all)
58
59+ def test_internal_set_node_id(self):
60+ """Test _set_node_id"""
61+ path = os.path.join(self.share.path, 'path')
62+ mdid = self.fsm.create(path, "share")
63+ mdobj = self.fsm.fs[mdid]
64+ # yes, it's a unit test, I access protected members.
65+ # pylint: disable-msg=W0212
66+ self.fsm._set_node_id(mdobj, "uuid", path)
67+
68+ self.assertEquals('uuid', mdobj['node_id'])
69+ self.fsm.set_node_id(path, "uuid")
70+ new_mdobj = self.fsm.get_by_node_id('share', 'uuid')
71+ for k, v in mdobj.items():
72+ if k == 'info':
73+ for k1, v1 in v.items():
74+ self.assertEquals(v1, getattr(new_mdobj.info, k1))
75+ else:
76+ self.assertEquals(v, getattr(new_mdobj, k))
77+
78+ # test using bad uuid
79+ mdobj = self.fsm.fs[mdid]
80+ self.assertEquals('uuid', mdobj['node_id'])
81+ self.assertRaises(ValueError,
82+ self.fsm._set_node_id, mdobj, 'bad-uuid', path)
83+
84
85 class StatTests(FSMTestCase):
86 '''Test all the behaviour regarding the stats.'''
87@@ -772,7 +813,7 @@
88 mdobj2 = self.fsm.get_by_path(path2)
89 self.assertEqual(mdobj2.stat, os.stat(path2))
90
91- def test_update_stat(self):
92+ def test_set_stat_by_mdid(self):
93 '''Test that update_stat works.'''
94 path = os.path.join(self.share.path, "thisfile")
95 open(path, "w").close()
96@@ -787,7 +828,7 @@
97 self.assertEqual(mdobj.stat, oldstat)
98
99 # it's updated when asked, even if it's an old stat
100- self.fsm.update_stat(mdid, oldstat)
101+ self.fsm.set_by_mdid(mdid, stat=oldstat)
102 mdobj = self.fsm.get_by_mdid(mdid)
103 self.assertEqual(mdobj.stat, oldstat)
104
105
106=== modified file 'ubuntuone/syncdaemon/filesystem_manager.py'
107--- ubuntuone/syncdaemon/filesystem_manager.py 2009-08-10 13:26:21 +0000
108+++ ubuntuone/syncdaemon/filesystem_manager.py 2009-08-11 12:22:34 +0000
109@@ -124,7 +124,7 @@
110 logger = functools.partial(fsm_logger.log, logging.INFO)
111 log_warning = functools.partial(fsm_logger.log, logging.WARNING)
112
113-is_forbidden = set("info path node_id share_id is_dir stat".split()
114+is_forbidden = set("info path node_id share_id is_dir".split()
115 ).intersection
116
117 class InconsistencyError(Exception):
118@@ -323,7 +323,7 @@
119 if mdobj["node_id"] is not None:
120 self._idx_node_id[(mdobj["share_id"], mdobj["node_id"])] = mdid
121
122- def create(self, path, share_id, is_dir=False):
123+ def create(self, path, share_id, node_id=None, is_dir=False):
124 '''Creates a new md object.'''
125 if not path.strip():
126 raise ValueError("Empty paths are not allowed (got %r)" % path)
127@@ -341,6 +341,8 @@
128 newobj["info"] = dict(created=time.time(), is_partial=False)
129 # only one stat, (instead of os.path.exists & os.stat)
130 newobj["stat"] = get_stat(path)
131+ if node_id is not None:
132+ self._set_node_id(newobj, node_id, path)
133
134 logger("create: path=%r mdid=%r share_id=%r node_id=%r is_dir=%r" % (
135 path, mdid, share_id, None, is_dir))
136@@ -356,26 +358,29 @@
137 path = os.path.normpath(path)
138 mdid = self._idx_path[path]
139 mdobj = self.fs[mdid]
140+ self._set_node_id(mdobj, node_id, path)
141+ self.fs[mdid] = mdobj
142+
143+ def _set_node_id(self, mdobj, node_id, path):
144+ """Set the node_id to the mdobj, but don't 'save' the mdobj"""
145 if mdobj["node_id"] is not None:
146 # the object is already there! it's ok if it has the same id
147 if mdobj["node_id"] == node_id:
148 logger("set_node_id (repeated!): path=%r mdid=%r node_id=%r"
149- % (path, mdid, node_id))
150+ % (path, mdobj['mdid'], node_id))
151 return
152 msg = "The path %r already has node_id (%r)" % (path, node_id)
153 raise ValueError(msg)
154-
155 # adjust the index
156 share_id = mdobj["share_id"]
157- self._idx_node_id[(share_id, node_id)] = mdid
158+ self._idx_node_id[(share_id, node_id)] = mdobj['mdid']
159
160 # set the node_id
161 mdobj["node_id"] = node_id
162 mdobj["info"]["node_id_assigned"] = time.time()
163- self.fs[mdid] = mdobj
164
165 logger("set_node_id: path=%r mdid=%r share_id=%r node_id=%r" % (
166- path, mdid, share_id, node_id))
167+ path, mdobj['mdid'], share_id, node_id))
168
169 def get_mdobjs_by_share_id(self, share_id, base_path=None):
170 """Returns all the mdobj that belongs to a share and it path
171@@ -456,13 +461,6 @@
172 mdobj["stat"] = get_stat(path)
173 self.fs[mdid] = mdobj
174
175- def update_stat(self, mdid, stat):
176- '''Updates the stat of a md object.'''
177- logger("update stat of mdid=%r", mdid)
178- mdobj = self.fs[mdid]
179- mdobj["stat"] = stat
180- self.fs[mdid] = mdobj
181-
182 def move_file(self, new_share_id, path_from, path_to):
183 '''Moves a file/dir from one point to the other.'''
184 path_from = os.path.normpath(path_from)
185
186=== modified file 'ubuntuone/syncdaemon/sync.py'
187--- ubuntuone/syncdaemon/sync.py 2009-08-04 20:00:44 +0000
188+++ ubuntuone/syncdaemon/sync.py 2009-08-11 00:36:15 +0000
189@@ -42,9 +42,13 @@
190 """create"""
191 self.fs = fs
192 self.keys = keys
193+ self.mdid = None
194+ self._changes = {}
195
196 def get_mdid(self):
197 """Get the metadata id."""
198+ if self.mdid is not None:
199+ return self.mdid
200 if len(self.keys) == 1 and "path" in self.keys:
201 # pylint: disable-msg=W0212
202 mdid = self.fs._idx_path[self.keys["path"]]
203@@ -59,6 +63,7 @@
204 raise KeyError("Incorrect keys: %s" % self.keys)
205 if mdid is None:
206 raise KeyError("cant find mdid")
207+ self.mdid = mdid
208 return mdid
209
210 def get(self, key):
211@@ -88,8 +93,12 @@
212
213 def set(self, **kwargs):
214 """Set the values for kwargs."""
215- mdid = self.get_mdid()
216- self.fs.set_by_mdid(mdid, **kwargs)
217+ self._changes.update(kwargs)
218+
219+ def sync(self):
220+ """sync the changes back to FSM"""
221+ if self._changes:
222+ self.fs.set_by_mdid(self.get_mdid(), **self._changes)
223
224 def has_metadata(self):
225 """The State Machine value version of has_metadata."""
226@@ -192,8 +201,6 @@
227 self.fs.create_file(self.get_mdid())
228
229
230-
231-
232 def loglevel(lvl):
233 """Make a function that logs at lvl log level."""
234 def level_log(self, message, *args, **kwargs):
235@@ -328,8 +335,8 @@
236 """create a local file."""
237 mdobj = self.m.fs.get_by_node_id(share_id, parent_id)
238 path = os.path.join(self.m.fs.get_abspath(share_id, mdobj.path), name)
239- self.m.fs.create(path=path, share_id=share_id, is_dir=True)
240- self.m.fs.set_node_id(path, node_id)
241+ self.m.fs.create(path=path, share_id=share_id, node_id=node_id,
242+ is_dir=True)
243 self.m.action_q.query([(share_id, node_id, "")])
244 # pylint: disable-msg=W0704
245 # this should be provided by FSM, fix!!
246@@ -370,10 +377,12 @@
247 self.key['node_id'],
248 self.key['local_hash'] or "")])
249 self.key.set(server_hash=self.key['local_hash'])
250+ self.key.sync()
251
252 def get_dir(self, event, params, hash):
253 """Get the directory."""
254 self.key.set(server_hash=hash)
255+ self.key.sync()
256 self.m.fs.create_partial(node_id=self.key['node_id'],
257 share_id=self.key['share_id'])
258 self.m.action_q.listdir(
259@@ -410,6 +419,7 @@
260 except InconsistencyError:
261 self.key.remove_partial()
262 self.key.set(server_hash=self.key['local_hash'])
263+ self.key.sync()
264 self.m.action_q.query([
265 (self.key["share_id"], self.key["node_id"], "")])
266 # we dont perform the merge, we try to re get it
267@@ -478,15 +488,17 @@
268
269 self.key.remove_partial()
270 self.key.set(local_hash=hash)
271+ self.key.sync()
272
273 def new_file(self, event, params, share_id, node_id, parent_id, name):
274 """create a local file."""
275 mdobj = self.m.fs.get_by_node_id(share_id, parent_id)
276 path = os.path.join(self.m.fs.get_abspath(share_id, mdobj.path), name)
277- self.m.fs.create(path=path, share_id=share_id, is_dir=False)
278- self.m.fs.set_node_id(path, node_id)
279+ self.m.fs.create(path=path, share_id=share_id, node_id=node_id,
280+ is_dir=False)
281 self.key.set(server_hash="")
282 self.key.set(local_hash="")
283+ self.key.sync()
284 self.key.make_file()
285 self.m.action_q.query([(share_id, node_id, "")])
286
287@@ -500,6 +512,7 @@
288 def get_file(self, event, params, hash):
289 """Get the contents for the file."""
290 self.key.set(server_hash=hash)
291+ self.key.sync()
292 self.m.fs.create_partial(node_id=self.key['node_id'],
293 share_id=self.key['share_id'])
294 self.m.action_q.download(
295@@ -513,6 +526,7 @@
296 def reget_file(self, event, params, hash):
297 """cancel and reget this download."""
298 self.key.set(server_hash=hash)
299+ self.key.sync()
300 self.m.action_q.cancel_download(share_id=self.key['share_id'],
301 node_id=self.key['node_id'])
302 self.key.remove_partial()
303@@ -547,6 +561,7 @@
304 def server_file_changed_back(self, event, params, hash):
305 """cancel and dont reget this download."""
306 self.key.set(server_hash=hash)
307+ self.key.sync()
308 self.m.action_q.cancel_download(share_id=self.key['share_id'],
309 node_id=self.key['node_id'])
310 self.key.remove_partial()
311@@ -561,6 +576,7 @@
312 # start work to go to a good state
313 self.key.remove_partial()
314 self.key.set(server_hash=self.key['local_hash'])
315+ self.key.sync()
316 self.m.action_q.query([
317 (self.key["share_id"], self.key["node_id"], "")])
318
319@@ -574,6 +590,7 @@
320 self.m.fs.create(path=path, share_id=share_id, is_dir=False)
321 self.key.set(local_hash=empty_hash)
322 self.key.set(server_hash=empty_hash)
323+ self.key.sync()
324 name = os.path.basename(path)
325 marker = MDMarker(self.key.get_mdid())
326 self.m.action_q.make_file(share_id, parent_id, name, marker)
327@@ -611,8 +628,9 @@
328 def put_file(self, event, params, hash, crc32, size, stat):
329 """upload the file to the server."""
330 previous_hash = self.key['server_hash']
331- self.key.set(local_hash=hash)
332- self.m.fs.update_stat(self.key.get_mdid(), stat)
333+ self.key.set(local_hash=hash, stat=stat)
334+ self.key.sync()
335+
336 self.m.action_q.upload(share_id=self.key['share_id'],
337 node_id=self.key['node_id'], previous_hash=previous_hash,
338 hash=hash, crc32=crc32, size=size,
339@@ -623,8 +641,8 @@
340 self.m.action_q.cancel_download(share_id=self.key['share_id'],
341 node_id=self.key['node_id'])
342 self.key.remove_partial()
343- self.key.set(local_hash=hash)
344- self.m.fs.update_stat(self.key.get_mdid(), stat)
345+ self.key.set(local_hash=hash, stat=stat)
346+ self.key.sync()
347
348 def reput_file_from_ok(self, event, param, hash):
349 """put the file again, mark upload as ok"""
350@@ -632,6 +650,7 @@
351 node_id=self.key['node_id'])
352 self.key.set(local_hash=hash)
353 self.key.set(server_hash=hash)
354+ self.key.sync()
355 self.m.hash_q.insert(self.key['path'])
356
357
358@@ -641,8 +660,8 @@
359 node_id=self.key['node_id'])
360 previous_hash = self.key['server_hash']
361
362- self.key.set(local_hash=hash)
363- self.m.fs.update_stat(self.key.get_mdid(), stat)
364+ self.key.set(local_hash=hash, stat=stat)
365+ self.key.sync()
366 self.m.action_q.upload(share_id=self.key['share_id'],
367 node_id=self.key['node_id'], previous_hash=previous_hash,
368 hash=hash, crc32=crc32, size=size,
369@@ -653,6 +672,7 @@
370 self.m.action_q.cancel_upload(share_id=self.key['share_id'],
371 node_id=self.key['node_id'])
372 self.key.set(server_hash=hash)
373+ self.key.sync()
374
375 def commit_upload(self, event, params, hash):
376 """Finish an upload."""
377@@ -754,6 +774,7 @@
378 self.m.action_q.cancel_upload(share_id=self.key['share_id'],
379 node_id=self.key['node_id'])
380 self.key.set(local_hash=self.key['server_hash'])
381+ self.key.sync()
382 self.client_moved(event, params, path_from, path_to)
383 self.m.hash_q.insert(self.key['path'])
384
385@@ -766,6 +787,7 @@
386 node_id=self.key['node_id'])
387 self.key.remove_partial()
388 self.key.set(server_hash=self.key['local_hash'])
389+ self.key.sync()
390 self.m.action_q.query([(self.key['share_id'],
391 self.key['node_id'],
392 self.key['local_hash'] or "")])
393@@ -778,7 +800,8 @@
394
395 def save_stat(self, event, params, hash, crc32, size, stat):
396 """Save the stat"""
397- self.m.fs.update_stat(self.key.get_mdid(), stat)
398+ self.key.set(stat=stat)
399+ self.key.sync()
400
401
402 class Sync(object):

Subscribers

People subscribed via source and target branches