diff --git a/Cargo.toml b/Cargo.toml index 8ebd88fb..0d93f1a5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,9 @@ repository = "https://github.com/FifthTry/fpm" homepage = "https://fpm.dev" build = "build.rs" +[features] +controller = [] + [dependencies] async-recursion = "0.3.2" camino = "1.0.5" diff --git a/src/commands/serve.rs b/src/commands/serve.rs index f6ac374d..1ead7bc4 100644 --- a/src/commands/serve.rs +++ b/src/commands/serve.rs @@ -145,6 +145,19 @@ async fn serve_static(req: actix_web::HttpRequest) -> actix_web::HttpResponse { #[actix_web::main] pub async fn serve(bind_address: &str, port: &str) -> std::io::Result<()> { + if cfg!(feature = "controller") { + // fpm-controller base path and ec2 instance id (hardcoded for now) + let fpm_controller: String = std::env::var("FPM_CONTROLLER") + .unwrap_or_else(|_| "https://controller.fifthtry.com".to_string()); + let fpm_instance: String = + std::env::var("FPM_INSTANCE_ID").expect("FPM_INSTANCE_ID is required"); + + match crate::controller::resolve_dependencies(fpm_instance, fpm_controller).await { + Ok(_) => println!("Dependencies resolved"), + Err(e) => panic!("Error resolving dependencies using controller!!: {:?}", e), + } + } + println!("### Server Started ###"); println!("Go to: http://{}:{}", bind_address, port); actix_web::HttpServer::new(|| { diff --git a/src/config.rs b/src/config.rs index 5b262a66..012d1cfd 100644 --- a/src/config.rs +++ b/src/config.rs @@ -705,6 +705,11 @@ impl Package { } } + pub fn with_zip(mut self, zip: String) -> fpm::Package { + self.zip = Some(zip); + self + } + pub fn get_dependency_for_interface(&self, interface: &str) -> Option<&fpm::Dependency> { self.dependencies .iter() diff --git a/src/controller.rs b/src/controller.rs new file mode 100644 index 00000000..a4ed2c38 --- /dev/null +++ b/src/controller.rs @@ -0,0 +1,155 @@ +/// FPM Controller Support +/// FPM cli supports communication with fpm controller. This is an optional feature, and is only +/// available when controller feature is enabled, which is not enabled by default. +/// Controller Communication +/// When controller feature is enabled, fpm serve will first communicate with the FPM controller +/// service’s /get-package/ API. + +/// FPM Controller Service Endpoint +/// The FPM Controller Service’s endpoint is computed by using environment variable FPM_CONTROLLER, +/// which will look something like this: https://controller.fifthtry.com, with the API path. +/// FPM Controller Service has more than one APIs: /get-package/ and /fpm-ready/. + +/// get-package: +/// Through an environment variable FPM_INSTANCE_ID, the fpm serve will learn it’s instance id, and +/// it will pass the instance id to the get-package API. +/// The API returns the URL of the package to be downloaded, git repository URL and the package name. +/// FPM will clone the git repository in the current directory. The current directory will contain +/// FPM.ftd and other files of the package. +/// FPM will then calls fpm install on it. + +/// fpm-ready: +/// Once dependencies are ready fpm calls /fpm-ready/ API on the controller. We will pass the +/// FPM_INSTANCE_ID and the git commit hash as input to the API +/// The API will return with success, and once it is done fpm will start receiving HTTP traffic +/// from the controller service. + +#[derive(serde::Deserialize, Debug)] +struct ApiResponse { + success: bool, + result: Option, + message: Option, +} + +#[derive(serde::Deserialize, Debug)] +struct PackageResult { + package: String, + git: String, +} + +pub async fn resolve_dependencies(fpm_instance: String, fpm_controller: String) -> fpm::Result<()> { + // First call get_package API to get package details and resolve dependencies + + // response from get-package API + let package_response = get_package(fpm_instance.as_str(), fpm_controller.as_str()).await?; + + // Clone the git package into the current directory + // Need to execute shell commands from rust + // git_url https format: https://github.com//.git + + let package = + fpm::Package::new(package_response.package.as_str()).with_zip(package_response.git); + + package.unzip_package().await?; + fpm::Config::read(None).await?; + + /*let out = std::process::Command::new("git") + .arg("clone") + .arg(git_url) + .output() + .expect("unable to execute git clone command"); + + if out.status.success() { + // By this time the cloned repo should be available in the current directory + println!("Git cloning successful for the package {}", package_name); + // Resolve dependencies by reading the FPM.ftd using config.read() + // Assuming package_name and repo name are identical + let _config = fpm::Config::read(Some(package_name.to_string())).await?; + }*/ + + // Once the dependencies are resolved for the package + // then call fpm_ready API to ensure that the controller service is now ready + + // response from fpm_ready API + + fpm_ready(fpm_instance.as_str(), fpm_controller.as_str()).await?; + + Ok(()) +} + +/// get-package API +/// input: fpm_instance +/// output: package_name and git repo URL +/// format: { +/// "success": true, +/// "result": { +/// "package": "" +/// "git": "" +/// } +/// } +async fn get_package(fpm_instance: &str, fpm_controller: &str) -> fpm::Result { + let controller_api = format!( + "{}/v1/fpm/get-package?ec2_reservation={}", + fpm_controller, fpm_instance + ); + + let url = url::Url::parse(controller_api.as_str())?; + + let mut headers = reqwest::header::HeaderMap::new(); + headers.insert( + reqwest::header::USER_AGENT, + reqwest::header::HeaderValue::from_static("fpm"), + ); + + let resp: ApiResponse = fpm::library::http::get_with_type(url, headers).await?; + + if !resp.success { + return Err(fpm::Error::APIResponseError(format!( + "get_package api error: {:?}", + resp.message + ))); + } + + resp.result.ok_or({ + fpm::Error::APIResponseError(format!("get_package api error: {:?}", &resp.message)) + }) +} + +/// fpm-ready API +/// input: fpm_instance, *(git commit hash) +/// output: success: true/false +/// format: lang: json +/// { +/// "success": true +/// } + +/// Git commit hash needs to be computed before making a call to the fpm_ready API +async fn fpm_ready(fpm_instance: &str, fpm_controller: &str) -> fpm::Result<()> { + let git_commit = ""; + + let controller_api = format!( + "{}/v1/fpm/fpm-ready?ec2_reservation={}&hash={}", + fpm_controller, fpm_instance, git_commit + ); + + let url = url::Url::parse(controller_api.as_str())?; + + // This request should be put request for fpm_ready API to update the instance status to ready + // Using http::_get() function to make request to this API for now + let mut headers = reqwest::header::HeaderMap::new(); + headers.insert( + reqwest::header::USER_AGENT, + reqwest::header::HeaderValue::from_static("fpm"), + ); + + // TODO: here Map is wrong, + let resp: ApiResponse> = + fpm::library::http::get_with_type(url, headers).await?; + if !resp.success { + return Err(fpm::Error::APIResponseError(format!( + "fpm_ready api error: {:?}", + resp.message + ))); + } + Ok(()) +} diff --git a/src/dependency.rs b/src/dependency.rs index 52e265a6..65448253 100644 --- a/src/dependency.rs +++ b/src/dependency.rs @@ -311,6 +311,74 @@ impl fpm::Package { } } + pub(crate) async fn unzip_package(&self) -> fpm::Result<()> { + use std::convert::TryInto; + use std::io::Write; + + let download_url = if let Some(ref url) = self.zip { + url + } else { + return Ok(()); + }; + + let path = std::env::temp_dir().join(format!("{}.zip", self.name.replace("/", "__"))); + + let start = std::time::Instant::now(); + print!("Downloading {} ... ", self.name.as_str()); + std::io::stdout().flush()?; + // Download the zip folder + { + let mut response = if download_url[1..].contains("://") + || download_url.starts_with("//") + { + reqwest::get(download_url.as_str())? + } else if let Ok(response) = reqwest::get(format!("https://{}", download_url).as_str()) + { + response + } else { + reqwest::get(format!("http://{}", download_url).as_str())? + }; + let mut file = std::fs::File::create(&path)?; + // TODO: instead of reading the whole thing in memory use tokio::io::copy() somehow? + let mut buf: Vec = vec![]; + response.copy_to(&mut buf)?; + file.write_all(&buf)?; + // file.write_all(response.text().await?.as_bytes())?; + } + + let file = std::fs::File::open(&path)?; + // TODO: switch to async_zip crate + let mut archive = zip::ZipArchive::new(file)?; + for i in 0..archive.len() { + let mut c_file = archive.by_index(i).unwrap(); + let out_path = match c_file.enclosed_name() { + Some(path) => path.to_owned(), + None => continue, + }; + let out_path_without_folder = out_path.to_str().unwrap().split_once("/").unwrap().1; + let file_extract_path = { + let mut file_extract_path: camino::Utf8PathBuf = + std::env::current_dir()?.canonicalize()?.try_into()?; + file_extract_path = file_extract_path.join(out_path_without_folder); + file_extract_path + }; + if (&*c_file.name()).ends_with('/') { + std::fs::create_dir_all(&file_extract_path)?; + } else { + if let Some(p) = file_extract_path.parent() { + if !p.exists() { + std::fs::create_dir_all(p)?; + } + } + // Note: we will be able to use tokio::io::copy() with async_zip + let mut outfile = std::fs::File::create(file_extract_path)?; + std::io::copy(&mut c_file, &mut outfile)?; + } + } + fpm::utils::print_end(format!("Downloaded {}", self.name.as_str()).as_str(), start); + Ok(()) + } + /// This function is called by `process()` or recursively called by itself. /// It checks the `FPM.ftd` file of dependent package and find out all the dependency packages. /// If dependent package is not available, it calls `process()` to download it inside `.packages` directory diff --git a/src/lib.rs b/src/lib.rs index 15d198a0..8a1aa59f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,7 @@ pub(crate) mod utils; mod auto_import; mod commands; mod config; +mod controller; mod dependency; mod doc; mod file; @@ -390,6 +391,9 @@ pub enum Error { #[error("HttpError: {}", _0)] HttpError(#[from] reqwest::Error), + #[error("APIResponseError: {}", _0)] + APIResponseError(String), + #[error("IoError: {}", _0)] IoError(#[from] std::io::Error), @@ -416,6 +420,9 @@ pub enum Error { #[error("SitemapParseError: {}", _0)] SitemapParseError(#[from] fpm::sitemap::ParseError), + + #[error("URLParseError: {}", _0)] + UrlParseError(#[from] url::ParseError), } pub type Result = std::result::Result; diff --git a/src/library/http.rs b/src/library/http.rs index 9ca25012..46a03851 100644 --- a/src/library/http.rs +++ b/src/library/http.rs @@ -53,7 +53,7 @@ pub async fn processor<'a>( doc.from_json(&json, section) } -async fn get( +pub(crate) async fn get( url: url::Url, doc_id: &str, line_number: usize, @@ -89,3 +89,23 @@ async fn _get(url: url::Url) -> reqwest::Result { .build()?; c.get(url.to_string().as_str()).send()?.text() } + +pub async fn get_with_type( + url: url::Url, + headers: reqwest::header::HeaderMap, +) -> fpm::Result { + let c = reqwest::Client::builder() + .default_headers(headers) + .build()?; + + let mut resp = c.get(url.to_string().as_str()).send()?; + if !resp.status().eq(&reqwest::StatusCode::OK) { + return Err(fpm::Error::APIResponseError(format!( + "url: {}, response_status: {}, response: {:?}", + url, + resp.status(), + resp.text() + ))); + } + Ok(resp.json()?) +} diff --git a/src/library/mod.rs b/src/library/mod.rs index f4a9d564..edfc1833 100644 --- a/src/library/mod.rs +++ b/src/library/mod.rs @@ -1,7 +1,7 @@ mod fpm_dot_ftd; mod get_data; mod get_version_data; -mod http; +pub(crate) mod http; mod include; mod sitemap; mod sqlite; diff --git a/src/main.rs b/src/main.rs index 95fcde46..84579110 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,6 +12,20 @@ async fn main() -> fpm::Result<()> { return Ok(()); } + // Serve block moved up + if let Some(mark) = matches.subcommand_matches("serve") { + let port = mark + .value_of("port") + .unwrap_or_else(|| mark.value_of("positional_port").unwrap_or("8000")) + .to_string(); + let bind = mark.value_of("bind").unwrap_or("127.0.0.1").to_string(); + tokio::task::spawn_blocking(move || { + fpm::serve(bind.as_str(), port.as_str()).expect("http service error"); + }) + .await + .expect("Thread spawn error"); + } + let mut config = fpm::Config::read(None).await?; if matches.subcommand_matches("update").is_some() { @@ -72,18 +86,6 @@ async fn main() -> fpm::Result<()> { let target = mark.value_of("target"); fpm::stop_tracking(&config, source, target).await?; } - if let Some(mark) = matches.subcommand_matches("serve") { - let port = mark - .value_of("port") - .unwrap_or_else(|| mark.value_of("positional_port").unwrap_or("8000")) - .to_string(); - let bind = mark.value_of("bind").unwrap_or("127.0.0.1").to_string(); - tokio::task::spawn_blocking(move || { - fpm::serve(bind.as_str(), port.as_str()).expect("http service error"); - }) - .await - .expect("Thread spawn error"); - } Ok(()) }