Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 55 additions & 2 deletions src/critic/cli.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,55 @@
def main():
print('Hello from', __name__)
import os

import click

from critic.monitor_utility import create_monitors, delete_monitors
from critic.tables import UptimeMonitorTable


@click.group()
def cli():
pass


def _default_monitor_table_name() -> str:
# Allow overriding the exact DynamoDB table name if needed.
# Otherwise use the generic Critic table name.
return os.environ.get('MONITOR_TABLE_NAME', UptimeMonitorTable.name())


@cli.command('create-monitors')
@click.option('--project-id', required=True)
@click.option('--prefix', default='demo', show_default=True)
@click.option('--count', default=10, type=int, show_default=True)
@click.option('--table-name', default=None)
def create_monitors_cmd(project_id: str, prefix: str, count: int, table_name: str | None):
# Failsafe added to use default table name if not provided.
table_name = table_name or _default_monitor_table_name()
created = create_monitors(
table_name=table_name,
project_id=project_id,
prefix=prefix,
count=count,
ddb=None,
)
click.echo(f'Created {created} monitors in {table_name}')


@cli.command('delete-monitors')
@click.option('--project-id', required=True)
@click.option('--prefix', required=True)
@click.option('--table-name', default=None)
def delete_monitors_cmd(project_id: str, prefix: str, table_name: str | None):
table_name = table_name or _default_monitor_table_name()
deleted = delete_monitors(
table_name=table_name,
project_id=project_id,
prefix=prefix,
ddb=None,
)
click.echo(f'Deleted {deleted} monitors from {table_name}')


if __name__ == '__main__':
# Allows running CLI commands directly.
cli()
80 changes: 80 additions & 0 deletions src/critic/monitor_utility.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import os

import boto3
from boto3.dynamodb.conditions import Key

# from critic.libs.ddb import floats_to_decimals
from critic.models import UptimeMonitorModel
from critic.tables import UptimeMonitorTable


def _table(table_name: str, ddb: boto3.resources.base.ServiceResource | None = None):
if ddb is None:
ddb = boto3.resource(
'dynamodb',
region_name=os.environ.get('AWS_DEFAULT_REGION', 'us-east-1'),
)
return ddb.Table(table_name)


def _monitor_item(project_id: str, slug: str) -> dict:
# Build monitor data that passes UptimeMonitorModel validation.
inst = UptimeMonitorModel(
project_id=project_id,
slug=slug,
url='https://www.google.com',
frequency_mins=5,
next_due_at='2025-11-10T20:35:00Z',
timeout_secs=30,
assertions={'status_code': 200},
failures_before_alerting=2,
alert_slack_channels=[],
alert_emails=[],
realert_interval_mins=60,
)

# Resource API expects plain python types, NOT DynamoDB AttributeValue format.
plain = inst.model_dump(mode='json', exclude_none=True)
return plain


def create_monitors(
table_name: str,
project_id: str,
prefix: str,
count: int,
ddb: boto3.resources.base.ServiceResource | None = None,
) -> int:
for i in range(1, count + 1):
slug = f'{prefix}-{i:04d}'
item = _monitor_item(project_id=project_id, slug=slug)

# Use table extraction. Fall back to boto3 only if override is necessary
if table_name == UptimeMonitorTable.name():
UptimeMonitorTable.put(item)
else:
_table(table_name, ddb).put_item(Item=item)

return count


def delete_monitors(
table_name: str,
project_id: str,
prefix: str,
ddb: boto3.resources.base.ServiceResource | None = None,
) -> int:
t = _table(table_name, ddb)

resp = t.query(
KeyConditionExpression=Key('project_id').eq(project_id),
ProjectionExpression='project_id, slug',
)
items = resp.get('Items', [])
to_delete = [item for item in items if item['slug'].startswith(prefix)]

with t.batch_writer() as bw:
for item in to_delete:
bw.delete_item(Key={'project_id': item['project_id'], 'slug': item['slug']})

return len(to_delete)
4 changes: 4 additions & 0 deletions tests/critic_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ def moto_for_unit_tests(request):
clear_tables()
else:
# Unit test → activate moto
# Set default env vars for moto if not already set
os.environ.setdefault('AWS_DEFAULT_REGION', 'us-east-1')
os.environ.setdefault('CRITIC_NAMESPACE', 'critic-test-')
with mock_aws():
ddb_module._ddb_client = None # Reset cached client before tables are created
create_tables()
yield
# The DDB module is designed to cache the client. When we're testing unit tests and
Expand Down
13 changes: 10 additions & 3 deletions tests/critic_tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
from critic.cli import main
from click.testing import CliRunner

from critic.cli import cli

def test_main():
main()

# Changed the tests for cli to actually verify that the cli is functioning as intended.
def test_cli_help():
runner = CliRunner()
result = runner.invoke(cli, ['--help'])
assert result.exit_code == 0
assert 'create-monitors' in result.output
assert 'delete-monitors' in result.output
68 changes: 68 additions & 0 deletions tests/critic_tests/test_monitor_utility.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from critic.libs.ddb import deserialize, get_client, serialize
from critic.monitor_utility import create_monitors, delete_monitors
from critic.tables import UptimeMonitorTable


def _query_project_items(table_name: str, project_id: str) -> list[dict]:
resp = get_client().query(
TableName=table_name,
KeyConditionExpression='project_id = :project_id',
ExpressionAttributeValues=serialize(
{
':project_id': project_id,
}
),
)
return [deserialize(item) for item in resp.get('Items', [])]


def test_create_and_delete_monitors():
# Test creating and deleting monitors in DynamoDB.
table_name = UptimeMonitorTable.name()

project_id = '00000000-0000-0000-0000-000000000001'
prefix = 'stress'

created = create_monitors(
table_name=table_name,
project_id=project_id,
prefix=prefix,
count=10,
ddb=None,
)
assert created == 10

items = _query_project_items(table_name, project_id)
assert len(items) == 10

deleted = delete_monitors(
table_name=table_name,
project_id=project_id,
prefix=prefix,
ddb=None,
)
assert deleted == 10

items_after = _query_project_items(table_name, project_id)
assert items_after == []


def test_delete_only_matches_prefix():
# Test that delete_monitors only deletes items matching the given prefix.
table_name = UptimeMonitorTable.name()

project_id = '00000000-0000-0000-0000-000000000001'
prefix = 'stress'

create_monitors(table_name, project_id, prefix, 5, ddb=None)
create_monitors(table_name, project_id, 'other', 3, ddb=None)

items_before = _query_project_items(table_name, project_id)
assert len(items_before) == 8

deleted = delete_monitors(table_name, project_id, prefix, ddb=None)
assert deleted == 5

items_after = _query_project_items(table_name, project_id)
remaining_slugs = {item['slug'] for item in items_after}
assert remaining_slugs == {f'other-{i:04d}' for i in range(1, 4)}