diff --git a/src/Database/V5/Bloodhound/Client.hs b/src/Database/V5/Bloodhound/Client.hs index 3a1994e0..1287f0b4 100644 --- a/src/Database/V5/Bloodhound/Client.hs +++ b/src/Database/V5/Bloodhound/Client.hs @@ -409,7 +409,7 @@ getSnapshots (SnapshotRepoName repoName) sel = where url = joinPath ["_snapshot", repoName, snapPath] snapPath = case sel of - AllSnapshots -> "_all" + AllSnapshots -> "_all" SnapshotList (s :| ss) -> T.intercalate "," (renderPath <$> (s:ss)) renderPath (SnapPattern t) = t renderPath (ExactSnap (SnapshotName t)) = t @@ -477,9 +477,9 @@ getNodesInfo sel = parseEsResponse =<< get =<< url where url = joinPath ["_nodes", selectionSeg] selectionSeg = case sel of - LocalNode -> "_local" + LocalNode -> "_local" NodeList (l :| ls) -> T.intercalate "," (selToSeg <$> (l:ls)) - AllNodes -> "_all" + AllNodes -> "_all" selToSeg (NodeByName (NodeName n)) = n selToSeg (NodeByFullNodeId (FullNodeId i)) = i selToSeg (NodeByHost (Server s)) = s @@ -495,9 +495,9 @@ getNodesStats sel = parseEsResponse =<< get =<< url where url = joinPath ["_nodes", selectionSeg, "stats"] selectionSeg = case sel of - LocalNode -> "_local" + LocalNode -> "_local" NodeList (l :| ls) -> T.intercalate "," (selToSeg <$> (l:ls)) - AllNodes -> "_all" + AllNodes -> "_all" selToSeg (NodeByName (NodeName n)) = n selToSeg (NodeByFullNodeId (FullNodeId i)) = i selToSeg (NodeByHost (Server s)) = s @@ -523,13 +523,13 @@ createIndex indexSettings (IndexName indexName) = createIndexWith :: MonadBH m => [UpdatableIndexSetting] -> Int -- ^ shard count - -> IndexName + -> IndexName -> m Reply createIndexWith updates shards (IndexName indexName) = bindM2 put url (return (Just body)) where url = joinPath [indexName] body = encode $ object - ["settings" .= deepMerge + ["settings" .= deepMerge ( HM.singleton "index.number_of_shards" (toJSON shards) : [u | Object u <- toJSON <$> updates] ) @@ -570,13 +570,14 @@ updateIndexSettings updates (IndexName indexName) = getIndexSettings :: (MonadBH m, MonadThrow m) => IndexName -> m (Either EsError IndexSettingsSummary) + getIndexSettings (IndexName indexName) = parseEsResponse =<< get =<< url where url = joinPath [indexName, "_settings"] --- | 'forceMergeIndex' --- +-- | 'forceMergeIndex' +-- -- The force merge API allows to force merging of one or more indices through -- an API. The merge relates to the number of segments a Lucene index holds -- within each shard. The force merge operation allows to reduce the number of @@ -618,7 +619,7 @@ deepMerge = LS.foldl' go mempty where go acc = LS.foldl' go' acc . HM.toList go' acc (k, v) = HM.insertWith merge k v acc merge (Object a) (Object b) = Object (deepMerge [a, b]) - merge _ b = b + merge _ b = b statusCodeIs :: (Int, Int) -> Reply -> Bool @@ -644,13 +645,13 @@ parseEsResponse :: (MonadThrow m, FromJSON a) => Reply parseEsResponse reply | respIsTwoHunna reply = case eitherDecode body of Right a -> return (Right a) - Left _ -> tryParseError + Left _ -> tryParseError | otherwise = tryParseError where body = responseBody reply tryParseError = case eitherDecode body of Right e -> return (Left e) -- this case should not be possible - Left _ -> explode + Left _ -> explode explode = throwM (EsProtocolException body) -- | 'indexExists' enables you to check if an index exists. Returns 'Bool' @@ -719,7 +720,7 @@ listIndices = indexVal <- HM.lookup "index" obj case indexVal of String txt -> Just (IndexName txt) - _ -> Nothing + _ -> Nothing _ -> Nothing -- | 'updateIndexAliases' updates the server's index alias @@ -805,10 +806,10 @@ putMapping (IndexName indexName) (MappingName mappingName) mapping = versionCtlParams :: IndexDocumentSettings -> [(Text, Maybe Text)] versionCtlParams cfg = case idsVersionControl cfg of - NoVersionControl -> [] - InternalVersion v -> versionParams v "internal" - ExternalGT (ExternalDocVersion v) -> versionParams v "external_gt" - ExternalGTE (ExternalDocVersion v) -> versionParams v "external_gte" + NoVersionControl -> [] + InternalVersion v -> versionParams v "internal" + ExternalGT (ExternalDocVersion v) -> versionParams v "external_gt" + ExternalGTE (ExternalDocVersion v) -> versionParams v "external_gte" ForceVersion (ExternalDocVersion v) -> versionParams v "force" where vt = showText . docVersionNumber @@ -834,7 +835,7 @@ indexDocument (IndexName indexName) bindM2 put url (return body) where url = addQuery params <$> joinPath [indexName, mappingName, docId] parentParams = case idsParent cfg of - Nothing -> [] + Nothing -> [] Just (DocumentParent (DocId p)) -> [ ("parent", Just p) ] params = versionCtlParams cfg ++ parentParams body = Just (encode document) @@ -895,11 +896,15 @@ mash :: Builder -> V.Vector L.ByteString -> Builder mash = V.foldl' (\b x -> b <> byteString "\n" <> lazyByteString x) mkBulkStreamValue :: Text -> Text -> Text -> Text -> Value -mkBulkStreamValue operation indexName mappingName docId = - object [operation .= - object [ "_index" .= indexName - , "_type" .= mappingName - , "_id" .= docId]] +mkBulkStreamValue = mkBulkStreamValueWithMeta [] + +mkBulkStreamValueWithMeta :: [UpsertActionMetadata] -> Text -> Text -> Text -> Text -> Value +mkBulkStreamValueWithMeta meta operation indexName mappingName docId = + object [ operation .= + object ([ "_index" .= indexName + , "_type" .= mappingName + , "_id" .= docId] + <> (buildUpsertActionMetadata <$> meta))] mkBulkStreamValueAuto :: Text -> Text -> Text -> Value mkBulkStreamValueAuto operation indexName mappingName = @@ -918,7 +923,7 @@ encodeBulkOperation (BulkIndex (IndexName indexName) (MappingName mappingName) (DocId docId) value) = blob where metadata = mkBulkStreamValue "index" indexName mappingName docId - blob = encode metadata `mappend` "\n" `mappend` encode value + blob = encode metadata <> "\n" <> encode value encodeBulkOperation (BulkIndexAuto (IndexName indexName) (MappingName mappingName) @@ -936,7 +941,7 @@ encodeBulkOperation (BulkCreate (IndexName indexName) (MappingName mappingName) (DocId docId) value) = blob where metadata = mkBulkStreamValue "create" indexName mappingName docId - blob = encode metadata `mappend` "\n" `mappend` encode value + blob = encode metadata <> "\n" <> encode value encodeBulkOperation (BulkDelete (IndexName indexName) (MappingName mappingName) @@ -949,7 +954,17 @@ encodeBulkOperation (BulkUpdate (IndexName indexName) (DocId docId) value) = blob where metadata = mkBulkStreamValue "update" indexName mappingName docId doc = object ["doc" .= value] - blob = encode metadata `mappend` "\n" `mappend` encode doc + blob = encode metadata <> "\n" <> encode doc + +encodeBulkOperation (BulkUpsert (IndexName indexName) + (MappingName mappingName) + (DocId docId) + value + actionMeta + docMeta) = blob + where metadata = mkBulkStreamValueWithMeta actionMeta "update" indexName mappingName docId + doc = object $ ["doc" .= value] <> (buildUpsertPayloadMetadata <$> docMeta) + blob = encode metadata <> "\n" <> encode doc encodeBulkOperation (BulkCreateEncoding (IndexName indexName) (MappingName mappingName) @@ -1031,10 +1046,10 @@ searchByType (IndexName indexName) -- search results. Note that the search is put into 'SearchTypeScan' -- mode and thus results will not be sorted. Combine this with -- 'advanceScroll' to efficiently stream through the full result set -getInitialScroll :: - (FromJSON a, MonadThrow m, MonadBH m) => IndexName -> - MappingName -> - Search -> +getInitialScroll :: + (FromJSON a, MonadThrow m, MonadBH m) => IndexName -> + MappingName -> + Search -> m (Either EsError (SearchResult a)) getInitialScroll (IndexName indexName) (MappingName mappingName) search' = do let url = addQuery params <$> joinPath [indexName, mappingName, "_search"] @@ -1059,14 +1074,14 @@ getInitialSortedScroll (IndexName indexName) (MappingName mappingName) search = resp' <- bindM2 dispatchSearch url (return search) parseEsResponse resp' -scroll' :: (FromJSON a, MonadBH m, MonadThrow m) => Maybe ScrollId -> +scroll' :: (FromJSON a, MonadBH m, MonadThrow m) => Maybe ScrollId -> m ([Hit a], Maybe ScrollId) scroll' Nothing = return ([], Nothing) scroll' (Just sid) = do res <- advanceScroll sid 60 case res of Right SearchResult {..} -> return (hits searchHits, scrollId) - Left _ -> return ([], Nothing) + Left _ -> return ([], Nothing) -- | Use the given scroll to fetch the next page of documents. If there are no -- further pages, 'SearchResult.searchHits.hits' will be '[]'. @@ -1086,13 +1101,13 @@ advanceScroll (ScrollId sid) scroll = do where scrollTime = showText secs <> "s" secs :: Integer secs = round scroll - + scrollObject = object [ "scroll" .= scrollTime , "scroll_id" .= sid ] -simpleAccumulator :: - (FromJSON a, MonadBH m, MonadThrow m) => +simpleAccumulator :: + (FromJSON a, MonadBH m, MonadThrow m) => [Hit a] -> ([Hit a], Maybe ScrollId) -> m ([Hit a], Maybe ScrollId) diff --git a/src/Database/V5/Bloodhound/Internal/Client.hs b/src/Database/V5/Bloodhound/Internal/Client.hs index 0e955b76..ec5f951a 100644 --- a/src/Database/V5/Bloodhound/Internal/Client.hs +++ b/src/Database/V5/Bloodhound/Internal/Client.hs @@ -9,16 +9,16 @@ module Database.V5.Bloodhound.Internal.Client where import Bloodhound.Import -import qualified Data.Text as T -import qualified Data.Traversable as DT -import qualified Data.HashMap.Strict as HM -import qualified Data.Vector as V -import qualified Data.Version as Vers +import qualified Data.HashMap.Strict as HM +import qualified Data.Text as T +import qualified Data.Traversable as DT +import qualified Data.Vector as V +import qualified Data.Version as Vers import GHC.Enum import Network.HTTP.Client -import qualified Text.ParserCombinators.ReadP as RP -import Text.Read (Read(..)) -import qualified Text.Read as TR +import qualified Text.ParserCombinators.ReadP as RP +import Text.Read (Read (..)) +import qualified Text.Read as TR import Database.V5.Bloodhound.Internal.Analysis import Database.V5.Bloodhound.Internal.Newtypes @@ -388,13 +388,13 @@ data Compression instance ToJSON Compression where toJSON x = case x of CompressionDefault -> toJSON ("default" :: Text) - CompressionBest -> toJSON ("best_compression" :: Text) + CompressionBest -> toJSON ("best_compression" :: Text) instance FromJSON Compression where parseJSON = withText "Compression" $ \t -> case t of - "default" -> return CompressionDefault + "default" -> return CompressionDefault "best_compression" -> return CompressionBest - _ -> fail "invalid compression codec" + _ -> fail "invalid compression codec" -- | A measure of bytes used for various configurations. You may want -- to use smart constructors like 'gigabytes' for larger values. @@ -582,6 +582,34 @@ data Mapping = , mappingFields :: [MappingField] } deriving (Eq, Show) +data UpsertActionMetadata + = UA_RetryOnConflict Int + | UA_Source Bool + | UA_Version Int + deriving (Eq, Show) + +buildUpsertActionMetadata :: UpsertActionMetadata -> Pair +buildUpsertActionMetadata (UA_RetryOnConflict i) = "_retry_on_conflict" .= i +buildUpsertActionMetadata (UA_Source b) = "_source" .= b +buildUpsertActionMetadata (UA_Version i) = "_version" .= i + +data UpsertPayloadMetadata + = UP_DocAsUpsert Bool + | UP_Script Value + | UP_Lang Text + | UP_Params Value + | UP_Upsert Value + | UP_Source Bool + deriving (Eq, Show) + +buildUpsertPayloadMetadata :: UpsertPayloadMetadata -> Pair +buildUpsertPayloadMetadata (UP_DocAsUpsert a) = "doc_as_upsert" .= a +buildUpsertPayloadMetadata (UP_Script a) = "script" .= a +buildUpsertPayloadMetadata (UP_Lang a) = "lang" .= a +buildUpsertPayloadMetadata (UP_Params a) = "params" .= a +buildUpsertPayloadMetadata (UP_Upsert a) = "upsert" .= a +buildUpsertPayloadMetadata (UP_Source a) = "_source" .= a + data AllocationPolicy = AllocAll -- ^ Allows shard allocation for all shards. | AllocPrimaries @@ -634,6 +662,8 @@ data BulkOperation = -- ^ Delete the document | BulkUpdate IndexName MappingName DocId Value -- ^ Update the document, merging the new value with the existing one. + | BulkUpsert IndexName MappingName DocId Value [UpsertActionMetadata] [UpsertPayloadMetadata] + -- ^ Update the document, or insert it if there is no existing one. deriving (Eq, Show) {-| 'EsResult' describes the standard wrapper JSON document that you see in diff --git a/src/Database/V5/Bloodhound/Types.hs b/src/Database/V5/Bloodhound/Types.hs index 403f7713..a2a3f396 100644 --- a/src/Database/V5/Bloodhound/Types.hs +++ b/src/Database/V5/Bloodhound/Types.hs @@ -169,6 +169,10 @@ module Database.V5.Bloodhound.Types , TemplatePattern(..) , MappingName(..) , DocId(..) + , UpsertActionMetadata(..) + , buildUpsertActionMetadata + , UpsertPayloadMetadata(..) + , buildUpsertPayloadMetadata , CacheName(..) , CacheKey(..) , BulkOperation(..) @@ -416,15 +420,15 @@ module Database.V5.Bloodhound.Types import Bloodhound.Import -import Database.V5.Bloodhound.Types.Class -import Database.V5.Bloodhound.Internal.Analysis import Database.V5.Bloodhound.Internal.Aggregation +import Database.V5.Bloodhound.Internal.Analysis import Database.V5.Bloodhound.Internal.Client import Database.V5.Bloodhound.Internal.Highlight import Database.V5.Bloodhound.Internal.Newtypes import Database.V5.Bloodhound.Internal.Query import Database.V5.Bloodhound.Internal.Sort import Database.V5.Bloodhound.Internal.Suggest +import Database.V5.Bloodhound.Types.Class {-| 'unpackId' is a silly convenience function that gets used once. -} @@ -536,4 +540,4 @@ instance (FromJSON a) => FromJSON (SearchResult a) where newtype ScrollId = ScrollId Text - deriving (Eq, Show, Ord, ToJSON, FromJSON) + deriving (Eq, Show, Ord, ToJSON, FromJSON) \ No newline at end of file