Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
248 changes: 111 additions & 137 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ path = "src/rusticsearch/main.rs"
[dependencies]
kite = "0.1.0"
kite_rocksdb = "0.1.0"
iron = "0.4.0"
router = "0.2.0"
persistent = "0.2.0"
rocket = "0.2"
rocket_contrib = "0.2"
rocket_codegen = "0.2"
url = "1.1.1"
log = "0.3.6"
unicode-segmentation = "0.1.2"
Expand Down
90 changes: 90 additions & 0 deletions src/rusticsearch/api/alias.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use std::collections::HashMap;
use std::sync::Arc;

use serde_json::Value;
use rocket::State;
use rocket_contrib::JSON;

use system::System;


#[get("/_alias/<alias_name>")]
pub fn get_global(alias_name: &str, system: State<Arc<System>>) -> Option<JSON<Value>> {
// Lock cluster metadata
let cluster_metadata = system.metadata.read().unwrap();

// Find alias
let mut found_aliases = HashMap::new();

for index_ref in cluster_metadata.names.find(alias_name) {
let index = match cluster_metadata.indices.get(&index_ref) {
Some(index) => index,
None => continue,
};

let mut inner_map = HashMap::new();
let mut inner_inner_map = HashMap::new();
inner_inner_map.insert(alias_name, HashMap::<String, String>::new());
inner_map.insert("aliases".to_owned(), inner_inner_map);
found_aliases.insert(index.canonical_name().clone(), inner_map);
}

if !found_aliases.is_empty() {
Some(JSON(json!(found_aliases)))
} else {
None
}
}


#[allow(unused_variables)]
#[get("/<index_name>/_alias")]
pub fn get_list(index_name: &str, system: State<Arc<System>>) -> JSON<Value> {
// TODO

JSON(json!({}))
}


#[get("/<index_name>/_alias/<alias_name>")]
pub fn get(index_name: &str, alias_name: &str, system: State<Arc<System>>) -> Option<JSON<Value>> {
// Lock cluster metadata
let cluster_metadata = system.metadata.read().unwrap();

// Get index
let index_ref = match cluster_metadata.names.find_canonical(index_name) {
Some(index_ref) => index_ref,
None => return None,
};

// Find alias
if cluster_metadata.names.iter_index_aliases(index_ref).any(|name| name == alias_name) {
Some(JSON(json!({})))
} else {
None
}
}


#[put("/<index_selector>/_alias/<alias_name>")]
pub fn put(index_selector: &str, alias_name: &str, system: State<Arc<System>>) -> JSON<Value> {
// Lock cluster metadata
let mut cluster_metadata = system.metadata.write().unwrap();

// Insert alias into names registry
let index_refs = cluster_metadata.names.find(index_selector);
match cluster_metadata.names.insert_or_replace_alias(alias_name.to_string(), index_refs) {
Ok(true) => {
system.log.info("[api] created alias", b!("index" => index_selector, "alias" => alias_name));
}
Ok(false) => {
system.log.info("[api] updated alias", b!("index" => index_selector, "alias" => alias_name));
}
Err(_) => {
// TODO
return JSON(json!({"acknowledged": false}));
}
}

JSON(json!({"acknowledged": true}))
}
97 changes: 0 additions & 97 deletions src/rusticsearch/api/alias_api.rs

This file was deleted.

45 changes: 30 additions & 15 deletions src/rusticsearch/api/bulk_api.rs → src/rusticsearch/api/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,24 @@ use serde_json;

use document::DocumentSource;

use api::persistent;
use api::iron::prelude::*;
use api::iron::status;
use api::utils::{json_response};
use std::sync::Arc;

use serde_json::Value;
use rocket::State;
use rocket::Data;
use rocket_contrib::JSON;

pub fn view_post_bulk(req: &mut Request) -> IronResult<Response> {
let ref system = get_system!(req);
use system::System;


#[post("/_bulk", data = "<data>")]
pub fn bulk(data: Data, system: State<Arc<System>>) -> Option<JSON<Value>> {
// Lock cluster metedata
let cluster_metadata = system.metadata.read().unwrap();

// Load data from body
let mut payload = String::new();
req.body.read_to_string(&mut payload).unwrap();
data.open().read_to_string(&mut payload).unwrap();

let mut items = Vec::new();

Expand All @@ -34,7 +37,13 @@ pub fn view_post_bulk(req: &mut Request) -> IronResult<Response> {
}

// Parse action line
let action_json = parse_json!(&action_line.unwrap());
let action_json: Value = match serde_json::from_str(&action_line.unwrap()) {
Ok(data) => data,
Err(_) => {
// TODO: Don't stop the bulk upload here
return None;
}
};

// Check action
// Action should be an object with only one key, the key name indicates the action and
Expand All @@ -54,7 +63,13 @@ pub fn view_post_bulk(req: &mut Request) -> IronResult<Response> {
match action_name.as_ref() {
"index" => {
let doc_line = payload_lines.next();
let doc_json = parse_json!(&doc_line.unwrap());;
let doc_json: Value = match serde_json::from_str(&doc_line.unwrap()) {
Ok(data) => data,
Err(_) => {
// TODO: Don't stop the bulk upload here
return None;
}
};

// Find index
let index = get_index_or_404!(cluster_metadata, doc_index);
Expand All @@ -65,7 +80,8 @@ pub fn view_post_bulk(req: &mut Request) -> IronResult<Response> {
let mapping = match index_metadata.mappings.get(doc_type) {
Some(mapping) => mapping,
None => {
return Ok(json_response(status::NotFound, json!({"message": "Mapping not found"})));
// TODO: Don't stop the bulk upload here
return None;
}
};

Expand All @@ -91,9 +107,8 @@ pub fn view_post_bulk(req: &mut Request) -> IronResult<Response> {
}
}

return Ok(json_response(status::Ok,
json!({
"took": items.len(),
"items": items,
})));
Some(JSON(json!({
"took": items.len(),
"items": items,
})))
}
99 changes: 99 additions & 0 deletions src/rusticsearch/api/document.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use std::sync::Arc;

use serde_json::Value;
use rocket::State;
use rocket_contrib::JSON;

use system::System;
use document::DocumentSource;


#[allow(unused_variables)]
#[get("/<index_name>/<mapping_name>/<doc_key>")]
pub fn get(index_name: &str, mapping_name: &str, doc_key: &str, system: State<Arc<System>>) -> Option<JSON<Value>> {
// Get index
let cluster_metadata = system.metadata.read().unwrap();
let index = get_index_or_404!(cluster_metadata, index_name);
let index_metadata = index.metadata.read().unwrap();

// Check that the mapping exists
if !index_metadata.mappings.contains_key(mapping_name) {
return None;
}

// Find document
/*
let index_reader = index.store.reader();
let doc = match index_reader.get_document_by_key(doc_key) {
Some(doc) => doc,
None => {
return Ok(json_response(status::NotFound, "{\"message\": \"Document not found\"}"));
}
};
*/


// Build JSON document
// TODO: This is probably completely wrong
// let json_object = BTreeMap::new();
// FIXME: for (field_name, field_value) in doc.fields.iter() {
// FIXME: json_object.insert(field_name.clone(), Json::Array(field_value.iter().map(|v| v.term.as_json()).collect::<Vec<_>>()));
// FIXME: }

Some(JSON(json!({})))
}


#[put("/<index_name>/<mapping_name>/<doc_key>", data = "<data>")]
pub fn put(index_name: &str, mapping_name: &str, doc_key: &str, data: JSON<Value>, system: State<Arc<System>>) -> Option<JSON<Value>> {
// Get index
let cluster_metadata = system.metadata.read().unwrap();
let index = get_index_or_404!(cluster_metadata, index_name);
let index_metadata = index.metadata.read().unwrap();

let doc = {
// Find mapping
let mapping = match index_metadata.mappings.get(mapping_name) {
Some(mapping) => mapping,
None => {
return None;
}
};

// Create document
let document_source = DocumentSource {
key: doc_key,
data: data.as_object().unwrap(),
};
document_source.prepare(mapping).unwrap()
};

index.store.insert_or_update_document(&doc).unwrap();

// TODO: {"_index":"wagtail","_type":"searchtests_searchtest","_id":"searchtests_searchtest:5378","_version":1,"created":true}
Some(JSON(json!({})))
}


#[delete("/<index_name>/<mapping_name>/<doc_key>")]
pub fn delete(index_name: &str, mapping_name: &str, doc_key: &str, system: State<Arc<System>>) -> Option<JSON<Value>> {
// Get index
let cluster_metadata = system.metadata.read().unwrap();
let index = get_index_or_404!(cluster_metadata, index_name);
let index_metadata = index.metadata.read().unwrap();

// Check that the mapping exists
if !index_metadata.mappings.contains_key(mapping_name) {
return None;
}

// Make sure the document exists
if !index.store.reader().contains_document_key(doc_key) {
return None;
}

// Delete document
index.store.remove_document_by_key(doc_key).unwrap();

Some(JSON(json!({})))
}
Loading