Merge ~twom/launchpad:oci-try-the-lock-for-the-race into launchpad:master

Proposed by Tom Wardill
Status: Merged
Approved by: Tom Wardill
Approved revision: 8ae73843a7eec5938bfafcc5c07a8487ad38bdb9
Merge reported by: Otto Co-Pilot
Merged at revision: not available
Proposed branch: ~twom/launchpad:oci-try-the-lock-for-the-race
Merge into: launchpad:master
Diff against target: 371 lines (+113/-144)
3 files modified
lib/lp/oci/model/ocirecipebuildjob.py (+33/-44)
lib/lp/oci/tests/test_ocirecipebuildjob.py (+74/-100)
lib/lp/services/database/locking.py (+6/-0)
Reviewer Review Type Date Requested Status
Thiago F. Pappacena (community) Approve
Review via email: mp+400820@code.launchpad.net

Commit message

Use an AdvisoryLock to prevent simultaneous Registry Upload jobs

Description of the change

The SELECT .. FOR UPDATE mechanism did not prevent a previous job from committing while holding an earlier version of the metadata.
Instead, lock based on the ocirecipe to ensure we only get one at a time.

To post a comment you must log in.
8ae7384... by Tom Wardill

Add backoff to retry delay

Revision history for this message
Thiago F. Pappacena (pappacena) wrote :

LGTM

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/lib/lp/oci/model/ocirecipebuildjob.py b/lib/lp/oci/model/ocirecipebuildjob.py
2index 20337c1..8f55850 100644
3--- a/lib/lp/oci/model/ocirecipebuildjob.py
4+++ b/lib/lp/oci/model/ocirecipebuildjob.py
5@@ -24,6 +24,7 @@ from storm.databases.postgres import JSON
6 from storm.locals import (
7 Int,
8 Reference,
9+ Store,
10 )
11 import transaction
12 from zope.component import getUtility
13@@ -48,6 +49,11 @@ from lp.services.database.interfaces import (
14 IMasterStore,
15 IStore,
16 )
17+from lp.services.database.locking import (
18+ AdvisoryLockHeld,
19+ LockType,
20+ try_advisory_lock,
21+ )
22 from lp.services.database.stormbase import StormBase
23 from lp.services.job.interfaces.job import JobStatus
24 from lp.services.job.model.job import (
25@@ -186,7 +192,7 @@ class OCIRegistryUploadJob(OCIRecipeBuildJobDerived):
26 class ManifestListUploadError(Exception):
27 pass
28
29- retry_error_types = (ManifestListUploadError, )
30+ retry_error_types = (ManifestListUploadError, AdvisoryLockHeld,)
31 max_retries = 5
32
33 config = config.IOCIRegistryUploadJobSource
34@@ -212,10 +218,13 @@ class OCIRegistryUploadJob(OCIRecipeBuildJobDerived):
35 @property
36 def retry_delay(self):
37 dithering_secs = int(random.random() * 60)
38- # Adds some random seconds between 0 and 60 to minimize the
39- # likelihood of synchronized retries holding locks on the database
40- # at the same time.
41- return timedelta(minutes=10, seconds=dithering_secs)
42+ delays = (10, 15, 20, 30)
43+ try:
44+ return timedelta(
45+ minutes=delays[self.attempt_count - 1],
46+ seconds=dithering_secs)
47+ except IndexError:
48+ return timedelta(minutes=10, seconds=dithering_secs)
49
50 # Ideally we'd just override Job._set_status or similar, but
51 # lazr.delegates makes that difficult, so we use this to override all
52@@ -290,32 +299,11 @@ class OCIRegistryUploadJob(OCIRecipeBuildJobDerived):
53 """
54 builds = list(build_request.builds)
55 uploads_per_build = {i: list(i.registry_upload_jobs) for i in builds}
56- upload_jobs = sum(uploads_per_build.values(), [])
57-
58- # Lock the Job rows, so no other job updates its status until the
59- # end of this job's transaction. This is done to avoid race conditions,
60- # where 2 upload jobs could be running simultaneously and end up
61- # uploading an incomplete manifest list at the same time.
62- # Note also that new upload jobs might be created between the
63- # transaction begin and this lock takes place, but in this case the
64- # new upload is either a retry from a failed upload, or the first
65- # upload for one of the existing builds. Either way, we will succeed
66- # to execute our manifest list upload, and the new job will wait
67- # until this job finishes to upload their version of the manifest
68- # list (which will override our version, including both manifests).
69- store = IMasterStore(builds[0])
70- placeholders = ', '.join('?' for _ in upload_jobs)
71- sql = (
72- "SELECT id, status FROM job WHERE id IN (%s) FOR UPDATE"
73- % placeholders)
74- job_status = {
75- job_id: JobStatus.items[status] for job_id, status in
76- store.execute(sql, [i.job_id for i in upload_jobs])}
77
78 builds = set()
79 for build, upload_jobs in uploads_per_build.items():
80 has_finished_upload = any(
81- job_status[i.job_id] == JobStatus.COMPLETED
82+ i.status == JobStatus.COMPLETED
83 or i.job_id == self.job_id
84 for i in upload_jobs)
85 if has_finished_upload:
86@@ -342,24 +330,25 @@ class OCIRegistryUploadJob(OCIRecipeBuildJobDerived):
87 def run(self):
88 """See `IRunnableJob`."""
89 client = getUtility(IOCIRegistryClient)
90- # XXX twom 2020-04-16 This is taken from SnapStoreUploadJob
91- # it will need to gain retry support.
92 try:
93- try:
94- if not self.build_uploaded:
95- client.upload(self.build)
96- self.build_uploaded = True
97-
98- self.uploadManifestList(client)
99-
100- except OCIRegistryError as e:
101- self.error_summary = str(e)
102- self.errors = e.errors
103- raise
104- except Exception as e:
105- self.error_summary = str(e)
106- self.errors = None
107- raise
108+ with try_advisory_lock(LockType.REGISTRY_UPLOAD,
109+ self.build.recipe.id,
110+ Store.of(self.build.recipe)):
111+ try:
112+ if not self.build_uploaded:
113+ client.upload(self.build)
114+ self.build_uploaded = True
115+
116+ self.uploadManifestList(client)
117+
118+ except OCIRegistryError as e:
119+ self.error_summary = str(e)
120+ self.errors = e.errors
121+ raise
122+ except Exception as e:
123+ self.error_summary = str(e)
124+ self.errors = None
125+ raise
126 except Exception:
127 transaction.commit()
128 raise
129diff --git a/lib/lp/oci/tests/test_ocirecipebuildjob.py b/lib/lp/oci/tests/test_ocirecipebuildjob.py
130index 9388a68..0ff2938 100644
131--- a/lib/lp/oci/tests/test_ocirecipebuildjob.py
132+++ b/lib/lp/oci/tests/test_ocirecipebuildjob.py
133@@ -11,10 +11,13 @@ from datetime import (
134 datetime,
135 timedelta,
136 )
137+import os
138+import signal
139 import threading
140 import time
141
142 from fixtures import FakeLogger
143+from storm.locals import Store
144 from testtools.matchers import (
145 Equals,
146 MatchesDict,
147@@ -50,6 +53,11 @@ from lp.oci.model.ocirecipebuildjob import (
148 )
149 from lp.services.compat import mock
150 from lp.services.config import config
151+from lp.services.database.locking import (
152+ AdvisoryLockHeld,
153+ LockType,
154+ try_advisory_lock,
155+ )
156 from lp.services.features.testing import FeatureFixture
157 from lp.services.job.interfaces.job import JobStatus
158 from lp.services.job.runner import JobRunner
159@@ -64,7 +72,10 @@ from lp.testing import (
160 admin_logged_in,
161 TestCaseWithFactory,
162 )
163-from lp.testing.dbuser import dbuser
164+from lp.testing.dbuser import (
165+ dbuser,
166+ switch_dbuser,
167+ )
168 from lp.testing.fakemethod import FakeMethod
169 from lp.testing.fixture import ZopeUtilityFixture
170 from lp.testing.layers import (
171@@ -396,114 +407,30 @@ class TestOCIRegistryUploadJob(TestCaseWithFactory, MultiArchRecipeMixin,
172 self.assertEqual(JobStatus.COMPLETED, upload_job.status)
173 self.assertTrue(upload_job.build_uploaded)
174
175- def test_getUploadedBuilds_lock_between_two_jobs(self):
176- """Simple test to ensure that getUploadedBuilds method locks
177- rows in the database and make concurrent calls wait for that.
178-
179- This is not a 100% reliable way to check that concurrent calls to
180- getUploadedBuilds will queue up since it relies on the
181- execution time, but it's a "good enough" approach: this test might
182- pass if the machine running it is *really, really* slow, but a failure
183- here will indicate that something is for sure wrong.
184- """
185-
186- class AllBuildsUploadedChecker(threading.Thread):
187- """Thread to run upload_job.getUploadedBuilds tracking the time."""
188- def __init__(self, build_request):
189- super(AllBuildsUploadedChecker, self).__init__()
190- self.build_request = build_request
191- self.upload_job = None
192- # Locks the measurement start until we finished running the
193- # bootstrap code. Parent thread should call waitBootstrap
194- # after self.start().
195- self.bootstrap_lock = threading.Lock()
196- self.bootstrap_lock.acquire()
197- self.result = None
198- self.error = None
199- self.start_date = None
200- self.end_date = None
201-
202- @property
203- def lock_duration(self):
204- return self.end_date - self.start_date
205-
206- def waitBootstrap(self):
207- """Wait until self.bootstrap finishes running."""
208- self.bootstrap_lock.acquire()
209- # We don't actually need the lock... just wanted to wait
210- # for it. let's release it then.
211- self.bootstrap_lock.release()
212-
213- def bootstrap(self):
214- try:
215- build = self.build_request.builds[1]
216- self.upload_job = OCIRegistryUploadJob.create(build)
217- finally:
218- self.bootstrap_lock.release()
219-
220- def run(self):
221- with admin_logged_in():
222- self.bootstrap()
223- self.start_date = datetime.now()
224- try:
225- self.result = self.upload_job.getUploadedBuilds(
226- self.build_request)
227- except Exception as e:
228- self.error = e
229- self.end_date = datetime.now()
230-
231- # Create a build request with 2 builds.
232+ def test_getUploadedBuilds(self):
233+ # Create a build request with 3 builds.
234 build_request = self.makeBuildRequest(
235 include_i386=True, include_amd64=True, include_hppa=True)
236 builds = build_request.builds
237 self.assertEqual(3, builds.count())
238
239- # Fail one of the builds, to make sure we are ignoring it.
240- removeSecurityProxy(builds[2]).status = BuildStatus.FAILEDTOBUILD
241-
242 # Create the upload job for the first build.
243 upload_job1 = OCIRegistryUploadJob.create(builds[0])
244 upload_job1 = removeSecurityProxy(upload_job1)
245
246- # How long the lock will be held by the first job, in seconds.
247- # Adjust to minimize false positives: a number too small here might
248- # make the test pass even if the lock is not correctly implemented.
249- # A number too big will slow down the test execution...
250- waiting_time = 2
251- # Start a clean transaction and lock the rows at database level.
252- transaction.commit()
253- self.assertEqual(
254- {builds[0]}, upload_job1.getUploadedBuilds(build_request))
255-
256- # Start, in parallel, another upload job to run `getUploadedBuilds`.
257- concurrent_checker = AllBuildsUploadedChecker(build_request)
258- concurrent_checker.start()
259- # Wait until concurrent_checker is ready to measure the time waiting
260- # for the database lock.
261- concurrent_checker.waitBootstrap()
262-
263- # Wait a bit and release the database lock by committing current
264- # transaction.
265- time.sleep(waiting_time)
266- # Let's force the first job to be finished, just to make sure the
267- # second job will realise it's the last one running.
268- upload_job1.start()
269- upload_job1.complete()
270- transaction.commit()
271-
272- # Now, the concurrent checker should have already finished running,
273- # without any error and it should have taken at least the
274- # waiting_time to finish running (since it was waiting).
275- concurrent_checker.join()
276- self.assertIsNone(concurrent_checker.error)
277- self.assertGreaterEqual(
278- concurrent_checker.lock_duration, timedelta(seconds=waiting_time))
279- # Should have noticed that both builds are ready to upload.
280- self.assertEqual(2, len(concurrent_checker.result))
281- thread_build1, thread_build2 = concurrent_checker.result
282- self.assertThat(set(builds[:2]), MatchesSetwise(
283- MatchesStructure(id=Equals(thread_build1.id)),
284- MatchesStructure(id=Equals(thread_build2.id))))
285+ upload_job2 = OCIRegistryUploadJob.create(builds[1])
286+ upload_job2 = removeSecurityProxy(upload_job2)
287+
288+ client = FakeRegistryClient()
289+ self.useFixture(ZopeUtilityFixture(client, IOCIRegistryClient))
290+ with dbuser(config.IOCIRegistryUploadJobSource.dbuser):
291+ run_isolated_jobs([upload_job1])
292+
293+ with dbuser(config.IOCIRegistryUploadJobSource.dbuser):
294+ run_isolated_jobs([upload_job2])
295+
296+ result = upload_job1.getUploadedBuilds(build_request)
297+ self.assertEqual({builds[0], builds[1]}, result)
298
299 def test_run_failed_registry_error(self):
300 # A run that fails with a registry error sets the registry upload
301@@ -586,6 +513,53 @@ class TestOCIRegistryUploadJob(TestCaseWithFactory, MultiArchRecipeMixin,
302
303 self.assertEqual(0, len(self.oopses))
304
305+ def test_advisorylock_on_run(self):
306+ # The job should take an advisory lock and any attempted
307+ # simultaneous jobs should retry
308+ logger = self.useFixture(FakeLogger())
309+ build_request = self.makeBuildRequest(include_i386=False)
310+ recipe = build_request.recipe
311+
312+ self.assertEqual(1, build_request.builds.count())
313+ ocibuild = build_request.builds[0]
314+ ocibuild.updateStatus(BuildStatus.FULLYBUILT)
315+ self.makeWebhook(recipe)
316+
317+ self.assertContentEqual([], ocibuild.registry_upload_jobs)
318+ job = OCIRegistryUploadJob.create(ocibuild)
319+ client = FakeRegistryClient()
320+ switch_dbuser(config.IOCIRegistryUploadJobSource.dbuser)
321+ # Fork so that we can take an advisory lock from a different
322+ # PostgreSQL session.
323+ read, write = os.pipe()
324+ pid = os.fork()
325+ if pid == 0: # child
326+ os.close(read)
327+ with try_advisory_lock(
328+ LockType.REGISTRY_UPLOAD,
329+ ocibuild.recipe.id,
330+ Store.of(ocibuild.recipe)):
331+ os.write(write, b"1")
332+ try:
333+ signal.pause()
334+ except KeyboardInterrupt:
335+ pass
336+ os._exit(0)
337+ else: # parent
338+ try:
339+ os.close(write)
340+ os.read(read, 1)
341+ runner = JobRunner([job])
342+ runner.runAll()
343+ self.assertEqual(JobStatus.WAITING, job.status)
344+ self.assertEqual([], runner.oops_ids)
345+ self.assertIn(
346+ "Scheduling retry due to AdvisoryLockHeld", logger.output)
347+ finally:
348+ os.kill(pid, signal.SIGINT)
349+
350+
351+
352
353 class TestOCIRegistryUploadJobViaCelery(TestCaseWithFactory,
354 MultiArchRecipeMixin):
355diff --git a/lib/lp/services/database/locking.py b/lib/lp/services/database/locking.py
356index 266f971..3f1c478 100644
357--- a/lib/lp/services/database/locking.py
358+++ b/lib/lp/services/database/locking.py
359@@ -44,6 +44,12 @@ class LockType(DBEnumeratedType):
360 Package copy.
361 """)
362
363+ REGISTRY_UPLOAD = DBItem(3, """OCI Registry upload.
364+
365+ OCI Registry upload.
366+ """
367+ )
368+
369
370 @contextmanager
371 def try_advisory_lock(lock_type, lock_id, store):

Subscribers

People subscribed via source and target branches

to status/vote changes: