diff --git a/.gitignore b/.gitignore
index 26701db..5c9518e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -172,6 +172,6 @@ skbuff.rs
admission-webhook.yaml
certificate-manager.yaml
client-deployment.yaml
-dns-deployment.yaml
-proxy-injector.yaml
-core/src/components/conntracker/src/bindings.rs
+
+# Claude AI assistant working files
+CLAUDE.md
diff --git a/cli/Cargo.lock b/cli/Cargo.lock
index 47db442..740da77 100644
--- a/cli/Cargo.lock
+++ b/cli/Cargo.lock
@@ -34,9 +34,9 @@ checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923"
[[package]]
name = "anstream"
-version = "0.6.19"
+version = "0.6.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "301af1932e46185686725e0fad2f8f2aa7da69dd70bf6ecc44d6b703844a3933"
+checksum = "3ae563653d1938f79b1ab1b5e668c87c76a9930414574a6583a7b7e11a8e6192"
dependencies = [
"anstyle",
"anstyle-parse",
@@ -64,29 +64,29 @@ dependencies = [
[[package]]
name = "anstyle-query"
-version = "1.1.3"
+version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6c8bdeb6047d8983be085bab0ba1472e6dc604e7041dbf6fcd5e71523014fae9"
+checksum = "9e231f6134f61b71076a3eab506c379d4f36122f2af15a9ff04415ea4c3339e2"
dependencies = [
- "windows-sys 0.59.0",
+ "windows-sys 0.60.2",
]
[[package]]
name = "anstyle-wincon"
-version = "3.0.9"
+version = "3.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "403f75924867bb1033c59fbf0797484329750cfbe3c4325cd33127941fabc882"
+checksum = "3e0633414522a32ffaac8ac6cc8f748e090c5717661fddeea04219e2344f5f2a"
dependencies = [
"anstyle",
"once_cell_polyfill",
- "windows-sys 0.59.0",
+ "windows-sys 0.60.2",
]
[[package]]
name = "anyhow"
-version = "1.0.98"
+version = "1.0.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
+checksum = "b0674a1ddeecb70197781e945de4b3b8ffb61fa939a5597bcf48503737663100"
[[package]]
name = "api"
@@ -261,9 +261,9 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
[[package]]
name = "bitflags"
-version = "2.9.1"
+version = "2.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967"
+checksum = "34efbcccd345379ca2868b2b2c9d3782e9cc58ba87bc7d79d5b53d9c9ae6f25d"
[[package]]
name = "bytemuck"
@@ -293,9 +293,9 @@ checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a"
[[package]]
name = "cfg-if"
-version = "1.0.1"
+version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9555578bc9e57714c812a1f84e4fc5b4d21fcb063490c624de019f7464c91268"
+checksum = "2fd1289c04a9ea8cb22300a459a72a385d7c73d3259e2ed7dcb2af674838cfa9"
[[package]]
name = "cfg_aliases"
@@ -305,9 +305,9 @@ checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
[[package]]
name = "clap"
-version = "4.5.41"
+version = "4.5.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "be92d32e80243a54711e5d7ce823c35c41c9d929dc4ab58e1276f625841aadf9"
+checksum = "1fc0e74a703892159f5ae7d3aac52c8e6c392f5ae5f359c70b5881d60aaac318"
dependencies = [
"clap_builder",
"clap_derive",
@@ -315,9 +315,9 @@ dependencies = [
[[package]]
name = "clap_builder"
-version = "4.5.41"
+version = "4.5.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "707eab41e9622f9139419d573eca0900137718000c517d47da73045f54331c3d"
+checksum = "b3e7f4214277f3c7aa526a59dd3fbe306a370daee1f8b7b8c987069cd8e888a8"
dependencies = [
"anstream",
"anstyle",
@@ -327,9 +327,9 @@ dependencies = [
[[package]]
name = "clap_derive"
-version = "4.5.41"
+version = "4.5.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ef4f52386a59ca4c860f7393bcf8abd8dfd91ecccc0f774635ff68e92eeef491"
+checksum = "14cb31bb0a7d536caef2639baa7fad459e15c3144efefa6dbd1c84562c4739f6"
dependencies = [
"heck",
"proc-macro2",
@@ -381,7 +381,6 @@ dependencies = [
"serde",
"serde_yaml",
"tokio",
- "tokio-stream",
"tonic",
"tonic-reflection",
"tracing",
@@ -552,9 +551,9 @@ dependencies = [
[[package]]
name = "hashbrown"
-version = "0.15.4"
+version = "0.15.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5971ac85611da7067dbfcabef3c70ebb5606018acd9e2a3903a0da507521e0d5"
+checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1"
dependencies = [
"allocator-api2",
"equivalent",
@@ -615,13 +614,14 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]]
name = "hyper"
-version = "1.6.0"
+version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cc2b571658e38e0c01b1fdca3bbbe93c00d3d71693ff2770043f8c29bc7d6f80"
+checksum = "eb3aa54a13a0dfe7fbe3a59e0c76093041720fdc77b110cc0fc260fafb4dc51e"
dependencies = [
+ "atomic-waker",
"bytes",
"futures-channel",
- "futures-util",
+ "futures-core",
"h2",
"http",
"http-body",
@@ -629,6 +629,7 @@ dependencies = [
"httpdate",
"itoa",
"pin-project-lite",
+ "pin-utils",
"smallvec",
"tokio",
"want",
@@ -687,9 +688,9 @@ dependencies = [
[[package]]
name = "indexmap"
-version = "2.10.0"
+version = "2.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fe4cd85333e22411419a0bcae1297d25e58c9443848b11dc6a86fefe8c78a661"
+checksum = "f2481980430f9f78649238835720ddccc57e52df14ffce1c6f37391d61b563e9"
dependencies = [
"equivalent",
"hashbrown",
@@ -697,9 +698,9 @@ dependencies = [
[[package]]
name = "io-uring"
-version = "0.7.9"
+version = "0.7.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d93587f37623a1a17d94ef2bc9ada592f5465fe7732084ab7beefabe5c77c0c4"
+checksum = "046fa2d4d00aea763528b4950358d0ead425372445dc8ff86312b3c69ff7727b"
dependencies = [
"bitflags",
"cfg-if",
@@ -735,15 +736,15 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
[[package]]
name = "libc"
-version = "0.2.174"
+version = "0.2.175"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1171693293099992e19cddea4e8b849964e9846f4acee11b3948bcc337be8776"
+checksum = "6a82ae493e598baaea5209805c49bbf2ea7de956d50d7da0da1164f9c6d28543"
[[package]]
name = "libredox"
-version = "0.1.4"
+version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1580801010e535496706ba011c15f8532df6b42297d2e471fec38ceadd8c0638"
+checksum = "391290121bad3d37fbddad76d8f5d1c1c314cfc646d143d7e07a3086ddff0ce3"
dependencies = [
"bitflags",
"libc",
@@ -938,9 +939,9 @@ dependencies = [
[[package]]
name = "percent-encoding"
-version = "2.3.1"
+version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
+checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220"
[[package]]
name = "petgraph"
@@ -986,9 +987,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "prettyplease"
-version = "0.2.36"
+version = "0.2.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ff24dfcda44452b9816fff4cd4227e1bb73ff5a2f1bc1105aa92fb8565ce44d2"
+checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b"
dependencies = [
"proc-macro2",
"syn",
@@ -996,9 +997,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
-version = "1.0.95"
+version = "1.0.101"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "02b3e5e68a3a1a02aad3ec490a98007cbc13c37cbe84a3cd7b8e406d76e7f778"
+checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de"
dependencies = [
"unicode-ident",
]
@@ -1103,25 +1104,25 @@ dependencies = [
[[package]]
name = "redox_users"
-version = "0.5.0"
+version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dd6f9d3d47bdd2ad6945c5015a226ec6155d0bcdfd8f7cd29f86b71f8de99d2b"
+checksum = "a4e608c6638b9c18977b00b475ac1f28d14e84b27d8d42f70e0bf1e3dec127ac"
dependencies = [
"getrandom 0.2.16",
"libredox",
- "thiserror 2.0.12",
+ "thiserror 2.0.16",
]
[[package]]
name = "regex"
-version = "1.11.1"
+version = "1.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191"
+checksum = "23d7fd106d8c02486a8d64e778353d1cffe08ce79ac2e82f540c86d0facf6912"
dependencies = [
"aho-corasick",
"memchr",
- "regex-automata 0.4.9",
- "regex-syntax 0.8.5",
+ "regex-automata 0.4.10",
+ "regex-syntax 0.8.6",
]
[[package]]
@@ -1135,13 +1136,13 @@ dependencies = [
[[package]]
name = "regex-automata"
-version = "0.4.9"
+version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908"
+checksum = "6b9458fa0bfeeac22b5ca447c63aaf45f28439a709ccd244698632f9aa6394d6"
dependencies = [
"aho-corasick",
"memchr",
- "regex-syntax 0.8.5",
+ "regex-syntax 0.8.6",
]
[[package]]
@@ -1152,15 +1153,15 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "regex-syntax"
-version = "0.8.5"
+version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
+checksum = "caf4aa5b0f434c91fe5c7f1ecb6a5ece2130b02ad2a590589dda5146df959001"
[[package]]
name = "rustc-demangle"
-version = "0.1.25"
+version = "0.1.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "989e6739f80c4ad5b13e0fd7fe89531180375b18520cc8c82080e4dc4035b84f"
+checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace"
[[package]]
name = "rustix"
@@ -1177,9 +1178,9 @@ dependencies = [
[[package]]
name = "rustversion"
-version = "1.0.21"
+version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8a0d197bd2c9dc6e53b84da9556a69ba4cdfab8619eb41a8bd1cc2027a0f6b1d"
+checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d"
[[package]]
name = "ryu"
@@ -1246,9 +1247,9 @@ dependencies = [
[[package]]
name = "slab"
-version = "0.4.10"
+version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "04dc19736151f35336d325007ac991178d504a119863a2fcb3758cdb5e52c50d"
+checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589"
[[package]]
name = "smallvec"
@@ -1274,9 +1275,9 @@ checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
[[package]]
name = "syn"
-version = "2.0.104"
+version = "2.0.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "17b6f705963418cdb9927482fa304bc562ece2fdd4f616084c50b7023b435a40"
+checksum = "ede7c438028d4436d71104916910f5bb611972c5cfd7f89b8300a8186e6fada6"
dependencies = [
"proc-macro2",
"quote",
@@ -1291,15 +1292,15 @@ checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263"
[[package]]
name = "tempfile"
-version = "3.20.0"
+version = "3.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e8a64e3985349f2441a1a9ef0b853f869006c3855f2cda6862a94d26ebb9d6a1"
+checksum = "15b61f8f20e3a6f7e0649d825294eaf317edce30f82cf6026e7e4cb9222a7d1e"
dependencies = [
"fastrand",
"getrandom 0.3.3",
"once_cell",
"rustix",
- "windows-sys 0.59.0",
+ "windows-sys 0.60.2",
]
[[package]]
@@ -1313,11 +1314,11 @@ dependencies = [
[[package]]
name = "thiserror"
-version = "2.0.12"
+version = "2.0.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708"
+checksum = "3467d614147380f2e4e374161426ff399c91084acd2363eaf549172b3d5e60c0"
dependencies = [
- "thiserror-impl 2.0.12",
+ "thiserror-impl 2.0.16",
]
[[package]]
@@ -1333,9 +1334,9 @@ dependencies = [
[[package]]
name = "thiserror-impl"
-version = "2.0.12"
+version = "2.0.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d"
+checksum = "6c5e1be1c48b9172ee610da68fd9cd2770e7a4056cb3fc98710ee6906f0c7960"
dependencies = [
"proc-macro2",
"quote",
@@ -1353,9 +1354,9 @@ dependencies = [
[[package]]
name = "tokio"
-version = "1.47.0"
+version = "1.47.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "43864ed400b6043a4757a25c7a64a8efde741aed79a056a2fb348a406701bb35"
+checksum = "89e49afdadebb872d3145a5638b59eb0691ea23e46ca484037cfab3b76b95038"
dependencies = [
"backtrace",
"bytes",
@@ -1668,6 +1669,12 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
+[[package]]
+name = "windows-link"
+version = "0.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a"
+
[[package]]
name = "windows-sys"
version = "0.59.0"
@@ -1683,7 +1690,7 @@ version = "0.60.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb"
dependencies = [
- "windows-targets 0.53.2",
+ "windows-targets 0.53.3",
]
[[package]]
@@ -1704,10 +1711,11 @@ dependencies = [
[[package]]
name = "windows-targets"
-version = "0.53.2"
+version = "0.53.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c66f69fcc9ce11da9966ddb31a40968cad001c5bedeb5c2b82ede4253ab48aef"
+checksum = "d5fe6031c4041849d7c496a8ded650796e7b6ecc19df1a431c1a363342e5dc91"
dependencies = [
+ "windows-link",
"windows_aarch64_gnullvm 0.53.0",
"windows_aarch64_msvc 0.53.0",
"windows_i686_gnu 0.53.0",
diff --git a/cli/Cargo.toml b/cli/Cargo.toml
index 069b9e6..f00b01d 100644
--- a/cli/Cargo.toml
+++ b/cli/Cargo.toml
@@ -21,7 +21,6 @@ anyhow = "1.0.98"
api = { path = "../core/api" }
tonic = "0.14.1"
tonic-reflection = "0.14.1"
-tokio-stream = "0.1.17"
prost-types = "0.14.1"
prost = "0.14.1"
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 088c2e9..f1b50db 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -1,11 +1,7 @@
[workspace]
resolver = "3"
members = [
- #"src/components/loadbalancer",
"api",
- #"src/components/proxy",
- #"src/components/xdp",
- #"src/components/maps",
"src/components/conntracker",
"src/components/identity",
"src/components/metrics_tracer",
diff --git a/core/Readme.md b/core/Readme.md
index e45a8d2..747dd31 100644
--- a/core/Readme.md
+++ b/core/Readme.md
@@ -21,7 +21,4 @@ If you're interested or have any questions, feel free to contact:
| **Identity** | User Space program that uses Conntracker component and displays active connections in the cluster | - [[92]](https://github.com/CortexFlow/CortexBrain/issues/92)
- [Core](https://github.com/CortexFlow/CortexBrain/milestone/1)
| **Metrics_tracer** | Kernel Space program that collects the main CortexBrain metrics collectors | - [[91]](https://github.com/CortexFlow/CortexBrain/issues/78)
- [Core](https://github.com/CortexFlow/CortexBrain/milestone/1)
| **Metrics** | User Space implementation of the metrics_tracer BPF scripts. The metrics crate also aggregates, stores, and hosts the main data processing functions | - [[91]](https://github.com/CortexFlow/CortexBrain/issues/78)
- [Core](https://github.com/CortexFlow/CortexBrain/milestone/1)
- | **Loadbalancer** | User space component. One of the core functionalities. The load balancer uses XDP, Maps, and Identity to distribute traffic across multiple backends in the cluster using optimized load balancing techniques | - [[91]](https://github.com/CortexFlow/CortexBrain/issues/91)
- [[Core]](https://github.com/CortexFlow/CortexBrain/milestone/1)
- | **Maps** | Contains all the BPF maps used in the XDP component | /
- | **XDP** | Kernel Space program that operates in the XDP hook to filter traffic and apply access policies in the cluster | - [[91]](https://github.com/CortexFlow/CortexBrain/issues/91)
- [Core](https://github.com/CortexFlow/CortexBrain/milestone/1)
\ No newline at end of file
diff --git a/core/src/components/loadbalancer/Cargo.toml b/core/src/components/loadbalancer/Cargo.toml
deleted file mode 100644
index b328953..0000000
--- a/core/src/components/loadbalancer/Cargo.toml
+++ /dev/null
@@ -1,26 +0,0 @@
-[package]
-name = "loadbalancer"
-version = "0.1.0"
-edition = "2021"
-
-[dependencies]
-anyhow = "1.0.98"
-aya = "0.13.1"
-aya-log = "0.2.1"
-log = "0.4.27"
-tokio = { version = "1.44.2", features = ["full"] }
-tracing = "0.1.41"
-tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
-bytemuck = {version ="1.23.0",features = ["derive"]}
-serde = "1.0.219"
-serde_json = "1.0.140"
-prometheus = "0.14.0"
-lazy_static = "1.5.0"
-kube = "1.1.0"
-base64 = "0.22.1"
-k8s-openapi = { version = "0.25.0", features = ["v1_32"] }
-warp = "0.3.7"
-kube-runtime = "1.1.0"
-
-[dependencies.shared]
-path = "../../shared"
diff --git a/core/src/components/loadbalancer/Dockerfile b/core/src/components/loadbalancer/Dockerfile
deleted file mode 100644
index fdddcfb..0000000
--- a/core/src/components/loadbalancer/Dockerfile
+++ /dev/null
@@ -1,47 +0,0 @@
-# Phase 1: Build image
-FROM rust:1.85 AS builder
-
-# Set working directory
-WORKDIR /usr/src/app
-
-# Copy the shared library in the correct location
-WORKDIR /usr/src/shared
-COPY .shared/Cargo.toml .
-COPY .shared/src ./src
-
-# Copying the XDP filter binaries
-WORKDIR /usr/src/app/loadbalancer
-
-# Then create the loadbalancer project structure
-WORKDIR /usr/src/app/loadbalancer
-COPY Cargo.toml .
-COPY src ./src
-
-# Ensure Cargo recognizes the shared dependency
-RUN cargo fetch
-
-# Build the project
-RUN cargo build --release
-
-# Phase 2: Create final image
-FROM ubuntu:24.04
-
-# Install runtime dependencies
-RUN apt-get update && apt-get install -y \
- ca-certificates \
- && rm -rf /var/lib/apt/lists/*
-
-# Create directory for the loadbalancer
-WORKDIR /usr/src/cortexbrain-loadbalancer
-
-# Copy the binary from builder
-COPY --from=builder /usr/src/app/loadbalancer/target/release/loadbalancer /usr/local/bin/cortexflow-loadbalancer
-
-# Copy config file
-COPY xdp-filter /usr/src/cortexbrain-loadbalancer/xdp-filter
-
-# Set config path environment variable
-ENV BPF_PATH="/usr/src/cortexbrain-loadbalancer/xdp-filter"
-
-# Set the loadbalancer execution command
-CMD ["cortexflow-loadbalancer"]
diff --git a/core/src/components/loadbalancer/build-lb.sh b/core/src/components/loadbalancer/build-lb.sh
deleted file mode 100755
index f31c4c5..0000000
--- a/core/src/components/loadbalancer/build-lb.sh
+++ /dev/null
@@ -1,28 +0,0 @@
-#!/bin/bash
-
-# Create temporary shared directory
-mkdir -p .shared
-
-# Copy shared files
-echo "Copying shared files"
-cp -r ../../shared/src .shared/
-cp -r ../../shared/Cargo.toml .shared/
-cp -r ../../client/config.yaml config.yaml
-
-# Building XDP filter files
-echo "Building the xdp filter files"
-pushd ../xdp
-./build.sh
-popd
-
-echo "Copying xdp-filter binaries"
-cp -r ../../../target/bpfel-unknown-none/release/xdp-filter xdp-filter
-
-# Run docker build
-docker build -t loadbalancer:0.0.1 .
-
-# Cleanup
-echo "Cleaning building files"
-rm -rf .shared
-rm -rf config.yaml
-rm -rf xdp-filter
diff --git a/core/src/components/loadbalancer/src/discovery.rs b/core/src/components/loadbalancer/src/discovery.rs
deleted file mode 100644
index e2d6c47..0000000
--- a/core/src/components/loadbalancer/src/discovery.rs
+++ /dev/null
@@ -1,591 +0,0 @@
-/*
-π Update to ebpf: Using BPF maps to store values π
-
-To store the service discovery data we need to do things in the kernel space
-we can use bpf maps in particular this map:
-
- BPF_MAP_TYPE_HASH
- doc: https://docs.kernel.org/bpf/map_hash.html
-
- DEFAULT MAP STRUCT:
- #include
- #include
-
- struct key {
- __u32 srcip;
- };
-
- struct value {
- __u64 packets;
- __u64 bytes;
- };
-
- struct {
- __uint(type, BPF_MAP_TYPE_LRU_HASH);
- __uint(max_entries, 32);
- __type(key, struct key);
- __type(value, struct value);
- } packet_stats SEC(".maps");
-
-
-*/
-
-/*
-The new algorithm can be described as this:
-
- 1. pod A need to know the ip of pod B to perform operations
- 2. discovery service check if the pod B ip is in the cache
- 2a. ip is in the cache!---> return ip address
- 2b. ip is not in the cache---> need to go to step 3
-
- 3. Service discovery search in ETCD the address of the pod B
- 4. Service discovery store the address in the BPF map to use it in the kernel
- 5. Pod A now can obtain Pod B ip address
-
-
-
-*/
-
-
-use crate::messaging;
-use crate::messaging::MexType;
-use crate::metrics::{DNS_REQUEST, DNS_RESPONSE_TIME};
-use crate::shared_struct::{SVCKey, SVCValue};
-use crate::shared_struct;
-use anyhow::Error ;
-use std::result::Result::Ok;
-use aya::maps::{HashMap as UserSpaceMap, MapData};
-use aya::Ebpf;
-use k8s_openapi::api::core::v1::{Pod, Service};
-use kube::api::ListParams;
-use kube::{Client, api::Api};
-use serde_json::{json, to_vec};
-use std::collections::BTreeMap;
-use std::net::{SocketAddr, ToSocketAddrs};
-use std::time::Duration;
-use std::time::Instant;
-use tokio::io::{AsyncReadExt, AsyncWriteExt};
-use tokio::net::{TcpStream, UdpSocket};
-use tokio::time::timeout;
-use tracing::{debug, error, info, warn};
-
-
-/* service discovery structure:
- uses a dns_server-->kube-dns
- service_cache: speed up the discovery process
-
-*/
-pub struct ServiceDiscovery<'a> {
- kube_client: Client,
- service_cache: &'a mut UserSpaceMap<&'a mut MapData,SVCKey,SVCValue>,
-}
-/*
- Doc:
- Here i'm using a lifetime <'a>
- Lifetimes in Rust are used to ensure that references (&) are
- always valid and do not point to βexpiredβ or deallocated memory.
-
- Ensure that the code cannot use dangling pointers during the execution
-
-*/
-
-/* User space implementation */
-
-impl<'a> ServiceDiscovery<'a> {
- pub async fn new(mut service_map: &'a mut UserSpaceMap<&'a mut MapData,SVCKey,SVCValue> ) -> Result {
- let kube_client = Client::try_default().await?;
- Ok(ServiceDiscovery {
- kube_client,
- service_cache: service_map,
- })
- }
-
- /*
- Destination resolver:
- Args: service_name, namespace
- Return: service endpoint
-
-
- */
- pub async fn resolve_service_destination(
- &mut self,
- service_name: &str,
- namespace: &str,
- port: i32,
- ) -> Option {
- // 1. Check the cache and return the service ip if found
- if let Some(cached_service) =self.get_service(service_name){
- info!("Service {:?} found in cache ",service_name);
- return Some(cached_service)
- }
- else{
- let services: Api = Api::namespaced(self.kube_client.clone(), namespace);
- let pods: Api = Api::namespaced(self.kube_client.clone(), namespace);
-
- debug!(
- "Fetching service {} from namespace {}",
- service_name, namespace
- );
-
- // Fetch the service endpoint from the kubernetes api
- self.fetch_service_endpoint_from_kubeapi(service_name, services, pods, namespace, port)
- .await
- }
- }
-
- /*
- Resolver function:
- Args: service_name, namespace
- Return: service address or None
- */
-
- async fn resolve_service_address(
- &mut self,
- service_name: &str,
- namespace: &str,
- port: i32,
- ) -> Option {
- match self
- .resolve_service_destination(service_name, namespace, port)
- .await
- {
- Some(service_address) => {
- debug!(
- "Resolved service address for {}: {}",
- service_name, service_address
- );
- Some(service_address)
- }
- None => {
- error!(
- "Service {} not found in namespace {}",
- service_name, namespace
- );
- None
- }
- }
- }
-
- /*
- fetch service endpoint from the KUBERNETES-API
- Args: service name, service_api, namespace
- Return: service endpoint
- */
-
- async fn fetch_service_endpoint_from_kubeapi(
- &mut self,
- service_name: &str,
- service_api: Api,
- pod_api: Api,
- namespace: &str,
- communication_port: i32, //can be udp or tcp port
- ) -> Option {
- // retrieve the service
- let service = match service_api.get(service_name).await {
- Ok(service) => service,
- Err(e) => {
- error!(
- "Service {} not found in namespace {}: {:?}",
- service_name, namespace, e
- );
- return None;
- }
- };
-
- // return the service selector
- let selector_map = match service
- .spec
- .as_ref()
- .and_then(|spec| spec.selector.as_ref())
- {
- Some(selector) => selector,
- None => {
- error!("No selector found for service {}", service_name);
- return None;
- }
- };
-
- // Convert the selector to a string format
- let selector = self.labels_to_selector(selector_map);
-
- // find the pods that match the selector
- let pods = match pod_api.list(&ListParams::default().labels(&selector)).await {
- Ok(pods) => pods,
- Err(e) => {
- error!(
- "Failed to fetch pods for service {} in namespace {}: {:?}",
- service_name, namespace, e
- );
- return None;
- }
- };
-
- // Select the first pod available
- // TODO: more advanced load balancing techniques needed here
- if let Some(pod) = pods.items.first() {
- if let Some(pod_ip) = pod
- .status
- .as_ref()
- .and_then(|status| status.pod_ip.as_ref())
- {
- info!("Pod IP for service {}: {}", service_name, pod_ip);
- let endpoint = format!("{}:{}", pod_ip, communication_port);
- info!(
- "Resolved endpoint for service {}: {}",
- service_name, endpoint
- );
- // add to service cache
-
- let key = SVCKey {
- service_name: shared_struct::str_to_u8_64(&service_name),
- };
- let value = SVCValue{
- ip: shared_struct::str_to_u8_4(&pod_ip),
- port: communication_port as u32
- };
-
- self.service_cache
- .insert(key, value,u64::min_value());
- return Some(endpoint);
- } else {
- error!(
- "No Pod IP defined for pod {}",
- pod.metadata.name.as_deref().unwrap_or("unknown")
- );
- }
- } else {
- error!("No pods found for service {}", service_name);
- }
-
- None
- }
-
- //return the selector from the labels
- fn labels_to_selector(&self, labels: &BTreeMap) -> String {
- labels
- .iter()
- .map(|(key, value)| format!("{}={}", key, value))
- .collect::>()
- .join(",")
- }
-
- //directly register a service in the cache
- pub fn register_service(&mut self, service_id: String, endpoint: String,port: u32) {
- let key = SVCKey{
- service_name:shared_struct::str_to_u8_64(&service_id)
- };
- let value = SVCValue{
- ip: shared_struct::str_to_u8_4(&endpoint),
- port
- };
- self.service_cache.insert(key, value,u64::min_value());
- }
-
- //directly get a service from the cache
- pub fn get_service(&self, service_id: &str) -> Option {
-
- let key= SVCKey{
- service_name:shared_struct::str_to_u8_64(&service_id)
- };
-
- //match pattern
- match self.service_cache.get(&key,0) {
- Ok(service) => {
- let svc_ip = String::from_utf8_lossy(&service.ip)
- .trim_end_matches(char::from(0))
- .to_string();
- Some(svc_ip)
- },
- //return an error in case the key is not found
- Err(aya::maps::MapError::KeyNotFound) => {
- error!("Service not found in cache!");
- return None
- }
- //return an error in case of any other error type expect "KeyNotFound"
- Err(e)=>{
- error!("An error occured {}",e);
- return None
- }
- }
- }
-
- //TCP RESPONSE
- //TODO: replace this logic in the kernel space
- pub async fn send_tcp_request(
- &mut self,
- service_name: &str,
- namespace: &str,
- payload: &[u8],
- port: i32,
- ) -> Option> {
- // Resolves the address of the service
- let target_service = match self.resolve_service_destination(service_name, namespace, port).await {
- Some(addr) => addr,
- None => {
- error!("Service {} not found in namespace {}", service_name, namespace);
- return None;
- }
- };
-
- // Convert the address in a socket address
- let target_addr: SocketAddr = match target_service.parse() {
- Ok(addr) => addr,
- Err(e) => {
- error!("Invalid target address: {}", e);
- return None;
- }
- };
-
- // TCP connection to the service
- let start_time = Instant::now();
- let duration = start_time.elapsed().as_secs_f64();
-
-
-
- match TcpStream::connect(target_addr).await {
- Ok(mut stream) => {
- info!("Connected to service at {}", target_addr);
-
- // Create the json message
- info!("Message waiting to be forwarded:{:?}",&payload);
-
- let response = json!({
- "message": String::from_utf8_lossy(payload)
- });
-
- let msg_forwarded_serialized = match to_vec(&response) {
- Ok(data) => data,
- Err(e) => {
- error!("Failed to serialize request: {}", e);
- return None;
- }
- };
- let response_message = messaging::create_message(
- &service_name,
- MexType::Outcoming,
- &msg_forwarded_serialized,
- );
-
- // send the message
- if let Err(e) = stream.write_all(&response_message).await {
- error!("Failed to send request to {}: {}", target_addr, e);
- return None;
- }
- info!("Request sent to {}", target_addr);
-
- let client_addr = match stream.peer_addr() {
- Ok(addr) => addr,
- Err(e) => {
- error!("Cannot return client address: {}", e);
- return None;
- }
- };
-
- let mut buffer = vec![0u8; 1024];
-
- // wait for the response with a timeout timer
- match timeout(Duration::from_secs(10), stream.read(&mut buffer)).await {
- Ok(Ok(len)) => {
- let response_data = buffer[..len].to_vec();
- info!("Received response from {} ({} bytes)", client_addr, len);
- DNS_REQUEST.with_label_values(&[&(client_addr.to_string()+"_tcp")]).inc();
- // Register the metric when len =0
- DNS_RESPONSE_TIME.with_label_values(&["service_discovery_tcp"]).observe(duration);
-
- if len > 0 {
- DNS_REQUEST.with_label_values(&[&(client_addr.to_string() + "_tcp")]).inc();
- Some(response_data)
- } else {
- warn!("Empty response received from {}", client_addr);
- None
- }
- }
- Ok(Err(e))=> {
- error!("Error: {}",e);
- None
- }
- Err(e) => {
- error!("TCP response timed out: {}", e);
-
- // Register the metric when timeout
- DNS_RESPONSE_TIME.with_label_values(&["service_discovery_tcp"]).observe(duration);
-
- None
- }
- }
- }
- Err(e) => {
- error!("Failed to connect to {}: {}", target_addr, e);
- None
- }
- }
- }
- //UDP RESPONSE
- //TODO: replace this logic in the kernel space
- pub async fn wait_for_udp_response(
- &mut self,
- service_name: &str,
- namespace: &str,
- payload: &[u8],
- port: i32,
- client_addr: SocketAddr,
- ) -> Option> {
- // Resolve the service name
- let target_service = match self
- .resolve_service_destination(service_name, namespace, port)
- .await
- {
- Some(addr) => addr,
- None => {
- error!(
- "Service {} not found in namespace {}",
- service_name, namespace
- );
- return None; // return None if service not found
- }
- };
-
- // Parse target_service into SocketAddr if it's a string
- let target_addr = match target_service.to_socket_addrs() {
- Ok(mut addrs) => match addrs.next() {
- Some(addr) => addr,
- None => {
- error!("Could not resolve address for {}", target_service);
- return None;
- }
- },
- Err(e) => {
- error!("Failed to parse socket address: {}", e);
- return None;
- }
- };
-
- // initialize the udp socket
- // bind to a random port
- let socket = match UdpSocket::bind("0.0.0.0:0").await {
- Ok(socket) => socket,
- Err(e) => {
- error!("Failed to bind UDP socket: {}", e);
- return None;
- }
- };
-
- // Allow the socket to receive from any address, not just the target
- // This is important for UDP where responses might come from different ports
- if let Err(e) = socket.set_broadcast(true) {
- error!("Failed to set socket to broadcast mode: {}", e);
- return None;
- }
-
- // Sends the payload to the destination service
- let response = json!({
- "message": String::from_utf8_lossy(payload)
- });
-
- let serialized_response = match to_vec(&response) {
- Ok(bytes) => bytes,
- Err(e) => {
- error!("cannot serialize udp response: {}", e);
- return None;
- }
- };
-
- let response_message = messaging::create_message(
- &service_name,
- MexType::Outcoming,
- &serialized_response,
- );
-
- info!(
- "([{}]->[{}])sending response (outcoming) message to : {}",
- target_addr, client_addr, client_addr
- );
-
- if let Err(e) = socket.send_to(&response_message, &client_addr).await {
- error!(
- "Error sending UDP response to target service {}: {}",
- target_addr, e
- );
- return None;
- }else {
- info!(
- "UDP response successfully sent to {} from {} with message {:?} ",
- client_addr,target_addr ,response_message
- );
- }
-
-
-
- let start_time = Instant::now();
- let client_ip = client_addr.ip();
-
- // Set up timeout for receiving the response
- match timeout(
- Duration::from_secs(10), // 10 second timeout
- async {
- // Get the response from the destination service
- // We're willing to accept a response from any address associated with the target service
- let mut buffer = [0u8; 1024];
- loop {
- match socket.recv_from(&mut buffer).await {
- Ok((len, addr)) => {
- // Check if this is a response from our target (any port)
- //TODO: is this part safe?
- if addr.ip() == client_ip {
- DNS_REQUEST.with_label_values(&[&(addr.to_string()+"_udp")]).inc();
- if len == 0 {
- warn!(
- "Received null UDP response from {} at address {}",
- client_addr, addr
- );
- return None;
- } else {
- info!(
- "Received UDP response from {}({} bytes) at address:{}",
- client_addr, len, addr
- );
-
- let response_data = buffer[..len].to_vec();
- return Some(response_data);
- }
- } else {
- // Message from another address, ignore and keep waiting
- info!("Received message from unexpected address: {}, continuing to wait", addr);
- continue;
- }
- }
- Err(e) => {
- error!("Error receiving UDP response: {}", e);
- return None;
- }
- }
- }
- }
- ).await {
- Ok(Some(response_data)) => {
- let duration = start_time.elapsed().as_secs_f64();
- DNS_RESPONSE_TIME
- .with_label_values(&["service_discovery_udp"])
- .observe(duration);
-
- // Forward the response to the original client
- info!("Forwarding the response to the client: {client_addr}");
- if let Err(e) = socket.send_to(&response_data, client_addr).await {
- error!("Error sending UDP response to client: {}", e);
- } else {
- info!("Response forwarded to client {}", client_addr);
- }
-
- Some(response_data)
- }
- Ok(None) => {
- warn!("None UDP response from {}", target_addr);
- None
- }
- Err(e) => {
- error!("UDP response timed out with error: {}", e);
- None
- }
- }
- }
-}
diff --git a/core/src/components/loadbalancer/src/loadbalancer.rs b/core/src/components/loadbalancer/src/loadbalancer.rs
deleted file mode 100644
index fef95b0..0000000
--- a/core/src/components/loadbalancer/src/loadbalancer.rs
+++ /dev/null
@@ -1,170 +0,0 @@
-/* Implementation */
-//TODO: implement loadbalancer function
-
-use crate::discovery::ServiceDiscovery;
-use crate::messaging;
-use crate::messaging::MexType;
-use crate::messaging::{
- ignore_message_with_no_service, produce_incoming_message, produce_unknown_message,
- produce_unknown_message_udp, send_fail_ack_message, send_outcoming_message,
- send_outcoming_message_udp, send_success_ack_message,
-};
-use anyhow::{Error, Result};
-use aya::Ebpf;
-use anyhow::Context;
-use tokio::signal;
-use tokio::fs;
-use aya_log::EbpfLogger;
-use aya::programs::{Xdp, XdpFlags};
-use prometheus::{Encoder, TextEncoder};
-use std::sync::{Arc};
-use tokio::io::AsyncReadExt;
-use tokio::io::AsyncWriteExt;
-use tokio::net::UdpSocket;
-use tokio::net::{TcpListener, TcpStream};
-use tracing::{debug, warn,error, info};
-use warp::Filter;
-use shared::apiconfig::EdgeProxyConfig;
-use std::path::Path;
-use aya::maps::{HashMap as UserSpaceMap, MapData};
-use crate::shared_struct::BackendPorts;
-use tokio::sync::RwLock;
-
-
-const BPF_PATH : &str = "BPF_PATH";
-
-pub struct Loadbalancer<'a> {
- proxy_config: Arc,
- service_discovery: Arc>>,
- backends: Arc>>,
-}
-
-impl<'a> Loadbalancer<'a>{
-
- pub async fn new(proxycfg: EdgeProxyConfig,cache_map: ServiceDiscovery<'a>,backends_list: UserSpaceMap
-) -> Result {
- Ok(Loadbalancer {
- proxy_config: Arc::new(proxycfg),
- service_discovery: Arc::new(cache_map.into()),
- backends: Arc::new(backends_list.into()),
- })
- }
-
- pub async fn run(&self) -> Result<(), Error> {
- let bpf_path= std::env::var(BPF_PATH).context("BPF_PATH environment variable required")?;
- let data = fs::read(Path::new(&bpf_path)).await.context("failed to load file from path")?;
- let mut bpf = aya::Ebpf::load(&data).context("failed to load data from file")?;
- EbpfLogger::init(&mut bpf).context("Cannot initialize ebpf logger");
-
- //extract the bpf program "xdp-hello" from builded binaries
- info!("loading xdp program");
- let program: &mut Xdp = bpf.program_mut("xdp_hello").unwrap().try_into()?;
- program.load().context("Failed to laod XDP program")?; //load the program
-
- info!("Starting program");
- program
- .attach("eth0", XdpFlags::default())
- .context("Failed to attach XDP program with default flags to interface eth0")?;
- info!("Cortexflow Intelligent Loadbalancer is running");
-
- //waiting for signint (ctrl-c) to shutdown the program
- info!("Waiting for Ctrl-C...");
-
- // Start udpsocket
- let socket = UdpSocket::bind("0.0.0.0:5053").await?;
- info!("Udp socket bound to {}", socket.local_addr()?);
-
- let metrics_route = warp::path!("metrics").map(|| {
- let mut buffer = Vec::new();
- let encoder = TextEncoder::new();
- let metrics_families = prometheus::gather();
- encoder.encode(&metrics_families, &mut buffer).unwrap();
- warp::reply::with_header(
- String::from_utf8(buffer).unwrap(),
- "Content-Type",
- "text/plain; charset=utf-8",
- )
- });
-
- tokio::spawn(async move {
- warp::serve(metrics_route).run(([0, 0, 0, 0], 9090)).await;
- });
-
- let socket = Arc::new(socket);
- let socket_clone = socket.clone();
-
- let mut buffer = [0u8; 512];
- loop {
- match socket_clone.recv_from(&mut buffer).await {
- Ok((len, addr)) => {
- let query = &buffer[..len];
- info!("Received {} bytes from sender: {}", len, addr);
-
- let response = self
- .handle_udp_connection(query, &socket_clone, addr, 5053)
- .await;
-
- if let Err(e) = socket_clone.send_to(&response, addr).await {
- error!("Error sending response: {:?}", e);
- }
- }
- Err(e) => {
- error!("Error receiving packet: {}", e);
- }
- }
- }
- }
-
-
- pub async fn handle_udp_connection(
- &self,
- query: &[u8],
- socket: &UdpSocket,
- sender_addr: std::net::SocketAddr,
- port: i32,
- ) -> Vec {
- // Extract service name, direction, and payload
- let (direction, service_name, payload) =
- match messaging::extract_service_name_and_payload(query) {
- Some((direction, name, payload)) if !name.is_empty() => (direction, name, payload),
- _ => {
- error!("Invalid UDP request format");
- return Vec::new(); // Return an empty response
- }
- };
-
- let namespace = "cortexflow";
-
- match direction {
- MexType::Incoming => {
- info!(
- "([{}]->[{}]): Processing incoming UDP message from service: {}",
- sender_addr, service_name, sender_addr
- );
-
- // Use service discovery to resolve the request and forward response to client
- if let Some(response) = self
- .service_discovery
- .write()
- .await
- .wait_for_udp_response(&service_name, namespace, &payload, port, sender_addr)
- .await
- {
- if let Err(e) = socket.send_to(&response, sender_addr).await {
- error!(
- "([{}]->[{}]):Error sending UDP response : {}",
- service_name, sender_addr, e
- );
- }
- response
- } else {
- Vec::new() // Return empty if no response received
- }
- }
- MexType::Outcoming => {
- send_outcoming_message_udp(socket, service_name, sender_addr).await
- }
- _ => produce_unknown_message_udp(socket, service_name, sender_addr).await,
- }
- }
-}
diff --git a/core/src/components/loadbalancer/src/main.rs b/core/src/components/loadbalancer/src/main.rs
deleted file mode 100644
index 668f213..0000000
--- a/core/src/components/loadbalancer/src/main.rs
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Contains the Load balancer (CortexFlow Agent) user space code implementation.
- * The implementation leverages the power of bpf programs to interact with the internet interface
- * to distribute load accross multiple backends.
- * The program leverages bpf maps to enable the communication between Kernel Space and User Space
-
- * //TODO: Update the code to use the discovered services from the cortexflow identity service
- */
-
-/*
- * Annotations
-
- let kubeconfig_path = PathBuf::from("/home/cortexflow/.kube/config");
-*/
- /* annotations for permissions:
- sudo chmod 644 /home//.kube/config
- sudo chown : /home//.kube/config
-
- sudo mkdir -p /root/.kube
- sudo cp /home//.kube/config /root/.kube/config
-*/
-
-mod shared_struct;
-mod discovery;
-
-mod messaging;
-mod metrics;
-mod loadbalancer;
-
-use std::error;
-
-use anyhow::Context;
-use aya::programs::{Xdp, XdpFlags};
-use tokio::fs;
-use tokio::signal;
-use aya_log::EbpfLogger;
-use tracing::{error,info,warn};
-use std::path::Path;
-use std::path::PathBuf;
-
-use tracing_subscriber::fmt::format::FmtSpan;
-use tracing_subscriber::EnvFilter;
-
-use aya::maps::{HashMap as UserSpaceMap, MapData};
-use crate::shared_struct::{SVCKey,SVCValue,BackendPorts};
-use crate::loadbalancer::Loadbalancer;
-use crate::discovery::ServiceDiscovery;
-use shared::{apiconfig::EdgeProxyConfig, default_api_config::ConfigType};
-use k8s_openapi::api::core::v1::ConfigMap;
-use kube::{Client,Config, api::Api};
-use kube::config::Kubeconfig;
-
-
-unsafe impl aya::Pod for shared_struct::SVCKey {}
-unsafe impl aya::Pod for shared_struct::SVCValue {}
-unsafe impl aya::Pod for shared_struct::BackendPorts {}
-
-const BPF_PATH : &str = "BPF_PATH";
-
-
-//main program
-#[tokio::main]
-async fn main() -> Result<(), anyhow::Error> {
-
- // * init tracing subscriber
- tracing_subscriber::fmt()
- .with_max_level(tracing::Level::INFO)
- .with_target(false)
- .with_level(true)
- .with_span_events(FmtSpan::NONE)
- .without_time()
- .with_file(false)
- .pretty()
- .with_env_filter(EnvFilter::new("info"))
- .with_line_number(false)
- .init();
-
-
-
-
- // * loading the pre-built binaries--> reason: linux kernel does not accept non compiled code. only accepts bytecode
-
- info!("loading data");
- let bpf_path= std::env::var(BPF_PATH).context("BPF_PATH environment variable required")?;
- let data = fs::read(Path::new(&bpf_path)).await.context("failed to load file from path")?;
- info!("loading bpf data");
- let mut bpf = aya::Ebpf::load(&data).context("failed to load data from file")?;
- info!("loading maps data");
- let mut maps_owned = bpf.take_map("services").context("failed to take services map")?;
- info!("loading bpf backends map");
- let backend_map = bpf.take_map("Backend_ports").context("failed to take backends map")?;
-
-
- if Path::new("/sys/fs/bpf/services").exists(){
- warn!("map already pinned,skipping process");
- }
- else{
- maps_owned.pin("/sys/fs/bpf/services").context("failed to pin map")?;
- }
-
- info!("loading service map in user space");
- let mut service_map = UserSpaceMap::<&mut MapData, SVCKey, SVCValue>::try_from(&mut maps_owned)?;
- info!("loading backends in user space");
- let mut backends = UserSpaceMap::::try_from(backend_map)?;
-
- let mut ports = [0;4];
- ports[0] = 9876;
- ports[1] = 9877;
-
- let backend_ports = BackendPorts{
- ports:ports,
- index:0,
- };
- backends.insert(5053,backend_ports,0);
-
-
- //declare service discovery
- info!("Initializing service discovery");
- let service_discovery=ServiceDiscovery::new(&mut service_map).await?;
-
- info!("connecting to client");
- let client = Client::try_default().await?;
-
- info!("reading kubernetes configmap");
- let configmap: Api = Api::namespaced(client.clone(), "cortexflow");
-
- info!("Loading Loadbalancer configuration from configmap");
- let lbcfg = EdgeProxyConfig::load_from_configmap(configmap, ConfigType::Default).await?;
- info!("Initializing Loadbalancer");
- let loadbalancer = Loadbalancer::new(lbcfg,service_discovery,backends).await?;
- loadbalancer.run().await?;
-
- Ok(())
-}
diff --git a/core/src/components/loadbalancer/src/messaging.rs b/core/src/components/loadbalancer/src/messaging.rs
deleted file mode 100644
index a615638..0000000
--- a/core/src/components/loadbalancer/src/messaging.rs
+++ /dev/null
@@ -1,233 +0,0 @@
-/* Contains all the functions used to communicate between services */
-use base64::{Engine as _, engine::general_purpose::STANDARD};
-use serde::{Deserialize, Serialize};
-use serde_json::json;
-use tokio::net::UdpSocket;
-use tokio::{io::AsyncWriteExt, net::TcpStream};
-use tracing::{error, info, warn};
-
-/*
-Extract the service name and the payload from this format:
-
- .:
-
-Messages structure:
- payload
- direction
- service
-
-Message Type:
- Incoming
- Outcoming
- Unknown
-
-*/
-/*
- Messagging logic:
- A sends an "Incoming" message to B.
- B receives the message and processes it:
- - If the service is valid, B tries to get a response from service_discovery.
- - If it finds a response, B sends the "Outcoming" message with the correct payload.
- - If it does not find a response, it logs an error.
-
- A receives an "Outcoming" message with the service response payload.
- - If B receives an "Outcoming" message, it responds with {"status": "received"}.
-
-*/
-
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
-pub enum MexType {
- Incoming,
- Outcoming,
- Unknown,
-}
-
-#[derive(Debug, Serialize, Deserialize)]
-pub struct Message {
- payload: String, //TODO: consider using type Option so the payload cannot be present and the functions does not return errors
- service: String,
- direction: MexType,
-}
-
-pub fn extract_service_name_and_payload(
- msg_encrypted: &[u8],
-) -> Option<(MexType, String, Vec)> {
- // Convert the bytes in a UTF-8 String
- let message = match std::str::from_utf8(msg_encrypted) {
- Ok(msg) => {
- info!("{:?}", msg);
- msg
- }
- Err(e) => {
- error!("Invalid byte sequence: {}", e);
- return None;
- // return none for invalid byte sequence
- //TODO: add checks if the message is not a JSON
- }
- };
- decode_json_message(message)
-}
-
-// Parse the json message
-fn decode_json_message(message: &str) -> Option<(MexType, String, Vec)> {
- match serde_json::from_str::(message) {
- Ok(service_message) => {
- // Extract service name
- let service_name = service_message
- .service
- .split('.')
- .next()
- .unwrap_or("")
- .to_string();
- // Decode base64 payload
- match STANDARD.decode(&service_message.payload) {
- Ok(decoded_payload) => {
- info!("decoded payload:{:?}", decoded_payload);
- Some((service_message.direction, service_name, decoded_payload))
- }
- Err(e) => {
- error!("Invalid Payload: {}", e);
- None
- }
- }
- }
- Err(e) => {
- error!("Cannot decode JSON message: {:?}", e);
- None
- }
- }
-}
-
-// Create JSON message
-pub fn create_message(service: &str, direction: MexType, payload: &[u8]) -> Vec {
- let message = Message {
- direction,
- payload: STANDARD.encode(payload),
- service: service.to_string(),
- };
- match serde_json::to_string(&message) {
- Ok(json) => json.into_bytes(),
- Err(e) => {
- error!("Cannot serialize the message: {}", e);
- Vec::new() // Empty vector in case of error
- }
- }
-}
-//tcp connection method
-//TODO: debug tcp connection
-pub async fn send_outcoming_message(stream: &mut TcpStream, service_name: String) {
- info!("Producing outcoming message");
- info!(
- "([{}]->[{:?}]): Receiving outgoing message from: {}",
- service_name,
- stream.peer_addr(),
- service_name
- );
-
- // Send a response back
- let response_json = json!({ "status": "received" }).to_string();
- if let Err(e) = stream.write_all(response_json.as_bytes()).await {
- error!("Error sending JSON response to {}: {}", service_name, e);
- }
-}
-//udp connection method
-pub async fn send_outcoming_message_udp(
- socket: &UdpSocket,
- service_name: String,
- addr: std::net::SocketAddr,
-) -> Vec {
- info!(
- "([{}]->[{}]):Receiving outgoing message from: {}",
- service_name, addr, service_name
- );
-
- // Send a response back
- let response_json = json!({ "status": "received" }).to_string();
- if let Err(e) = socket.send_to(&response_json.as_bytes(), addr).await {
- error!(
- "([{}]->[{}]):Error sending JSON response to {}: {}",
- addr, service_name, service_name, e
- );
- }
- response_json.as_bytes().to_vec()
-}
-//tcp connection method
-//TODO: debug this method
-pub async fn produce_unknown_message(stream: &mut TcpStream, service_name: String) {
- warn!("Producing message from unknown direction");
- warn!(
- "Receiving message with unknown direction from {}",
- service_name
- );
- warn!("Ignoring the message with unknown direction");
-
- // Send a response back
- let response_json = json!({ "status": "received" }).to_string();
- if let Err(e) = stream.write_all(response_json.as_bytes()).await {
- error!("Error sending JSON response to {}: {}", service_name, e);
- }
-}
-//udp connection method
-pub async fn produce_unknown_message_udp(
- socket: &UdpSocket,
- service_name: String,
- addr: std::net::SocketAddr,
-) -> Vec {
- warn!(
- "Receiving message with unknown direction from {}",
- service_name
- );
- warn!("Ignoring the message with unknown direction");
-
- // Send a response back
- let response_json = json!({ "status": "received" }).to_string();
- if let Err(e) = socket.send_to(&response_json.as_bytes(), addr).await {
- error!("Error sending JSON response to {}: {}", service_name, e);
- }
- response_json.as_bytes().to_vec()
-}
-
-//tcp connection method
-//TODO: debug this method
-pub async fn produce_incoming_message(stream: &mut TcpStream, service_name: String) {
- info!("Producing Incoming response message");
- // return a status response
- let response_json = json!({"status":"received"}).to_string();
- info!(
- "Sending TCP response back to {} with content {}",
- service_name, response_json
- );
- let response_message =
- create_message(&service_name, MexType::Outcoming, response_json.as_bytes());
-
- if let Err(e) = stream.write_all(&response_message).await {
- error!("Error sending {:?} to {}", response_message, service_name);
- error!("Error: {}", e);
- }
-}
-//tcp connection method
-//TODO: debug this method
-pub async fn send_success_ack_message(stream: &mut TcpStream) {
- // ACK message
- let ack_message = b"Message Received";
- if let Err(e) = stream.write_all(ack_message).await {
- error!("Error sending TCP acknowledgment: {}", e);
- }
-}
-//tcp connection method
-//TODO: debug this method
-pub async fn send_fail_ack_message(stream: &mut TcpStream) {
- // ACK message
- let ack_message = b"Delivery failed";
- if let Err(e) = stream.write_all(ack_message).await {
- error!("Error sending TCP acknowledgment: {}", e);
- }
-}
-//tcp connnection method
-//TODO: debug this method
-pub fn ignore_message_with_no_service(direction: MexType, payload: &[u8]) {
- info!(
- "Ignoring message with direction {:?}: {:?}",
- direction, payload
- );
-}
diff --git a/core/src/components/loadbalancer/src/metrics.rs b/core/src/components/loadbalancer/src/metrics.rs
deleted file mode 100644
index 09bd189..0000000
--- a/core/src/components/loadbalancer/src/metrics.rs
+++ /dev/null
@@ -1,25 +0,0 @@
-use lazy_static::lazy_static;
-use prometheus::{register_histogram_vec, register_int_counter_vec, HistogramVec, IntCounterVec};
-
-
-lazy_static!{
- /* One of the main benefits of using lazy_static is the ability to store thread-safe global variables.
- Because lazy static values are initialized in a thread-safe manner, they can be safely accessed
- from multiple threads without the need for additional synchronization.
- This can be especially useful in cases where you want to avoid the overhead of locking and unlocking shared resources.
-
- lazy static documentation: https://blog.logrocket.com/rust-lazy-static-pattern/#how-lazy-static-works
- */
-
- pub static ref DNS_REQUEST: IntCounterVec = register_int_counter_vec!(
- "total_dns_requests",
- "Total_DNS_Requests",
- &["client_ip"]
- ).unwrap();
-
- pub static ref DNS_RESPONSE_TIME: HistogramVec = register_histogram_vec!(
- "dns_response_time",
- "DNS_response_time",
- &["server"]
- ).unwrap();
-}
diff --git a/core/src/components/loadbalancer/src/mod.rs b/core/src/components/loadbalancer/src/mod.rs
deleted file mode 100644
index 7e3fd1d..0000000
--- a/core/src/components/loadbalancer/src/mod.rs
+++ /dev/null
@@ -1,5 +0,0 @@
-pub mod loadbalancer;
-pub mod shared_struct;
-pub mod discovery;
-pub mod metrics;
-pub mod messaging;
\ No newline at end of file
diff --git a/core/src/components/loadbalancer/src/shared_struct.rs b/core/src/components/loadbalancer/src/shared_struct.rs
deleted file mode 100644
index b34f2e8..0000000
--- a/core/src/components/loadbalancer/src/shared_struct.rs
+++ /dev/null
@@ -1,41 +0,0 @@
-#![no_std]
-
-use bytemuck::{Pod, Zeroable};
-
-//ref: /maps/map.rs/SVCKey
-#[repr(C)]
-#[derive(Clone, Copy, Pod, Zeroable,Debug)]
-pub struct SVCKey {
- pub service_name: [u8; 64],
-}
-
-//ref: /maps/map.rs/SVCValue
-#[repr(C)]
-#[derive(Clone, Copy, Pod, Zeroable,Debug)]
-pub struct SVCValue {
- pub ip: [u8; 4],
- pub port: u32,
-}
-//ref: /map/maps.rs/BackendPorts
-#[repr(C)]
-#[derive(Clone,Debug,Pod,Zeroable,Copy)]
-pub struct BackendPorts{
- pub ports: [u16;4],
- pub index: usize
-}
-
-//ref: /map/map.rs/str_to_u8_64
-pub fn str_to_u8_64(s: &str) -> [u8; 64] {
- let mut buf = [0u8; 64];
- let bytes = s.as_bytes();
- let len = bytes.len().min(64);
- buf[..len].copy_from_slice(&bytes[..len]);
- buf
-}
-pub fn str_to_u8_4(s: &str) -> [u8; 4] {
- let mut buf = [0u8; 4];
- let bytes = s.as_bytes();
- let len = bytes.len().min(4);
- buf[..len].copy_from_slice(&bytes[..len]);
- buf
-}
\ No newline at end of file
diff --git a/core/src/components/maps/Cargo.toml b/core/src/components/maps/Cargo.toml
deleted file mode 100644
index 1b8206e..0000000
--- a/core/src/components/maps/Cargo.toml
+++ /dev/null
@@ -1,36 +0,0 @@
-[package]
-name = "maps"
-version = "0.1.0"
-edition = "2021"
-
-[lib]
-name = "maps"
-path = "src/lib.rs"
-
-[dependencies]
-aya-ebpf = { git = "https://github.com/aya-rs/aya" }
-aya-log-ebpf = { git = "https://github.com/aya-rs/aya" }
-bytemuck = {version ="1.23.0",features = ["derive"]}
-network-types = "0.0.8"
-
-[build-dependencies]
-which = { version = "8.0.0", default-features = false }
-
-[[bin]]
-name = "xdp-map"
-path = "src/map.rs"
-
-[profile.dev]
-panic = "abort"
-
-[profile.release]
-panic = "abort"
-
-[target.bpfel-unknown-none]
-linker = "bpf-linker"
-rustflags = [
- "-C", "panic=abort",
- "-C", "target-feature=+alu32",
- "-C", "link-args=-znotext",
-]
-
diff --git a/core/src/components/maps/build.rs b/core/src/components/maps/build.rs
deleted file mode 100644
index f83c317..0000000
--- a/core/src/components/maps/build.rs
+++ /dev/null
@@ -1,17 +0,0 @@
-use which::which;
-
-/// Building this crate has an undeclared dependency on the `bpf-linker` binary. This would be
-/// better expressed by [artifact-dependencies][bindeps] but issues such as
-/// https://github.com/rust-lang/cargo/issues/12385 make their use impractical for the time being.
-///
-/// This file implements an imperfect solution: it causes cargo to rebuild the crate whenever the
-/// mtime of `which bpf-linker` changes. Note that possibility that a new bpf-linker is added to
-/// $PATH ahead of the one used as the cache key still exists. Solving this in the general case
-/// would require rebuild-if-changed-env=PATH *and* rebuild-if-changed={every-directory-in-PATH}
-/// which would likely mean far too much cache invalidation.
-///
-/// [bindeps]: https://doc.rust-lang.org/nightly/cargo/reference/unstable.html?highlight=feature#artifact-dependencies
-fn main() {
- let bpf_linker = which("bpf-linker").unwrap();
- println!("cargo:rerun-if-changed={}", bpf_linker.to_str().unwrap());
-}
diff --git a/core/src/components/maps/src/lib.rs b/core/src/components/maps/src/lib.rs
deleted file mode 100644
index 844d23a..0000000
--- a/core/src/components/maps/src/lib.rs
+++ /dev/null
@@ -1,6 +0,0 @@
-#![no_std]
-#![no_main]
-
-pub mod map;
-
-
diff --git a/core/src/components/maps/src/map.rs b/core/src/components/maps/src/map.rs
deleted file mode 100644
index 21f1b39..0000000
--- a/core/src/components/maps/src/map.rs
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Init a bpf map to save the user space pod data
- * to use in the kernel space and user space
-*/
-#![no_std]
-#![no_main]
-
-use aya_ebpf::maps::PerfEventArray;
-use aya_ebpf::{macros::map, maps::HashMap as KernelSpaceMap}; //aya_ebpf is the kernel space libary
-use bytemuck::{Pod, Zeroable};
-
-/* unsafe impl Zeroable for SVCKey {} //implemente zeroable
-unsafe impl Zeroable for SVCValue {} */
-
-#[repr(C)]
-/*
- * match the C fields alignment. tells the compiler that the rappresentation
- * must follow the C rules. disable the rust compiler realignment
- *
- *In this case Rust struct and C are unitary equivalent
-*/
-#[derive(Clone, Copy, Pod, Zeroable, Debug)]
-pub struct SVCKey {
- pub service_name: [u8; 64],
-}
-
-#[repr(C)] //match the C fields alignment
-#[derive(Clone, Copy, Pod, Zeroable, Debug)]
-pub struct SVCValue {
- pub ip: [u8; 4],
- pub port: u32,
-}
-
-#[repr(C)]
-#[derive(Clone, Debug, Pod, Zeroable, Copy)]
-pub struct BackendPorts {
- pub ports: [u16; 4],
- pub index: usize,
-}
-
-//enable Pod (Plain of data) data type
-/* unsafe impl Pod for SVCKey {}
-unsafe impl Pod for SVCValue {} */
-/*
- * Doc:
- * POD (Plain Old Data) types are marked with the trait, indicating that they can be
- * duplicated simply by copying their memory representation.
- *
- * This trait allows the Rust compiler to efficiently handle data creating bit-for-bit copies
- * without invoking user-defined methods
- * POD types do not involve pointers or complex data structures, they are easier to manage in terms of
- * memory allocation and deallocation
-
-*/
-
-/*
- * Maps
-*/
-
-#[map(name = "services")]
-/*
- * connect the map name "SERVICES" to the HasMap in the BPF bytecode
- * init a BPF_MAP_HASH_TYPE to store the resolved service values
-*/
-pub static mut SERVICES: KernelSpaceMap =
- KernelSpaceMap::with_max_entries(1024, 0);
-
-#[map(name = "Backend_ports")]
-/*
- * connect the map name "BACKEND_PORTS" to the HasMap in the BPF bytecode
- * init a BPF_MAP_HASH_TYPE to store the resolved service values
-*/
-pub static mut BACKEND_PORTS: KernelSpaceMap =
- KernelSpaceMap::with_max_entries(10, 0);
-
-/*Aux Functions */
-
-//perform &str types to &[u8;64]
-pub fn str_to_u8_64(s: &str) -> [u8; 64] {
- let mut buf = [0u8; 64];
- let bytes = s.as_bytes();
- let len = bytes.len().min(64);
- buf[..len].copy_from_slice(&bytes[..len]);
- buf
-}
-pub fn u32_to_u8_4(s: u32) -> [u8; 4] {
- //32 bit ---> 4 bytes
- let mut buf = [0u8; 4];
- let bytes = s.to_le_bytes(); // this produce [u8,4]
- buf[..4].copy_from_slice(&bytes); // Only copy the first 4 bytes
- buf
-}
-
-pub fn u32_to_u8_64(s: u32) -> [u8; 64] {
- //32 bit ---> 4 bytes
- let mut buf = [0u8; 64];
- let bytes = s.to_le_bytes(); //this produce [u8,4]
- buf[..4].copy_from_slice(&bytes); // Only copy the first 4 bytes
- buf
-}
diff --git a/core/src/components/metrics_tracer/Cargo.toml b/core/src/components/metrics_tracer/Cargo.toml
index 0660036..4816770 100644
--- a/core/src/components/metrics_tracer/Cargo.toml
+++ b/core/src/components/metrics_tracer/Cargo.toml
@@ -7,7 +7,6 @@ edition = "2024"
aya-ebpf = { git = "https://github.com/aya-rs/aya" }
aya-log-ebpf = { git = "https://github.com/aya-rs/aya" }
bytemuck = {version ="1.23.0",features = ["derive"]}
-network-types = "0.0.8"
[build-dependencies]
which = { version = "7.0.0", default-features = false }
diff --git a/core/src/components/proxy/Cargo.toml b/core/src/components/proxy/Cargo.toml
deleted file mode 100644
index 296e6d9..0000000
--- a/core/src/components/proxy/Cargo.toml
+++ /dev/null
@@ -1,46 +0,0 @@
-[package]
-name = "proxy"
-version = "0.1.0"
-edition = "2024"
-
-[dependencies]
-anyhow = "1.0.95"
-base64 = "0.22.1"
-dashmap = "6.1.0"
-hyper = "1.6.0"
-k8s-openapi = "0.25.0"
-kube = "1.1.0"
-kube-runtime = "1.1.0"
-lazy_static = "1.5.0"
-prometheus = "0.14.0"
-serde = "1.0.219"
-serde_json = "1.0.140"
-tokio = "1.43.0"
-tracing = "0.1.41"
-tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
-warp = "0.3.7"
-time = "=0.3.41"
-aya-ebpf = { git = "https://github.com/aya-rs/aya" }
-aya-log-ebpf = { git = "https://github.com/aya-rs/aya" }
-bytemuck = "1.22.0"
-aya = "0.13.1"
-once_cell = { version = "1.20.3" }
-
-[dependencies.shared]
-path = "../../shared"
-
-[build-dependencies]
-which = { version = "8.0.0", default-features = false }
-
-[[bin]]
-name = "proxy"
-path = "src/main.rs"
-
-[profile.dev]
-panic = "abort"
-
-[profile.release]
-panic = "abort"
-
-[patch.crates-io]
-once_cell = "1.20.3"
diff --git a/core/src/components/proxy/src/main.rs b/core/src/components/proxy/src/main.rs
deleted file mode 100644
index 30ed7db..0000000
--- a/core/src/components/proxy/src/main.rs
+++ /dev/null
@@ -1,42 +0,0 @@
-//TODO: basic proxy functionalities
-//TODO: add integration with prometheus logging system
-//TODO: add load balancer between dns servers
-
-mod discovery;
-mod map;
-mod messaging;
-mod metrics;
-mod proxy;
-
-use k8s_openapi::api::core::v1::ConfigMap;
-use kube::{Client, api::Api};
-use proxy::Proxy;
-use shared::{apiconfig::EdgeProxyConfig, default_api_config::ConfigType};
-use tracing_subscriber::EnvFilter;
-use tracing_subscriber::fmt::format::FmtSpan;
-
-#[tokio::main]
-async fn main() -> Result<(), anyhow::Error> {
- tracing_subscriber::fmt()
- .with_max_level(tracing::Level::INFO)
- .with_target(false)
- .with_level(true)
- .with_span_events(FmtSpan::NONE)
- .without_time()
- .with_file(false)
- .pretty()
- .with_env_filter(EnvFilter::new("info"))
- .with_line_number(false)
- .init();
-
- let client = Client::try_default().await?;
- let configmap: Api = Api::namespaced(client.clone(), "cortexflow");
-
- let proxycfg = EdgeProxyConfig::load_from_configmap(configmap, ConfigType::Default).await?;
- let proxy = Proxy::new(proxycfg).await?;
-
- proxy.start().await?;
- proxy.get_info().await;
-
- Ok(())
-}
diff --git a/core/src/components/proxy/src/mod.rs b/core/src/components/proxy/src/mod.rs
deleted file mode 100644
index 6f11795..0000000
--- a/core/src/components/proxy/src/mod.rs
+++ /dev/null
@@ -1,5 +0,0 @@
-pub mod discovery;
-pub mod map;
-pub mod messaging;
-pub mod metrics;
-pub mod proxy;
diff --git a/core/src/components/proxy/src/proxy.rs b/core/src/components/proxy/src/proxy.rs
deleted file mode 100644
index 0d7f317..0000000
--- a/core/src/components/proxy/src/proxy.rs
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
-Cortexflow proxy is the main proxy in cortexbrain. Features:
-- Caching β
//TODO: refer to bug (line 67)
-- UDP/TCP traffic β
-- Automatic prometheus metrics export β
-- Load balancing β
-- Service discovery β
-*/
-use crate::discovery::ServiceDiscovery;
-use crate::messaging;
-use crate::messaging::MexType;
-use crate::messaging::{
- ignore_message_with_no_service, produce_incoming_message, produce_unknown_message,
- produce_unknown_message_udp, send_fail_ack_message, send_outcoming_message,
- send_outcoming_message_udp, send_success_ack_message,
-};
-use anyhow::{Error, Result};
-use aya::Ebpf;
-use prometheus::{Encoder, TextEncoder};
-use shared::apiconfig::EdgeProxyConfig;
-use std::sync::Arc;
-use tokio::io::AsyncReadExt;
-use tokio::io::AsyncWriteExt;
-use tokio::net::UdpSocket;
-use tokio::net::{TcpListener, TcpStream};
-use tracing::{debug, error, info};
-use warp::Filter;
-#[derive(Clone)]
-pub struct Proxy {
- proxy_config: Arc,
- service_discovery: Arc,
-}
-
-impl Proxy {
- pub async fn new(proxycfg: EdgeProxyConfig) -> Result {
- let service_discovery = ServiceDiscovery::new(&mut Ebpf).await?;
- Ok(Proxy {
- proxy_config: Arc::new(proxycfg),
- service_discovery: Arc::new(service_discovery),
- })
- }
-
- pub async fn get_info(&self) {
- info!("Enable: {:?}", self.proxy_config.enable);
- info!("Listen interface: {:?}", self.proxy_config.listen_interface);
- }
-
- pub async fn start(&self) -> Result<(), Error> {
- if !self.proxy_config.enable {
- error!("Proxy is not running");
- return Ok(());
- }
- self.run().await
- }
-
- //TODO: a code refactoring needed here
- pub async fn run(&self) -> Result<(), Error> {
- debug!("Cortexflow Proxy is running");
-
- // Start udpsocket
- let socket = UdpSocket::bind("0.0.0.0:5053").await?;
- debug!("Socket bound to {}", socket.local_addr()?);
-
- // Start tcp_listener
- let tcp_listener = TcpListener::bind("0.0.0.0:5054").await?;
- debug!("Tcp listener bound to {}", tcp_listener.local_addr()?);
-
- //TODO:fix caching bug
- /*
- Bug description: the caching system use the udp resolved endpoint
- when a tcp communication is performed
-
- Solution to do:
- implement a system that can recognize and block the use of udp endpoints
- while performing a tcp communication
-
- Additional info:
- TCP port: 5054
- UDP port : 5053
-
- */
- //start the cache
- //let cache = Arc::new(DashMap::new());
- //let cache_clone = cache.clone();
-
- let metrics_route = warp::path!("metrics").map(|| {
- let mut buffer = Vec::new();
- let encoder = TextEncoder::new();
- let metrics_families = prometheus::gather();
- encoder.encode(&metrics_families, &mut buffer).unwrap();
- warp::reply::with_header(
- String::from_utf8(buffer).unwrap(),
- "Content-Type",
- "text/plain; charset=utf-8",
- )
- });
-
- tokio::spawn(async move {
- warp::serve(metrics_route).run(([0, 0, 0, 0], 9090)).await;
- });
-
- // Clone all the necessary for the tcp task
- let proxy_clone = self.clone();
-
- tokio::spawn(async move {
- while let Ok((stream, _)) = tcp_listener.accept().await {
- //let cache = cache_clone.clone();
- let proxy = proxy_clone.clone();
-
- tokio::spawn(async move {
- Self::handle_tcp_connection(proxy, stream, 5054).await;
- });
- }
- });
-
- let socket = Arc::new(socket);
- let socket_clone = socket.clone();
-
- let mut buffer = [0u8; 512];
- loop {
- match socket_clone.recv_from(&mut buffer).await {
- Ok((len, addr)) => {
- let query = &buffer[..len];
- info!("Received {} bytes from sender: {}", len, addr);
-
- let response = self
- .handle_udp_connection(query, &socket_clone, addr, 5053)
- .await;
-
- if let Err(e) = socket_clone.send_to(&response, addr).await {
- error!("Error sending response: {:?}", e);
- }
- }
- Err(e) => {
- error!("Error receiving packet: {}", e);
- }
- }
- }
- }
-
- pub async fn handle_udp_connection(
- &self,
- query: &[u8],
- socket: &UdpSocket,
- sender_addr: std::net::SocketAddr,
- port: i32,
- ) -> Vec {
- // Extract service name, direction, and payload
- let (direction, service_name, payload) =
- match messaging::extract_service_name_and_payload(query) {
- Some((direction, name, payload)) if !name.is_empty() => (direction, name, payload),
- _ => {
- error!("Invalid UDP request format");
- return Vec::new(); // Return an empty response
- }
- };
-
- let namespace = "cortexflow";
-
- match direction {
- MexType::Incoming => {
- info!(
- "([{}]->[{}]): Processing incoming UDP message from service: {}",
- sender_addr, service_name, sender_addr
- );
-
- // Use service discovery to resolve the request and forward response to client
- if let Some(response) = self
- .service_discovery
- .wait_for_udp_response(&service_name, namespace, &payload, port, sender_addr)
- .await
- {
- if let Err(e) = socket.send_to(&response, sender_addr).await {
- error!(
- "([{}]->[{}]):Error sending UDP response : {}",
- service_name, sender_addr, e
- );
- }
- response
- } else {
- Vec::new() // Return empty if no response received
- }
- }
- MexType::Outcoming => {
- send_outcoming_message_udp(socket, service_name, sender_addr).await
- }
- _ => produce_unknown_message_udp(socket, service_name, sender_addr).await,
- }
- }
-
- // handles the tcp connection
- pub async fn handle_tcp_connection(proxy: Self, mut stream: TcpStream, port: i32) {
- let sender_addr = stream.peer_addr();
- info!("Debugging sender address: {:?}", sender_addr);
- let mut buffer = [0u8; 1024];
-
- match stream.read(&mut buffer).await {
- Ok(size) if size > 0 => {
- let query = &buffer[..size];
- info!("Received query: {:?}", query);
-
- match messaging::extract_service_name_and_payload(query) {
- Some((direction, service_name, payload)) if !service_name.is_empty() => {
- let namespace = "cortexflow";
-
- match direction {
- MexType::Incoming => {
- info!(
- "([{:?}]->[{}]):Processing request for service: {}",
- sender_addr, service_name, service_name
- );
-
- // Forward the response to the client
- if let Some(response) = proxy
- .service_discovery
- .send_tcp_request(&service_name, namespace, &payload, port)
- .await
- {
- info!(
- "([{}]->[{:?}]): Sending response back to client",
- service_name, sender_addr
- );
- if let Err(e) = stream.write_all(&response).await {
- error!("Failed to send response: {}", e);
- }
- } else {
- error!(
- "Service {} not found in namespace {}",
- service_name, namespace
- );
- }
- }
- MexType::Outcoming => {
- info!(
- "([{}]->[{:?}]) Processing outgoing message for {}",
- service_name, sender_addr, service_name
- );
- send_outcoming_message(&mut stream, service_name).await;
- }
- _ => {
- produce_unknown_message(&mut stream, service_name).await;
- }
- }
-
- send_success_ack_message(&mut stream).await;
- }
- _ => {
- error!("Invalid or empty service name in request");
- send_fail_ack_message(&mut stream).await;
- }
- }
- }
- Ok(_) => {
- info!("Received empty message");
- send_success_ack_message(&mut stream).await;
- }
- Err(e) => {
- error!("Error: {}", e);
- send_fail_ack_message(&mut stream).await;
- }
- }
- }
-}
diff --git a/core/src/components/xdp/Cargo.toml b/core/src/components/xdp/Cargo.toml
deleted file mode 100644
index 253d6d6..0000000
--- a/core/src/components/xdp/Cargo.toml
+++ /dev/null
@@ -1,37 +0,0 @@
-[package]
-name = "xdp"
-version = "0.1.0"
-edition = "2021"
-
-[lib]
-name = "xdp"
-path = "src/lib.rs"
-
-[dependencies]
-maps ={ path = "../maps"}
-aya-ebpf = { git = "https://github.com/aya-rs/aya" }
-aya-log-ebpf = { git = "https://github.com/aya-rs/aya" }
-bytemuck = {version ="1.23.0",features = ["derive"]}
-network-types = "0.0.8"
-
-[build-dependencies]
-which = { version = "8.0.0", default-features = false }
-
-[[bin]]
-name = "xdp-filter"
-path = "src/filter.rs"
-
-[profile.dev]
-panic = "abort"
-
-[profile.release]
-panic = "abort"
-
-[target.bpfel-unknown-none]
-linker = "bpf-linker"
-rustflags = [
- "-C", "panic=abort",
- "-C", "target-feature=+alu32",
- "-C", "link-args=-znotext",
-]
-
diff --git a/core/src/components/xdp/build.rs b/core/src/components/xdp/build.rs
deleted file mode 100644
index f83c317..0000000
--- a/core/src/components/xdp/build.rs
+++ /dev/null
@@ -1,17 +0,0 @@
-use which::which;
-
-/// Building this crate has an undeclared dependency on the `bpf-linker` binary. This would be
-/// better expressed by [artifact-dependencies][bindeps] but issues such as
-/// https://github.com/rust-lang/cargo/issues/12385 make their use impractical for the time being.
-///
-/// This file implements an imperfect solution: it causes cargo to rebuild the crate whenever the
-/// mtime of `which bpf-linker` changes. Note that possibility that a new bpf-linker is added to
-/// $PATH ahead of the one used as the cache key still exists. Solving this in the general case
-/// would require rebuild-if-changed-env=PATH *and* rebuild-if-changed={every-directory-in-PATH}
-/// which would likely mean far too much cache invalidation.
-///
-/// [bindeps]: https://doc.rust-lang.org/nightly/cargo/reference/unstable.html?highlight=feature#artifact-dependencies
-fn main() {
- let bpf_linker = which("bpf-linker").unwrap();
- println!("cargo:rerun-if-changed={}", bpf_linker.to_str().unwrap());
-}
diff --git a/core/src/components/xdp/build.sh b/core/src/components/xdp/build.sh
deleted file mode 100755
index a4376a8..0000000
--- a/core/src/components/xdp/build.sh
+++ /dev/null
@@ -1,2 +0,0 @@
-echo "π Building xdp"
-cargo +nightly build -Z build-std=core --target bpfel-unknown-none --release --bin xdp-filter
diff --git a/core/src/components/xdp/requirements.txt b/core/src/components/xdp/requirements.txt
deleted file mode 100644
index 726852e..0000000
--- a/core/src/components/xdp/requirements.txt
+++ /dev/null
@@ -1,4 +0,0 @@
-To correctly build the component you must have bpf-linker installed
-
-installation:
-cargo install --git https://github.com/aya-rs/bpf-linker
diff --git a/core/src/components/xdp/src/filter.rs b/core/src/components/xdp/src/filter.rs
deleted file mode 100644
index 685ac1b..0000000
--- a/core/src/components/xdp/src/filter.rs
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Contains the code for the kernel xdp manipulation. this code lives in
- * the kernel space only and needs to be attached to a "main" program that lives in the user space
-*/
-
-#![no_std] // * no standard library
-#![no_main] // * no main entrypoint
-
-use aya_ebpf::{bindings::xdp_action, macros::xdp, programs::XdpContext};
-use aya_log_ebpf::{debug, error, info};
-
-use core::mem;
-use maps::map::{SVCKey, SVCValue, SERVICES};
-use network_types::{
- eth::{EthHdr, EtherType},
- ip::{IpProto, Ipv4Hdr},
- tcp::TcpHdr,
- udp::UdpHdr,
-};
-
-/*
-* init xdp program
-*/
-#[xdp]
-pub fn xdp_hello(ctx: XdpContext) -> u32 {
- match unsafe { xdp_firewall(&ctx) } {
- Ok(ret) => ret,
- Err(_) => xdp_action::XDP_ABORTED,
- }
-}
-
-unsafe fn init_xdp(ctx: &XdpContext) -> Result {
- info!(ctx, "Received a packet");
- Ok(xdp_action::XDP_PASS)
-}
-
-#[panic_handler]
-fn panic(_info: &core::panic::PanicInfo) -> ! {
- loop {}
-}
-
-// * getting packet data from raw packets
-#[inline(always)] //inline
-fn ptr_at(ctx: &XdpContext, offset: usize) -> Result<*const T, ()> {
- let start = ctx.data();
- let end = ctx.data_end();
- let len = mem::size_of::();
-
- if start + offset + len > end {
- return Err(());
- }
- Ok((start + offset) as *const T)
-}
-
-//TODO:safe the result of the firewall into a bpf hash map and perform a redirect
-/*
-* XDP firewall
-* Usage:
-* 1. Drop packets from the 443 port (only fo development test )
-* 2. Log TCP and UDP traffic
-* 3. //TODO: rebuild firewall policy to efficiently filter traffic
-* 4. //TODO: use ConnArray declared in conntracker program to discover services and implement policies
-*/
-fn xdp_firewall(ctx: &XdpContext) -> Result {
- let ethhdr: *const EthHdr = ptr_at(ctx, 0)?;
- match unsafe { (*ethhdr).ether_type } {
- EtherType::Ipv4 => {}
- _ => return Ok(xdp_action::XDP_PASS),
- }
-
- let ipv4hdr: *const Ipv4Hdr = ptr_at(ctx, EthHdr::LEN)?;
- let source_addr = u32::from_be_bytes(unsafe { (*ipv4hdr).src_addr });
-
- // handle protocols
- match unsafe { (*ipv4hdr).proto } {
- IpProto::Tcp => {
- let tcphdr: *const TcpHdr = ptr_at(ctx, EthHdr::LEN + Ipv4Hdr::LEN)?;
- let port = u16::from_be(unsafe { (*tcphdr).source });
- if port == 443 {
- return Ok(xdp_action::XDP_PASS);
- } else {
- info!(
- ctx,
- "Received TCP packet from IP: {:i} PORT: {}", source_addr, port
- );
- }
- }
- IpProto::Udp => {
- let udphdr: *const UdpHdr = ptr_at(ctx, EthHdr::LEN + Ipv4Hdr::LEN)?;
- let port = u16::from_be_bytes(unsafe { (*udphdr).source });
- if port == 443 {
- return Ok(xdp_action::XDP_PASS);
- } else {
- info!(
- ctx,
- "Received UDP packet from IP: {:i} PORT: {}", source_addr, port
- );
- //TODO: saving the packet address here will only store the ip and port
- // of the gateway (in this case the minikube node ip)--> i need to save the ip in the user space
- debug!(
- ctx,
- "Inserting key: {:i} and value {:i} into the services bpf map",
- source_addr,
- port as u32
- );
- let key = SVCKey {
- service_name: maps::map::u32_to_u8_64(source_addr),
- };
- let value = SVCValue {
- ip: maps::map::u32_to_u8_4(source_addr.into()),
- port: port as u32,
- };
- let res = unsafe { SERVICES.insert(&key, &value, 0) };
- match res {
- Ok(_) => {
- return Ok(xdp_action::XDP_PASS);
- }
- Err(_) => {
- error!(ctx, "Error inserting element into bpf map");
- return Err(());
- }
- }
- }
- }
- _ => return Ok(xdp_action::XDP_DROP),
- };
-
- Ok(xdp_action::XDP_PASS)
-}
diff --git a/core/src/components/xdp/src/lib.rs b/core/src/components/xdp/src/lib.rs
deleted file mode 100644
index 3950067..0000000
--- a/core/src/components/xdp/src/lib.rs
+++ /dev/null
@@ -1,6 +0,0 @@
-#![no_std]
-#![no_main]
-
-
-pub mod filter;
-
diff --git a/core/src/components/xdp/src/mod.rs b/core/src/components/xdp/src/mod.rs
deleted file mode 100644
index 52f4d03..0000000
--- a/core/src/components/xdp/src/mod.rs
+++ /dev/null
@@ -1,5 +0,0 @@
-#![no_std]
-#![no_main]
-
-pub mod map;
-pub mod filter;
\ No newline at end of file