Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 37 additions & 18 deletions src/main/java/net/killa/kept/KeptConcurrentMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package net.killa.kept;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentMap;

Expand All @@ -36,8 +37,8 @@
* the change.
*
*/
public class KeptConcurrentMap extends KeptMap implements
ConcurrentMap<String, String>, Synchronizable {
public class KeptConcurrentMap<V> extends KeptMap<V> implements
ConcurrentMap<String, V>, Synchronizable {
private final ZooKeeper keeper;
private final String znode;
private final List<ACL> acl;
Expand All @@ -64,10 +65,11 @@ public class KeptConcurrentMap extends KeptMap implements
* @throws KeeperException
* @throws InterruptedException
*/
public KeptConcurrentMap(final ZooKeeper keeper, final String znode,
public KeptConcurrentMap(final Class<? extends V> elementClass,
final ZooKeeper keeper, final String znode,
final List<ACL> acl, final CreateMode createMode)
throws KeeperException, InterruptedException {
super(keeper, znode, acl, createMode);
super(elementClass, keeper, znode, acl, createMode);

this.keeper = keeper;
this.znode = znode;
Expand All @@ -76,10 +78,11 @@ public KeptConcurrentMap(final ZooKeeper keeper, final String znode,
}

@Override
public String putIfAbsent(final String key, final String value) {
public V putIfAbsent(final String key, final V value) {
synchronized (this.map) {
try {
this.keeper.create(this.znode + '/' + key, value.getBytes(),
this.keeper.create(this.znode + '/' + key,
Transformer.objectToBytes(value, this.elementClass),
this.acl, this.createMode);

return null;
Expand All @@ -91,7 +94,10 @@ public String putIfAbsent(final String key, final String value) {
} catch (final InterruptedException e) {
throw new RuntimeException(e.getClass().getSimpleName()
+ " caught", e);
}
} catch (IOException e) {
throw new RuntimeException(e.getClass().getSimpleName()
+ " caught", e);
}
}
}

Expand Down Expand Up @@ -126,13 +132,15 @@ public boolean remove(final Object key, final Object value) {
}

@Override
public String replace(final String key, final String value) {
public V replace(final String key, final V value) {
synchronized (this.map) {
try {
final String data = new String(this.keeper.getData(this.znode
+ '/' + key, true, null));
@SuppressWarnings("unchecked")
final V data = (V) Transformer.bytesToObject(this.keeper.getData(this.znode
+ '/' + key, true, null), this.elementClass);

this.keeper.setData(this.znode + '/' + key, value.getBytes(),
this.keeper.setData(this.znode + '/' + key,
Transformer.objectToBytes(value, this.elementClass),
-1);

return data;
Expand All @@ -144,23 +152,31 @@ public String replace(final String key, final String value) {
} catch (final InterruptedException e) {
throw new RuntimeException(e.getClass().getSimpleName()
+ " caught", e);
}
} catch (IOException e) {
throw new RuntimeException(e.getClass().getSimpleName()
+ " caught", e);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e.getClass().getSimpleName()
+ " caught", e);
}
}
}

@Override
public boolean replace(final String key, final String oldValue,
final String newValue) {
public boolean replace(final String key, final V oldValue,
final V newValue) {
synchronized (this.map) {
try {
final Stat stat = new Stat();

final String data = new String(this.keeper.getData(this.znode
+ '/' + key, true, stat));
@SuppressWarnings("unchecked")
final V data = (V) Transformer.bytesToObject(this.keeper.getData(this.znode
+ '/' + key, true, stat), this.elementClass);

if (data.equals(oldValue.toString())) {
this.keeper.setData(this.znode + '/' + key,
newValue.getBytes(), stat.getVersion());
Transformer.objectToBytes(newValue, this.elementClass),
stat.getVersion());

return true;
} else
Expand All @@ -175,7 +191,10 @@ public boolean replace(final String key, final String oldValue,
} catch (final InterruptedException e) {
throw new RuntimeException(e.getClass().getSimpleName()
+ " caught", e);
}
} catch (final Exception e) {
throw new RuntimeException(e.getClass().getSimpleName()
+ " caught", e);
}
}
}
}
66 changes: 41 additions & 25 deletions src/main/java/net/killa/kept/KeptMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@
* the change.
*
*/
public class KeptMap implements Map<String, String>, Synchronizable {
public class KeptMap<V> implements Map<String, V>, Synchronizable {
private static final Logger LOG = Logger.getLogger(KeptMap.class);

private final SynchronizingWatcher watcher;
protected final Map<String, String> map;
protected final Map<String, V> map;
protected final Class<? extends V> elementClass;

private final ZooKeeper keeper;
private final String znode;
Expand Down Expand Up @@ -73,10 +74,12 @@ public class KeptMap implements Map<String, String>, Synchronizable {
* @throws KeeperException
* @throws InterruptedException
*/
public KeptMap(final ZooKeeper keeper, final String znode,
public KeptMap(final Class<? extends V> elementClass,
final ZooKeeper keeper, final String znode,
final List<ACL> acl, final CreateMode createMode)
throws KeeperException, InterruptedException {
this.map = new HashMap<String, String>();
this.elementClass = elementClass;
this.map = new HashMap<String, V>();

this.keeper = keeper;

Expand Down Expand Up @@ -111,6 +114,7 @@ else if (createMode == CreateMode.EPHEMERAL_SEQUENTIAL)
}

/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override
public void synchronize() throws KeeperException, InterruptedException {
synchronized (this.map) {
Expand All @@ -123,15 +127,17 @@ public void synchronize() throws KeeperException, InterruptedException {
this.watcher))
this.map.put(
s,
new String(this.keeper.getData(
this.znode + '/' + s, this.watcher, null)));
} catch (final KeeperException.SessionExpiredException e) {
// ignore it
(V) Transformer.bytesToObject(this.keeper.getData(
this.znode + '/' + s, this.watcher, null),this.elementClass));
} catch (final Exception e) {
throw new RuntimeException(e.getClass().getSimpleName()
+ " caught", e);
}
}
}

private String putUnsynchronized(final String key, final String value)
@SuppressWarnings("unchecked")
private V putUnsynchronized(final String key, final V value)
throws KeeperException, InterruptedException {
// FIXME: support slashes in keys
if (key.indexOf('/') >= 0)
Expand All @@ -141,7 +147,9 @@ private String putUnsynchronized(final String key, final String value)
final String path = this.znode + '/' + key;

try {
this.keeper.create(path, value.getBytes(), this.acl,
this.keeper.create(path,
Transformer.objectToBytes(value, this.elementClass),
this.acl,
this.createMode);

return null;
Expand All @@ -158,11 +166,12 @@ private String putUnsynchronized(final String key, final String value)

try {
// set the new value
this.keeper.setData(path, value.getBytes(),
stat.getVersion());
this.keeper.setData(path,
Transformer.objectToBytes(value, this.elementClass),
stat.getVersion());

// return the old value
return new String(oldval);
return (V) Transformer.bytesToObject(oldval, this.elementClass);
} catch (final KeeperException.BadVersionException f) {
if (j++ > 9)
throw f;
Expand All @@ -178,10 +187,14 @@ private String putUnsynchronized(final String key, final String value)
throw new RuntimeException(f.getClass().getSimpleName()
+ " caught", f);
}
} catch (final Exception f) {
throw new RuntimeException(f.getClass().getSimpleName()
+ " caught", f);
}
}

private String removeUnsynchronized(final Object key)
@SuppressWarnings("unchecked")
protected V removeUnsynchronized(final Object key)
throws InterruptedException, KeeperException {
final String path = this.znode + '/' + key;

Expand All @@ -195,8 +208,8 @@ private String removeUnsynchronized(final Object key)
.getData(path, false, stat);

this.keeper.delete(path, stat.getVersion());

return new String(oldval);
return (V) Transformer.bytesToObject(oldval,this.elementClass);
} catch (final KeeperException.BadVersionException e) {
i++;
if (i > 10)
Expand All @@ -207,10 +220,13 @@ private String removeUnsynchronized(final Object key)
.debug("caught bad version attempting to update, retrying");

Thread.sleep(50);
}
}
} catch (final KeeperException.NoNodeException e) {
return null;
}
} catch (final Exception f) {
throw new RuntimeException(f.getClass().getSimpleName()
+ " caught", f);
}
}

/**
Expand Down Expand Up @@ -247,13 +263,13 @@ public boolean containsValue(final Object value) {

/** {@inheritDoc} */
@Override
public Set<java.util.Map.Entry<String, String>> entrySet() {
public Set<java.util.Map.Entry<String, V>> entrySet() {
return this.map.entrySet();
}

/** {@inheritDoc} */
@Override
public String get(final Object key) {
public V get(final Object key) {
return this.map.get(key);
}

Expand All @@ -276,7 +292,7 @@ public Set<String> keySet() {
* containsKey() will return true for the added key.
*/
@Override
public String put(final String key, final String value) {
public V put(final String key, final V value) {
synchronized (this.map) {
try {
return this.putUnsynchronized(key, value);
Expand All @@ -294,10 +310,10 @@ public String put(final String key, final String value) {
* containsKey() will return true for the added key.
*/
@Override
public void putAll(final Map<? extends String, ? extends String> m) {
public void putAll(final Map<? extends String, ? extends V> m) {
synchronized (this.map) {
try {
for (final Entry<? extends String, ? extends String> entry : m
for (final Entry<? extends String, ? extends V> entry : m
.entrySet())
this.putUnsynchronized(entry.getKey(), entry.getValue());
} catch (final Exception e) {
Expand All @@ -314,7 +330,7 @@ public void putAll(final Map<? extends String, ? extends String> m) {
* before containsKey() will return false for the removed key.
*/
@Override
public String remove(final Object key) {
public V remove(final Object key) {
synchronized (this.map) {
try {
return this.removeUnsynchronized(key);
Expand All @@ -333,7 +349,7 @@ public int size() {

/** {@inheritDoc} */
@Override
public Collection<String> values() {
public Collection<V> values() {
return this.map.values();
}
}
2 changes: 1 addition & 1 deletion src/test/java/net/killa/kept/KeptConcurrentMapTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class KeptConcurrentMapTest extends KeptTestBase {
@Test
public void testKeptConcurrentMap() throws IOException, KeeperException,
InterruptedException {
final KeptConcurrentMap kcm = new KeptConcurrentMap(this.keeper,
final KeptConcurrentMap kcm = new KeptConcurrentMap(String.class, this.keeper,
this.getParent(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

final String payload1 = Long.toString(System.currentTimeMillis());
Expand Down
6 changes: 3 additions & 3 deletions src/test/java/net/killa/kept/KeptMapTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public void testKeptMap() throws IOException, KeeperException,
InterruptedException {
final String parent = this.getParent();

final KeptMap s = new KeptMap(this.keeper, parent, Ids.OPEN_ACL_UNSAFE,
final KeptMap s = new KeptMap(String.class, this.keeper, parent, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);

// check to see that changes made to the map are reflected in the znode
Expand Down Expand Up @@ -73,7 +73,7 @@ public void testKeptMap() throws IOException, KeeperException,
@Test
public void testKeptMapClear() throws IOException, KeeperException,
InterruptedException {
final KeptMap km = new KeptMap(this.keeper, this.getParent(),
final KeptMap km = new KeptMap(String.class, this.keeper, this.getParent(),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

km.put("one", "value");
Expand All @@ -100,7 +100,7 @@ public void testKeptMapClear() throws IOException, KeeperException,
@Test
public void testKeptMapOverwrite() throws KeeperException,
InterruptedException {
final KeptMap km = new KeptMap(this.keeper, this.getParent(),
final KeptMap km = new KeptMap(String.class, this.keeper, this.getParent(),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

Assert.assertNull("value not previously null", km.put("one", "value"));
Expand Down