Skip to content
85 changes: 50 additions & 35 deletions src/Database/V5/Bloodhound/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ getSnapshots (SnapshotRepoName repoName) sel =
where
url = joinPath ["_snapshot", repoName, snapPath]
snapPath = case sel of
AllSnapshots -> "_all"
AllSnapshots -> "_all"
SnapshotList (s :| ss) -> T.intercalate "," (renderPath <$> (s:ss))
renderPath (SnapPattern t) = t
renderPath (ExactSnap (SnapshotName t)) = t
Expand Down Expand Up @@ -477,9 +477,9 @@ getNodesInfo sel = parseEsResponse =<< get =<< url
where
url = joinPath ["_nodes", selectionSeg]
selectionSeg = case sel of
LocalNode -> "_local"
LocalNode -> "_local"
NodeList (l :| ls) -> T.intercalate "," (selToSeg <$> (l:ls))
AllNodes -> "_all"
AllNodes -> "_all"
selToSeg (NodeByName (NodeName n)) = n
selToSeg (NodeByFullNodeId (FullNodeId i)) = i
selToSeg (NodeByHost (Server s)) = s
Expand All @@ -495,9 +495,9 @@ getNodesStats sel = parseEsResponse =<< get =<< url
where
url = joinPath ["_nodes", selectionSeg, "stats"]
selectionSeg = case sel of
LocalNode -> "_local"
LocalNode -> "_local"
NodeList (l :| ls) -> T.intercalate "," (selToSeg <$> (l:ls))
AllNodes -> "_all"
AllNodes -> "_all"
selToSeg (NodeByName (NodeName n)) = n
selToSeg (NodeByFullNodeId (FullNodeId i)) = i
selToSeg (NodeByHost (Server s)) = s
Expand All @@ -523,13 +523,13 @@ createIndex indexSettings (IndexName indexName) =
createIndexWith :: MonadBH m
=> [UpdatableIndexSetting]
-> Int -- ^ shard count
-> IndexName
-> IndexName
-> m Reply
createIndexWith updates shards (IndexName indexName) =
bindM2 put url (return (Just body))
where url = joinPath [indexName]
body = encode $ object
["settings" .= deepMerge
["settings" .= deepMerge
( HM.singleton "index.number_of_shards" (toJSON shards) :
[u | Object u <- toJSON <$> updates]
)
Expand Down Expand Up @@ -570,13 +570,14 @@ updateIndexSettings updates (IndexName indexName) =

getIndexSettings :: (MonadBH m, MonadThrow m) => IndexName
-> m (Either EsError IndexSettingsSummary)

getIndexSettings (IndexName indexName) =
parseEsResponse =<< get =<< url
where
url = joinPath [indexName, "_settings"]

-- | 'forceMergeIndex'
--
-- | 'forceMergeIndex'
--
-- The force merge API allows to force merging of one or more indices through
-- an API. The merge relates to the number of segments a Lucene index holds
-- within each shard. The force merge operation allows to reduce the number of
Expand Down Expand Up @@ -618,7 +619,7 @@ deepMerge = LS.foldl' go mempty
where go acc = LS.foldl' go' acc . HM.toList
go' acc (k, v) = HM.insertWith merge k v acc
merge (Object a) (Object b) = Object (deepMerge [a, b])
merge _ b = b
merge _ b = b


statusCodeIs :: (Int, Int) -> Reply -> Bool
Expand All @@ -644,13 +645,13 @@ parseEsResponse :: (MonadThrow m, FromJSON a) => Reply
parseEsResponse reply
| respIsTwoHunna reply = case eitherDecode body of
Right a -> return (Right a)
Left _ -> tryParseError
Left _ -> tryParseError
| otherwise = tryParseError
where body = responseBody reply
tryParseError = case eitherDecode body of
Right e -> return (Left e)
-- this case should not be possible
Left _ -> explode
Left _ -> explode
explode = throwM (EsProtocolException body)

-- | 'indexExists' enables you to check if an index exists. Returns 'Bool'
Expand Down Expand Up @@ -719,7 +720,7 @@ listIndices =
indexVal <- HM.lookup "index" obj
case indexVal of
String txt -> Just (IndexName txt)
_ -> Nothing
_ -> Nothing
_ -> Nothing

-- | 'updateIndexAliases' updates the server's index alias
Expand Down Expand Up @@ -805,10 +806,10 @@ putMapping (IndexName indexName) (MappingName mappingName) mapping =
versionCtlParams :: IndexDocumentSettings -> [(Text, Maybe Text)]
versionCtlParams cfg =
case idsVersionControl cfg of
NoVersionControl -> []
InternalVersion v -> versionParams v "internal"
ExternalGT (ExternalDocVersion v) -> versionParams v "external_gt"
ExternalGTE (ExternalDocVersion v) -> versionParams v "external_gte"
NoVersionControl -> []
InternalVersion v -> versionParams v "internal"
ExternalGT (ExternalDocVersion v) -> versionParams v "external_gt"
ExternalGTE (ExternalDocVersion v) -> versionParams v "external_gte"
ForceVersion (ExternalDocVersion v) -> versionParams v "force"
where
vt = showText . docVersionNumber
Expand All @@ -834,7 +835,7 @@ indexDocument (IndexName indexName)
bindM2 put url (return body)
where url = addQuery params <$> joinPath [indexName, mappingName, docId]
parentParams = case idsParent cfg of
Nothing -> []
Nothing -> []
Just (DocumentParent (DocId p)) -> [ ("parent", Just p) ]
params = versionCtlParams cfg ++ parentParams
body = Just (encode document)
Expand Down Expand Up @@ -895,11 +896,15 @@ mash :: Builder -> V.Vector L.ByteString -> Builder
mash = V.foldl' (\b x -> b <> byteString "\n" <> lazyByteString x)

mkBulkStreamValue :: Text -> Text -> Text -> Text -> Value
mkBulkStreamValue operation indexName mappingName docId =
object [operation .=
object [ "_index" .= indexName
, "_type" .= mappingName
, "_id" .= docId]]
mkBulkStreamValue = mkBulkStreamValueWithMeta []

mkBulkStreamValueWithMeta :: [UpsertActionMetadata] -> Text -> Text -> Text -> Text -> Value
mkBulkStreamValueWithMeta meta operation indexName mappingName docId =
object [ operation .=
object ([ "_index" .= indexName
, "_type" .= mappingName
, "_id" .= docId]
<> (buildUpsertActionMetadata <$> meta))]

mkBulkStreamValueAuto :: Text -> Text -> Text -> Value
mkBulkStreamValueAuto operation indexName mappingName =
Expand All @@ -918,7 +923,7 @@ encodeBulkOperation (BulkIndex (IndexName indexName)
(MappingName mappingName)
(DocId docId) value) = blob
where metadata = mkBulkStreamValue "index" indexName mappingName docId
blob = encode metadata `mappend` "\n" `mappend` encode value
blob = encode metadata <> "\n" <> encode value

encodeBulkOperation (BulkIndexAuto (IndexName indexName)
(MappingName mappingName)
Expand All @@ -936,7 +941,7 @@ encodeBulkOperation (BulkCreate (IndexName indexName)
(MappingName mappingName)
(DocId docId) value) = blob
where metadata = mkBulkStreamValue "create" indexName mappingName docId
blob = encode metadata `mappend` "\n" `mappend` encode value
blob = encode metadata <> "\n" <> encode value

encodeBulkOperation (BulkDelete (IndexName indexName)
(MappingName mappingName)
Expand All @@ -949,7 +954,17 @@ encodeBulkOperation (BulkUpdate (IndexName indexName)
(DocId docId) value) = blob
where metadata = mkBulkStreamValue "update" indexName mappingName docId
doc = object ["doc" .= value]
blob = encode metadata `mappend` "\n" `mappend` encode doc
blob = encode metadata <> "\n" <> encode doc

encodeBulkOperation (BulkUpsert (IndexName indexName)
(MappingName mappingName)
(DocId docId)
value
actionMeta
docMeta) = blob
where metadata = mkBulkStreamValueWithMeta actionMeta "update" indexName mappingName docId
doc = object $ ["doc" .= value] <> (buildUpsertPayloadMetadata <$> docMeta)
blob = encode metadata <> "\n" <> encode doc

encodeBulkOperation (BulkCreateEncoding (IndexName indexName)
(MappingName mappingName)
Expand Down Expand Up @@ -1031,10 +1046,10 @@ searchByType (IndexName indexName)
-- search results. Note that the search is put into 'SearchTypeScan'
-- mode and thus results will not be sorted. Combine this with
-- 'advanceScroll' to efficiently stream through the full result set
getInitialScroll ::
(FromJSON a, MonadThrow m, MonadBH m) => IndexName ->
MappingName ->
Search ->
getInitialScroll ::
(FromJSON a, MonadThrow m, MonadBH m) => IndexName ->
MappingName ->
Search ->
m (Either EsError (SearchResult a))
getInitialScroll (IndexName indexName) (MappingName mappingName) search' = do
let url = addQuery params <$> joinPath [indexName, mappingName, "_search"]
Expand All @@ -1059,14 +1074,14 @@ getInitialSortedScroll (IndexName indexName) (MappingName mappingName) search =
resp' <- bindM2 dispatchSearch url (return search)
parseEsResponse resp'

scroll' :: (FromJSON a, MonadBH m, MonadThrow m) => Maybe ScrollId ->
scroll' :: (FromJSON a, MonadBH m, MonadThrow m) => Maybe ScrollId ->
m ([Hit a], Maybe ScrollId)
scroll' Nothing = return ([], Nothing)
scroll' (Just sid) = do
res <- advanceScroll sid 60
case res of
Right SearchResult {..} -> return (hits searchHits, scrollId)
Left _ -> return ([], Nothing)
Left _ -> return ([], Nothing)

-- | Use the given scroll to fetch the next page of documents. If there are no
-- further pages, 'SearchResult.searchHits.hits' will be '[]'.
Expand All @@ -1086,13 +1101,13 @@ advanceScroll (ScrollId sid) scroll = do
where scrollTime = showText secs <> "s"
secs :: Integer
secs = round scroll

scrollObject = object [ "scroll" .= scrollTime
, "scroll_id" .= sid
]

simpleAccumulator ::
(FromJSON a, MonadBH m, MonadThrow m) =>
simpleAccumulator ::
(FromJSON a, MonadBH m, MonadThrow m) =>
[Hit a] ->
([Hit a], Maybe ScrollId) ->
m ([Hit a], Maybe ScrollId)
Expand Down
52 changes: 41 additions & 11 deletions src/Database/V5/Bloodhound/Internal/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ module Database.V5.Bloodhound.Internal.Client where

import Bloodhound.Import

import qualified Data.Text as T
import qualified Data.Traversable as DT
import qualified Data.HashMap.Strict as HM
import qualified Data.Vector as V
import qualified Data.Version as Vers
import qualified Data.HashMap.Strict as HM
import qualified Data.Text as T
import qualified Data.Traversable as DT
import qualified Data.Vector as V
import qualified Data.Version as Vers
import GHC.Enum
import Network.HTTP.Client
import qualified Text.ParserCombinators.ReadP as RP
import Text.Read (Read(..))
import qualified Text.Read as TR
import qualified Text.ParserCombinators.ReadP as RP
import Text.Read (Read (..))
import qualified Text.Read as TR

import Database.V5.Bloodhound.Internal.Analysis
import Database.V5.Bloodhound.Internal.Newtypes
Expand Down Expand Up @@ -388,13 +388,13 @@ data Compression
instance ToJSON Compression where
toJSON x = case x of
CompressionDefault -> toJSON ("default" :: Text)
CompressionBest -> toJSON ("best_compression" :: Text)
CompressionBest -> toJSON ("best_compression" :: Text)

instance FromJSON Compression where
parseJSON = withText "Compression" $ \t -> case t of
"default" -> return CompressionDefault
"default" -> return CompressionDefault
"best_compression" -> return CompressionBest
_ -> fail "invalid compression codec"
_ -> fail "invalid compression codec"

-- | A measure of bytes used for various configurations. You may want
-- to use smart constructors like 'gigabytes' for larger values.
Expand Down Expand Up @@ -582,6 +582,34 @@ data Mapping =
, mappingFields :: [MappingField] }
deriving (Eq, Show)

data UpsertActionMetadata
= UA_RetryOnConflict Int
| UA_Source Bool
| UA_Version Int
deriving (Eq, Show)

buildUpsertActionMetadata :: UpsertActionMetadata -> Pair
buildUpsertActionMetadata (UA_RetryOnConflict i) = "_retry_on_conflict" .= i
buildUpsertActionMetadata (UA_Source b) = "_source" .= b
buildUpsertActionMetadata (UA_Version i) = "_version" .= i

data UpsertPayloadMetadata
= UP_DocAsUpsert Bool
| UP_Script Value
| UP_Lang Text
| UP_Params Value
| UP_Upsert Value
| UP_Source Bool
deriving (Eq, Show)

buildUpsertPayloadMetadata :: UpsertPayloadMetadata -> Pair
buildUpsertPayloadMetadata (UP_DocAsUpsert a) = "doc_as_upsert" .= a
buildUpsertPayloadMetadata (UP_Script a) = "script" .= a
buildUpsertPayloadMetadata (UP_Lang a) = "lang" .= a
buildUpsertPayloadMetadata (UP_Params a) = "params" .= a
buildUpsertPayloadMetadata (UP_Upsert a) = "upsert" .= a
buildUpsertPayloadMetadata (UP_Source a) = "_source" .= a

data AllocationPolicy = AllocAll
-- ^ Allows shard allocation for all shards.
| AllocPrimaries
Expand Down Expand Up @@ -634,6 +662,8 @@ data BulkOperation =
-- ^ Delete the document
| BulkUpdate IndexName MappingName DocId Value
-- ^ Update the document, merging the new value with the existing one.
| BulkUpsert IndexName MappingName DocId Value [UpsertActionMetadata] [UpsertPayloadMetadata]
-- ^ Update the document, or insert it if there is no existing one.
deriving (Eq, Show)

{-| 'EsResult' describes the standard wrapper JSON document that you see in
Expand Down
10 changes: 7 additions & 3 deletions src/Database/V5/Bloodhound/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ module Database.V5.Bloodhound.Types
, TemplatePattern(..)
, MappingName(..)
, DocId(..)
, UpsertActionMetadata(..)
, buildUpsertActionMetadata
, UpsertPayloadMetadata(..)
, buildUpsertPayloadMetadata
, CacheName(..)
, CacheKey(..)
, BulkOperation(..)
Expand Down Expand Up @@ -416,15 +420,15 @@ module Database.V5.Bloodhound.Types

import Bloodhound.Import

import Database.V5.Bloodhound.Types.Class
import Database.V5.Bloodhound.Internal.Analysis
import Database.V5.Bloodhound.Internal.Aggregation
import Database.V5.Bloodhound.Internal.Analysis
import Database.V5.Bloodhound.Internal.Client
import Database.V5.Bloodhound.Internal.Highlight
import Database.V5.Bloodhound.Internal.Newtypes
import Database.V5.Bloodhound.Internal.Query
import Database.V5.Bloodhound.Internal.Sort
import Database.V5.Bloodhound.Internal.Suggest
import Database.V5.Bloodhound.Types.Class

{-| 'unpackId' is a silly convenience function that gets used once.
-}
Expand Down Expand Up @@ -536,4 +540,4 @@ instance (FromJSON a) => FromJSON (SearchResult a) where

newtype ScrollId =
ScrollId Text
deriving (Eq, Show, Ord, ToJSON, FromJSON)
deriving (Eq, Show, Ord, ToJSON, FromJSON)