Merge lp:~jameinel/bzr/2.3-gcb-peak-mem into lp:bzr

Proposed by John A Meinel
Status: Merged
Approved by: Andrew Bennetts
Approved revision: no longer in the revision history of the source branch.
Merged at revision: 5451
Proposed branch: lp:~jameinel/bzr/2.3-gcb-peak-mem
Merge into: lp:bzr
Diff against target: 289 lines (+99/-42)
4 files modified
NEWS (+3/-0)
bzrlib/groupcompress.py (+69/-39)
bzrlib/tests/per_lock/test_lock.py (+1/-1)
bzrlib/tests/test_groupcompress.py (+26/-2)
To merge this branch: bzr merge lp:~jameinel/bzr/2.3-gcb-peak-mem
Reviewer Review Type Date Requested Status
Andrew Bennetts Approve
Review via email: mp+36200@code.launchpad.net

Commit message

Add GroupCompressBlock.to_chunks()

Description of the change

This is just a small push towards one of my pet-peeves. Specifically "bzr commit" has a peak memory of 1 fulltext + 2 copies of the compressed text. This is happening at several layers in the stack, and this only fixes one of them. But it is a push in the right direction.

Basically, we have a pattern of:

header = xyz
content = (from some_source)
return ''.join([header, content])

That occurs in a few places. This changes GroupCompressBlock to just return:

return [header, content, lines]

So that higher up the stack we can then do:

outer_header = abc
chunks = [outer_header]
chunks.extend(content_from_elsewhere)

Which creates a list, rather than copying all of the content.

To actually get this copying removed we need to:

1) Change _DirectPackAccess to have an api that can be passed the chunks of the content
2) Change Serializer.bytes_record() to take chunks, and produce chunks
3) Change how we write, to maybe issue multiple write requests. ATM, I'm thinking to use a
   size check, so that if len(content) > 1MB, we issue multiple writes, otherwise we just
   ''.join([header, content])

Anyway, I'm pretty sure this is a step in the right direction, so I'd like to get it merged.

To post a comment you must log in.
lp:~jameinel/bzr/2.3-gcb-peak-mem updated
5440. By Canonical.com Patch Queue Manager <email address hidden>

(spiv) Merge lp:bzr/2.2. (Andrew Bennetts)

Revision history for this message
Andrew Bennetts (spiv) wrote :

56 + @property
57 + def _z_content(self):

I guess the plan is to eventually remove this property? It would be good to have an explicit comment saying so.

Otherwise this looks good to me. I agree it makes sense to move to a chunks-based API, rather than one based on monolithic strs.

review: Approve
Revision history for this message
Andrew Bennetts (spiv) wrote :

Oh, and it might be nice to add a NEWS entry under Internals for this, but it's not a big deal either way.

lp:~jameinel/bzr/2.3-gcb-peak-mem updated
5441. By Canonical.com Patch Queue Manager <email address hidden>

(spiv) Change links to user-reference/bzr_man.html to corresponding pages
 everywhere in the User Guide (Alexander Belchenko)

5442. By Canonical.com Patch Queue Manager <email address hidden>

(spiv) Mention applyDeprecated in the Testing Guide. (#602996) (John C
 Barstow)

5443. By Canonical.com Patch Queue Manager <email address hidden>

(spiv) Add 'mainline' and 'annotate' revision specs. (Aaron Bentley) (Andrew
 Bennetts)

5444. By Canonical.com Patch Queue Manager <email address hidden>

(parthm) 'bzr status' now displays shelve summary (#403687). (Parth
 Malwankar)

5445. By Canonical.com Patch Queue Manager <email address hidden>

(jameinel) Remove 'log' information from "successful" tests (John A Meinel)

Revision history for this message
John A Meinel (jameinel) wrote :

sent to pqm by email

lp:~jameinel/bzr/2.3-gcb-peak-mem updated
5446. By Canonical.com Patch Queue Manager <email address hidden>

(vila) SRU bug nomination clarifications (Vincent Ladeuil)

5447. By Canonical.com Patch Queue Manager <email address hidden>

(jameinel) Update bug url for squid (Martin Pool)

5448. By Canonical.com Patch Queue Manager <email address hidden>

(vila) Merge lp:bzr/2.2 into trunk including fixes for #644855,
 #646133, #632387 (Vincent Ladeuil)

5449. By Canonical.com Patch Queue Manager <email address hidden>

(vila) Use TestCaseInTempDir for TestGlobalConfig tests requiring disk
 resources (Vincent Ladeuil)

5450. By Canonical.com Patch Queue Manager <email address hidden>

(gz) Correct some use of "it's" and "its" in bzrlib (Martin [gz])

Revision history for this message
John A Meinel (jameinel) wrote :

sent to pqm by email

lp:~jameinel/bzr/2.3-gcb-peak-mem updated
5451. By Canonical.com Patch Queue Manager <email address hidden>

(jam) Reduce the peak memory in GroupCompressBlock

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'NEWS'
2--- NEWS 2010-09-29 09:56:23 +0000
3+++ NEWS 2010-09-29 20:07:49 +0000
4@@ -48,6 +48,9 @@
5 Internals
6 *********
7
8+* Small change to GroupCompressBlock to work more in terms of 'chunks'
9+ rather than 'content' for its compressed storage. (John Arbash Meinel)
10+
11 * When running ``bzr selftest --subunit`` the subunit stream will no
12 longer include the "log" information for tests which are considered to
13 be 'successes' (success, xfail, skip, etc) (John Arbash Meinel)
14
15=== modified file 'bzrlib/groupcompress.py'
16--- bzrlib/groupcompress.py 2010-08-06 18:14:22 +0000
17+++ bzrlib/groupcompress.py 2010-09-29 20:07:49 +0000
18@@ -101,7 +101,7 @@
19 def __init__(self):
20 # map by key? or just order in file?
21 self._compressor_name = None
22- self._z_content = None
23+ self._z_content_chunks = None
24 self._z_content_decompressor = None
25 self._z_content_length = None
26 self._content_length = None
27@@ -135,26 +135,30 @@
28 self._content = ''.join(self._content_chunks)
29 self._content_chunks = None
30 if self._content is None:
31- if self._z_content is None:
32+ # We join self._z_content_chunks here, because if we are
33+ # decompressing, then it is *very* likely that we have a single
34+ # chunk
35+ if self._z_content_chunks is None:
36 raise AssertionError('No content to decompress')
37- if self._z_content == '':
38+ z_content = ''.join(self._z_content_chunks)
39+ if z_content == '':
40 self._content = ''
41 elif self._compressor_name == 'lzma':
42 # We don't do partial lzma decomp yet
43- self._content = pylzma.decompress(self._z_content)
44+ self._content = pylzma.decompress(z_content)
45 elif self._compressor_name == 'zlib':
46 # Start a zlib decompressor
47 if num_bytes * 4 > self._content_length * 3:
48 # If we are requesting more that 3/4ths of the content,
49 # just extract the whole thing in a single pass
50 num_bytes = self._content_length
51- self._content = zlib.decompress(self._z_content)
52+ self._content = zlib.decompress(z_content)
53 else:
54 self._z_content_decompressor = zlib.decompressobj()
55 # Seed the decompressor with the uncompressed bytes, so
56 # that the rest of the code is simplified
57 self._content = self._z_content_decompressor.decompress(
58- self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
59+ z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
60 if not self._z_content_decompressor.unconsumed_tail:
61 self._z_content_decompressor = None
62 else:
63@@ -207,7 +211,17 @@
64 # XXX: Define some GCCorrupt error ?
65 raise AssertionError('Invalid bytes: (%d) != %d + %d' %
66 (len(bytes), pos, self._z_content_length))
67- self._z_content = bytes[pos:]
68+ self._z_content_chunks = (bytes[pos:],)
69+
70+ @property
71+ def _z_content(self):
72+ """Return z_content_chunks as a simple string.
73+
74+ Meant only to be used by the test suite.
75+ """
76+ if self._z_content_chunks is not None:
77+ return ''.join(self._z_content_chunks)
78+ return None
79
80 @classmethod
81 def from_bytes(cls, bytes):
82@@ -269,13 +283,13 @@
83 self._content_length = length
84 self._content_chunks = content_chunks
85 self._content = None
86- self._z_content = None
87+ self._z_content_chunks = None
88
89 def set_content(self, content):
90 """Set the content of this block."""
91 self._content_length = len(content)
92 self._content = content
93- self._z_content = None
94+ self._z_content_chunks = None
95
96 def _create_z_content_using_lzma(self):
97 if self._content_chunks is not None:
98@@ -283,39 +297,49 @@
99 self._content_chunks = None
100 if self._content is None:
101 raise AssertionError('Nothing to compress')
102- self._z_content = pylzma.compress(self._content)
103- self._z_content_length = len(self._z_content)
104+ z_content = pylzma.compress(self._content)
105+ self._z_content_chunks = (z_content,)
106+ self._z_content_length = len(z_content)
107
108- def _create_z_content_from_chunks(self):
109+ def _create_z_content_from_chunks(self, chunks):
110 compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
111- compressed_chunks = map(compressor.compress, self._content_chunks)
112+ # Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
113+ # (measured peak is maybe 30MB over the above...)
114+ compressed_chunks = map(compressor.compress, chunks)
115 compressed_chunks.append(compressor.flush())
116- self._z_content = ''.join(compressed_chunks)
117- self._z_content_length = len(self._z_content)
118+ # Ignore empty chunks
119+ self._z_content_chunks = [c for c in compressed_chunks if c]
120+ self._z_content_length = sum(map(len, self._z_content_chunks))
121
122 def _create_z_content(self):
123- if self._z_content is not None:
124+ if self._z_content_chunks is not None:
125 return
126 if _USE_LZMA:
127 self._create_z_content_using_lzma()
128 return
129 if self._content_chunks is not None:
130- self._create_z_content_from_chunks()
131- return
132- self._z_content = zlib.compress(self._content)
133- self._z_content_length = len(self._z_content)
134+ chunks = self._content_chunks
135+ else:
136+ chunks = (self._content,)
137+ self._create_z_content_from_chunks(chunks)
138
139- def to_bytes(self):
140- """Encode the information into a byte stream."""
141+ def to_chunks(self):
142+ """Create the byte stream as a series of 'chunks'"""
143 self._create_z_content()
144 if _USE_LZMA:
145 header = self.GCB_LZ_HEADER
146 else:
147 header = self.GCB_HEADER
148- chunks = [header,
149- '%d\n%d\n' % (self._z_content_length, self._content_length),
150- self._z_content,
151+ chunks = ['%s%d\n%d\n'
152+ % (header, self._z_content_length, self._content_length),
153 ]
154+ chunks.extend(self._z_content_chunks)
155+ total_len = sum(map(len, chunks))
156+ return total_len, chunks
157+
158+ def to_bytes(self):
159+ """Encode the information into a byte stream."""
160+ total_len, chunks = self.to_chunks()
161 return ''.join(chunks)
162
163 def _dump(self, include_text=False):
164@@ -679,18 +703,21 @@
165 z_header_bytes = zlib.compress(header_bytes)
166 del header_bytes
167 z_header_bytes_len = len(z_header_bytes)
168- block_bytes = self._block.to_bytes()
169+ block_bytes_len, block_chunks = self._block.to_chunks()
170 lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
171- len(block_bytes)))
172+ block_bytes_len))
173 lines.append(z_header_bytes)
174- lines.append(block_bytes)
175- del z_header_bytes, block_bytes
176+ lines.extend(block_chunks)
177+ del z_header_bytes, block_chunks
178+ # TODO: This is a point where we will double the memory consumption. To
179+ # avoid this, we probably have to switch to a 'chunked' api
180 return ''.join(lines)
181
182 @classmethod
183 def from_bytes(cls, bytes):
184 # TODO: This does extra string copying, probably better to do it a
185- # different way
186+ # different way. At a minimum this creates 2 copies of the
187+ # compressed content
188 (storage_kind, z_header_len, header_len,
189 block_len, rest) = bytes.split('\n', 4)
190 del bytes
191@@ -854,14 +881,6 @@
192
193 After calling this, the compressor should no longer be used
194 """
195- # TODO: this causes us to 'bloat' to 2x the size of content in the
196- # group. This has an impact for 'commit' of large objects.
197- # One possibility is to use self._content_chunks, and be lazy and
198- # only fill out self._content as a full string when we actually
199- # need it. That would at least drop the peak memory consumption
200- # for 'commit' down to ~1x the size of the largest file, at a
201- # cost of increased complexity within this code. 2x is still <<
202- # 3x the size of the largest file, so we are doing ok.
203 self._block.set_chunked_content(self.chunks, self.endpoint)
204 self.chunks = None
205 self._delta_index = None
206@@ -1630,8 +1649,19 @@
207 self._unadded_refs = {}
208 keys_to_add = []
209 def flush():
210- bytes = self._compressor.flush().to_bytes()
211+ bytes_len, chunks = self._compressor.flush().to_chunks()
212 self._compressor = GroupCompressor()
213+ # Note: At this point we still have 1 copy of the fulltext (in
214+ # record and the var 'bytes'), and this generates 2 copies of
215+ # the compressed text (one for bytes, one in chunks)
216+ # TODO: Push 'chunks' down into the _access api, so that we don't
217+ # have to double compressed memory here
218+ # TODO: Figure out how to indicate that we would be happy to free
219+ # the fulltext content at this point. Note that sometimes we
220+ # will want it later (streaming CHK pages), but most of the
221+ # time we won't (everything else)
222+ bytes = ''.join(chunks)
223+ del chunks
224 index, start, length = self._access.add_raw_records(
225 [(None, len(bytes))], bytes)[0]
226 nodes = []
227
228=== modified file 'bzrlib/tests/per_lock/test_lock.py'
229--- bzrlib/tests/per_lock/test_lock.py 2010-09-23 16:37:27 +0000
230+++ bzrlib/tests/per_lock/test_lock.py 2010-09-29 20:07:49 +0000
231@@ -1,4 +1,4 @@
232-# Copyright (C) 2007 Canonical Ltd
233+# Copyright (C) 2007, 2009, 2010 Canonical Ltd
234 #
235 # This program is free software; you can redistribute it and/or modify
236 # it under the terms of the GNU General Public License as published by
237
238=== modified file 'bzrlib/tests/test_groupcompress.py'
239--- bzrlib/tests/test_groupcompress.py 2010-08-05 16:27:35 +0000
240+++ bzrlib/tests/test_groupcompress.py 2010-09-29 20:07:49 +0000
241@@ -347,6 +347,30 @@
242 self.assertEqual(z_content, block._z_content)
243 self.assertEqual(content, block._content)
244
245+ def test_to_chunks(self):
246+ content_chunks = ['this is some content\n',
247+ 'this content will be compressed\n']
248+ content_len = sum(map(len, content_chunks))
249+ content = ''.join(content_chunks)
250+ gcb = groupcompress.GroupCompressBlock()
251+ gcb.set_chunked_content(content_chunks, content_len)
252+ total_len, block_chunks = gcb.to_chunks()
253+ block_bytes = ''.join(block_chunks)
254+ self.assertEqual(gcb._z_content_length, len(gcb._z_content))
255+ self.assertEqual(total_len, len(block_bytes))
256+ self.assertEqual(gcb._content_length, content_len)
257+ expected_header =('gcb1z\n' # group compress block v1 zlib
258+ '%d\n' # Length of compressed content
259+ '%d\n' # Length of uncompressed content
260+ ) % (gcb._z_content_length, gcb._content_length)
261+ # The first chunk should be the header chunk. It is small, fixed size,
262+ # and there is no compelling reason to split it up
263+ self.assertEqual(expected_header, block_chunks[0])
264+ self.assertStartsWith(block_bytes, expected_header)
265+ remaining_bytes = block_bytes[len(expected_header):]
266+ raw_bytes = zlib.decompress(remaining_bytes)
267+ self.assertEqual(content, raw_bytes)
268+
269 def test_to_bytes(self):
270 content = ('this is some content\n'
271 'this content will be compressed\n')
272@@ -389,7 +413,7 @@
273 z_content = zlib.compress(content)
274 self.assertEqual(57182, len(z_content))
275 block = groupcompress.GroupCompressBlock()
276- block._z_content = z_content
277+ block._z_content_chunks = (z_content,)
278 block._z_content_length = len(z_content)
279 block._compressor_name = 'zlib'
280 block._content_length = 158634
281@@ -434,7 +458,7 @@
282 z_content = zlib.compress(content)
283 self.assertEqual(57182, len(z_content))
284 block = groupcompress.GroupCompressBlock()
285- block._z_content = z_content
286+ block._z_content_chunks = (z_content,)
287 block._z_content_length = len(z_content)
288 block._compressor_name = 'zlib'
289 block._content_length = 158634