From f775e38f6aff012fc1d796e923510d21a3fd3aa1 Mon Sep 17 00:00:00 2001 From: Chris Eccles Date: Fri, 20 Jan 2017 09:14:44 +0000 Subject: [PATCH] Chnaging the Concurrent Hash Maps to be keyed by publicaiton isolating the different publicaiton item lists from each other --- src/main/java/org/si4t/solr/SolrIndexer.java | 106 ++++++++++++++----- 1 file changed, 81 insertions(+), 25 deletions(-) diff --git a/src/main/java/org/si4t/solr/SolrIndexer.java b/src/main/java/org/si4t/solr/SolrIndexer.java index d3db8af..f616c25 100644 --- a/src/main/java/org/si4t/solr/SolrIndexer.java +++ b/src/main/java/org/si4t/solr/SolrIndexer.java @@ -55,10 +55,15 @@ public class SolrIndexer implements SearchIndex private List solrCores = null; private List solrUrls = null; private String defaultCoreUrl = null; - private ConcurrentHashMap itemRemovals = new ConcurrentHashMap<>(); - private ConcurrentHashMap itemAdds = new ConcurrentHashMap<>(); - private ConcurrentHashMap binaryAdds = new ConcurrentHashMap<>(); - private ConcurrentHashMap itemUpdates = new ConcurrentHashMap<>(); + + private ConcurrentHashMap> publicationItemRemovals = new ConcurrentHashMap>(); + private ConcurrentHashMap> publicationItemAdds = new ConcurrentHashMap>(); + private ConcurrentHashMap> publicationBinaryAdds = new ConcurrentHashMap>(); + private ConcurrentHashMap> publicationItemUpdates = new ConcurrentHashMap>(); + + public SolrIndexer(){ + + } private void setSolrUrl(String pubId) throws ConfigurationException { @@ -189,7 +194,18 @@ public void addBinaryToIndex(BinaryIndexData data) throws IndexingException LOG.error("Addition failed. Unique ID is empty"); return; } - this.binaryAdds.put(data.getUniqueIndexId(), data); + + ConcurrentHashMap binaryAdds = this.publicationBinaryAdds.get(data.getPublicationItemId()); + + //If no Concurrent Hash map for the publication yet make one + if(binaryAdds == null){ + binaryAdds = new ConcurrentHashMap(); + } + + binaryAdds.put(data.getUniqueIndexId(), data); + + //Update the cached items for the publication + this.publicationBinaryAdds.put(data.getPublicationItemId(), binaryAdds); } /* @@ -213,10 +229,19 @@ public void addItemToIndex(SearchIndexData data) throws IndexingException LOG.warn("Item is: " + data.toString()); } - if (!this.itemAdds.containsKey(data.getUniqueIndexId())) + ConcurrentHashMap itemAdds = this.publicationItemAdds.get(data.getPublicationItemId()); + + //If no Concurrent Hash map for the publication yet make one + if(itemAdds == null){ + itemAdds = new ConcurrentHashMap(); + } + + if (!itemAdds.containsKey(data.getUniqueIndexId())) { - this.itemAdds.put(data.getUniqueIndexId(), data); + itemAdds.put(data.getUniqueIndexId(), data); } + + publicationItemAdds.put(data.getPublicationItemId(), itemAdds); } /* @@ -233,7 +258,17 @@ public void removeBinaryFromIndex(BaseIndexData data) throws IndexingException LOG.error("Removal addition failed. Unique ID empty"); return; } - this.itemRemovals.put(data.getUniqueIndexId(), data); + + ConcurrentHashMap itemRemovals = this.publicationItemRemovals.get(data.getPublicationItemId()); + + //If no Concurrent Hash map for the publication yet make one + if(itemRemovals == null){ + itemRemovals = new ConcurrentHashMap(); + } + + itemRemovals.put(data.getUniqueIndexId(), data); + + this.publicationItemRemovals.put(data.getPublicationItemId(), itemRemovals); } /* @@ -252,7 +287,16 @@ public void removeItemFromIndex(BaseIndexData data) throws IndexingException return; } - this.itemRemovals.put(data.getUniqueIndexId(), data); + ConcurrentHashMap itemRemovals = this.publicationItemRemovals.get(data.getPublicationItemId()); + + //If no Concurrent Hash map for the publication yet make one + if(itemRemovals == null){ + itemRemovals = new ConcurrentHashMap(); + } + + itemRemovals.put(data.getUniqueIndexId(), data); + + this.publicationItemRemovals.put(data.getPublicationItemId(), itemRemovals); } /* @@ -269,8 +313,17 @@ public void updateItemInIndex(SearchIndexData data) throws IndexingException LOG.error("Adding update item failed. Unique ID empty"); return; } - this.itemUpdates.put(data.getUniqueIndexId(), data); + ConcurrentHashMap itemUpdates = this.publicationItemUpdates.get(data.getPublicationItemId()); + + //If no Concurrent Hash map for the publication yet make one + if(itemUpdates == null){ + itemUpdates = new ConcurrentHashMap(); + } + + itemUpdates.put(data.getUniqueIndexId(), data); + + this.publicationItemUpdates.put(data.getPublicationItemId(), itemUpdates); } /* @@ -285,10 +338,10 @@ public void commit(String publicationId) throws IndexingException this.setSolrUrl(publicationId); - this.commitAddContentToSolr(this.itemAdds); - this.commitAddBinariesToSolr(); - this.removeItemsFromSolr(); - this.processItemUpdates(); + this.commitAddContentToSolr(this.publicationItemAdds.get(publicationId)); + this.commitAddBinariesToSolr(this.publicationBinaryAdds.get(publicationId)); + this.removeItemsFromSolr(this.publicationItemRemovals.get(publicationId)); + this.processItemUpdates(this.publicationItemUpdates.get(publicationId)); } catch (SolrServerException e) @@ -325,10 +378,11 @@ public void commit(String publicationId) throws IndexingException private void clearRegisters() { - itemAdds.clear(); - binaryAdds.clear(); - itemRemovals.clear(); - itemUpdates.clear(); + //Need to loop through each publication ones + this.publicationItemAdds.clear(); + this.publicationBinaryAdds.clear(); + this.publicationItemRemovals.clear(); + this.publicationItemUpdates.clear(); } /** @@ -343,14 +397,16 @@ private void clearRegisters() * @throws IOException * @throws SAXException * @throws SolrServerException + * @param itemUpdates */ - private void processItemUpdates() throws ParserConfigurationException, IOException, SAXException, SolrServerException + private void processItemUpdates(ConcurrentHashMap itemUpdates) throws ParserConfigurationException, IOException, SAXException, SolrServerException { - this.commitAddContentToSolr(this.itemUpdates); + this.commitAddContentToSolr(itemUpdates); } - private void commitAddBinariesToSolr() throws SolrServerException, IOException, ParserConfigurationException, SAXException, IndexingException { - if (this.binaryAdds.size() > 0) + private void commitAddBinariesToSolr(ConcurrentHashMap binaryAdds) throws SolrServerException, IOException, ParserConfigurationException, SAXException, IndexingException + { + if (binaryAdds.size() > 0) { LOG.info("Adding binaries to Solr."); @@ -443,13 +499,13 @@ private static SolrInputDocument constructInputDocument(SearchIndexData data, Lo return doc; } - private void removeItemsFromSolr() throws SolrServerException, IOException, ParserConfigurationException, SAXException, IndexingException { - if (this.itemRemovals.size() > 0) + private void removeItemsFromSolr(ConcurrentHashMap itemRemovals) throws SolrServerException, IOException, ParserConfigurationException, SAXException, IndexingException { + if (itemRemovals.size() > 0) { LOG.info ( SolrIndexDispatcher.INSTANCE.removeFromSolr( - this.itemRemovals.keySet(), + itemRemovals.keySet(), new SolrClientRequest ( this.solrHome + "-" + this.coreName,