diff --git a/Cargo.toml b/Cargo.toml index 970aeb16..46d45d5f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,5 +5,6 @@ members = [ "metadata", "common", "config", + "config_center", "dubbo" ] diff --git a/config_center/Cargo.toml b/config_center/Cargo.toml new file mode 100644 index 00000000..5e1f6569 --- /dev/null +++ b/config_center/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "config_center" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +etcd-client = "0.9.2" +tokio = { version = "1.0", features = ["full"] } +async-trait = "0.1.56" +rand = "0.8.5" diff --git a/config_center/src/config_changed_event.rs b/config_center/src/config_changed_event.rs new file mode 100644 index 00000000..0f16b705 --- /dev/null +++ b/config_center/src/config_changed_event.rs @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#[derive(Debug)] +pub struct ConfigChangedEvent { + + pub key: String, + + pub group: String, + + pub content: String, + + pub change_type: ConfigChangeType, +} + +#[derive(Debug)] +pub enum ConfigChangeType { + ADDED, + MODIFIED, + DELETED, +} diff --git a/config_center/src/configuration_listener.rs b/config_center/src/configuration_listener.rs new file mode 100644 index 00000000..65d352b4 --- /dev/null +++ b/config_center/src/configuration_listener.rs @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use crate::config_changed_event::ConfigChangedEvent; + +pub trait ConfigurationListener { + + fn process(&self, event: ConfigChangedEvent); + + fn get_type(&self) -> String; +} diff --git a/config_center/src/dynamic_configuration.rs b/config_center/src/dynamic_configuration.rs new file mode 100644 index 00000000..188feb07 --- /dev/null +++ b/config_center/src/dynamic_configuration.rs @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::collections::HashSet; +use crate::configuration_listener::ConfigurationListener; +use async_trait::async_trait; + +#[async_trait] +pub trait DynamicConfiguration { + + async fn add_listener(mut self, key: &str, group: &str, listener: impl ConfigurationListener + std::marker::Send); + + async fn remove_listener(mut self, key: &str, group: &str, listener: impl ConfigurationListener + std::marker::Send); + + // TODO how to override + + async fn get_config(&mut self, key: &str, group: &str, timeout: i32) -> String; + + async fn get_properties(&mut self, key: &str, group: &str, timeout: i32) -> String; + + async fn publish_config(&mut self, key: &str, group: &str, content: &str) -> bool; + + async fn get_config_keys(&mut self, group: &str) -> HashSet; +} diff --git a/config_center/src/dynamic_configuration_factory.rs b/config_center/src/dynamic_configuration_factory.rs new file mode 100644 index 00000000..f53a8b7c --- /dev/null +++ b/config_center/src/dynamic_configuration_factory.rs @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use crate::url::URL; + +pub trait DynamicConfigurationFactory { + + fn get_dynamic_configuration(&self, url: URL) -> T; +} diff --git a/config_center/src/etcd_dynamic_configuration.rs b/config_center/src/etcd_dynamic_configuration.rs new file mode 100644 index 00000000..ce668e89 --- /dev/null +++ b/config_center/src/etcd_dynamic_configuration.rs @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::collections::{HashMap, HashSet}; +use etcd_client::{Client, GetOptions, Watcher, WatchOptions, WatchStream}; +use crate::configuration_listener::ConfigurationListener; +use crate::dynamic_configuration::DynamicConfiguration; +use async_trait::async_trait; +use rand::Rng; +use crate::url::URL; + +pub struct EtcdDynamicConfiguration { + + /** + * The final root path would be: /$NAME_SPACE/config + */ + pub root_path: String, + + pub client: Client, + + pub url: URL, + + pub watcher: Watcher, + + pub stream: WatchStream, + + pub watch_listener_map: HashMap>, +} + +const CONFIG_NAMESPACE_KEY: &str = "namespace"; + +const DEFAULT_GROUP: &str = "dubbo"; + +const PATH_SEPARATOR: &str = "/"; + +impl EtcdDynamicConfiguration { + + pub async fn new(self, url: URL) -> Self { + let mut client = Client::connect(["localhost:2379"], None).await.unwrap(); + let mut root_path = String::from(PATH_SEPARATOR); + root_path.push_str(url.get_parameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP).as_str()); + root_path.push_str("/config"); + let (watcher, stream) = client.watch("/", None).await.unwrap(); + let watch_listener_map = HashMap::new(); + + + // while let Some(resp) = stream.message().await? { + // println!("[{:?}] receive watch response", resp.watch_id()); + // for event in resp.events() { + // println!("event type: {:?}", event.event_type()); + // if let Some(kv) = event.kv() { + // println!("kv: {{{}: {}}}", kv.key_str()?, kv.value_str()?); + // } + // if EventType::Delete == event.event_type() { + // watcher.cancel_by_id(resp.watch_id()).await?; + // } + // } + // } + + + + + EtcdDynamicConfiguration { + root_path, + client, + url, + watcher, + stream, + watch_listener_map, + } + } + + pub fn get_path(&self, key: &str, group: &str) -> String { + if key.is_empty() { + return self.build_path(group); + } + self.build_path(group) + PATH_SEPARATOR + key + } + + pub fn build_path(&self, mut group: &str) -> String { + if group.is_empty() { + group = DEFAULT_GROUP; + } + self.root_path.clone() + PATH_SEPARATOR + group + } +} + +#[async_trait] +impl DynamicConfiguration for EtcdDynamicConfiguration { + + async fn add_listener(mut self, key: &str, group: &str, listener: impl ConfigurationListener + std::marker::Send) { + let path = self.get_path(key, group); + let watch_id = rand::thread_rng().gen(); + if !self.watch_listener_map.contains_key(path.as_str()) { + let mut watcher_map = HashMap::new(); + let listener_type = listener.get_type(); + let mut etcd_watcher = EtcdConfigWatcher::new(key.to_string(), group.to_string(), self.watcher, self.stream, watch_id, listener); + etcd_watcher.watch(watch_id); + watcher_map.insert(listener_type, etcd_watcher); + self.watch_listener_map.insert(path, watcher_map); + } else { + let watcher_map = self.watch_listener_map.get_mut(path.as_str()).unwrap(); + let listener_type = listener.get_type(); + let mut etcd_watcher = EtcdConfigWatcher::new(key.to_string(), group.to_string(), self.watcher, self.stream, watch_id, listener); + etcd_watcher.watch(watch_id); + watcher_map.insert(listener_type, etcd_watcher); + } + } + + async fn remove_listener(mut self, key: &str, group: &str, listener: impl ConfigurationListener + std::marker::Send) { + let path = self.get_path(key, group); + let watcher_map = self.watch_listener_map.get_mut(path.as_str()).unwrap(); + if !watcher_map.contains_key(listener.get_type().as_str()) { + return; + } + let watcher = watcher_map.get_mut(listener.get_type().as_str()).unwrap(); + watcher.cancelWatch(); + watcher_map.remove(listener.get_type().as_str()); + } + + async fn get_config(&mut self, key: &str, group: &str, timeout: i32) -> String { + let path = self.get_path(key, group); + let resp = self.client.get(path, None).await.unwrap(); + if let Some(kv) = resp.kvs().first() { + return kv.value_str().unwrap().to_string(); + } + return String::new(); + } + + async fn get_properties(&mut self, key: &str, group: &str, timeout: i32) -> String { + let mut path = String::new(); + if group.len() != 0 { + path = group.to_string() + PATH_SEPARATOR + key; + } else { + path = self.url.get_parameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + PATH_SEPARATOR + key; + } + let resp = self.client.get(key, None).await.unwrap(); + if let Some(kv) = resp.kvs().first() { + return kv.value_str().unwrap().to_string(); + } + return String::from(""); + } + + async fn publish_config(&mut self, key: &str, group: &str, content: &str) -> bool { + let path = self.get_path(key, group); + + // TODO need base64 encoding + + self.client.put(path, content, None).await.unwrap(); + + // TODO consider fix return value type. + true + } + + async fn get_config_keys(&mut self, group: &str) -> HashSet { + let path = self.get_path("", group); + let resp = self.client.get("", Some(GetOptions::new().with_prefix())).await.unwrap(); + let mut result = HashSet::new(); + for kv in resp.kvs() { + result.insert(kv.key_str().unwrap().to_string()); + } + result + } +} + +pub struct EtcdConfigWatcher { + //pub listener: Box, + pub key: String, + pub group: String, + pub normalized_key: String, + pub watcher: Watcher, + pub stream: WatchStream, + pub watch_id: i64, +} + +impl EtcdConfigWatcher { + + pub fn new(key: String, group: String, watcher: Watcher, stream: WatchStream, watch_id: i64, listener: impl ConfigurationListener) -> Self { + EtcdConfigWatcher { + //listener, + key, + group, + normalized_key: "".to_string(), // TODO + watcher, + stream, + watch_id, + } + } + + pub async fn watch(&mut self, watch_id: i64) { + self.watcher.watch(self.key.clone(), Some(WatchOptions::new().with_watch_id(watch_id))).await.unwrap(); + } + + pub fn cancelWatch(&mut self) { + let watch_id = self.watch_id; + self.watcher.cancel_by_id(watch_id); + } +} diff --git a/config_center/src/key.rs b/config_center/src/key.rs new file mode 100644 index 00000000..1fd20f7e --- /dev/null +++ b/config_center/src/key.rs @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +const CONFIG_NAMESPACE_KEY: &str = "config-center.namespace"; + +const CONFIG_GROUP_KEY: &str = "config-center.group"; diff --git a/config_center/src/lib.rs b/config_center/src/lib.rs new file mode 100644 index 00000000..df7f0a98 --- /dev/null +++ b/config_center/src/lib.rs @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +mod dynamic_configuration_factory; +mod configuration_listener; +mod config_changed_event; +mod dynamic_configuration; +mod etcd_dynamic_configuration; +mod key; +mod url; + +#[cfg(test)] +mod tests { + #[test] + fn it_works() { + let result = 2 + 2; + assert_eq!(result, 4); + } +} diff --git a/config_center/src/url.rs b/config_center/src/url.rs new file mode 100644 index 00000000..9525e837 --- /dev/null +++ b/config_center/src/url.rs @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::collections::HashMap; + +pub struct URL { + pub protocol: String, + pub username: String, + pub password: String, + pub host: String, + pub port: u32, + pub path: String, + parameters: HashMap, + methodParameters: HashMap>, +} + +impl URL { + + pub fn get_parameter(&self, key: &str, default_value: &str) -> String { + let value = match self.parameters.get(key) { + Some(value) => value, + None => { + default_value + }, + }; + if value.is_empty() { + return String::from(default_value) + } + String::from(value) + } +}