diff --git a/fixtures/policy_test.py b/fixtures/policy_test.py index b92aace27..d3f2e57a0 100644 --- a/fixtures/policy_test.py +++ b/fixtures/policy_test.py @@ -41,7 +41,9 @@ def __init__(self, policy_name, rules_list, inputs, connections, api=None): def setUp(self): super(PolicyFixture, self).setUp() - if self.api_flag is None: + if self.api_flag is not None: + self._create_policy_api(self.policy_name, self.rules_list) + else: self.policy_obj = self.quantum_fixture.get_policy_if_present( self.project_name, self.policy_name) if not self.policy_obj: @@ -54,11 +56,8 @@ def setUp(self): self.logger.info( 'Policy %s already present, not creating any policy' % (self.policy_name)) - self.policy_fq_name = self.quantum_fixture.get_policy_fq_name( self.policy_obj) - else: - self._create_policy_api(self.policy_name, self.rules_list) # end setUp def verify_on_setup(self): @@ -857,4 +856,57 @@ def verify_policy_in_control_nodes(self): self.logger.info("verification: %s, status: %s" % (me, result)) return {'result': result, 'msg': err_msg} # end verify_policy_in_control_node + + def verify_policy_in_api_quantum_server( + self, + api_policy_obj, + quantum_policy_obj): + '''Validate policy information in API-Server. Compare data with quantum based policy fixture data. + Check specifically for following: + api_server_keys: 1> fq_name, 2> uuid, 3> rules + quantum_fixture_keys: 1> policy_fq_name, 2> id in policy_obj, 3> policy_obj [for rules] + ''' + me = inspect.getframeinfo(inspect.currentframe())[2] + result = True + err_msg = [] + out = None + self.logger.info("====Verifying data for %s in API_Server ======" % + (api_policy_obj.fq_name[2])) + self.api_s_policy_obj = self.api_s_inspect.get_cs_policy( + domain=api_policy_obj.fq_name[0], + project=api_policy_obj.fq_name[1], + policy=api_policy_obj.fq_name[2], + refresh=True) + self.api_s_policy_obj_x = self.api_s_policy_obj['network-policy'] + + # compare policy_fq_name + out = policy_test_utils.compare_args( + 'policy_fq_name', + api_policy_obj.fq_name, + quantum_policy_obj['policy']['fq_name']) + if out: + err_msg.append(out) + # compare policy_uuid + out = policy_test_utils.compare_args( + 'policy_uuid', + api_policy_obj.uuid, + quantum_policy_obj['policy']['id']) + if out: + err_msg.append(out) + # compare policy_rules + out = policy_test_utils.compare_args( + 'policy_rules', self.api_s_policy_obj_x[ + 'network_policy_entries']['policy_rule'], + quantum_policy_obj['policy']['entries']['policy_rule']) + if out: + err_msg.append(out) + + if err_msg != []: + result = False + err_msg.insert( + 0, me + ":" + api_policy_obj.fq_name[2]) + self.logger.info("verification: %s, status: %s message: %s" % + (me, result, err_msg)) + return {'result': result, 'msg': err_msg} + # end verify_policy_in_api_quantum_server # end PolicyFixture diff --git a/scripts/policy/test_policy_api.py b/scripts/policy/test_policy_api.py index 2f77ab378..989a69732 100644 --- a/scripts/policy/test_policy_api.py +++ b/scripts/policy/test_policy_api.py @@ -1,17 +1,15 @@ import policy_test_utils -from vn_test import * -from vm_test import * -from policy_test import * +from vn_test import VNFixture +from vm_test import VMFixture +from policy_test import PolicyFixture from tcutils.wrappers import preposttest_wrapper from vnc_api import vnc_api from vnc_api.gen.resource_test import * -from sdn_topo_setup import * +from sdn_topo_setup import sdnTopoSetupFixture from .base import BasePolicyTest from common import isolated_creds import inspect -import fixtures -from policy_test import * class TestApiPolicyFixture01(BasePolicyTest): @@ -74,7 +72,7 @@ def test_create_api_policy(self): proj = self.vnc_lib.project_read(self.project.project_fq_name) # creaete VN - vn_blue_obj = VirtualNetwork(vn1_name, proj) + vn_blue_obj = vnc_api.VirtualNetwork(vn1_name, proj) vn_id = self.vnc_lib.virtual_network_create(vn_blue_obj) self.logger.info("VN %s is created using API Server" % vn1_name) @@ -108,8 +106,8 @@ def test_create_api_policy(self): self.assertIsNotNone(pol, "policy is not present on API server") vn_blue_obj.add_network_policy( policy_fixt1.policy_obj, - VirtualNetworkPolicyType( - sequence=SequenceType( + vnc_api.VirtualNetworkPolicyType( + sequence=vnc_api.SequenceType( major=0, minor=0))) self.vnc_lib.virtual_network_update(vn_blue_obj) @@ -141,7 +139,7 @@ def test_create_api_policy(self): pol.name) self.assertIsNotNone(policy_in_quantum, "policy is not present on quantum server") - assert self.verify_policy_in_api_quantum_server(pol, policy_in_quantum) + assert policy_fixt1.verify_policy_in_api_quantum_server(pol, policy_in_quantum) self.logger.info("policy %s is verified on API Server" % policy_name) # delete vn @@ -195,7 +193,7 @@ def test_associate_disassociate_api_policy(self): proj = self.vnc_lib.project_read(self.project.project_fq_name) # creaete VN - vn_obj = VirtualNetwork(vn1_name, proj) + vn_obj = vnc_api.VirtualNetwork(vn1_name, proj) vn_id = self.vnc_lib.virtual_network_create(vn_obj) self.logger.info("VN %s is created using API Server" % vn1_name) @@ -224,8 +222,8 @@ def test_associate_disassociate_api_policy(self): vn_update_rsp = None vn_obj.add_network_policy( policy_fixt.policy_obj, - VirtualNetworkPolicyType( - sequence=SequenceType( + vnc_api.VirtualNetworkPolicyType( + sequence=vnc_api.SequenceType( major=0, minor=0))) self.logger.info("trying to associate policy %s to vn %s" % @@ -352,60 +350,6 @@ def test_policy_with_local_keyword_across_multiple_vn(self): self.assertEqual(result, True, msg) return True # end test_policy_with_local_keyword_across_multiple_vn - - def verify_policy_in_api_quantum_server( - self, - api_policy_obj, - quantum_policy_obj): - '''Validate policy information in API-Server. Compare data with quantum based policy fixture data. - Check specifically for following: - api_server_keys: 1> fq_name, 2> uuid, 3> rules - quantum_fixture_keys: 1> policy_fq_name, 2> id in policy_obj, 3> policy_obj [for rules] - ''' - me = inspect.getframeinfo(inspect.currentframe())[2] - result = True - err_msg = [] - out = None - self.logger.info("====Verifying data for %s in API_Server ======" % - (api_policy_obj.fq_name[2])) - self.api_s_policy_obj = self.api_s_inspect.get_cs_policy( - domain=api_policy_obj.fq_name[0], - project=api_policy_obj.fq_name[1], - policy=api_policy_obj.fq_name[2], - refresh=True) - self.api_s_policy_obj_x = self.api_s_policy_obj['network-policy'] - - # compare policy_fq_name - out = policy_test_utils.compare_args( - 'policy_fq_name', - api_policy_obj.fq_name, - quantum_policy_obj['policy']['fq_name']) - if out: - err_msg.append(out) - # compare policy_uuid - out = policy_test_utils.compare_args( - 'policy_uuid', - api_policy_obj.uuid, - quantum_policy_obj['policy']['id']) - if out: - err_msg.append(out) - # compare policy_rules - out = policy_test_utils.compare_args( - 'policy_rules', self.api_s_policy_obj_x[ - 'network_policy_entries']['policy_rule'], - quantum_policy_obj['policy']['entries']['policy_rule']) - if out: - err_msg.append(out) - - if err_msg != []: - result = False - err_msg.insert( - 0, me + ":" + api_policy_obj.fq_name[2]) - self.logger.info("verification: %s, status: %s message: %s" % - (me, result, err_msg)) - return {'result': result, 'msg': err_msg} - # end verify_policy_in_api_quantum_server - # end TestApiPolicyFixture01 @@ -591,58 +535,4 @@ def test_policy_api_fixtures(self): topo, config_topo = out['data'] return True # end test_policy_api_fixtures - - def verify_policy_in_api_quantum_server( - self, - api_policy_obj, - quantum_policy_obj): - '''Validate policy information in API-Server. Compare data with quantum based policy fixture data. - Check specifically for following: - api_server_keys: 1> fq_name, 2> uuid, 3> rules - quantum_fixture_keys: 1> policy_fq_name, 2> id in policy_obj, 3> policy_obj [for rules] - ''' - me = inspect.getframeinfo(inspect.currentframe())[2] - result = True - err_msg = [] - out = None - self.logger.info("====Verifying data for %s in API_Server ======" % - (api_policy_obj.fq_name[2])) - self.api_s_policy_obj = self.api_s_inspect.get_cs_policy( - domain=api_policy_obj.fq_name[0], - project=api_policy_obj.fq_name[1], - policy=api_policy_obj.fq_name[2], - refresh=True) - self.api_s_policy_obj_x = self.api_s_policy_obj['network-policy'] - - # compare policy_fq_name - out = policy_test_utils.compare_args( - 'policy_fq_name', - api_policy_obj.fq_name, - quantum_policy_obj['policy']['fq_name']) - if out: - err_msg.append(out) - # compare policy_uuid - out = policy_test_utils.compare_args( - 'policy_uuid', - api_policy_obj.uuid, - quantum_policy_obj['policy']['id']) - if out: - err_msg.append(out) - # compare policy_rules - out = policy_test_utils.compare_args( - 'policy_rules', self.api_s_policy_obj_x[ - 'network_policy_entries']['policy_rule'], - quantum_policy_obj['policy']['entries']['policy_rule']) - if out: - err_msg.append(out) - - if err_msg != []: - result = False - err_msg.insert( - 0, me + ":" + api_policy_obj.fq_name[2]) - self.logger.info("verification: %s, status: %s message: %s" % - (me, result, err_msg)) - return {'result': result, 'msg': err_msg} - # end verify_policy_in_api_quantum_server - # end TestApiPolicyFixture02