The AtomicReference
and other Atomic…
classes in the java.util.concurrent
package allow multiple threads to read and update values safely.
Yet AtomicReference
alone is of limited use in writing thread-safe code.
This article describes an approach to using AtomicReference that can resolve race conditions.
This approach can be used for thread-safe state transitions without dedicated threads.
These transitions could include lazy initialization or resource clean-up tasks.
Something is rotten in the state transition
Consider the updateAndGet()
method.
This allows many threads to contend to update the value.
For example, one might write something like this:
private AtomicReference<Connection> ref;
public Connection getConnection() {
return ref.updateAndGet(conn -> conn == null ? new Connection() : conn);
}
When getConnection()
is first invoked, it will create a new Connection
, but later calls will find the existing one.
But there is a trap here for the unwary developer.
The contract of the updateAndGet()
method describes some restrictions on the update function:
"The function should be side-effect-free, since it may be re-applied when attempted updates fail due to contention among threads."
Although the update only happens once, many threads may call the update function at the same time. If the update function is fast and side-effect free, there is no cause for concern. But if creating a new connection is slow, or has side-effects, this approach is flawed.
Let’s build an alternative that works well with any kind of update function, even if it is slow, non-thread-safe, or throws exceptions.
To do or not to do
The update processing — e.g. a lazy initialization step — should happen exactly once. Thread contention must be resolved before the work is undertaken:
-
Exactly one thread must win the race to perform the update.
-
Other threads need only wait for the result.
Let’s build a new utility.
We’ll call it AcidReference
.
Though this be madness, yet there is method in’t
We need a more expressive interface for this behaviour.
Rather than an update function, we accept a supplier of an update function.
If the update function is null
then no update is necessary.
Our method signature looks like this:
public boolean update(Function<T, Supplier<T>> updaterSupplier) {
// ...
}
It could be used to lazily initialize our Connection
object like this:
// create the connection on demand
boolean created = ref.update(conn -> null == conn ? Connection::new : null);
if (created)
System.out.println("Created the connection");
else
System.out.println("Connection already created");
Also note that we dispensed with the update-and-get approach.
In our use case, we were not using the return value of updateAndGet()
.
We wanted to know whether a particular call had performed an update,
so we return a boolean
to show whether the update happened.
It should be easy to adapt the code to implement updateAndGet()
or getAndUpdate()
.
To assume a pleasing shape
If we add a couple of constructors and a getter method, we have the outline for our utility:
public class AcidReference<T> {
public AcidReference(T initial) { ... }
public AcidReference() { ... }
public T get() { ... }
public boolean update(Function<T, Supplier<T>> updaterSupplier) { ... }
}
Suit the action to the word
So let’s see how AcidReference
is implemented.
After all, that’s the most interesting part.
By indirections, find directions out
"We can solve any problem by introducing an extra level of indirection." -- David J. Wheeler
We define a Ref
interface that tells us whether it is in transition.
@FunctionalInterface
private interface Ref<T> extends Supplier<T> {
default boolean isReady() { return true; }
}
While the update is being applied, we want a special Ref
that makes other threads wait for the result:
/** A placeholder ref to allow contending threads to wait for an in-flight update. */
private static class TransitionalRef<T> implements Ref<T> {
private final CountDownLatch latch = new CountDownLatch(1);
private final Thread updaterThread = Thread.currentThread();
public boolean isReady() {
// It is not valid to try to retrieve the value
// from the updating thread while it is still updating
if (Thread.currentThread() == updaterThread)
throw new IllegalStateException("Attempt to retrieve value during update process");
try {
latch.await();
} catch (InterruptedException ignored) {}
return false;
}
void markAsReady() { latch.countDown(); }
public T get() { throw new UnsupportedOperationException(); }
}
We hold one of these Ref
objects inside an AtomicReference
.
Instead of AtomicReference<Widget>
we now use AtomicReference<Ref<Widget>>
.
And waits upon the judgement
Waiting for a transition is very simple:
private Ref<T> getWithWait() {
for (;;) {
Ref<T> ref = atomicReference.get();
if (ref.isReady()) return ref;
}
}
Usually, this loop will run only once, because ref.isReady()
returns true.
If ref
is a TransitionalRef
, the isReady()
method will block until the transition completes.
Even then, the loop would usually repeat once more and then retrieve the updated value.
An unlucky thread could loop around many times — once for each of many observed transitions.
The get()
method is trivial — get the reference, and dereference it:
public T get() { return getWithWait().get(); }
What judgement would step from this to this?
The update()
implementation is more involved.
Only one thread may enter the critical section where the update happens.
Other threads must wait for the transition to complete before they can access the result.
Recall that the method takes a function supplier as a parameter:
public boolean update(Function<T, Supplier<T>> updaterSupplier) {
TransitionalRef<T> tranRef = null; // created lazily later
Ref<T> ref;
Supplier<T> neededUpdate;
do {
Step 1 |
Retrieve the current reference, waiting for any in-flight update to complete. |
ref = getWithWait();
Step 2 |
Use the function supplier to check whether this value needs to be updated. |
neededUpdate = updaterSupplier.apply(ref.get());
Step 3 |
If the supplier returned |
if (null == neededUpdate) return false;
Step 4 |
Now that we know an update might be needed, we create a |
if (null == tranRef) tranRef = new TransitionalRef<>();
Step 5 |
Atomically compare and swap the original |
} while (false == atomicReference.compareAndSet(ref, tranRef));
Step 6 |
If we reach here, we start the update. Start a try-finally block to ensure |
try {
Step 7 |
Compute the new value. |
final T newValue = neededUpdate.get();
Step 8 |
Create a new non-transitional reference. (At last, the lambda we hinted at earlier.) |
ref = () -> newValue;
Step 9 |
Flag up to the caller that this update has succeeded. |
return true;
} finally {
Step 10 |
Apply the new value, or the original value if an exception occurred. |
atomicReference.set(ref);
Step 11 |
Unblock any threads waiting in step 1. |
tranRef.markAsReady();
}
}
There are more things in heaven and earth than are dreamt of in our philosophy
This is our best and most generic solution yet to a common problem. We improved it further while writing this post, and we probably aren’t finished. You can search for the latest code in the OpenLiberty source repository.
The undiscover’d country, from whose bourn no traveller returns
There is a trap for the unwary in AcidReference
too. Observe the line in the update()
method that calls neededUpdate.get()
.
This calls out to some external code, provided by the caller, while effectively holding a lock.
Whenever this happens, there is a risk of the called code simply not returning, and the lock never being released.
There is a more insidious risk that the called code proceeds to obtain another lock in an inconsistent order with this effective lock.
That is to say, another thread might obtain the other lock first before calling our update()
method, giving rise to deadlock.
AtomicReference insisted that update functions be completely side-effect free, and implied that they should not be expensive to invoke. AcidReference allows update functions that modify state, and it guarantees that they are only called once. Care and attention are still required.
Concurrent programming in Java is hard. Time for a coffee, and maybe a Danish.
The rest is silence.