Merge lp:~spiv/bzr/gc-batching into lp:bzr/2.0

Proposed by Andrew Bennetts
Status: Merged
Merge reported by: Andrew Bennetts
Merged at revision: not available
Proposed branch: lp:~spiv/bzr/gc-batching
Merge into: lp:bzr/2.0
Diff against target: 18 lines
To merge this branch: bzr merge lp:~spiv/bzr/gc-batching
Reviewer Review Type Date Requested Status
John A Meinel Approve
Martin Pool Approve
Review via email: mp+10643@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Andrew Bennetts (spiv) wrote :

Fixes #402657, at least for the originally reported case of "bzr branch http://bazaar.launchpad.net/~launchpad-pqm/launchpad/devel".

It changes the implementation of GroupCompressVersionedFiles._get_remaining_record_stream to batch up block fetching a bit more. It used to call _get_block once per key, which would do a single IO request for each block, and many of the blocks were very small (a few hundred bytes) for some reason. Now instead it essentially accumulates a list of blocks to fetch and only performs the IO when the expected fetch size is reasonably large (256kB currently, basically an arbitrary choice), or if the batch needs to be flushed before returning keys from a different source (such as self._unadded_refs).

The code is still somewhat at the mercy of the ordering asked for by the caller; the "bzr branch ..." case does 'unordered' IO, and this patch helps significantly there (based on log+http traces). Cases that trigger other orderings might not be helped as much.

_get_block is now gone, replaced by _get_blocks. There's a new class _BatchingBlockFetcher which has most of the batching logic, although the decision about when the actually fetch the batch is still in _get_remaining_record_stream. _get_remaining_record_stream is shorter and clearer now, which is nice.

I haven't managed to write any convincing automated tests for this improvement, so there are no test changes in this patch :( . I have tested it quite a bit manually, and I'm confident it is as correct as the old code, and I think the new code is pretty clear. So I'm pretty happy with it despite that. Suggestions for good tests are welcome, of course!

At one point yesterday I observed a small (~5%), reproducible, improvement in "co ." time for Launchpad, but I cannot reproduce a significant difference at the moment; seemingly something to do with throwing away and refetching my launchpad/devel branch? (I'm actually seeing 50% worse times than yesterday, even with bzr.dev!) Anyway, it's definitely no worse and still perhaps a little better even for local IO, so that's good.

Revision history for this message
John A Meinel (jameinel) wrote :

I suspect yet another lp:mad bug here, as I did not get an email to review this proposal. (I just double checked my inbox.)

Anyway, I'm working on a review, but between lp:mad getting the content of the diff wrong and it not sending me an email, it will be a bit... :)

Revision history for this message
John A Meinel (jameinel) wrote :
Download full text (7.9 KiB)

To start with, I think your layering here is pretty nice. Adding an object does make managing the state a bit clearer. get_record_stream() is complex enough that it probably should have started out as an object interface rather than just a simple generator. Oh well.

In short, I think we have a race condition because of how LRUSizeCache (or just LRUCache) interacts with _get_blocks() being in arbitrary ordering. And we need a different caching algorithm that can ensure requested blocks are never flushed from the cache before they are no longer needed.

The comment here is no longer correct:
+ for read_memo in read_memos:
+ try:
+ yield cached[read_memo]
+ except KeyError:
+ # read the group
+ zdata = raw_records.next()
+ # decompress - whole thing - this is not a bug, as it
+ # permits caching. We might want to store the partially
+ # decompresed group and decompress object, so that recent
+ # texts are not penalised by big groups.
+ block = GroupCompressBlock.from_bytes(zdata)
+ self._group_cache[read_memo] = block
+ cached[read_memo] = block
+ yield block

^- We are caching the 'block' and not the raw content anymore. And the block may only partially evaluate the compressed content.

However, I'm more concerned that in this loop:
+ for read_memo in read_memos:
+ if read_memo in cached:
+ # Don't fetch what we already have
+ continue
+ if read_memo in not_cached_seen:
+ # Don't try to fetch the same data twice
+ continue
+ not_cached.append(read_memo)
+ not_cached_seen.add(read_memo)
+ raw_records = self._access.get_raw_records(not_cached)
+ for read_memo in read_memos:
+ try:
+ yield cached[read_memo]
+ except KeyError:
+ # read the group
+ zdata = raw_records.next()
+ # decompress - whole thing - this is not a bug, as it
+ # permits caching. We might want to store the partially
+ # decompresed group and decompress object, so that recent
+ # texts are not penalised by big groups.
+ block = GroupCompressBlock.from_bytes(zdata)
+ self._group_cache[read_memo] = block
+ cached[read_memo] = block
+ yield block

^- There is an assumption that raw_records is in the exact ordering of 'read_memos'.

And I think there is another small assumption in here, which is that the code that filters out duplicates is going to require that groups always perfectly fit in cache and aren't expired before you get to the duplicate. Perhaps an example:

Assume we have the groups G1=>G4, and that we have the texts G1,T1, etc.

If the request ends up being for:

[G1,T1], [G2, T1-T100], [G3, T1-T100], [G1,T2]

The code above will not request G1 two times.
However, it will cache[G2] and cache[G3], which gives it time for G1 to be flushed from the cache.

Even more worrisome...

Read more...

review: Needs Fixing
Revision history for this message
Andrew Bennetts (spiv) wrote :
Download full text (8.9 KiB)

John A Meinel wrote:
> Review: Needs Fixing
> To start with, I think your layering here is pretty nice. Adding an object
> does make managing the state a bit clearer. get_record_stream() is complex
> enough that it probably should have started out as an object interface rather
> than just a simple generator. Oh well.

Yes. I'm glad I structured it this way, it feels quite nice to me once I
figured out which bits to move out of _get_remaining_record_stream.

> In short, I think we have a race condition because of how LRUSizeCache (or
> just LRUCache) interacts with _get_blocks() being in arbitrary ordering. And
> we need a different caching algorithm that can ensure requested blocks are
> never flushed from the cache before they are no longer needed.

Ok, I see that. I've fixed it by making add_key grab and keep cached data
immediately; we know we're about to use it so it would be perverse to allow it
to fall out of LRUCache and then not be used. This shifts some of the "plan
what to get" logic out of yield_factories to add_key, which I was considering
doing anyway.

> The comment here is no longer correct:
> + for read_memo in read_memos:
> + try:
> + yield cached[read_memo]
> + except KeyError:
> + # read the group
> + zdata = raw_records.next()
> + # decompress - whole thing - this is not a bug, as it
> + # permits caching. We might want to store the partially
> + # decompresed group and decompress object, so that recent
> + # texts are not penalised by big groups.
> + block = GroupCompressBlock.from_bytes(zdata)
> + self._group_cache[read_memo] = block
> + cached[read_memo] = block
> + yield block
>
> ^- We are caching the 'block' and not the raw content anymore. And the block may only partially evaluate the compressed content.

Ok. I've deleted that large, wrong comment and just put this at the top of the
except block:

                # Read the block, and cache it.

> However, I'm more concerned that in this loop:
[...]
>
> ^- There is an assumption that raw_records is in the exact ordering of 'read_memos'.

I've added the assertion you suggest below (without an 'assert' statement, of
course.)

> And I think there is another small assumption in here, which is that the code
> that filters out duplicates is going to require that groups always perfectly
> fit in cache and aren't expired before you get to the duplicate. Perhaps an
> example:
[...]
> Even more worrisome, is "large groups" which may never get put into the cache
> in the first place. (LRUSizeCache says "if a request is larger than my size,
> don't cache it".)
>
> I think we'll never run into these bugs in test data, but we'll see them 'in
> the wild' once we have data that may not fit well in the cache.

Yeah, I think you're right. I wish that we could make the test data encounter
these cases as much as wild data...

> So I think what we really need is a different caching logic. Namely that
> "_get_blocks()" could keep a counter for how many times it needs a given
> block, and d...

Read more...

1=== modified file 'bzrlib/groupcompress.py'
2--- bzrlib/groupcompress.py 2009-08-25 07:36:53 +0000
3+++ bzrlib/groupcompress.py 2009-08-26 05:41:49 +0000
4@@ -44,12 +44,15 @@
5 VersionedFiles,
6 )
7
8+# Minimum number of uncompressed bytes to try fetch at once when retrieving
9+# groupcompress blocks.
10+BATCH_SIZE = 2**16
11+
12 _USE_LZMA = False and (pylzma is not None)
13
14 # osutils.sha_string('')
15 _null_sha1 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709'
16
17-
18 def sort_gc_optimal(parent_map):
19 """Sort and group the keys in parent_map into groupcompress order.
20
21@@ -984,25 +987,45 @@
22 self.gcvf = gcvf
23 self.locations = locations
24 self.keys = []
25+ self.batch_memos = {}
26+ self.memos_to_get = []
27 self.total_bytes = 0
28 self.last_read_memo = None
29 self.manager = None
30
31 def add_key(self, key):
32- """Add another to key to fetch."""
33+ """Add another to key to fetch.
34+
35+ :return: The estimated number of bytes needed to fetch the batch so
36+ far.
37+ """
38 self.keys.append(key)
39 index_memo, _, _, _ = self.locations[key]
40 read_memo = index_memo[0:3]
41- # This looks a bit dangerous, but it's ok: we're assuming that memos in
42- # _group_cache now will still be there when yield_factories is called
43- # (and that uncached memos don't become cached). This ought to be
44- # true. But if it isn't that's ok, yield_factories will still work.
45- # The only negative effect is that the estimated 'total_bytes' value
46- # here will be wrong, so we might fetch bigger/smaller batches than
47- # intended.
48- if read_memo not in self.gcvf._group_cache:
49+ # Three possibilities for this read_memo:
50+ # - it's already part of this batch; or
51+ # - it's not yet part of this batch, but is already cached; or
52+ # - it's not yet part of this batch and will need to be fetched.
53+ if read_memo in self.batch_memos:
54+ # This read memo is already in this batch.
55+ return self.total_bytes
56+ try:
57+ cached_block = self.gcvf._group_cache[read_memo]
58+ except KeyError:
59+ # This read memo is new to this batch, and the data isn't cached
60+ # either.
61+ self.batch_memos[read_memo] = None
62+ self.memos_to_get.append(read_memo)
63 byte_length = read_memo[2]
64 self.total_bytes += byte_length
65+ else:
66+ # This read memo is new to this batch, but cached.
67+ # Keep a reference to the cached block in batch_memos because it's
68+ # certain that we'll use it when this batch is processed, but
69+ # there's a risk that it would fall out of _group_cache between now
70+ # and then.
71+ self.batch_memos[read_memo] = cached_block
72+ return self.total_bytes
73
74 def _flush_manager(self):
75 if self.manager is not None:
76@@ -1021,18 +1044,11 @@
77 """
78 if self.manager is None and not self.keys:
79 return
80- # First, determine the list of memos to get.
81- memos_to_get = []
82- last_read_memo = self.last_read_memo
83- for key in self.keys:
84- index_memo = self.locations[key][0]
85- read_memo = index_memo[:3]
86- if last_read_memo != read_memo:
87- memos_to_get.append(read_memo)
88- last_read_memo = read_memo
89- # Second, we fetch all those memos in one batch.
90- blocks = self.gcvf._get_blocks(memos_to_get)
91- # Finally, we turn blocks into factories and yield them.
92+ # Fetch all memos in this batch.
93+ blocks = self.gcvf._get_blocks(self.memos_to_get)
94+ # Turn blocks into factories and yield them.
95+ memos_to_get_stack = list(self.memos_to_get)
96+ memos_to_get_stack.reverse()
97 for key in self.keys:
98 index_memo, _, parents, _ = self.locations[key]
99 read_memo = index_memo[:3]
100@@ -1042,9 +1058,19 @@
101 # now, so yield records
102 for factory in self._flush_manager():
103 yield factory
104- # Now start a new manager. The next block from _get_blocks
105- # will be the block we need.
106- block = blocks.next()
107+ # Now start a new manager.
108+ if memos_to_get_stack and memos_to_get_stack[-1] == read_memo:
109+ # The next block from _get_blocks will be the block we
110+ # need.
111+ block_read_memo, block = blocks.next()
112+ if block_read_memo != read_memo:
113+ raise AssertionError(
114+ "block_read_memo out of sync with read_memo"
115+ "(%r != %r)" % (block_read_memo, read_memo))
116+ self.batch_memos[read_memo] = block
117+ memos_to_get_stack.pop()
118+ else:
119+ block = self.batch_memos[read_memo]
120 self.manager = _LazyGroupContentManager(block)
121 self.last_read_memo = read_memo
122 start, end = index_memo[3:5]
123@@ -1053,6 +1079,8 @@
124 for factory in self._flush_manager():
125 yield factory
126 del self.keys[:]
127+ self.batch_memos.clear()
128+ del self.memos_to_get[:]
129 self.total_bytes = 0
130
131
132@@ -1222,7 +1250,8 @@
133 def _get_blocks(self, read_memos):
134 """Get GroupCompressBlocks for the given read_memos.
135
136- Blocks are returned in the order specified in read_memos.
137+ :returns: a series of (read_memo, block) pairs, in the order they were
138+ originally passed.
139 """
140 cached = {}
141 for read_memo in read_memos:
142@@ -1246,18 +1275,14 @@
143 raw_records = self._access.get_raw_records(not_cached)
144 for read_memo in read_memos:
145 try:
146- yield cached[read_memo]
147+ yield read_memo, cached[read_memo]
148 except KeyError:
149- # read the group
150+ # Read the block, and cache it.
151 zdata = raw_records.next()
152- # decompress - whole thing - this is not a bug, as it
153- # permits caching. We might want to store the partially
154- # decompresed group and decompress object, so that recent
155- # texts are not penalised by big groups.
156 block = GroupCompressBlock.from_bytes(zdata)
157 self._group_cache[read_memo] = block
158 cached[read_memo] = block
159- yield block
160+ yield read_memo, block
161
162 def get_missing_compression_parent_keys(self):
163 """Return the keys of missing compression parents.
164@@ -1432,34 +1457,32 @@
165 # Batch up as many keys as we can until either:
166 # - we encounter an unadded ref, or
167 # - we run out of keys, or
168- # - the total bytes to retrieve for this batch > 256k
169+ # - the total bytes to retrieve for this batch > BATCH_SIZE
170 batcher = _BatchingBlockFetcher(self, locations)
171- BATCH_SIZE = 2**18
172 for source, keys in source_keys:
173 if source is self:
174 for key in keys:
175 if key in self._unadded_refs:
176 # Flush batch, then yield unadded ref from
177 # self._compressor.
178- for _ in batcher.yield_factories(full_flush=True):
179- yield _
180+ for factory in batcher.yield_factories(full_flush=True):
181+ yield factory
182 bytes, sha1 = self._compressor.extract(key)
183 parents = self._unadded_refs[key]
184 yield FulltextContentFactory(key, parents, sha1, bytes)
185 continue
186- batcher.add_key(key)
187- if batcher.total_bytes > BATCH_SIZE:
188+ if batcher.add_key(key) > BATCH_SIZE:
189 # Ok, this batch is big enough. Yield some results.
190- for _ in batcher.yield_factories():
191- yield _
192+ for factory in batcher.yield_factories():
193+ yield factory
194 else:
195- for _ in batcher.yield_factories(full_flush=True):
196- yield _
197+ for factory in batcher.yield_factories(full_flush=True):
198+ yield factory
199 for record in source.get_record_stream(keys, ordering,
200 include_delta_closure):
201 yield record
202- for _ in batcher.yield_factories(full_flush=True):
203- yield _
204+ for factory in batcher.yield_factories(full_flush=True):
205+ yield factory
206
207 def get_sha1s(self, keys):
208 """See VersionedFiles.get_sha1s()."""
Revision history for this message
Martin Pool (mbp) wrote :

I looked over this with spiv, and it looks like it's a reasonable change to merge to 2.0. It needs to be cherrypicked because the current change is based off trunk.

It would still be nice if John could check that this includes all his comments.

As a follow-on it would be good to add some specific tests for the _BatchingBlockFetcher.

review: Approve
Revision history for this message
John A Meinel (jameinel) wrote :

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

...

>> So I think what we really need is a different caching logic. Namely that
>> "_get_blocks()" could keep a counter for how many times it needs a given
>> block, and doesn't flush the block out of the cache until that value is
>> satisfied. In addition, it is probably reasonable to continue to cache things
>> in gcvf._cache. I'm not sure about the LRU effects, and whether we should
>> always make a request on the _cache to keep it in sync with the actual
>> requests...
>
> What I've done as mentioned above is actually retrieve and keep the block on
> _BatchingBlockFetcher at add_key time. As new blocks are received from
> _get_blocks they are also stored on _BatchingBlockFetcher. So each block, once
> retrieved, whether via cache or _get_blocks, will then be present for the rest
> of the batch (i.e. until the end of yield_factories). This fixes that issue, I
> think.
>
> An improvement to this scheme would be to then forget blocks from the batch once
> we know nothing else in the batch will use them (e.g. by tracking how many keys
> in the batch need a particular block/read_memo), but that's probably not very
> important.

It is something I'm a little bit concerned about. Perhaps your block
fetcher is limited enough in scope (since you do try to keep an upper
bound on the request size). It is something I think we should keep an
eye on. As we've really been running into problems lately with consuming
far too much memory.

...
>
...

>> ^- I personally don't like to see "_" as a variable that ever gets used on the right hand side.
>> I'd prefer:
>> for factory in batcher.yield_factories():
>> yield factory
>
> Ok, changed. (Although I find the longer lines with the word 'factory' repeated
> negates the readability benefit for me, so it's much of a muchness. So I'm
> happy to go with your preference.)
>

Well

for f in batcher.yield_factories():
  yield f

So from what I can see the diff looks fine. As Martin mentions

1) We are still missing some sort of unit testing here.

2) We should evaluate if memory consumption remains reasonable with the
current batching scheme.

But these are all things that can happen later. (Which is good, given
that this has landed :).

John
=:->

-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (Cygwin)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org/

iEYEARECAAYFAkqVQbEACgkQJdeBCYSNAAPFEgCfTXWUtjupAWMak6SMdJwLhpdc
t0oAn2ZWsHeQPDZMP1ofLt8Y/SleQUMM
=Wx7v
-----END PGP SIGNATURE-----

Revision history for this message
John A Meinel (jameinel) wrote :

I don't seem to be able to set the status of this branch to "merged" in the submission request to get it into the 2.0 branch. I assume it was cherrypicked by Martin, since I see the code present in the 2.0rc1 release.

Perhaps Martin can set the status? (So it doesn't show up as something left to be done.)

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'doc/developers/bug-handling.txt'
2--- doc/developers/bug-handling.txt 2009-08-24 00:29:31 +0000
3+++ doc/developers/bug-handling.txt 2009-08-26 18:35:24 +0000
4@@ -142,12 +142,8 @@
5 it's not a good idea for a developer to spend time reproducing the bug
6 until they're going to work on it.)
7 Triaged
8- This is an odd state - one we consider a bug in launchpad, as it really
9- means "Importance has been set". We use this to mean the same thing
10- as confirmed, and set no preference on whether Confirmed or Triaged are
11- used. Please do not change a "Confirmed" bug to "Triaged" or vice verca -
12- any reports we create or use will always search for both "Confirmed" and
13- "Triaged" or neither "Confirmed" nor "Triaged".
14+ We don't use this status. If it is set, it means the same as
15+ Confirmed.
16 In Progress
17 Someone has started working on this.
18 Won't Fix

Subscribers

People subscribed via source and target branches