Skip to content

Commit 75b57b8

Browse files
author
kdgregory
committed
add ReadThroughCache
1 parent 5729f14 commit 75b57b8

File tree

2 files changed

+745
-0
lines changed

2 files changed

+745
-0
lines changed
Lines changed: 325 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,325 @@
1+
// Copyright Keith D Gregory
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package net.sf.kdgcommons.util;
16+
17+
import java.util.HashMap;
18+
import java.util.LinkedHashMap;
19+
import java.util.Map;
20+
import java.util.concurrent.locks.ReentrantLock;
21+
22+
23+
/**
24+
* A size-limited LRU cache that uses a retriever function to load values. Instances are
25+
* thread-safe (provided the retriever is thread-safe) and provide a variety of blocking
26+
* options for retrieval.
27+
* <p>
28+
* Note that the cache itself implements the {@link #Retriever} interface; caches may be
29+
* stacked.
30+
*
31+
* @since 1.0.15
32+
*/
33+
public class ReadThroughCache<K,V>
34+
{
35+
/**
36+
* This interface defines the retrieval operation: given a key, it will return a value.
37+
* By contract, the retriever will wait forever; specific implementations may abandon
38+
* retrieval after a timeout.
39+
*/
40+
public interface Retriever<KK,VV>
41+
{
42+
/**
43+
* Retrieves the value corresponding to the passed key. May return <code>null</code>.
44+
*/
45+
VV retrieve(KK key) throws InterruptedException;
46+
}
47+
48+
49+
/**
50+
* Options for controlling concurrent retrieval.
51+
*/
52+
public enum Synchronization
53+
{
54+
/**
55+
* No synchronization: concurrent requests for the same key will invoke concurrent
56+
* retrievals. The first value returned is cached, subsequent values are discarded.
57+
*/
58+
NONE,
59+
60+
/**
61+
* Per-key synchronization: each retrieval request establishes a lock that is cleared
62+
* when the retrieval completes. Subsequent retrieves for the same key will block until
63+
* the first returns. This is the default behavior.
64+
*/
65+
BY_KEY,
66+
67+
/**
68+
* Single-threaded: only one invocation of the retriever will take place at a time; all
69+
* subsequent requests will block until it completes. This should only be used if the
70+
* retriever is not thread-safe.
71+
*/
72+
SINGLE_THREADED
73+
}
74+
75+
76+
//----------------------------------------------------------------------------
77+
// Public methods
78+
//----------------------------------------------------------------------------
79+
80+
public V retrieve(K key) throws InterruptedException
81+
{
82+
// all the intelligence happens in the retriever decorators
83+
return retriever.retrieve(key);
84+
}
85+
86+
87+
/**
88+
* Returns the count of mappings currently in the cache.
89+
*/
90+
public int size()
91+
{
92+
synchronized (cacheLock)
93+
{
94+
return cache.size();
95+
}
96+
}
97+
98+
99+
/**
100+
* Removes all cached values.
101+
*/
102+
public void clear()
103+
{
104+
synchronized (cacheLock)
105+
{
106+
cache.clear();
107+
}
108+
}
109+
110+
//----------------------------------------------------------------------------
111+
// Constructors and instance variables
112+
//----------------------------------------------------------------------------
113+
114+
private Retriever<K,V> retriever;
115+
private Object cacheLock = new Object();
116+
private Map<K,V> cache = null;
117+
118+
/**
119+
* Base constructor.
120+
*
121+
* @param size Maximum number of items in the cache; the least recently used
122+
* item will be evicted if retrieval would exceed this limit. To
123+
* prevent resizing the underlying hash table, this value is also
124+
* used as the map's capacity.
125+
* @param retriever The function to retrieve items.
126+
* @param syncOpt The synchronization strategy.
127+
*/
128+
public ReadThroughCache(final int size, Retriever<K,V> retriever, Synchronization syncOpt)
129+
{
130+
switch (syncOpt)
131+
{
132+
case NONE :
133+
this.retriever = new UnsynchronizedRetriever(retriever);
134+
break;
135+
case BY_KEY :
136+
this.retriever = new ByKeyRetriever(retriever);
137+
break;
138+
case SINGLE_THREADED :
139+
// this.retriever = new UnsynchronizedRetriever(retriever);
140+
this.retriever = new SingleThreadedRetriever(retriever);
141+
break;
142+
default :
143+
throw new IllegalArgumentException("invalid synchronization option: " + syncOpt);
144+
}
145+
146+
cache = new LinkedHashMap<K,V>(size, 0.75f, true)
147+
{
148+
private static final long serialVersionUID = 1L;
149+
150+
@Override
151+
protected boolean removeEldestEntry(Map.Entry<K,V> eldest) {
152+
return size() > size;
153+
}
154+
};
155+
}
156+
157+
158+
/**
159+
* Convenience constructor: creates an instance with specified size and retriever,
160+
* using per-key synchronization.
161+
*/
162+
public ReadThroughCache(int size, Retriever<K,V> retriever)
163+
{
164+
this(size, retriever, Synchronization.BY_KEY);
165+
}
166+
167+
168+
//----------------------------------------------------------------------------
169+
// Internals
170+
//----------------------------------------------------------------------------
171+
172+
private abstract class AbstractDelegatingRetriever
173+
implements Retriever<K,V>
174+
{
175+
protected Retriever<K,V> delegate;
176+
177+
protected AbstractDelegatingRetriever(Retriever<K,V> delegate)
178+
{
179+
this.delegate = delegate;
180+
}
181+
182+
public V retrieve(K key) throws InterruptedException
183+
{
184+
synchronized (cacheLock)
185+
{
186+
if (cache.containsKey(key))
187+
return cache.get(key);
188+
}
189+
return retrieve0(key);
190+
}
191+
192+
public abstract V retrieve0(K key) throws InterruptedException;
193+
}
194+
195+
196+
private class UnsynchronizedRetriever
197+
extends AbstractDelegatingRetriever
198+
{
199+
public UnsynchronizedRetriever(Retriever<K,V> delegate)
200+
{
201+
super(delegate);
202+
}
203+
204+
@Override
205+
public V retrieve0(K key) throws InterruptedException
206+
{
207+
V value = delegate.retrieve(key);
208+
209+
synchronized (cacheLock)
210+
{
211+
if (cache.containsKey(key))
212+
return cache.get(key);
213+
214+
cache.put(key, value);
215+
return value;
216+
}
217+
}
218+
}
219+
220+
221+
private class ByKeyRetriever
222+
extends AbstractDelegatingRetriever
223+
{
224+
private Object internalLock = new Object();
225+
private Map<K,ReentrantLock> keyLocks = new HashMap<K,ReentrantLock>();
226+
227+
public ByKeyRetriever(Retriever<K,V> delegate)
228+
{
229+
super(delegate);
230+
}
231+
232+
@Override
233+
public V retrieve0(K key) throws InterruptedException
234+
{
235+
ReentrantLock keyLock = getOrCreateLock(key);
236+
if (keyLock == null)
237+
{
238+
return doRetrieve(key);
239+
}
240+
else
241+
{
242+
keyLock.lockInterruptibly();
243+
keyLock.unlock();
244+
synchronized (cacheLock)
245+
{
246+
V value = cache.get(key);
247+
if ((value != null) || cache.containsKey(key))
248+
return value;
249+
}
250+
251+
// if we fall through to here, it means that the blocking process has
252+
// thrown an exception and it's up to us to retrieve the data; we'll
253+
// assume that *someone* will retrieve the value before we blow stack
254+
return retrieve0(key);
255+
}
256+
}
257+
258+
private ReentrantLock getOrCreateLock(K key) throws InterruptedException
259+
{
260+
synchronized (internalLock)
261+
{
262+
ReentrantLock lock = keyLocks.get(key);
263+
if (lock != null)
264+
return lock;
265+
266+
lock = new ReentrantLock();
267+
lock.lockInterruptibly();
268+
keyLocks.put(key, lock);
269+
return null;
270+
}
271+
}
272+
273+
private void removeLock(K key)
274+
{
275+
ReentrantLock lock = null;
276+
synchronized (internalLock)
277+
{
278+
lock = keyLocks.remove(key);
279+
}
280+
lock.unlock();
281+
}
282+
283+
private V doRetrieve(K key) throws InterruptedException
284+
{
285+
try
286+
{
287+
V value = delegate.retrieve(key);
288+
synchronized (cacheLock)
289+
{
290+
cache.put(key, value);
291+
}
292+
return value;
293+
}
294+
finally
295+
{
296+
removeLock(key);
297+
}
298+
}
299+
}
300+
301+
302+
private class SingleThreadedRetriever
303+
extends AbstractDelegatingRetriever
304+
{
305+
public SingleThreadedRetriever(Retriever<K,V> delegate)
306+
{
307+
super(delegate);
308+
}
309+
310+
@Override
311+
public synchronized V retrieve0(K key) throws InterruptedException
312+
{
313+
V value = delegate.retrieve(key);
314+
315+
synchronized (cacheLock)
316+
{
317+
if (cache.containsKey(key))
318+
return cache.get(key);
319+
320+
cache.put(key, value);
321+
return value;
322+
}
323+
}
324+
}
325+
}

0 commit comments

Comments
 (0)