From 869c7c394447ada05fa3f60b2a40c4891c1a428d Mon Sep 17 00:00:00 2001 From: Josh Godsiff Date: Tue, 30 May 2017 09:48:00 +1000 Subject: [PATCH 1/6] Added quick and dirty BulkUpsert type Added a BulkUpsert type to the other Bulk* types, so we can carry the metadata we need through to the call. Currently just passes K-V pairs straight through. Also replaced infixed `mappened` with `<>`, and removed some trailing spaces. --- src/Database/V5/Bloodhound/Client.hs | 55 +++++++++++++++++----------- src/Database/V5/Bloodhound/Types.hs | 7 +++- 2 files changed, 39 insertions(+), 23 deletions(-) diff --git a/src/Database/V5/Bloodhound/Client.hs b/src/Database/V5/Bloodhound/Client.hs index ba7796e1..cb8c3129 100644 --- a/src/Database/V5/Bloodhound/Client.hs +++ b/src/Database/V5/Bloodhound/Client.hs @@ -100,6 +100,7 @@ import Control.Monad import Control.Monad.Catch import Control.Monad.IO.Class import Data.Aeson +import Data.Aeson.Types (Pair) import Data.ByteString.Lazy.Builder import qualified Data.ByteString.Lazy.Char8 as L import Data.Foldable (toList) @@ -305,7 +306,7 @@ instance FromJSON GSRs where parseJSON = withObject "Collection of GenericSnapshotRepo" parse where parse = fmap GSRs . mapM (uncurry go) . HM.toList - go rawName = withObject "GenericSnapshotRepo" $ \o -> do + go rawName = withObject "GenericSnapshotRepo" $ \o -> GenericSnapshotRepo (SnapshotRepoName rawName) <$> o .: "type" <*> o .: "settings" @@ -532,13 +533,12 @@ updateIndexSettings updates (IndexName indexName) = getIndexSettings :: (MonadBH m, MonadThrow m) => IndexName -> m (Either EsError IndexSettingsSummary) -getIndexSettings (IndexName indexName) = do - parseEsResponse =<< get =<< url +getIndexSettings (IndexName indexName) = parseEsResponse =<< get =<< url where url = joinPath [indexName, "_settings"] --- | 'forceMergeIndex' --- +-- | 'forceMergeIndex' +-- -- The force merge API allows to force merging of one or more indices through -- an API. The merge relates to the number of segments a Lucene index holds -- within each shard. The force merge operation allows to reduce the number of @@ -837,14 +837,18 @@ encodeBulkOperations stream = collapsed where collapsed = toLazyByteString $ mappend mashedTaters (byteString "\n") mash :: Builder -> V.Vector L.ByteString -> Builder -mash = V.foldl' (\b x -> b `mappend` (byteString "\n") `mappend` (lazyByteString x)) +mash = V.foldl' (\b x -> b <> byteString "\n" <> lazyByteString x) mkBulkStreamValue :: Text -> Text -> Text -> Text -> Value -mkBulkStreamValue operation indexName mappingName docId = - object [operation .= - object [ "_index" .= indexName - , "_type" .= mappingName - , "_id" .= docId]] +mkBulkStreamValue = mkBulkStreamValueWithMeta [] + +mkBulkStreamValueWithMeta :: [Pair] -> Text -> Text -> Text -> Text -> Value +mkBulkStreamValueWithMeta meta operation indexName mappingName docId = + object [ operation .= + object ([ "_index" .= indexName + , "_type" .= mappingName + , "_id" .= docId] + <> meta)] -- | 'encodeBulkOperation' is a convenience function for dumping a single 'BulkOperation' -- into an 'L.ByteString' @@ -857,13 +861,13 @@ encodeBulkOperation (BulkIndex (IndexName indexName) (MappingName mappingName) (DocId docId) value) = blob where metadata = mkBulkStreamValue "index" indexName mappingName docId - blob = encode metadata `mappend` "\n" `mappend` encode value + blob = encode metadata <> "\n" <> encode value encodeBulkOperation (BulkCreate (IndexName indexName) (MappingName mappingName) (DocId docId) value) = blob where metadata = mkBulkStreamValue "create" indexName mappingName docId - blob = encode metadata `mappend` "\n" `mappend` encode value + blob = encode metadata <> "\n" <> encode value encodeBulkOperation (BulkDelete (IndexName indexName) (MappingName mappingName) @@ -876,7 +880,14 @@ encodeBulkOperation (BulkUpdate (IndexName indexName) (DocId docId) value) = blob where metadata = mkBulkStreamValue "update" indexName mappingName docId doc = object ["doc" .= value] - blob = encode metadata `mappend` "\n" `mappend` encode doc + blob = encode metadata <> "\n" <> encode doc + +encodeBulkOperation (BulkUpsert (IndexName indexName) + (MappingName mappingName) + (DocId docId) value (UpsertMetadata moreMeta)) = blob + where metadata = mkBulkStreamValueWithMeta moreMeta "update" indexName mappingName docId + doc = object ["doc" .= value] + blob = encode metadata <> "\n" <> encode doc -- | 'getDocument' is a straight-forward way to fetch a single document from -- Elasticsearch using a 'Server', 'IndexName', 'MappingName', and a 'DocId'. @@ -943,10 +954,10 @@ searchByType (IndexName indexName) -- search results. Note that the search is put into 'SearchTypeScan' -- mode and thus results will not be sorted. Combine this with -- 'advanceScroll' to efficiently stream through the full result set -getInitialScroll :: - (FromJSON a, MonadThrow m, MonadBH m) => IndexName -> - MappingName -> - Search -> +getInitialScroll :: + (FromJSON a, MonadThrow m, MonadBH m) => IndexName -> + MappingName -> + Search -> m (Either EsError (SearchResult a)) getInitialScroll (IndexName indexName) (MappingName mappingName) search' = do let url = addQuery params <$> joinPath [indexName, mappingName, "_search"] @@ -971,7 +982,7 @@ getInitialSortedScroll (IndexName indexName) (MappingName mappingName) search = resp' <- bindM2 dispatchSearch url (return search) parseEsResponse resp' -scroll' :: (FromJSON a, MonadBH m, MonadThrow m) => Maybe ScrollId -> +scroll' :: (FromJSON a, MonadBH m, MonadThrow m) => Maybe ScrollId -> m ([Hit a], Maybe ScrollId) scroll' Nothing = return ([], Nothing) scroll' (Just sid) = do @@ -998,13 +1009,13 @@ advanceScroll (ScrollId sid) scroll = do where scrollTime = showText secs <> "s" secs :: Integer secs = round scroll - + scrollObject = object [ "scroll" .= scrollTime , "scroll_id" .= sid ] -simpleAccumulator :: - (FromJSON a, MonadBH m, MonadThrow m) => +simpleAccumulator :: + (FromJSON a, MonadBH m, MonadThrow m) => [Hit a] -> ([Hit a], Maybe ScrollId) -> m ([Hit a], Maybe ScrollId) diff --git a/src/Database/V5/Bloodhound/Types.hs b/src/Database/V5/Bloodhound/Types.hs index 20abd6b7..ebb429fc 100644 --- a/src/Database/V5/Bloodhound/Types.hs +++ b/src/Database/V5/Bloodhound/Types.hs @@ -161,6 +161,7 @@ module Database.V5.Bloodhound.Types , TemplatePattern(..) , MappingName(..) , DocId(..) + , UpsertMetadata(..) , CacheName(..) , CacheKey(..) , BulkOperation(..) @@ -689,6 +690,8 @@ data MappingField = data Mapping = Mapping { typeName :: TypeName , mappingFields :: [MappingField] } deriving (Eq, Read, Show, Generic, Typeable) +newtype UpsertMetadata = UpsertMetadata [Pair] deriving (Eq, Read, Show, Generic, Typeable) + {-| 'BulkOperation' is a sum type for expressing the four kinds of bulk operation index, create, delete, and update. 'BulkIndex' behaves like an "upsert", 'BulkCreate' will fail if a document already exists at the DocId. @@ -699,7 +702,9 @@ data BulkOperation = BulkIndex IndexName MappingName DocId Value | BulkCreate IndexName MappingName DocId Value | BulkDelete IndexName MappingName DocId - | BulkUpdate IndexName MappingName DocId Value deriving (Eq, Read, Show, Generic, Typeable) + | BulkUpdate IndexName MappingName DocId Value + | BulkUpsert IndexName MappingName DocId Value UpsertMetadata + deriving (Eq, Read, Show, Generic, Typeable) {-| 'EsResult' describes the standard wrapper JSON document that you see in successful Elasticsearch lookups or lookups that couldn't find the document. From b3b7a787f8d7180abb92818345cfa93a5a2a7100 Mon Sep 17 00:00:00 2001 From: Josh Godsiff Date: Tue, 30 May 2017 09:58:08 +1000 Subject: [PATCH 2/6] Added more metadata to BulkUpsert to allow setting flags adjacent to doc. --- src/Database/V5/Bloodhound/Client.hs | 9 ++++++--- src/Database/V5/Bloodhound/Types.hs | 8 +++++--- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/Database/V5/Bloodhound/Client.hs b/src/Database/V5/Bloodhound/Client.hs index cb8c3129..9379a082 100644 --- a/src/Database/V5/Bloodhound/Client.hs +++ b/src/Database/V5/Bloodhound/Client.hs @@ -884,9 +884,12 @@ encodeBulkOperation (BulkUpdate (IndexName indexName) encodeBulkOperation (BulkUpsert (IndexName indexName) (MappingName mappingName) - (DocId docId) value (UpsertMetadata moreMeta)) = blob - where metadata = mkBulkStreamValueWithMeta moreMeta "update" indexName mappingName docId - doc = object ["doc" .= value] + (DocId docId) + value + (UpsertActionMetadata actionMeta) + (UpsertDocMetadata docMeta)) = blob + where metadata = mkBulkStreamValueWithMeta actionMeta "update" indexName mappingName docId + doc = object $ ["doc" .= value] <> docMeta blob = encode metadata <> "\n" <> encode doc -- | 'getDocument' is a straight-forward way to fetch a single document from diff --git a/src/Database/V5/Bloodhound/Types.hs b/src/Database/V5/Bloodhound/Types.hs index ebb429fc..9ceadade 100644 --- a/src/Database/V5/Bloodhound/Types.hs +++ b/src/Database/V5/Bloodhound/Types.hs @@ -161,7 +161,8 @@ module Database.V5.Bloodhound.Types , TemplatePattern(..) , MappingName(..) , DocId(..) - , UpsertMetadata(..) + , UpsertActionMetadata(..) + , UpsertDocMetadata(..) , CacheName(..) , CacheKey(..) , BulkOperation(..) @@ -690,7 +691,8 @@ data MappingField = data Mapping = Mapping { typeName :: TypeName , mappingFields :: [MappingField] } deriving (Eq, Read, Show, Generic, Typeable) -newtype UpsertMetadata = UpsertMetadata [Pair] deriving (Eq, Read, Show, Generic, Typeable) +newtype UpsertActionMetadata = UpsertActionMetadata [Pair] deriving (Eq, Read, Show, Generic, Typeable) +newtype UpsertDocMetadata = UpsertDocMetadata [Pair] deriving (Eq, Read, Show, Generic, Typeable) {-| 'BulkOperation' is a sum type for expressing the four kinds of bulk operation index, create, delete, and update. 'BulkIndex' behaves like an @@ -703,7 +705,7 @@ data BulkOperation = | BulkCreate IndexName MappingName DocId Value | BulkDelete IndexName MappingName DocId | BulkUpdate IndexName MappingName DocId Value - | BulkUpsert IndexName MappingName DocId Value UpsertMetadata + | BulkUpsert IndexName MappingName DocId Value UpsertActionMetadata UpsertDocMetadata deriving (Eq, Read, Show, Generic, Typeable) {-| 'EsResult' describes the standard wrapper JSON document that you see in From fed39af72a190d8a4d238db97cf9d28848992dde Mon Sep 17 00:00:00 2001 From: Josh Godsiff Date: Tue, 30 May 2017 15:22:07 +1000 Subject: [PATCH 3/6] Added explicit types for Upsert Action and Payload metadata No longer just passing arbitrary JSON straight through. --- src/Database/V5/Bloodhound/Client.hs | 11 ++++----- src/Database/V5/Bloodhound/Types.hs | 35 ++++++++++++++++++++++++---- 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/src/Database/V5/Bloodhound/Client.hs b/src/Database/V5/Bloodhound/Client.hs index 9379a082..f5900e9d 100644 --- a/src/Database/V5/Bloodhound/Client.hs +++ b/src/Database/V5/Bloodhound/Client.hs @@ -100,7 +100,6 @@ import Control.Monad import Control.Monad.Catch import Control.Monad.IO.Class import Data.Aeson -import Data.Aeson.Types (Pair) import Data.ByteString.Lazy.Builder import qualified Data.ByteString.Lazy.Char8 as L import Data.Foldable (toList) @@ -842,13 +841,13 @@ mash = V.foldl' (\b x -> b <> byteString "\n" <> lazyByteString x) mkBulkStreamValue :: Text -> Text -> Text -> Text -> Value mkBulkStreamValue = mkBulkStreamValueWithMeta [] -mkBulkStreamValueWithMeta :: [Pair] -> Text -> Text -> Text -> Text -> Value +mkBulkStreamValueWithMeta :: [UpsertActionMetadata] -> Text -> Text -> Text -> Text -> Value mkBulkStreamValueWithMeta meta operation indexName mappingName docId = object [ operation .= object ([ "_index" .= indexName , "_type" .= mappingName , "_id" .= docId] - <> meta)] + <> (buildUpsertActionMetadata <$> meta))] -- | 'encodeBulkOperation' is a convenience function for dumping a single 'BulkOperation' -- into an 'L.ByteString' @@ -886,10 +885,10 @@ encodeBulkOperation (BulkUpsert (IndexName indexName) (MappingName mappingName) (DocId docId) value - (UpsertActionMetadata actionMeta) - (UpsertDocMetadata docMeta)) = blob + actionMeta + docMeta) = blob where metadata = mkBulkStreamValueWithMeta actionMeta "update" indexName mappingName docId - doc = object $ ["doc" .= value] <> docMeta + doc = object $ ["doc" .= value] <> (buildUpsertPayloadMetadata <$> docMeta) blob = encode metadata <> "\n" <> encode doc -- | 'getDocument' is a straight-forward way to fetch a single document from diff --git a/src/Database/V5/Bloodhound/Types.hs b/src/Database/V5/Bloodhound/Types.hs index 9ceadade..0b0e27e0 100644 --- a/src/Database/V5/Bloodhound/Types.hs +++ b/src/Database/V5/Bloodhound/Types.hs @@ -162,7 +162,9 @@ module Database.V5.Bloodhound.Types , MappingName(..) , DocId(..) , UpsertActionMetadata(..) - , UpsertDocMetadata(..) + , buildUpsertActionMetadata + , UpsertPayloadMetadata(..) + , buildUpsertPayloadMetadata , CacheName(..) , CacheKey(..) , BulkOperation(..) @@ -691,8 +693,33 @@ data MappingField = data Mapping = Mapping { typeName :: TypeName , mappingFields :: [MappingField] } deriving (Eq, Read, Show, Generic, Typeable) -newtype UpsertActionMetadata = UpsertActionMetadata [Pair] deriving (Eq, Read, Show, Generic, Typeable) -newtype UpsertDocMetadata = UpsertDocMetadata [Pair] deriving (Eq, Read, Show, Generic, Typeable) +data UpsertActionMetadata + = UA_RetryOnConflict Int + | UA_Source Bool + | UA_Version Int + deriving (Eq, Read, Show, Generic, Typeable) + +buildUpsertActionMetadata :: UpsertActionMetadata -> Pair +buildUpsertActionMetadata (UA_RetryOnConflict i) = "_retry_on_conflict" .= i +buildUpsertActionMetadata (UA_Source b) = "_source" .= b +buildUpsertActionMetadata (UA_Version i) = "_version" .= i + +data UpsertPayloadMetadata + = UP_DocAsUpsert Bool + | UP_Script Value + | UP_Lang Text + | UP_Params Value + | UP_Upsert Value + | UP_Source Bool + deriving (Eq, Read, Show, Generic, Typeable) + +buildUpsertPayloadMetadata :: UpsertPayloadMetadata -> Pair +buildUpsertPayloadMetadata (UP_DocAsUpsert a) = "doc_as_upsert" .= a +buildUpsertPayloadMetadata (UP_Script a) = "script" .= a +buildUpsertPayloadMetadata (UP_Lang a) = "lang" .= a +buildUpsertPayloadMetadata (UP_Params a) = "params" .= a +buildUpsertPayloadMetadata (UP_Upsert a) = "upsert" .= a +buildUpsertPayloadMetadata (UP_Source a) = "_source" .= a {-| 'BulkOperation' is a sum type for expressing the four kinds of bulk operation index, create, delete, and update. 'BulkIndex' behaves like an @@ -705,7 +732,7 @@ data BulkOperation = | BulkCreate IndexName MappingName DocId Value | BulkDelete IndexName MappingName DocId | BulkUpdate IndexName MappingName DocId Value - | BulkUpsert IndexName MappingName DocId Value UpsertActionMetadata UpsertDocMetadata + | BulkUpsert IndexName MappingName DocId Value [UpsertActionMetadata] [UpsertPayloadMetadata] deriving (Eq, Read, Show, Generic, Typeable) {-| 'EsResult' describes the standard wrapper JSON document that you see in From 5cfda7564a5d294c34862e8e95275c0b8686172e Mon Sep 17 00:00:00 2001 From: Josh Godsiff Date: Tue, 30 May 2017 09:48:00 +1000 Subject: [PATCH 4/6] Added quick and dirty BulkUpsert type Added a BulkUpsert type to the other Bulk* types, so we can carry the metadata we need through to the call. Currently just passes K-V pairs straight through. Also replaced infixed `mappened` with `<>`, and removed some trailing spaces. --- src/Database/V5/Bloodhound/Client.hs | 82 +++++++++++-------- src/Database/V5/Bloodhound/Internal/Client.hs | 26 +++--- src/Database/V5/Bloodhound/Types.hs | 5 +- 3 files changed, 65 insertions(+), 48 deletions(-) diff --git a/src/Database/V5/Bloodhound/Client.hs b/src/Database/V5/Bloodhound/Client.hs index 3a1994e0..1f2ba626 100644 --- a/src/Database/V5/Bloodhound/Client.hs +++ b/src/Database/V5/Bloodhound/Client.hs @@ -104,6 +104,7 @@ import Control.Monad import Control.Monad.Catch import Control.Monad.IO.Class import Data.Aeson +import Data.Aeson.Types (Pair) import Data.ByteString.Lazy.Builder import qualified Data.ByteString.Lazy.Char8 as L import Data.Foldable (toList) @@ -409,7 +410,7 @@ getSnapshots (SnapshotRepoName repoName) sel = where url = joinPath ["_snapshot", repoName, snapPath] snapPath = case sel of - AllSnapshots -> "_all" + AllSnapshots -> "_all" SnapshotList (s :| ss) -> T.intercalate "," (renderPath <$> (s:ss)) renderPath (SnapPattern t) = t renderPath (ExactSnap (SnapshotName t)) = t @@ -477,9 +478,9 @@ getNodesInfo sel = parseEsResponse =<< get =<< url where url = joinPath ["_nodes", selectionSeg] selectionSeg = case sel of - LocalNode -> "_local" + LocalNode -> "_local" NodeList (l :| ls) -> T.intercalate "," (selToSeg <$> (l:ls)) - AllNodes -> "_all" + AllNodes -> "_all" selToSeg (NodeByName (NodeName n)) = n selToSeg (NodeByFullNodeId (FullNodeId i)) = i selToSeg (NodeByHost (Server s)) = s @@ -495,9 +496,9 @@ getNodesStats sel = parseEsResponse =<< get =<< url where url = joinPath ["_nodes", selectionSeg, "stats"] selectionSeg = case sel of - LocalNode -> "_local" + LocalNode -> "_local" NodeList (l :| ls) -> T.intercalate "," (selToSeg <$> (l:ls)) - AllNodes -> "_all" + AllNodes -> "_all" selToSeg (NodeByName (NodeName n)) = n selToSeg (NodeByFullNodeId (FullNodeId i)) = i selToSeg (NodeByHost (Server s)) = s @@ -523,13 +524,13 @@ createIndex indexSettings (IndexName indexName) = createIndexWith :: MonadBH m => [UpdatableIndexSetting] -> Int -- ^ shard count - -> IndexName + -> IndexName -> m Reply createIndexWith updates shards (IndexName indexName) = bindM2 put url (return (Just body)) where url = joinPath [indexName] body = encode $ object - ["settings" .= deepMerge + ["settings" .= deepMerge ( HM.singleton "index.number_of_shards" (toJSON shards) : [u | Object u <- toJSON <$> updates] ) @@ -575,8 +576,8 @@ getIndexSettings (IndexName indexName) = where url = joinPath [indexName, "_settings"] --- | 'forceMergeIndex' --- +-- | 'forceMergeIndex' +-- -- The force merge API allows to force merging of one or more indices through -- an API. The merge relates to the number of segments a Lucene index holds -- within each shard. The force merge operation allows to reduce the number of @@ -618,7 +619,7 @@ deepMerge = LS.foldl' go mempty where go acc = LS.foldl' go' acc . HM.toList go' acc (k, v) = HM.insertWith merge k v acc merge (Object a) (Object b) = Object (deepMerge [a, b]) - merge _ b = b + merge _ b = b statusCodeIs :: (Int, Int) -> Reply -> Bool @@ -644,13 +645,13 @@ parseEsResponse :: (MonadThrow m, FromJSON a) => Reply parseEsResponse reply | respIsTwoHunna reply = case eitherDecode body of Right a -> return (Right a) - Left _ -> tryParseError + Left _ -> tryParseError | otherwise = tryParseError where body = responseBody reply tryParseError = case eitherDecode body of Right e -> return (Left e) -- this case should not be possible - Left _ -> explode + Left _ -> explode explode = throwM (EsProtocolException body) -- | 'indexExists' enables you to check if an index exists. Returns 'Bool' @@ -719,7 +720,7 @@ listIndices = indexVal <- HM.lookup "index" obj case indexVal of String txt -> Just (IndexName txt) - _ -> Nothing + _ -> Nothing _ -> Nothing -- | 'updateIndexAliases' updates the server's index alias @@ -805,10 +806,10 @@ putMapping (IndexName indexName) (MappingName mappingName) mapping = versionCtlParams :: IndexDocumentSettings -> [(Text, Maybe Text)] versionCtlParams cfg = case idsVersionControl cfg of - NoVersionControl -> [] - InternalVersion v -> versionParams v "internal" - ExternalGT (ExternalDocVersion v) -> versionParams v "external_gt" - ExternalGTE (ExternalDocVersion v) -> versionParams v "external_gte" + NoVersionControl -> [] + InternalVersion v -> versionParams v "internal" + ExternalGT (ExternalDocVersion v) -> versionParams v "external_gt" + ExternalGTE (ExternalDocVersion v) -> versionParams v "external_gte" ForceVersion (ExternalDocVersion v) -> versionParams v "force" where vt = showText . docVersionNumber @@ -834,7 +835,7 @@ indexDocument (IndexName indexName) bindM2 put url (return body) where url = addQuery params <$> joinPath [indexName, mappingName, docId] parentParams = case idsParent cfg of - Nothing -> [] + Nothing -> [] Just (DocumentParent (DocId p)) -> [ ("parent", Just p) ] params = versionCtlParams cfg ++ parentParams body = Just (encode document) @@ -895,11 +896,15 @@ mash :: Builder -> V.Vector L.ByteString -> Builder mash = V.foldl' (\b x -> b <> byteString "\n" <> lazyByteString x) mkBulkStreamValue :: Text -> Text -> Text -> Text -> Value -mkBulkStreamValue operation indexName mappingName docId = - object [operation .= - object [ "_index" .= indexName - , "_type" .= mappingName - , "_id" .= docId]] +mkBulkStreamValue = mkBulkStreamValueWithMeta [] + +mkBulkStreamValueWithMeta :: [Pair] -> Text -> Text -> Text -> Text -> Value +mkBulkStreamValueWithMeta meta operation indexName mappingName docId = + object [ operation .= + object ([ "_index" .= indexName + , "_type" .= mappingName + , "_id" .= docId] + <> meta)] mkBulkStreamValueAuto :: Text -> Text -> Text -> Value mkBulkStreamValueAuto operation indexName mappingName = @@ -918,7 +923,7 @@ encodeBulkOperation (BulkIndex (IndexName indexName) (MappingName mappingName) (DocId docId) value) = blob where metadata = mkBulkStreamValue "index" indexName mappingName docId - blob = encode metadata `mappend` "\n" `mappend` encode value + blob = encode metadata <> "\n" <> encode value encodeBulkOperation (BulkIndexAuto (IndexName indexName) (MappingName mappingName) @@ -936,7 +941,7 @@ encodeBulkOperation (BulkCreate (IndexName indexName) (MappingName mappingName) (DocId docId) value) = blob where metadata = mkBulkStreamValue "create" indexName mappingName docId - blob = encode metadata `mappend` "\n" `mappend` encode value + blob = encode metadata <> "\n" <> encode value encodeBulkOperation (BulkDelete (IndexName indexName) (MappingName mappingName) @@ -949,7 +954,14 @@ encodeBulkOperation (BulkUpdate (IndexName indexName) (DocId docId) value) = blob where metadata = mkBulkStreamValue "update" indexName mappingName docId doc = object ["doc" .= value] - blob = encode metadata `mappend` "\n" `mappend` encode doc + blob = encode metadata <> "\n" <> encode doc + +encodeBulkOperation (BulkUpsert (IndexName indexName) + (MappingName mappingName) + (DocId docId) value (UpsertMetadata moreMeta)) = blob + where metadata = mkBulkStreamValueWithMeta moreMeta "update" indexName mappingName docId + doc = object ["doc" .= value] + blob = encode metadata <> "\n" <> encode doc encodeBulkOperation (BulkCreateEncoding (IndexName indexName) (MappingName mappingName) @@ -1031,10 +1043,10 @@ searchByType (IndexName indexName) -- search results. Note that the search is put into 'SearchTypeScan' -- mode and thus results will not be sorted. Combine this with -- 'advanceScroll' to efficiently stream through the full result set -getInitialScroll :: - (FromJSON a, MonadThrow m, MonadBH m) => IndexName -> - MappingName -> - Search -> +getInitialScroll :: + (FromJSON a, MonadThrow m, MonadBH m) => IndexName -> + MappingName -> + Search -> m (Either EsError (SearchResult a)) getInitialScroll (IndexName indexName) (MappingName mappingName) search' = do let url = addQuery params <$> joinPath [indexName, mappingName, "_search"] @@ -1059,14 +1071,14 @@ getInitialSortedScroll (IndexName indexName) (MappingName mappingName) search = resp' <- bindM2 dispatchSearch url (return search) parseEsResponse resp' -scroll' :: (FromJSON a, MonadBH m, MonadThrow m) => Maybe ScrollId -> +scroll' :: (FromJSON a, MonadBH m, MonadThrow m) => Maybe ScrollId -> m ([Hit a], Maybe ScrollId) scroll' Nothing = return ([], Nothing) scroll' (Just sid) = do res <- advanceScroll sid 60 case res of Right SearchResult {..} -> return (hits searchHits, scrollId) - Left _ -> return ([], Nothing) + Left _ -> return ([], Nothing) -- | Use the given scroll to fetch the next page of documents. If there are no -- further pages, 'SearchResult.searchHits.hits' will be '[]'. @@ -1086,13 +1098,13 @@ advanceScroll (ScrollId sid) scroll = do where scrollTime = showText secs <> "s" secs :: Integer secs = round scroll - + scrollObject = object [ "scroll" .= scrollTime , "scroll_id" .= sid ] -simpleAccumulator :: - (FromJSON a, MonadBH m, MonadThrow m) => +simpleAccumulator :: + (FromJSON a, MonadBH m, MonadThrow m) => [Hit a] -> ([Hit a], Maybe ScrollId) -> m ([Hit a], Maybe ScrollId) diff --git a/src/Database/V5/Bloodhound/Internal/Client.hs b/src/Database/V5/Bloodhound/Internal/Client.hs index 0e955b76..73c7bde3 100644 --- a/src/Database/V5/Bloodhound/Internal/Client.hs +++ b/src/Database/V5/Bloodhound/Internal/Client.hs @@ -9,16 +9,16 @@ module Database.V5.Bloodhound.Internal.Client where import Bloodhound.Import -import qualified Data.Text as T -import qualified Data.Traversable as DT -import qualified Data.HashMap.Strict as HM -import qualified Data.Vector as V -import qualified Data.Version as Vers +import qualified Data.HashMap.Strict as HM +import qualified Data.Text as T +import qualified Data.Traversable as DT +import qualified Data.Vector as V +import qualified Data.Version as Vers import GHC.Enum import Network.HTTP.Client -import qualified Text.ParserCombinators.ReadP as RP -import Text.Read (Read(..)) -import qualified Text.Read as TR +import qualified Text.ParserCombinators.ReadP as RP +import Text.Read (Read (..)) +import qualified Text.Read as TR import Database.V5.Bloodhound.Internal.Analysis import Database.V5.Bloodhound.Internal.Newtypes @@ -388,13 +388,13 @@ data Compression instance ToJSON Compression where toJSON x = case x of CompressionDefault -> toJSON ("default" :: Text) - CompressionBest -> toJSON ("best_compression" :: Text) + CompressionBest -> toJSON ("best_compression" :: Text) instance FromJSON Compression where parseJSON = withText "Compression" $ \t -> case t of - "default" -> return CompressionDefault + "default" -> return CompressionDefault "best_compression" -> return CompressionBest - _ -> fail "invalid compression codec" + _ -> fail "invalid compression codec" -- | A measure of bytes used for various configurations. You may want -- to use smart constructors like 'gigabytes' for larger values. @@ -582,6 +582,8 @@ data Mapping = , mappingFields :: [MappingField] } deriving (Eq, Show) +newtype UpsertMetadata = UpsertMetadata [Pair] deriving (Eq, Show) + data AllocationPolicy = AllocAll -- ^ Allows shard allocation for all shards. | AllocPrimaries @@ -634,6 +636,8 @@ data BulkOperation = -- ^ Delete the document | BulkUpdate IndexName MappingName DocId Value -- ^ Update the document, merging the new value with the existing one. + | BulkUpsert IndexName MappingName DocId Value UpsertMetadata + -- ^ Update the document, or insert it if there is no existing one. deriving (Eq, Show) {-| 'EsResult' describes the standard wrapper JSON document that you see in diff --git a/src/Database/V5/Bloodhound/Types.hs b/src/Database/V5/Bloodhound/Types.hs index 403f7713..95aeccb7 100644 --- a/src/Database/V5/Bloodhound/Types.hs +++ b/src/Database/V5/Bloodhound/Types.hs @@ -169,6 +169,7 @@ module Database.V5.Bloodhound.Types , TemplatePattern(..) , MappingName(..) , DocId(..) + , UpsertMetadata(..) , CacheName(..) , CacheKey(..) , BulkOperation(..) @@ -416,15 +417,15 @@ module Database.V5.Bloodhound.Types import Bloodhound.Import -import Database.V5.Bloodhound.Types.Class -import Database.V5.Bloodhound.Internal.Analysis import Database.V5.Bloodhound.Internal.Aggregation +import Database.V5.Bloodhound.Internal.Analysis import Database.V5.Bloodhound.Internal.Client import Database.V5.Bloodhound.Internal.Highlight import Database.V5.Bloodhound.Internal.Newtypes import Database.V5.Bloodhound.Internal.Query import Database.V5.Bloodhound.Internal.Sort import Database.V5.Bloodhound.Internal.Suggest +import Database.V5.Bloodhound.Types.Class {-| 'unpackId' is a silly convenience function that gets used once. -} From 11c74daffa9c9e3b52a0946af9896d78534757b9 Mon Sep 17 00:00:00 2001 From: Josh Godsiff Date: Tue, 30 May 2017 09:58:08 +1000 Subject: [PATCH 5/6] Added more metadata to BulkUpsert to allow setting flags adjacent to doc. --- src/Database/V5/Bloodhound/Client.hs | 9 ++++++--- src/Database/V5/Bloodhound/Internal/Client.hs | 5 +++-- src/Database/V5/Bloodhound/Types.hs | 3 ++- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/Database/V5/Bloodhound/Client.hs b/src/Database/V5/Bloodhound/Client.hs index 1f2ba626..c246a6ca 100644 --- a/src/Database/V5/Bloodhound/Client.hs +++ b/src/Database/V5/Bloodhound/Client.hs @@ -958,9 +958,12 @@ encodeBulkOperation (BulkUpdate (IndexName indexName) encodeBulkOperation (BulkUpsert (IndexName indexName) (MappingName mappingName) - (DocId docId) value (UpsertMetadata moreMeta)) = blob - where metadata = mkBulkStreamValueWithMeta moreMeta "update" indexName mappingName docId - doc = object ["doc" .= value] + (DocId docId) + value + (UpsertActionMetadata actionMeta) + (UpsertDocMetadata docMeta)) = blob + where metadata = mkBulkStreamValueWithMeta actionMeta "update" indexName mappingName docId + doc = object $ ["doc" .= value] <> docMeta blob = encode metadata <> "\n" <> encode doc encodeBulkOperation (BulkCreateEncoding (IndexName indexName) diff --git a/src/Database/V5/Bloodhound/Internal/Client.hs b/src/Database/V5/Bloodhound/Internal/Client.hs index 73c7bde3..5293a1bc 100644 --- a/src/Database/V5/Bloodhound/Internal/Client.hs +++ b/src/Database/V5/Bloodhound/Internal/Client.hs @@ -582,7 +582,8 @@ data Mapping = , mappingFields :: [MappingField] } deriving (Eq, Show) -newtype UpsertMetadata = UpsertMetadata [Pair] deriving (Eq, Show) +newtype UpsertActionMetadata = UpsertActionMetadata [Pair] deriving (Eq, Show) +newtype UpsertDocMetadata = UpsertDocMetadata [Pair] deriving (Eq, Show) data AllocationPolicy = AllocAll -- ^ Allows shard allocation for all shards. @@ -636,7 +637,7 @@ data BulkOperation = -- ^ Delete the document | BulkUpdate IndexName MappingName DocId Value -- ^ Update the document, merging the new value with the existing one. - | BulkUpsert IndexName MappingName DocId Value UpsertMetadata + | BulkUpsert IndexName MappingName DocId Value UpsertActionMetadata UpsertDocMetadata -- ^ Update the document, or insert it if there is no existing one. deriving (Eq, Show) diff --git a/src/Database/V5/Bloodhound/Types.hs b/src/Database/V5/Bloodhound/Types.hs index 95aeccb7..087e53e6 100644 --- a/src/Database/V5/Bloodhound/Types.hs +++ b/src/Database/V5/Bloodhound/Types.hs @@ -169,7 +169,8 @@ module Database.V5.Bloodhound.Types , TemplatePattern(..) , MappingName(..) , DocId(..) - , UpsertMetadata(..) + , UpsertActionMetadata(..) + , UpsertDocMetadata(..) , CacheName(..) , CacheKey(..) , BulkOperation(..) From 026da692c67932944ff83741b7f30cca72aa7f95 Mon Sep 17 00:00:00 2001 From: Josh Godsiff Date: Tue, 30 May 2017 15:22:07 +1000 Subject: [PATCH 6/6] Added explicit types for Upsert Action and Payload metadata No longer just passing arbitrary JSON straight through. --- src/Database/V5/Bloodhound/Client.hs | 11 +++---- src/Database/V5/Bloodhound/Internal/Client.hs | 31 +++++++++++++++++-- src/Database/V5/Bloodhound/Types.hs | 4 ++- 3 files changed, 36 insertions(+), 10 deletions(-) diff --git a/src/Database/V5/Bloodhound/Client.hs b/src/Database/V5/Bloodhound/Client.hs index c246a6ca..381f2aa4 100644 --- a/src/Database/V5/Bloodhound/Client.hs +++ b/src/Database/V5/Bloodhound/Client.hs @@ -104,7 +104,6 @@ import Control.Monad import Control.Monad.Catch import Control.Monad.IO.Class import Data.Aeson -import Data.Aeson.Types (Pair) import Data.ByteString.Lazy.Builder import qualified Data.ByteString.Lazy.Char8 as L import Data.Foldable (toList) @@ -898,13 +897,13 @@ mash = V.foldl' (\b x -> b <> byteString "\n" <> lazyByteString x) mkBulkStreamValue :: Text -> Text -> Text -> Text -> Value mkBulkStreamValue = mkBulkStreamValueWithMeta [] -mkBulkStreamValueWithMeta :: [Pair] -> Text -> Text -> Text -> Text -> Value +mkBulkStreamValueWithMeta :: [UpsertActionMetadata] -> Text -> Text -> Text -> Text -> Value mkBulkStreamValueWithMeta meta operation indexName mappingName docId = object [ operation .= object ([ "_index" .= indexName , "_type" .= mappingName , "_id" .= docId] - <> meta)] + <> (buildUpsertActionMetadata <$> meta))] mkBulkStreamValueAuto :: Text -> Text -> Text -> Value mkBulkStreamValueAuto operation indexName mappingName = @@ -960,10 +959,10 @@ encodeBulkOperation (BulkUpsert (IndexName indexName) (MappingName mappingName) (DocId docId) value - (UpsertActionMetadata actionMeta) - (UpsertDocMetadata docMeta)) = blob + actionMeta + docMeta) = blob where metadata = mkBulkStreamValueWithMeta actionMeta "update" indexName mappingName docId - doc = object $ ["doc" .= value] <> docMeta + doc = object $ ["doc" .= value] <> (buildUpsertPayloadMetadata <$> docMeta) blob = encode metadata <> "\n" <> encode doc encodeBulkOperation (BulkCreateEncoding (IndexName indexName) diff --git a/src/Database/V5/Bloodhound/Internal/Client.hs b/src/Database/V5/Bloodhound/Internal/Client.hs index 5293a1bc..ec5f951a 100644 --- a/src/Database/V5/Bloodhound/Internal/Client.hs +++ b/src/Database/V5/Bloodhound/Internal/Client.hs @@ -582,8 +582,33 @@ data Mapping = , mappingFields :: [MappingField] } deriving (Eq, Show) -newtype UpsertActionMetadata = UpsertActionMetadata [Pair] deriving (Eq, Show) -newtype UpsertDocMetadata = UpsertDocMetadata [Pair] deriving (Eq, Show) +data UpsertActionMetadata + = UA_RetryOnConflict Int + | UA_Source Bool + | UA_Version Int + deriving (Eq, Show) + +buildUpsertActionMetadata :: UpsertActionMetadata -> Pair +buildUpsertActionMetadata (UA_RetryOnConflict i) = "_retry_on_conflict" .= i +buildUpsertActionMetadata (UA_Source b) = "_source" .= b +buildUpsertActionMetadata (UA_Version i) = "_version" .= i + +data UpsertPayloadMetadata + = UP_DocAsUpsert Bool + | UP_Script Value + | UP_Lang Text + | UP_Params Value + | UP_Upsert Value + | UP_Source Bool + deriving (Eq, Show) + +buildUpsertPayloadMetadata :: UpsertPayloadMetadata -> Pair +buildUpsertPayloadMetadata (UP_DocAsUpsert a) = "doc_as_upsert" .= a +buildUpsertPayloadMetadata (UP_Script a) = "script" .= a +buildUpsertPayloadMetadata (UP_Lang a) = "lang" .= a +buildUpsertPayloadMetadata (UP_Params a) = "params" .= a +buildUpsertPayloadMetadata (UP_Upsert a) = "upsert" .= a +buildUpsertPayloadMetadata (UP_Source a) = "_source" .= a data AllocationPolicy = AllocAll -- ^ Allows shard allocation for all shards. @@ -637,7 +662,7 @@ data BulkOperation = -- ^ Delete the document | BulkUpdate IndexName MappingName DocId Value -- ^ Update the document, merging the new value with the existing one. - | BulkUpsert IndexName MappingName DocId Value UpsertActionMetadata UpsertDocMetadata + | BulkUpsert IndexName MappingName DocId Value [UpsertActionMetadata] [UpsertPayloadMetadata] -- ^ Update the document, or insert it if there is no existing one. deriving (Eq, Show) diff --git a/src/Database/V5/Bloodhound/Types.hs b/src/Database/V5/Bloodhound/Types.hs index 087e53e6..9fa03b29 100644 --- a/src/Database/V5/Bloodhound/Types.hs +++ b/src/Database/V5/Bloodhound/Types.hs @@ -170,7 +170,9 @@ module Database.V5.Bloodhound.Types , MappingName(..) , DocId(..) , UpsertActionMetadata(..) - , UpsertDocMetadata(..) + , buildUpsertActionMetadata + , UpsertPayloadMetadata(..) + , buildUpsertPayloadMetadata , CacheName(..) , CacheKey(..) , BulkOperation(..)