Skip to content

Commit f8e8fcc

Browse files
zhengruifengJoshRosen
authored andcommitted
[SPARK-50705][SQL] Make QueryPlan lock-free
### What changes were proposed in this pull request? Replace a group of `lazy val` in `QueryPlan` with new lock-free helper classes. Not all `lazy val`s are replaced in this PR, we will need to handle remaining `lazy val`s together with its subclasses to make it take effect. ### Why are the changes needed? for the deadlock issues on query plan nodes: - sometimes we want the plan node methods to use a coarse lock (just lock the plan node itself), as these methods (expressions , output , references , deterministic , schema , canonicalized , etc.) may call each other, so using a coarse lock can prevent deadlocks. - sometimes we want these methods to use fine-grained locks, because these methods may call each other of a parent/child plan node. If you traverse the tree with different directions at the same time, it's likely to hit deadlock using coarse lock. the only solution is to not use locks if possible ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests Manually test against a deadlock case ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#49212 from zhengruifeng/query_plan_atom_refs. Lead-authored-by: Ruifeng Zheng <ruifengz@apache.org> Co-authored-by: Josh Rosen <joshrosen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 0123a5e commit f8e8fcc

File tree

7 files changed

+402
-83
lines changed

7 files changed

+402
-83
lines changed

core/src/main/scala/org/apache/spark/util/TransientLazy.scala renamed to core/src/main/scala/org/apache/spark/util/BestEffortLazyVal.scala

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,46 @@
1616
*/
1717
package org.apache.spark.util
1818

19+
import java.util.concurrent.atomic.AtomicReference
20+
1921
/**
20-
* Construct to lazily initialize a variable.
21-
* This may be helpful for avoiding deadlocks in certain scenarios. For example,
22-
* a) Thread 1 entered a synchronized method, grabbing a coarse lock on the parent object.
23-
* b) Thread 2 gets spawned off, and tries to initialize a lazy value on the same parent object
24-
* (in our case, this was the logger). This causes scala to also try to grab a coarse lock on
25-
* the parent object.
26-
* c) If thread 1 waits for thread 2 to join, a deadlock occurs.
27-
* The main difference between this and [[LazyTry]] is that this does not cache failures.
22+
* A lock-free implementation of a lazily-initialized variable.
23+
* If there are concurrent initializations then the `compute()` function may be invoked
24+
* multiple times. However, only a single `compute()` result will be stored and all readers
25+
* will receive the same result object instance.
26+
*
27+
* This may be helpful for avoiding deadlocks in certain scenarios where exactly-once
28+
* value computation is not a hard requirement.
29+
*
30+
* @note
31+
* This helper class has additional requirements on the compute function:
32+
* 1) The compute function MUST not return null;
33+
* 2) The computation failure is not cached.
2834
*
2935
* @note
3036
* Scala 3 uses a different implementation of lazy vals which doesn't have this problem.
3137
* Please refer to <a
3238
* href="https://docs.scala-lang.org/scala3/reference/changed-features/lazy-vals-init.html">Lazy
3339
* Vals Initialization</a> for more details.
3440
*/
35-
private[spark] class TransientLazy[T](initializer: => T) extends Serializable {
41+
private[spark] class BestEffortLazyVal[T <: AnyRef](
42+
@volatile private[this] var compute: () => T) extends Serializable {
3643

37-
@transient
38-
private[this] lazy val value: T = initializer
44+
private[this] val cached: AtomicReference[T] = new AtomicReference(null.asInstanceOf[T])
3945

4046
def apply(): T = {
41-
value
47+
val value = cached.get()
48+
if (value != null) {
49+
value
50+
} else {
51+
val f = compute
52+
if (f != null) {
53+
val newValue = f()
54+
assert(newValue != null, "compute function cannot return null.")
55+
cached.compareAndSet(null.asInstanceOf[T], newValue)
56+
compute = null // allow closure to be GC'd
57+
}
58+
cached.get()
59+
}
4260
}
4361
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.util
18+
19+
import java.io.{IOException, ObjectInputStream}
20+
import java.util.concurrent.atomic.AtomicReference
21+
22+
/**
23+
* A lock-free implementation of a lazily-initialized variable.
24+
* If there are concurrent initializations then the `compute()` function may be invoked
25+
* multiple times. However, only a single `compute()` result will be stored and all readers
26+
* will receive the same result object instance.
27+
*
28+
* This may be helpful for avoiding deadlocks in certain scenarios where exactly-once
29+
* value computation is not a hard requirement.
30+
*
31+
* The main difference between this and [[BestEffortLazyVal]] is that:
32+
* [[BestEffortLazyVal]] serializes the cached value after computation, while
33+
* [[TransientBestEffortLazyVal]] always serializes the compute function.
34+
*
35+
* @note
36+
* This helper class has additional requirements on the compute function:
37+
* 1) The compute function MUST not return null;
38+
* 2) The computation failure is not cached.
39+
*
40+
* @note
41+
* Scala 3 uses a different implementation of lazy vals which doesn't have this problem.
42+
* Please refer to <a
43+
* href="https://docs.scala-lang.org/scala3/reference/changed-features/lazy-vals-init.html">Lazy
44+
* Vals Initialization</a> for more details.
45+
*/
46+
private[spark] class TransientBestEffortLazyVal[T <: AnyRef](
47+
private[this] val compute: () => T) extends Serializable {
48+
49+
@transient
50+
private[this] var cached: AtomicReference[T] = new AtomicReference(null.asInstanceOf[T])
51+
52+
def apply(): T = {
53+
val value = cached.get()
54+
if (value != null) {
55+
value
56+
} else {
57+
val newValue = compute()
58+
assert(newValue != null, "compute function cannot return null.")
59+
cached.compareAndSet(null.asInstanceOf[T], newValue)
60+
cached.get()
61+
}
62+
}
63+
64+
@throws(classOf[IOException])
65+
private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
66+
ois.defaultReadObject()
67+
cached = new AtomicReference(null.asInstanceOf[T])
68+
}
69+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
21+
22+
trait SerializerTestUtils {
23+
24+
protected def roundtripSerialize[T](obj: T): T = {
25+
deserializeFromBytes(serializeToBytes(obj))
26+
}
27+
28+
protected def serializeToBytes[T](o: T): Array[Byte] = {
29+
val baos = new ByteArrayOutputStream
30+
val oos = new ObjectOutputStream(baos)
31+
try {
32+
oos.writeObject(o)
33+
baos.toByteArray
34+
} finally {
35+
oos.close()
36+
}
37+
}
38+
39+
protected def deserializeFromBytes[T](bytes: Array[Byte]): T = {
40+
val bais = new ByteArrayInputStream(bytes)
41+
val ois = new ObjectInputStream(bais)
42+
ois.readObject().asInstanceOf[T]
43+
}
44+
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.util
18+
19+
import java.io.NotSerializableException
20+
import java.util.concurrent.CountDownLatch
21+
import java.util.concurrent.atomic.AtomicInteger
22+
23+
import scala.concurrent.{ExecutionContext, Future}
24+
import scala.concurrent.duration._
25+
26+
import org.apache.spark.{SerializerTestUtils, SparkFunSuite}
27+
28+
class BestEffortLazyValSuite extends SparkFunSuite with SerializerTestUtils {
29+
30+
test("BestEffortLazy works") {
31+
val numInitializerCalls = new AtomicInteger(0)
32+
// Simulate a race condition where two threads concurrently
33+
// initialize the lazy value:
34+
val latch = new CountDownLatch(2)
35+
val lazyval = new BestEffortLazyVal(() => {
36+
numInitializerCalls.incrementAndGet()
37+
latch.countDown()
38+
latch.await()
39+
new Object()
40+
})
41+
42+
// Ensure no initialization happened before the lazy value was invoked
43+
assert(numInitializerCalls.get() === 0)
44+
45+
// Two threads concurrently invoke the lazy value
46+
implicit val ec: ExecutionContext = ExecutionContext.global
47+
val future1 = Future { lazyval() }
48+
val future2 = Future { lazyval() }
49+
val value1 = ThreadUtils.awaitResult(future1, 10.seconds)
50+
val value2 = ThreadUtils.awaitResult(future2, 10.seconds)
51+
52+
// The initializer should have been invoked twice (due to how we set up the
53+
// race condition via the latch):
54+
assert(numInitializerCalls.get() === 2)
55+
56+
// But the value should only have been computed once:
57+
assert(value1 eq value2)
58+
59+
// Ensure the subsequent invocation serves the same object
60+
assert(lazyval() eq value1)
61+
assert(numInitializerCalls.get() === 2)
62+
}
63+
64+
test("BestEffortLazyVal is serializable") {
65+
val lazyval = new BestEffortLazyVal(() => "test")
66+
67+
// serialize and deserialize before first invocation
68+
val lazyval2 = roundtripSerialize(lazyval)
69+
assert(lazyval2() === "test")
70+
71+
// first invocation
72+
assert(lazyval() === "test")
73+
74+
// serialize and deserialize after first invocation
75+
val lazyval3 = roundtripSerialize(lazyval)
76+
assert(lazyval3() === "test")
77+
}
78+
79+
test("BestEffortLazyVal is serializable: unserializable value") {
80+
val lazyval = new BestEffortLazyVal(() => new Object())
81+
82+
// serialize and deserialize before first invocation
83+
val lazyval2 = roundtripSerialize(lazyval)
84+
assert(lazyval2() != null)
85+
86+
// first invocation
87+
assert(lazyval() != null)
88+
89+
// serialize and deserialize after first invocation
90+
// try to serialize the cached value and cause NotSerializableException
91+
val e = intercept[NotSerializableException] {
92+
val lazyval3 = roundtripSerialize(lazyval)
93+
}
94+
assert(e.getMessage.contains("java.lang.Object"))
95+
}
96+
97+
test("BestEffortLazyVal is serializable: initialization failure") {
98+
val lazyval = new BestEffortLazyVal[String](() => throw new RuntimeException("test"))
99+
100+
// serialize and deserialize before first invocation
101+
val lazyval2 = roundtripSerialize(lazyval)
102+
val e2 = intercept[RuntimeException] {
103+
val v = lazyval2()
104+
}
105+
assert(e2.getMessage.contains("test"))
106+
107+
// initialization failure
108+
val e = intercept[RuntimeException] {
109+
val v = lazyval()
110+
}
111+
assert(e.getMessage.contains("test"))
112+
113+
// serialize and deserialize after initialization failure
114+
val lazyval3 = roundtripSerialize(lazyval)
115+
val e3 = intercept[RuntimeException] {
116+
val v = lazyval3()
117+
}
118+
assert(e3.getMessage.contains("test"))
119+
}
120+
}

0 commit comments

Comments
 (0)