Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions mrok/agent/devtools/inspector/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from textual.worker import get_current_worker

from mrok import __version__
from mrok.proxy.datastructures import Event, HTTPHeaders, HTTPResponse, WorkerMetrics, ZitiMrokMeta
from mrok.proxy.models import Event, HTTPHeaders, HTTPResponse, ServiceMetadata, WorkerMetrics


def build_tree(node, data):
Expand Down Expand Up @@ -185,7 +185,7 @@ def on_mount(self) -> None:
# mem=int(mean([m.process.mem for m in self.workers_metrics.values()])),
# )

def update_meta(self, meta: ZitiMrokMeta) -> None:
def update_meta(self, meta: ServiceMetadata) -> None:
table = self.query_one(DataTable)
if len(table.rows) == 0:
table.add_row("URL", f"https://{meta.extension}.{meta.domain}")
Expand Down
2 changes: 1 addition & 1 deletion mrok/agent/sidecar/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from httpcore import AsyncConnectionPool

from mrok.proxy.app import ProxyAppBase
from mrok.proxy.types import Scope
from mrok.types.proxy import Scope

logger = logging.getLogger("mrok.agent")

Expand Down
4 changes: 4 additions & 0 deletions mrok/agent/sidecar/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def __init__(
identity_file: str,
target: str | Path | tuple[str, int],
workers: int = 4,
events_enabled: bool = True,
max_connections: int | None = 10,
max_keepalive_connections: int | None = None,
keepalive_expiry: float | None = None,
Expand All @@ -24,6 +25,7 @@ def __init__(
identity_file,
workers=workers,
reload=False,
events_enabled=events_enabled,
events_pub_port=publishers_port,
events_sub_port=subscribers_port,
)
Expand All @@ -47,6 +49,7 @@ def run(
identity_file: str,
target_addr: str | Path | tuple[str, int],
workers: int = 4,
events_enabled: bool = True,
max_connections: int | None = 10,
max_keepalive_connections: int | None = None,
keepalive_expiry: float | None = None,
Expand All @@ -58,6 +61,7 @@ def run(
identity_file,
target_addr,
workers=workers,
events_enabled=events_enabled,
max_connections=max_connections,
max_keepalive_connections=max_keepalive_connections,
keepalive_expiry=keepalive_expiry,
Expand Down
2 changes: 1 addition & 1 deletion mrok/agent/ziticorn.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from mrok.proxy.master import MasterBase
from mrok.proxy.types import ASGIApp
from mrok.types.proxy import ASGIApp


class ZiticornAgent(MasterBase):
Expand Down
5 changes: 3 additions & 2 deletions mrok/cli/commands/admin/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@

from mrok.cli.commands.admin.utils import parse_tags
from mrok.conf import Settings
from mrok.ziti.api import TagsType, ZitiClientAPI, ZitiManagementAPI
from mrok.types.ziti import Tags
from mrok.ziti.api import ZitiClientAPI, ZitiManagementAPI
from mrok.ziti.bootstrap import bootstrap_identity

logger = logging.getLogger(__name__)


async def bootstrap(
settings: Settings, forced: bool, tags: TagsType | None
settings: Settings, forced: bool, tags: Tags | None
) -> tuple[str, dict[str, Any] | None]:
async with ZitiManagementAPI(settings) as mgmt_api, ZitiClientAPI(settings) as client_api:
return await bootstrap_identity(
Expand Down
4 changes: 2 additions & 2 deletions mrok/cli/commands/admin/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import typer

from mrok.ziti.api import TagsType
from mrok.types.ziti import Tags


def parse_tags(pairs: list[str] | None) -> TagsType | None:
def parse_tags(pairs: list[str] | None) -> Tags | None:
if not pairs:
return None

Expand Down
9 changes: 9 additions & 0 deletions mrok/cli/commands/agent/run/sidecar.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ def run_sidecar(
show_default=True,
),
] = 50001,
no_events: Annotated[
bool,
typer.Option(
"--no-events",
help="Disable events. Default: False",
show_default=True,
),
] = False,
):
"""Run a Sidecar Proxy to expose a web application through OpenZiti."""
if ":" in str(target):
Expand All @@ -110,6 +118,7 @@ def run_sidecar(
str(identity_file),
target_addr,
workers=workers,
events_enabled=not no_events,
max_connections=max_connections,
max_keepalive_connections=max_keepalive_connections,
keepalive_expiry=keepalive_expiry,
Expand Down
4 changes: 2 additions & 2 deletions mrok/controller/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
computed_field,
)

from mrok.ziti.api import TagsType
from mrok.types.ziti import Tags


class BaseSchema(BaseModel):
model_config = ConfigDict(from_attributes=True, extra="ignore")
tags: TagsType | None = None
tags: Tags | None = None


class IdSchema(BaseModel):
Expand Down
2 changes: 1 addition & 1 deletion mrok/frontend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from mrok.proxy.app import ProxyAppBase
from mrok.proxy.backend import AIOZitiNetworkBackend
from mrok.proxy.exceptions import InvalidTargetError
from mrok.proxy.types import Scope
from mrok.types.proxy import Scope

RE_SUBDOMAIN = re.compile(r"(?i)^(?:EXT-\d{4}-\d{4}|INS-\d{4}-\d{4}-\d{4})$")

Expand Down
4 changes: 2 additions & 2 deletions mrok/proxy/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from httpcore import AsyncConnectionPool, Request

from mrok.proxy.exceptions import ProxyError
from mrok.proxy.streams import ASGIRequestBodyStream
from mrok.proxy.types import ASGIReceive, ASGISend, Scope
from mrok.proxy.stream import ASGIRequestBodyStream
from mrok.types.proxy import ASGIReceive, ASGISend, Scope

logger = logging.getLogger("mrok.proxy")

Expand Down
8 changes: 4 additions & 4 deletions mrok/proxy/asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from contextlib import AsyncExitStack, asynccontextmanager
from typing import Any, ParamSpec, Protocol

from mrok.proxy.types import ASGIApp, ASGIReceive, ASGISend, Lifespan, Scope
from mrok.types.proxy import ASGIApp, ASGIReceive, ASGISend, Lifespan, Scope

P = ParamSpec("P")

Expand Down Expand Up @@ -57,9 +57,9 @@ async def merge_lifespan(self, app: ASGIApp):
if self.lifespan is not None:
outer_state = await stack.enter_async_context(self.lifespan(app))
state.update(outer_state or {})
starlette_lifesapn = self.get_starlette_lifespan()
if starlette_lifesapn is not None:
inner_state = await stack.enter_async_context(starlette_lifesapn(app))
starlette_lifespan = self.get_starlette_lifespan()
if starlette_lifespan is not None:
inner_state = await stack.enter_async_context(starlette_lifespan(app))
state.update(inner_state or {})
yield state

Expand Down
2 changes: 1 addition & 1 deletion mrok/proxy/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from openziti.context import ZitiContext

from mrok.proxy.exceptions import InvalidTargetError, TargetUnavailableError
from mrok.proxy.streams import AIONetworkStream
from mrok.proxy.stream import AIONetworkStream


class AIOZitiNetworkBackend(AsyncNetworkBackend):
Expand Down
66 changes: 66 additions & 0 deletions mrok/proxy/event_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import asyncio
import contextlib
import logging

import zmq
import zmq.asyncio

from mrok.proxy.asgi import ASGIAppWrapper
from mrok.proxy.metrics import MetricsCollector
from mrok.proxy.middleware import CaptureMiddleware, MetricsMiddleware
from mrok.proxy.models import Event, HTTPResponse, ServiceMetadata, Status
from mrok.types.proxy import ASGIApp

logger = logging.getLogger("mrok.proxy")


class EventPublisher:
def __init__(
self,
worker_id: str,
meta: ServiceMetadata | None = None,
event_publisher_port: int = 50000,
metrics_interval: float = 5.0,
):
self._worker_id = worker_id
self._meta = meta
self._metrics_interval = metrics_interval
self.publisher_port = event_publisher_port
self._zmq_ctx = zmq.asyncio.Context()
self._publisher = self._zmq_ctx.socket(zmq.PUB)
self._metrics_collector = MetricsCollector(self._worker_id)
self._publish_task = None

async def on_startup(self):
self._publisher.connect(f"tcp://localhost:{self.publisher_port}")
self._publish_task = asyncio.create_task(self.publish_metrics_event())
logger.info(f"Events publishing for worker {self._worker_id} started")

async def on_shutdown(self):
self._publish_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._publish_task
self._publisher.close()
self._zmq_ctx.term()
logger.info(f"Events publishing for worker {self._worker_id} stopped")

async def publish_metrics_event(self):
while True:
snap = await self._metrics_collector.snapshot()
event = Event(type="status", data=Status(meta=self._meta, metrics=snap))
await self._publisher.send_string(event.model_dump_json())
await asyncio.sleep(self._metrics_interval)

async def publish_response_event(self, response: HTTPResponse):
event = Event(type="response", data=response)
await self._publisher.send_string(event.model_dump_json()) # type: ignore[attr-defined]

def setup_middleware(self, app: ASGIAppWrapper):
app.add_middleware(CaptureMiddleware, self.publish_response_event)
app.add_middleware(MetricsMiddleware, self._metrics_collector) # type: ignore

@contextlib.asynccontextmanager
async def lifespan(self, app: ASGIApp):
await self.on_startup() # type: ignore
yield
await self.on_shutdown() # type: ignore
10 changes: 0 additions & 10 deletions mrok/proxy/lifespan.py

This file was deleted.

4 changes: 2 additions & 2 deletions mrok/proxy/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

from mrok.conf import get_settings
from mrok.logging import setup_logging
from mrok.proxy.types import ASGIApp
from mrok.proxy.worker import Worker
from mrok.types.proxy import ASGIApp

logger = logging.getLogger("mrok.agent")

Expand Down Expand Up @@ -67,7 +67,7 @@ def start_uvicorn_worker(
app,
identity_file,
events_enabled=events_enabled,
events_publisher_port=events_pub_port,
event_publisher_port=events_pub_port,
metrics_interval=metrics_interval,
)
worker.run()
Expand Down
4 changes: 2 additions & 2 deletions mrok/proxy/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import psutil
from hdrh.histogram import HdrHistogram

from mrok.proxy.datastructures import (
from mrok.proxy.models import (
DataTransferMetrics,
ProcessMetrics,
RequestsMetrics,
Expand Down Expand Up @@ -49,7 +49,7 @@ async def get_process_metrics(interval: float = 0.1) -> ProcessMetrics:
return await asyncio.to_thread(_collect_process_usage, interval)


class WorkerMetricsCollector:
class MetricsCollector:
def __init__(self, worker_id: str, lowest=1, highest=60000, sigfigs=3):
self.worker_id = worker_id
self.total_requests = 0
Expand Down
10 changes: 5 additions & 5 deletions mrok/proxy/middlewares.py → mrok/proxy/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@
import time

from mrok.proxy.constants import MAX_REQUEST_BODY_BYTES, MAX_RESPONSE_BODY_BYTES
from mrok.proxy.datastructures import FixedSizeByteBuffer, HTTPHeaders, HTTPRequest, HTTPResponse
from mrok.proxy.metrics import WorkerMetricsCollector
from mrok.proxy.types import (
from mrok.proxy.metrics import MetricsCollector
from mrok.proxy.models import FixedSizeByteBuffer, HTTPHeaders, HTTPRequest, HTTPResponse
from mrok.proxy.utils import must_capture_request, must_capture_response
from mrok.types.proxy import (
ASGIApp,
ASGIReceive,
ASGISend,
Message,
ResponseCompleteCallback,
Scope,
)
from mrok.proxy.utils import must_capture_request, must_capture_response

logger = logging.getLogger("mrok.proxy")

Expand Down Expand Up @@ -98,7 +98,7 @@ async def send_wrapper(msg: Message):


class MetricsMiddleware:
def __init__(self, app: ASGIApp, metrics: WorkerMetricsCollector):
def __init__(self, app: ASGIApp, metrics: MetricsCollector):
self.app = app
self.metrics = metrics

Expand Down
16 changes: 8 additions & 8 deletions mrok/proxy/datastructures.py → mrok/proxy/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pydantic_core import core_schema


class ZitiId(BaseModel):
class X509Credentials(BaseModel):
key: str
cert: str
ca: str
Expand All @@ -21,7 +21,7 @@ def strip_pem_prefix(cls, value: str) -> str:
return value


class ZitiMrokMeta(BaseModel):
class ServiceMetadata(BaseModel):
model_config = ConfigDict(extra="ignore")
identity: str
extension: str
Expand All @@ -30,21 +30,21 @@ class ZitiMrokMeta(BaseModel):
tags: dict[str, str | bool | None] | None = None


class ZitiIdentity(BaseModel):
class Identity(BaseModel):
model_config = ConfigDict(extra="ignore")
zt_api: str = Field(validation_alias="ztAPI")
id: ZitiId
id: X509Credentials
zt_apis: str | None = Field(default=None, validation_alias="ztAPIs")
config_types: str | None = Field(default=None, validation_alias="configTypes")
enable_ha: bool = Field(default=False, validation_alias="enableHa")
mrok: ZitiMrokMeta | None = None
mrok: ServiceMetadata | None = None

@staticmethod
def load_from_file(path: str | Path) -> ZitiIdentity:
def load_from_file(path: str | Path) -> Identity:
path = Path(path)
with path.open("r", encoding="utf-8") as f:
data = json.load(f)
return ZitiIdentity.model_validate(data)
return Identity.model_validate(data)


class FixedSizeByteBuffer:
Expand Down Expand Up @@ -183,7 +183,7 @@ class WorkerMetrics(BaseModel):

class Status(BaseModel):
type: Literal["status"] = "status"
meta: ZitiMrokMeta
meta: ServiceMetadata
metrics: WorkerMetrics


Expand Down
11 changes: 0 additions & 11 deletions mrok/proxy/protocol.py

This file was deleted.

Loading
Loading