Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion awx/main/tests/functional/api/test_inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from awx.api.versioning import reverse

from awx.main.models import InventorySource, Inventory, ActivityStream
from awx.main.models import InventorySource, Inventory, ActivityStream, Organization
from awx.main.utils.inventory_vars import update_group_variables


Expand Down Expand Up @@ -963,3 +963,45 @@ def test_interleaved_deletions(self, inventory, patch, admin_user, inventory_sou
# Test step 6: Value of var x from source A reappears, because the
# latest update from source B did not contain var x.
self.update_and_verify(inv_src_c, {}, expect={"x": 1}, teststep=6)


@pytest.mark.django_db
def test_inventory_names_unique_per_organization(post, admin_user):
"""Validate that two inventories can have the same name if they belong to different organizations."""
org1 = Organization.objects.create(name='org-inv-1')
org2 = Organization.objects.create(name='org-inv-2')
inv_name = 'SharedInventoryName'

# Create inventory with same name in org1
resp1 = post(
reverse('api:inventory_list'),
{'name': inv_name, 'organization': org1.id},
admin_user,
expect=201,
)
inv1_id = resp1.data['id']

# Create inventory with same name in org2 - should succeed
resp2 = post(
reverse('api:inventory_list'),
{'name': inv_name, 'organization': org2.id},
admin_user,
expect=201,
)
inv2_id = resp2.data['id']

assert inv1_id != inv2_id
inv1 = Inventory.objects.get(id=inv1_id)
inv2 = Inventory.objects.get(id=inv2_id)
assert inv1.name == inv2.name == inv_name
assert inv1.organization.id == org1.id
assert inv2.organization.id == org2.id

# Attempt to create another inventory with same name in org1 - should fail
resp3 = post(
reverse('api:inventory_list'),
{'name': inv_name, 'organization': org1.id},
admin_user,
expect=400,
)
assert 'Inventory with this Name and Organization already exists' in json.dumps(resp3.data)
92 changes: 92 additions & 0 deletions awx/main/tests/functional/api/test_notification_templates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# -*- coding: utf-8 -*-
import pytest

from awx.api.versioning import reverse
from awx.main.models import NotificationTemplate, Organization


@pytest.mark.django_db
def test_notification_template_names_unique_per_organization(post, admin_user):
"""
Validate that notification templates must have unique names within an organization,
but can have the same name across different organizations.
"""
org1 = Organization.objects.create(name='org-notif-1')
org2 = Organization.objects.create(name='org-notif-2')
template_name = 'SharedNotificationName'

# Create notification template in org1
resp1 = post(
reverse('api:notification_template_list'),
{
'name': template_name,
'organization': org1.id,
'notification_type': 'email',
'notification_configuration': {
'username': 'user@example.com',
'password': 'pass',
'sender': 'sender@example.com',
'recipients': ['recipient@example.com'],
'host': 'smtp.example.com',
'port': 25,
'use_tls': False,
'use_ssl': False,
},
},
admin_user,
expect=201,
)
template1_id = resp1.data['id']

# Create notification template with same name in org2 - should succeed
resp2 = post(
reverse('api:notification_template_list'),
{
'name': template_name,
'organization': org2.id,
'notification_type': 'email',
'notification_configuration': {
'username': 'user@example.com',
'password': 'pass',
'sender': 'sender@example.com',
'recipients': ['recipient@example.com'],
'host': 'smtp.example.com',
'port': 25,
'use_tls': False,
'use_ssl': False,
},
},
admin_user,
expect=201,
)
template2_id = resp2.data['id']

assert template1_id != template2_id
template1 = NotificationTemplate.objects.get(id=template1_id)
template2 = NotificationTemplate.objects.get(id=template2_id)
assert template1.name == template2.name == template_name
assert template1.organization.id == org1.id
assert template2.organization.id == org2.id

# Attempt to create another notification template with same name in org1 - should fail
resp3 = post(
reverse('api:notification_template_list'),
{
'name': template_name,
'organization': org1.id,
'notification_type': 'email',
'notification_configuration': {
'username': 'user@example.com',
'password': 'pass',
'sender': 'sender@example.com',
'recipients': ['recipient@example.com'],
'host': 'smtp.example.com',
'port': 25,
'use_tls': False,
'use_ssl': False,
},
},
admin_user,
expect=400,
)
assert 'Notification template with this Organization and Name already exists' in str(resp3.data)
45 changes: 45 additions & 0 deletions awx/main/tests/functional/test_bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from awx.main.models import Organization, Inventory, WorkflowJob, ExecutionEnvironment, Host
from awx.main.scheduler import TaskManager

from django.test import override_settings


@pytest.mark.django_db
@pytest.mark.parametrize('num_hosts, num_queries', [(1, 15), (10, 15)])
Expand Down Expand Up @@ -445,3 +447,46 @@ def get_inventory_hosts(get, inv_id, use_user):
data = get(reverse('api:inventory_hosts_list', kwargs={'pk': inv_id}), use_user, expect=200).data
results = [host['id'] for host in data['results']]
return results


@pytest.mark.django_db
def test_bulk_job_launch_respects_settings_limit(job_template, organization, inventory, project, post, patch, get, user):
"""Test that bulk job launch respects BULK_JOB_MAX_LAUNCH setting."""
normal_user = user('normal_user', False)
organization.member_role.members.add(normal_user)

jt = JobTemplate.objects.create(
name='bulk-test-jt',
ask_inventory_on_launch=True,
project=project,
playbook='helloworld.yml',
allow_simultaneous=True,
)
jt.execute_role.members.add(normal_user)
inventory.use_role.members.add(normal_user)

# Test with limit set to 3
with override_settings(BULK_JOB_MAX_LAUNCH=3):
# Attempt to launch 5 jobs when limit is 3 - should fail
jobs = [{'unified_job_template': jt.id, 'inventory': inventory.id} for _ in range(5)]
resp = post(
reverse('api:bulk_job_launch'),
{'name': 'Bulk Job Test', 'jobs': jobs},
normal_user,
expect=400,
)
assert 'Number of requested jobs exceeds system setting' in str(resp.data)

# Test with limit increased to 10
with override_settings(BULK_JOB_MAX_LAUNCH=10):
# Now launching 5 jobs should succeed
jobs = [{'unified_job_template': jt.id, 'inventory': inventory.id} for _ in range(5)]
resp = post(
reverse('api:bulk_job_launch'),
{'name': 'Bulk Job Test', 'jobs': jobs},
normal_user,
expect=201,
)
bulk_job = get(resp.data['url'], normal_user, expect=200).data
# Verify the workflow job was created
assert bulk_job['name'] == 'Bulk Job Test'
Loading