Merge lp:~spiv/bzr/gc-batching into lp:bzr/2.0
- gc-batching
- Merge into 2.0
Status: | Merged | ||||
---|---|---|---|---|---|
Merge reported by: | Andrew Bennetts | ||||
Merged at revision: | not available | ||||
Proposed branch: | lp:~spiv/bzr/gc-batching | ||||
Merge into: | lp:bzr/2.0 | ||||
Diff against target: | 18 lines | ||||
To merge this branch: | bzr merge lp:~spiv/bzr/gc-batching | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
John A Meinel | Approve | ||
Martin Pool | Approve | ||
Review via email: mp+10643@code.launchpad.net |
Commit message
Description of the change
Andrew Bennetts (spiv) wrote : | # |
John A Meinel (jameinel) wrote : | # |
John A Meinel (jameinel) wrote : | # |
To start with, I think your layering here is pretty nice. Adding an object does make managing the state a bit clearer. get_record_stream() is complex enough that it probably should have started out as an object interface rather than just a simple generator. Oh well.
In short, I think we have a race condition because of how LRUSizeCache (or just LRUCache) interacts with _get_blocks() being in arbitrary ordering. And we need a different caching algorithm that can ensure requested blocks are never flushed from the cache before they are no longer needed.
The comment here is no longer correct:
+ for read_memo in read_memos:
+ try:
+ yield cached[read_memo]
+ except KeyError:
+ # read the group
+ zdata = raw_records.next()
+ # decompress - whole thing - this is not a bug, as it
+ # permits caching. We might want to store the partially
+ # decompresed group and decompress object, so that recent
+ # texts are not penalised by big groups.
+ block = GroupCompressBl
+ self._group_
+ cached[read_memo] = block
+ yield block
^- We are caching the 'block' and not the raw content anymore. And the block may only partially evaluate the compressed content.
However, I'm more concerned that in this loop:
+ for read_memo in read_memos:
+ if read_memo in cached:
+ # Don't fetch what we already have
+ continue
+ if read_memo in not_cached_seen:
+ # Don't try to fetch the same data twice
+ continue
+ not_cached.
+ not_cached_
+ raw_records = self._access.
+ for read_memo in read_memos:
+ try:
+ yield cached[read_memo]
+ except KeyError:
+ # read the group
+ zdata = raw_records.next()
+ # decompress - whole thing - this is not a bug, as it
+ # permits caching. We might want to store the partially
+ # decompresed group and decompress object, so that recent
+ # texts are not penalised by big groups.
+ block = GroupCompressBl
+ self._group_
+ cached[read_memo] = block
+ yield block
^- There is an assumption that raw_records is in the exact ordering of 'read_memos'.
And I think there is another small assumption in here, which is that the code that filters out duplicates is going to require that groups always perfectly fit in cache and aren't expired before you get to the duplicate. Perhaps an example:
Assume we have the groups G1=>G4, and that we have the texts G1,T1, etc.
If the request ends up being for:
[G1,T1], [G2, T1-T100], [G3, T1-T100], [G1,T2]
The code above will not request G1 two times.
However, it will cache[G2] and cache[G3], which gives it time for G1 to be flushed from the cache.
Even more worrisome...
Andrew Bennetts (spiv) wrote : | # |
John A Meinel wrote:
> Review: Needs Fixing
> To start with, I think your layering here is pretty nice. Adding an object
> does make managing the state a bit clearer. get_record_stream() is complex
> enough that it probably should have started out as an object interface rather
> than just a simple generator. Oh well.
Yes. I'm glad I structured it this way, it feels quite nice to me once I
figured out which bits to move out of _get_remaining_
> In short, I think we have a race condition because of how LRUSizeCache (or
> just LRUCache) interacts with _get_blocks() being in arbitrary ordering. And
> we need a different caching algorithm that can ensure requested blocks are
> never flushed from the cache before they are no longer needed.
Ok, I see that. I've fixed it by making add_key grab and keep cached data
immediately; we know we're about to use it so it would be perverse to allow it
to fall out of LRUCache and then not be used. This shifts some of the "plan
what to get" logic out of yield_factories to add_key, which I was considering
doing anyway.
> The comment here is no longer correct:
> + for read_memo in read_memos:
> + try:
> + yield cached[read_memo]
> + except KeyError:
> + # read the group
> + zdata = raw_records.next()
> + # decompress - whole thing - this is not a bug, as it
> + # permits caching. We might want to store the partially
> + # decompresed group and decompress object, so that recent
> + # texts are not penalised by big groups.
> + block = GroupCompressBl
> + self._group_
> + cached[read_memo] = block
> + yield block
>
> ^- We are caching the 'block' and not the raw content anymore. And the block may only partially evaluate the compressed content.
Ok. I've deleted that large, wrong comment and just put this at the top of the
except block:
# Read the block, and cache it.
> However, I'm more concerned that in this loop:
[...]
>
> ^- There is an assumption that raw_records is in the exact ordering of 'read_memos'.
I've added the assertion you suggest below (without an 'assert' statement, of
course.)
> And I think there is another small assumption in here, which is that the code
> that filters out duplicates is going to require that groups always perfectly
> fit in cache and aren't expired before you get to the duplicate. Perhaps an
> example:
[...]
> Even more worrisome, is "large groups" which may never get put into the cache
> in the first place. (LRUSizeCache says "if a request is larger than my size,
> don't cache it".)
>
> I think we'll never run into these bugs in test data, but we'll see them 'in
> the wild' once we have data that may not fit well in the cache.
Yeah, I think you're right. I wish that we could make the test data encounter
these cases as much as wild data...
> So I think what we really need is a different caching logic. Namely that
> "_get_blocks()" could keep a counter for how many times it needs a given
> block, and d...
1 | === modified file 'bzrlib/groupcompress.py' |
2 | --- bzrlib/groupcompress.py 2009-08-25 07:36:53 +0000 |
3 | +++ bzrlib/groupcompress.py 2009-08-26 05:41:49 +0000 |
4 | @@ -44,12 +44,15 @@ |
5 | VersionedFiles, |
6 | ) |
7 | |
8 | +# Minimum number of uncompressed bytes to try fetch at once when retrieving |
9 | +# groupcompress blocks. |
10 | +BATCH_SIZE = 2**16 |
11 | + |
12 | _USE_LZMA = False and (pylzma is not None) |
13 | |
14 | # osutils.sha_string('') |
15 | _null_sha1 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709' |
16 | |
17 | - |
18 | def sort_gc_optimal(parent_map): |
19 | """Sort and group the keys in parent_map into groupcompress order. |
20 | |
21 | @@ -984,25 +987,45 @@ |
22 | self.gcvf = gcvf |
23 | self.locations = locations |
24 | self.keys = [] |
25 | + self.batch_memos = {} |
26 | + self.memos_to_get = [] |
27 | self.total_bytes = 0 |
28 | self.last_read_memo = None |
29 | self.manager = None |
30 | |
31 | def add_key(self, key): |
32 | - """Add another to key to fetch.""" |
33 | + """Add another to key to fetch. |
34 | + |
35 | + :return: The estimated number of bytes needed to fetch the batch so |
36 | + far. |
37 | + """ |
38 | self.keys.append(key) |
39 | index_memo, _, _, _ = self.locations[key] |
40 | read_memo = index_memo[0:3] |
41 | - # This looks a bit dangerous, but it's ok: we're assuming that memos in |
42 | - # _group_cache now will still be there when yield_factories is called |
43 | - # (and that uncached memos don't become cached). This ought to be |
44 | - # true. But if it isn't that's ok, yield_factories will still work. |
45 | - # The only negative effect is that the estimated 'total_bytes' value |
46 | - # here will be wrong, so we might fetch bigger/smaller batches than |
47 | - # intended. |
48 | - if read_memo not in self.gcvf._group_cache: |
49 | + # Three possibilities for this read_memo: |
50 | + # - it's already part of this batch; or |
51 | + # - it's not yet part of this batch, but is already cached; or |
52 | + # - it's not yet part of this batch and will need to be fetched. |
53 | + if read_memo in self.batch_memos: |
54 | + # This read memo is already in this batch. |
55 | + return self.total_bytes |
56 | + try: |
57 | + cached_block = self.gcvf._group_cache[read_memo] |
58 | + except KeyError: |
59 | + # This read memo is new to this batch, and the data isn't cached |
60 | + # either. |
61 | + self.batch_memos[read_memo] = None |
62 | + self.memos_to_get.append(read_memo) |
63 | byte_length = read_memo[2] |
64 | self.total_bytes += byte_length |
65 | + else: |
66 | + # This read memo is new to this batch, but cached. |
67 | + # Keep a reference to the cached block in batch_memos because it's |
68 | + # certain that we'll use it when this batch is processed, but |
69 | + # there's a risk that it would fall out of _group_cache between now |
70 | + # and then. |
71 | + self.batch_memos[read_memo] = cached_block |
72 | + return self.total_bytes |
73 | |
74 | def _flush_manager(self): |
75 | if self.manager is not None: |
76 | @@ -1021,18 +1044,11 @@ |
77 | """ |
78 | if self.manager is None and not self.keys: |
79 | return |
80 | - # First, determine the list of memos to get. |
81 | - memos_to_get = [] |
82 | - last_read_memo = self.last_read_memo |
83 | - for key in self.keys: |
84 | - index_memo = self.locations[key][0] |
85 | - read_memo = index_memo[:3] |
86 | - if last_read_memo != read_memo: |
87 | - memos_to_get.append(read_memo) |
88 | - last_read_memo = read_memo |
89 | - # Second, we fetch all those memos in one batch. |
90 | - blocks = self.gcvf._get_blocks(memos_to_get) |
91 | - # Finally, we turn blocks into factories and yield them. |
92 | + # Fetch all memos in this batch. |
93 | + blocks = self.gcvf._get_blocks(self.memos_to_get) |
94 | + # Turn blocks into factories and yield them. |
95 | + memos_to_get_stack = list(self.memos_to_get) |
96 | + memos_to_get_stack.reverse() |
97 | for key in self.keys: |
98 | index_memo, _, parents, _ = self.locations[key] |
99 | read_memo = index_memo[:3] |
100 | @@ -1042,9 +1058,19 @@ |
101 | # now, so yield records |
102 | for factory in self._flush_manager(): |
103 | yield factory |
104 | - # Now start a new manager. The next block from _get_blocks |
105 | - # will be the block we need. |
106 | - block = blocks.next() |
107 | + # Now start a new manager. |
108 | + if memos_to_get_stack and memos_to_get_stack[-1] == read_memo: |
109 | + # The next block from _get_blocks will be the block we |
110 | + # need. |
111 | + block_read_memo, block = blocks.next() |
112 | + if block_read_memo != read_memo: |
113 | + raise AssertionError( |
114 | + "block_read_memo out of sync with read_memo" |
115 | + "(%r != %r)" % (block_read_memo, read_memo)) |
116 | + self.batch_memos[read_memo] = block |
117 | + memos_to_get_stack.pop() |
118 | + else: |
119 | + block = self.batch_memos[read_memo] |
120 | self.manager = _LazyGroupContentManager(block) |
121 | self.last_read_memo = read_memo |
122 | start, end = index_memo[3:5] |
123 | @@ -1053,6 +1079,8 @@ |
124 | for factory in self._flush_manager(): |
125 | yield factory |
126 | del self.keys[:] |
127 | + self.batch_memos.clear() |
128 | + del self.memos_to_get[:] |
129 | self.total_bytes = 0 |
130 | |
131 | |
132 | @@ -1222,7 +1250,8 @@ |
133 | def _get_blocks(self, read_memos): |
134 | """Get GroupCompressBlocks for the given read_memos. |
135 | |
136 | - Blocks are returned in the order specified in read_memos. |
137 | + :returns: a series of (read_memo, block) pairs, in the order they were |
138 | + originally passed. |
139 | """ |
140 | cached = {} |
141 | for read_memo in read_memos: |
142 | @@ -1246,18 +1275,14 @@ |
143 | raw_records = self._access.get_raw_records(not_cached) |
144 | for read_memo in read_memos: |
145 | try: |
146 | - yield cached[read_memo] |
147 | + yield read_memo, cached[read_memo] |
148 | except KeyError: |
149 | - # read the group |
150 | + # Read the block, and cache it. |
151 | zdata = raw_records.next() |
152 | - # decompress - whole thing - this is not a bug, as it |
153 | - # permits caching. We might want to store the partially |
154 | - # decompresed group and decompress object, so that recent |
155 | - # texts are not penalised by big groups. |
156 | block = GroupCompressBlock.from_bytes(zdata) |
157 | self._group_cache[read_memo] = block |
158 | cached[read_memo] = block |
159 | - yield block |
160 | + yield read_memo, block |
161 | |
162 | def get_missing_compression_parent_keys(self): |
163 | """Return the keys of missing compression parents. |
164 | @@ -1432,34 +1457,32 @@ |
165 | # Batch up as many keys as we can until either: |
166 | # - we encounter an unadded ref, or |
167 | # - we run out of keys, or |
168 | - # - the total bytes to retrieve for this batch > 256k |
169 | + # - the total bytes to retrieve for this batch > BATCH_SIZE |
170 | batcher = _BatchingBlockFetcher(self, locations) |
171 | - BATCH_SIZE = 2**18 |
172 | for source, keys in source_keys: |
173 | if source is self: |
174 | for key in keys: |
175 | if key in self._unadded_refs: |
176 | # Flush batch, then yield unadded ref from |
177 | # self._compressor. |
178 | - for _ in batcher.yield_factories(full_flush=True): |
179 | - yield _ |
180 | + for factory in batcher.yield_factories(full_flush=True): |
181 | + yield factory |
182 | bytes, sha1 = self._compressor.extract(key) |
183 | parents = self._unadded_refs[key] |
184 | yield FulltextContentFactory(key, parents, sha1, bytes) |
185 | continue |
186 | - batcher.add_key(key) |
187 | - if batcher.total_bytes > BATCH_SIZE: |
188 | + if batcher.add_key(key) > BATCH_SIZE: |
189 | # Ok, this batch is big enough. Yield some results. |
190 | - for _ in batcher.yield_factories(): |
191 | - yield _ |
192 | + for factory in batcher.yield_factories(): |
193 | + yield factory |
194 | else: |
195 | - for _ in batcher.yield_factories(full_flush=True): |
196 | - yield _ |
197 | + for factory in batcher.yield_factories(full_flush=True): |
198 | + yield factory |
199 | for record in source.get_record_stream(keys, ordering, |
200 | include_delta_closure): |
201 | yield record |
202 | - for _ in batcher.yield_factories(full_flush=True): |
203 | - yield _ |
204 | + for factory in batcher.yield_factories(full_flush=True): |
205 | + yield factory |
206 | |
207 | def get_sha1s(self, keys): |
208 | """See VersionedFiles.get_sha1s().""" |
Martin Pool (mbp) wrote : | # |
I looked over this with spiv, and it looks like it's a reasonable change to merge to 2.0. It needs to be cherrypicked because the current change is based off trunk.
It would still be nice if John could check that this includes all his comments.
As a follow-on it would be good to add some specific tests for the _BatchingBlockF
John A Meinel (jameinel) wrote : | # |
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1
...
>> So I think what we really need is a different caching logic. Namely that
>> "_get_blocks()" could keep a counter for how many times it needs a given
>> block, and doesn't flush the block out of the cache until that value is
>> satisfied. In addition, it is probably reasonable to continue to cache things
>> in gcvf._cache. I'm not sure about the LRU effects, and whether we should
>> always make a request on the _cache to keep it in sync with the actual
>> requests...
>
> What I've done as mentioned above is actually retrieve and keep the block on
> _BatchingBlockF
> _get_blocks they are also stored on _BatchingBlockF
> retrieved, whether via cache or _get_blocks, will then be present for the rest
> of the batch (i.e. until the end of yield_factories). This fixes that issue, I
> think.
>
> An improvement to this scheme would be to then forget blocks from the batch once
> we know nothing else in the batch will use them (e.g. by tracking how many keys
> in the batch need a particular block/read_memo), but that's probably not very
> important.
It is something I'm a little bit concerned about. Perhaps your block
fetcher is limited enough in scope (since you do try to keep an upper
bound on the request size). It is something I think we should keep an
eye on. As we've really been running into problems lately with consuming
far too much memory.
...
>
...
>> ^- I personally don't like to see "_" as a variable that ever gets used on the right hand side.
>> I'd prefer:
>> for factory in batcher.
>> yield factory
>
> Ok, changed. (Although I find the longer lines with the word 'factory' repeated
> negates the readability benefit for me, so it's much of a muchness. So I'm
> happy to go with your preference.)
>
Well
for f in batcher.
yield f
So from what I can see the diff looks fine. As Martin mentions
1) We are still missing some sort of unit testing here.
2) We should evaluate if memory consumption remains reasonable with the
current batching scheme.
But these are all things that can happen later. (Which is good, given
that this has landed :).
John
=:->
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (Cygwin)
Comment: Using GnuPG with Mozilla - http://
iEYEARECAAYFAkq
t0oAn2ZWsHeQPDZ
=Wx7v
-----END PGP SIGNATURE-----
John A Meinel (jameinel) wrote : | # |
I don't seem to be able to set the status of this branch to "merged" in the submission request to get it into the 2.0 branch. I assume it was cherrypicked by Martin, since I see the code present in the 2.0rc1 release.
Perhaps Martin can set the status? (So it doesn't show up as something left to be done.)
Preview Diff
1 | === modified file 'doc/developers/bug-handling.txt' |
2 | --- doc/developers/bug-handling.txt 2009-08-24 00:29:31 +0000 |
3 | +++ doc/developers/bug-handling.txt 2009-08-26 18:35:24 +0000 |
4 | @@ -142,12 +142,8 @@ |
5 | it's not a good idea for a developer to spend time reproducing the bug |
6 | until they're going to work on it.) |
7 | Triaged |
8 | - This is an odd state - one we consider a bug in launchpad, as it really |
9 | - means "Importance has been set". We use this to mean the same thing |
10 | - as confirmed, and set no preference on whether Confirmed or Triaged are |
11 | - used. Please do not change a "Confirmed" bug to "Triaged" or vice verca - |
12 | - any reports we create or use will always search for both "Confirmed" and |
13 | - "Triaged" or neither "Confirmed" nor "Triaged". |
14 | + We don't use this status. If it is set, it means the same as |
15 | + Confirmed. |
16 | In Progress |
17 | Someone has started working on this. |
18 | Won't Fix |
Fixes #402657, at least for the originally reported case of "bzr branch http:// bazaar. launchpad. net/~launchpad- pqm/launchpad/ devel".
It changes the implementation of GroupCompressVe rsionedFiles. _get_remaining_ record_ stream to batch up block fetching a bit more. It used to call _get_block once per key, which would do a single IO request for each block, and many of the blocks were very small (a few hundred bytes) for some reason. Now instead it essentially accumulates a list of blocks to fetch and only performs the IO when the expected fetch size is reasonably large (256kB currently, basically an arbitrary choice), or if the batch needs to be flushed before returning keys from a different source (such as self._unadded_ refs).
The code is still somewhat at the mercy of the ordering asked for by the caller; the "bzr branch ..." case does 'unordered' IO, and this patch helps significantly there (based on log+http traces). Cases that trigger other orderings might not be helped as much.
_get_block is now gone, replaced by _get_blocks. There's a new class _BatchingBlockF etcher which has most of the batching logic, although the decision about when the actually fetch the batch is still in _get_remaining_ record_ stream. _get_remaining_ record_ stream is shorter and clearer now, which is nice.
I haven't managed to write any convincing automated tests for this improvement, so there are no test changes in this patch :( . I have tested it quite a bit manually, and I'm confident it is as correct as the old code, and I think the new code is pretty clear. So I'm pretty happy with it despite that. Suggestions for good tests are welcome, of course!
At one point yesterday I observed a small (~5%), reproducible, improvement in "co ." time for Launchpad, but I cannot reproduce a significant difference at the moment; seemingly something to do with throwing away and refetching my launchpad/devel branch? (I'm actually seeing 50% worse times than yesterday, even with bzr.dev!) Anyway, it's definitely no worse and still perhaps a little better even for local IO, so that's good.