Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions prometheus.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
global:
scrape_interval: 15s # By default, scrape targets every 15 seconds.

# Attach these labels to any time series or alerts when communicating with
# external systems (federation, remote storage, Alertmanager).
external_labels:
monitor: 'codelab-monitor'

# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
- job_name: 'prometheus'

# Override the global default and scrape targets from this job every 5 seconds.
scrape_interval: 5s

static_configs:
- targets: ['localhost:9090']
5 changes: 5 additions & 0 deletions scl/cap_reg.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import logging
from typing import List, Dict
from scl.meta.capability import Capability
from scl.otel.metric_decorator import record_latency
from scl.otel.otel import search_time_histogram, tool_execute_time_histogram, cap_search_result_count

# Add the StructuredContextLanguage directory to the path
scl_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
Expand Down Expand Up @@ -46,10 +48,12 @@ def get_cap_by_name(self, name)-> Capability:
## RAG search between context and function description after embedding
## Return function in openAI tool format
@tracer.start_as_current_span("getCapsBySimilarity")
@record_latency(search_time_histogram, cap_search_result_count)
def getCapsBySimilarity(self, msg: Msg, limit=5, min_similarity=0.5) -> Dict[str, Capability]:
return self.cap_store.search_by_similarity(msg, limit, min_similarity)

@tracer.start_as_current_span("invoke_cap_safe")
@record_latency(tool_execute_time_histogram)
def call_cap_safe(self, cap: Capability, args_dict=None):
## todo replace by https://github.com/langchain-ai/langchain-sandbox?
## todo replace by e2b?
Expand All @@ -71,5 +75,6 @@ def record(self, msg: Msg, cap: Capability):
return self.cap_store.record(msg, cap)

@tracer.start_as_current_span("getCapsByHistory")
@record_latency(search_time_histogram, cap_search_result_count)
def getCapsByHistory(self, msg: Msg, limit=5, min_similarity=0.5) -> Dict[str, Capability]:
return self.cap_store.getCapsByHistory(msg, limit, min_similarity)
6 changes: 6 additions & 0 deletions scl/llm_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from scl.cap_reg import CapRegistry
from scl.meta.msg import Msg
from scl.config import config
from scl.otel.otel import cap_use_hit_count, cap_duplicate_count, cap_total_count

## why not we just prvide the metrics and leave the function to user themself?
## using hooks to provide user capbility to overwrite the default behavior
Expand Down Expand Up @@ -46,6 +47,8 @@ def send_messages(
**({} if tools_autonomy is None else tools_autonomy),
**({} if tools_history is None else tools_history)
}
cap_total_count = len(tools_merged)
cap_duplicate_count = len(tools_named) + len(tools_autonomy) + len(tools_history) - cap_total_count
## metrics tool number,metrics as duplicate number?
## or a cache for duplicate info
tools = []
Expand Down Expand Up @@ -88,6 +91,7 @@ def function_call_playground(
## todo-> debug/trace
logging.info(response)
if response.tool_calls:
cap_use_hit_count = len(response.tool_calls)
for tool_call in response.tool_calls:
## metric accuery for each search? from LLM, back to cap_reg's cache
func1_name = tool_call.function.name
Expand All @@ -104,4 +108,6 @@ def function_call_playground(
msg.append_cap_result(func1_out, tool_call.id)
## metric execution time
response = send_messages(client, model, cap_registry, ToolNames, msg, turns)
else:
cap_use_hit_count = 0
return response.content
26 changes: 26 additions & 0 deletions scl/otel/metric_decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import functools
from typing import Callable, Any

def record_latency(histogram, count):
"""记录函数执行时间的装饰器"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Any:
# 获取直方图(假设在闭包或全局可访问)
# 或者可以传递histogram对象
start_time = time.perf_counter()

try:
result = func(*args, **kwargs)
#print(f"Function {func.__name__} returned {result}")
# todo
if count is not None:
count = len(result)
return result
finally:
end_time = time.perf_counter()
duration = end_time - start_time
histogram.record(duration, {"function": func.__name__})

return wrapper
return decorator
110 changes: 110 additions & 0 deletions scl/otel/otel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import os
import logging
from typing import Iterable

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
ConsoleSpanExporter,
)

from opentelemetry import metrics
from opentelemetry.sdk.metrics import (
MeterProvider,
CallbackOptions,
Observation
)
from opentelemetry.sdk.metrics.export import (
ConsoleMetricExporter,
PeriodicExportingMetricReader,
)
from scl.config import config

# Try to import OTLP exporters, fallback to console if not available
try:
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
OTLP_AVAILABLE = True
except ImportError:
OTLP_AVAILABLE = False
logging.info("OTLP exporter not available, falling back to console exporter")

# 创建TracerProvider
provider = TracerProvider()

# Use OTLP exporter if available, otherwise use console exporter
if OTLP_AVAILABLE:
OTLP_ENDPOINT = config.otlp_endpoint
span_exporter = OTLPSpanExporter(
endpoint=f"{OTLP_ENDPOINT}/v1/traces"
)
else:
span_exporter = ConsoleSpanExporter()

# 创建并添加span processor
span_processor = BatchSpanProcessor(span_exporter)
provider.add_span_processor(span_processor)

# 设置全局tracer provider
trace.set_tracer_provider(provider)

# 创建tracer实例 - 建议使用模块名或应用名
tracer = trace.get_tracer("scl")

# Use OTLP exporter for metrics if available, otherwise use console exporter
if OTLP_AVAILABLE:
metric_exporter = OTLPMetricExporter()
else:
metric_exporter = ConsoleMetricExporter()

metric_reader = PeriodicExportingMetricReader(metric_exporter)
provider = MeterProvider(metric_readers=[metric_reader])

# Sets the global default meter provider
metrics.set_meter_provider(provider)

# Creates a meter from the global meter provider
meter = metrics.get_meter("scl")

## a key value map
## map
### key as str in name,desc format
### value as object
### value can be a number for gauge
### value can be a histogram

# Define metrics
search_time_histogram = meter.create_histogram(
name="cap.search.time",
description="Time taken for search operations",
explicit_bucket_boundaries_advisory=[1.0, 5.0, 10.0],
unit="s"
)

tool_execute_time_histogram = meter.create_histogram(
name="cap.execute.time",
description="Time taken for cap execution",
explicit_bucket_boundaries_advisory=[1.0, 5.0, 10.0],
unit="s"
)

## todo make it a map for search
cap_search_result_count=0
cap_total_count=0
cap_duplicate_count=0
cap_use_hit_count=0

def observable_cap_gauge_func(options: CallbackOptions) -> Iterable[Observation]:
## for loop map in ... value as int
yield Observation(cap_search_result_count, {"type": "cap search count"})
#yield Observation(cap_total_count, {"type": "cap total count"})
#yield Observation(cap_duplicate_count, {"type": "cap duplicate count"})
#yield Observation(cap_use_hit_count, {"type": "cap use hit count"})

cap_gauge = meter.create_observable_gauge(
name="cap.gauge",
callbacks=[observable_cap_gauge_func],
description="gauge related with cap",
unit="1"
)
51 changes: 0 additions & 51 deletions scl/trace.py

This file was deleted.