diff --git a/src/main/java/net/killa/kept/KeptConcurrentMap.java b/src/main/java/net/killa/kept/KeptConcurrentMap.java index b76d651..82aef3c 100644 --- a/src/main/java/net/killa/kept/KeptConcurrentMap.java +++ b/src/main/java/net/killa/kept/KeptConcurrentMap.java @@ -17,6 +17,7 @@ */ package net.killa.kept; +import java.io.IOException; import java.util.List; import java.util.concurrent.ConcurrentMap; @@ -36,8 +37,8 @@ * the change. * */ -public class KeptConcurrentMap extends KeptMap implements - ConcurrentMap, Synchronizable { +public class KeptConcurrentMap extends KeptMap implements + ConcurrentMap, Synchronizable { private final ZooKeeper keeper; private final String znode; private final List acl; @@ -64,10 +65,11 @@ public class KeptConcurrentMap extends KeptMap implements * @throws KeeperException * @throws InterruptedException */ - public KeptConcurrentMap(final ZooKeeper keeper, final String znode, + public KeptConcurrentMap(final Class elementClass, + final ZooKeeper keeper, final String znode, final List acl, final CreateMode createMode) throws KeeperException, InterruptedException { - super(keeper, znode, acl, createMode); + super(elementClass, keeper, znode, acl, createMode); this.keeper = keeper; this.znode = znode; @@ -76,10 +78,11 @@ public KeptConcurrentMap(final ZooKeeper keeper, final String znode, } @Override - public String putIfAbsent(final String key, final String value) { + public V putIfAbsent(final String key, final V value) { synchronized (this.map) { try { - this.keeper.create(this.znode + '/' + key, value.getBytes(), + this.keeper.create(this.znode + '/' + key, + Transformer.objectToBytes(value, this.elementClass), this.acl, this.createMode); return null; @@ -91,7 +94,10 @@ public String putIfAbsent(final String key, final String value) { } catch (final InterruptedException e) { throw new RuntimeException(e.getClass().getSimpleName() + " caught", e); - } + } catch (IOException e) { + throw new RuntimeException(e.getClass().getSimpleName() + + " caught", e); + } } } @@ -126,13 +132,15 @@ public boolean remove(final Object key, final Object value) { } @Override - public String replace(final String key, final String value) { + public V replace(final String key, final V value) { synchronized (this.map) { try { - final String data = new String(this.keeper.getData(this.znode - + '/' + key, true, null)); + @SuppressWarnings("unchecked") + final V data = (V) Transformer.bytesToObject(this.keeper.getData(this.znode + + '/' + key, true, null), this.elementClass); - this.keeper.setData(this.znode + '/' + key, value.getBytes(), + this.keeper.setData(this.znode + '/' + key, + Transformer.objectToBytes(value, this.elementClass), -1); return data; @@ -144,23 +152,31 @@ public String replace(final String key, final String value) { } catch (final InterruptedException e) { throw new RuntimeException(e.getClass().getSimpleName() + " caught", e); - } + } catch (IOException e) { + throw new RuntimeException(e.getClass().getSimpleName() + + " caught", e); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e.getClass().getSimpleName() + + " caught", e); + } } } @Override - public boolean replace(final String key, final String oldValue, - final String newValue) { + public boolean replace(final String key, final V oldValue, + final V newValue) { synchronized (this.map) { try { final Stat stat = new Stat(); - final String data = new String(this.keeper.getData(this.znode - + '/' + key, true, stat)); + @SuppressWarnings("unchecked") + final V data = (V) Transformer.bytesToObject(this.keeper.getData(this.znode + + '/' + key, true, stat), this.elementClass); if (data.equals(oldValue.toString())) { this.keeper.setData(this.znode + '/' + key, - newValue.getBytes(), stat.getVersion()); + Transformer.objectToBytes(newValue, this.elementClass), + stat.getVersion()); return true; } else @@ -175,7 +191,10 @@ public boolean replace(final String key, final String oldValue, } catch (final InterruptedException e) { throw new RuntimeException(e.getClass().getSimpleName() + " caught", e); - } + } catch (final Exception e) { + throw new RuntimeException(e.getClass().getSimpleName() + + " caught", e); + } } } } diff --git a/src/main/java/net/killa/kept/KeptMap.java b/src/main/java/net/killa/kept/KeptMap.java index c22d905..7025d8b 100644 --- a/src/main/java/net/killa/kept/KeptMap.java +++ b/src/main/java/net/killa/kept/KeptMap.java @@ -41,11 +41,12 @@ * the change. * */ -public class KeptMap implements Map, Synchronizable { +public class KeptMap implements Map, Synchronizable { private static final Logger LOG = Logger.getLogger(KeptMap.class); private final SynchronizingWatcher watcher; - protected final Map map; + protected final Map map; + protected final Class elementClass; private final ZooKeeper keeper; private final String znode; @@ -73,10 +74,12 @@ public class KeptMap implements Map, Synchronizable { * @throws KeeperException * @throws InterruptedException */ - public KeptMap(final ZooKeeper keeper, final String znode, + public KeptMap(final Class elementClass, + final ZooKeeper keeper, final String znode, final List acl, final CreateMode createMode) throws KeeperException, InterruptedException { - this.map = new HashMap(); + this.elementClass = elementClass; + this.map = new HashMap(); this.keeper = keeper; @@ -111,6 +114,7 @@ else if (createMode == CreateMode.EPHEMERAL_SEQUENTIAL) } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public void synchronize() throws KeeperException, InterruptedException { synchronized (this.map) { @@ -123,15 +127,17 @@ public void synchronize() throws KeeperException, InterruptedException { this.watcher)) this.map.put( s, - new String(this.keeper.getData( - this.znode + '/' + s, this.watcher, null))); - } catch (final KeeperException.SessionExpiredException e) { - // ignore it + (V) Transformer.bytesToObject(this.keeper.getData( + this.znode + '/' + s, this.watcher, null),this.elementClass)); + } catch (final Exception e) { + throw new RuntimeException(e.getClass().getSimpleName() + + " caught", e); } } } - private String putUnsynchronized(final String key, final String value) + @SuppressWarnings("unchecked") + private V putUnsynchronized(final String key, final V value) throws KeeperException, InterruptedException { // FIXME: support slashes in keys if (key.indexOf('/') >= 0) @@ -141,7 +147,9 @@ private String putUnsynchronized(final String key, final String value) final String path = this.znode + '/' + key; try { - this.keeper.create(path, value.getBytes(), this.acl, + this.keeper.create(path, + Transformer.objectToBytes(value, this.elementClass), + this.acl, this.createMode); return null; @@ -158,11 +166,12 @@ private String putUnsynchronized(final String key, final String value) try { // set the new value - this.keeper.setData(path, value.getBytes(), - stat.getVersion()); + this.keeper.setData(path, + Transformer.objectToBytes(value, this.elementClass), + stat.getVersion()); // return the old value - return new String(oldval); + return (V) Transformer.bytesToObject(oldval, this.elementClass); } catch (final KeeperException.BadVersionException f) { if (j++ > 9) throw f; @@ -178,10 +187,14 @@ private String putUnsynchronized(final String key, final String value) throw new RuntimeException(f.getClass().getSimpleName() + " caught", f); } + } catch (final Exception f) { + throw new RuntimeException(f.getClass().getSimpleName() + + " caught", f); } } - private String removeUnsynchronized(final Object key) + @SuppressWarnings("unchecked") + protected V removeUnsynchronized(final Object key) throws InterruptedException, KeeperException { final String path = this.znode + '/' + key; @@ -195,8 +208,8 @@ private String removeUnsynchronized(final Object key) .getData(path, false, stat); this.keeper.delete(path, stat.getVersion()); - - return new String(oldval); + + return (V) Transformer.bytesToObject(oldval,this.elementClass); } catch (final KeeperException.BadVersionException e) { i++; if (i > 10) @@ -207,10 +220,13 @@ private String removeUnsynchronized(final Object key) .debug("caught bad version attempting to update, retrying"); Thread.sleep(50); - } + } } catch (final KeeperException.NoNodeException e) { return null; - } + } catch (final Exception f) { + throw new RuntimeException(f.getClass().getSimpleName() + + " caught", f); + } } /** @@ -247,13 +263,13 @@ public boolean containsValue(final Object value) { /** {@inheritDoc} */ @Override - public Set> entrySet() { + public Set> entrySet() { return this.map.entrySet(); } /** {@inheritDoc} */ @Override - public String get(final Object key) { + public V get(final Object key) { return this.map.get(key); } @@ -276,7 +292,7 @@ public Set keySet() { * containsKey() will return true for the added key. */ @Override - public String put(final String key, final String value) { + public V put(final String key, final V value) { synchronized (this.map) { try { return this.putUnsynchronized(key, value); @@ -294,10 +310,10 @@ public String put(final String key, final String value) { * containsKey() will return true for the added key. */ @Override - public void putAll(final Map m) { + public void putAll(final Map m) { synchronized (this.map) { try { - for (final Entry entry : m + for (final Entry entry : m .entrySet()) this.putUnsynchronized(entry.getKey(), entry.getValue()); } catch (final Exception e) { @@ -314,7 +330,7 @@ public void putAll(final Map m) { * before containsKey() will return false for the removed key. */ @Override - public String remove(final Object key) { + public V remove(final Object key) { synchronized (this.map) { try { return this.removeUnsynchronized(key); @@ -333,7 +349,7 @@ public int size() { /** {@inheritDoc} */ @Override - public Collection values() { + public Collection values() { return this.map.values(); } } diff --git a/src/test/java/net/killa/kept/KeptConcurrentMapTest.java b/src/test/java/net/killa/kept/KeptConcurrentMapTest.java index ab123d7..00da468 100644 --- a/src/test/java/net/killa/kept/KeptConcurrentMapTest.java +++ b/src/test/java/net/killa/kept/KeptConcurrentMapTest.java @@ -29,7 +29,7 @@ public class KeptConcurrentMapTest extends KeptTestBase { @Test public void testKeptConcurrentMap() throws IOException, KeeperException, InterruptedException { - final KeptConcurrentMap kcm = new KeptConcurrentMap(this.keeper, + final KeptConcurrentMap kcm = new KeptConcurrentMap(String.class, this.keeper, this.getParent(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); final String payload1 = Long.toString(System.currentTimeMillis()); diff --git a/src/test/java/net/killa/kept/KeptMapTest.java b/src/test/java/net/killa/kept/KeptMapTest.java index 9ac40de..5d3e354 100644 --- a/src/test/java/net/killa/kept/KeptMapTest.java +++ b/src/test/java/net/killa/kept/KeptMapTest.java @@ -31,7 +31,7 @@ public void testKeptMap() throws IOException, KeeperException, InterruptedException { final String parent = this.getParent(); - final KeptMap s = new KeptMap(this.keeper, parent, Ids.OPEN_ACL_UNSAFE, + final KeptMap s = new KeptMap(String.class, this.keeper, parent, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); // check to see that changes made to the map are reflected in the znode @@ -73,7 +73,7 @@ public void testKeptMap() throws IOException, KeeperException, @Test public void testKeptMapClear() throws IOException, KeeperException, InterruptedException { - final KeptMap km = new KeptMap(this.keeper, this.getParent(), + final KeptMap km = new KeptMap(String.class, this.keeper, this.getParent(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); km.put("one", "value"); @@ -100,7 +100,7 @@ public void testKeptMapClear() throws IOException, KeeperException, @Test public void testKeptMapOverwrite() throws KeeperException, InterruptedException { - final KeptMap km = new KeptMap(this.keeper, this.getParent(), + final KeptMap km = new KeptMap(String.class, this.keeper, this.getParent(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); Assert.assertNull("value not previously null", km.put("one", "value"));