From e65af2560c820db6f7e924c10a7f616e56188205 Mon Sep 17 00:00:00 2001 From: Cynthia Zheng Gu Date: Wed, 20 May 2015 17:25:11 -0700 Subject: [PATCH 1/8] add several configuration options for tsqa --- tsqa/conf_plugin.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 tsqa/conf_plugin.py diff --git a/tsqa/conf_plugin.py b/tsqa/conf_plugin.py new file mode 100644 index 0000000..5527b63 --- /dev/null +++ b/tsqa/conf_plugin.py @@ -0,0 +1,24 @@ +import logging +import os + +from nose.plugins import Plugin + +log = logging.getLogger('nose.plugins.ConfPlugin') + +class ConfPlugin(Plugin): + + name = 'conf-plugin' + def options(self, parser, env=os.environ): + """Register commandline options. + """ + Plugin.options(self, parser, env) + parser.add_option('--keep-tmp', action='store_true', dest='keep_tmp', + help="Keep tmp files after running successfully") + parser.add_option('--sleep-in-sec', type='int', + dest='sleep_in_sec', + help='Sleep time before ATS start and after ATS stop to allow enough time for async tests') + parser.add_option('--standalone', action='store_true', + dest='stand_alone', + help="Allow standalone test without auto deploying ATS") + + From 27aa3b9352d4df370c2f51e9b1feb42ac93f5b71 Mon Sep 17 00:00:00 2001 From: Cynthia Zheng Gu Date: Wed, 20 May 2015 19:16:10 -0700 Subject: [PATCH 2/8] allow config ats port for standalone tests --- tsqa/conf_plugin.py | 4 ++-- tsqa/environment.py | 37 +++++++++++++++++++++++++++++++++++-- 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/tsqa/conf_plugin.py b/tsqa/conf_plugin.py index 5527b63..a333098 100644 --- a/tsqa/conf_plugin.py +++ b/tsqa/conf_plugin.py @@ -17,8 +17,8 @@ def options(self, parser, env=os.environ): parser.add_option('--sleep-in-sec', type='int', dest='sleep_in_sec', help='Sleep time before ATS start and after ATS stop to allow enough time for async tests') - parser.add_option('--standalone', action='store_true', + parser.add_option('--standalone-ats-port', type='int', dest='stand_alone', - help="Allow standalone test without auto deploying ATS") + help="Allow standalone test with pre-deployed ATS port") diff --git a/tsqa/environment.py b/tsqa/environment.py index 6244a52..abd3380 100644 --- a/tsqa/environment.py +++ b/tsqa/environment.py @@ -28,6 +28,7 @@ import tsqa.configs import tsqa.utils import logging +import time log = logging.getLogger(__name__) @@ -315,6 +316,24 @@ def __init__(self, layout=None): else: self.layout = None + #process environment options + self.keep_tmp = False + if filter(lambda x: '--keep-tmp' in x, sys.argv): + self.keep_tmp=True + self.sleep_in_sec = 0 + if filter(lambda x: '--sleep-in-sec' in x, sys.argv): + n = sys.argv.index('--sleep-in-sec') + self.sleep_in_sec = sys.argv[n+1] + self.standalone_ats_port = -1 + if filter(lambda x: '--standalone-ats-port' in x, sys.argv): + n = sys.argv.index('--standalone-ats-port') + self.standalone_ats_port = int(sys.argv[n+1]) + if self.standalone_ats_port <= 0 or self.standalone_ats_port >= 65536: + log.info("invalid port number assigned to --standalone_ats_port, start an ATS") + self.standalone_ats_port = -1 + + + def create(self): """ """ @@ -372,7 +391,10 @@ def clone(self, layout=None): else: os.chmod(dirname, 0777) - http_server_port = tsqa.utils.bind_unused_port()[1] + if self.standalone_ats_port != -1: + http_server_port = self.standalone_ats_port + else: + http_server_port = tsqa.utils.bind_unused_port()[1] manager_mgmt_port = tsqa.utils.bind_unused_port()[1] admin_port = tsqa.utils.bind_unused_port()[1] @@ -435,19 +457,28 @@ def destroy(self): installed files. """ self.stop() - shutil.rmtree(self.layout.prefix, ignore_errors=True) + if self.keep_tmp is False: + shutil.rmtree(self.layout.prefix, ignore_errors=True) self.layout = Layout(None) def start(self): + if self.standalone_ats_port != -1: + return if self.running(): # if its already running, don't start another one raise Exception('traffic cop already started') log.debug("Starting traffic cop") assert(os.path.isfile(os.path.join(self.layout.sysconfdir, 'records.config'))) self.__exec_cop() log.debug("Started traffic cop: %s", self.cop) + if int(self.sleep_in_sec) > 0: + time.sleep(self.sleep_in_sec) # TODO: exception if already stopped? def stop(self): + if self.standalone_ats_port != -1: + return + if int(self.sleep_in_sec) > 0: + time.sleep(self.sleep_in_sec) log.debug("Stopping traffic cop: %s", self.cop) if self.running(): self.cop.kill() @@ -462,6 +493,8 @@ def stop(self): self.cop.terminate() # TODO: remove?? or wait... def running(self): + if self.standalone_ats_port != -1: + return True if self.cop is None: return False self.cop.poll() From f50d8edfccadee0cb71886400adc9107e38ccd1e Mon Sep 17 00:00:00 2001 From: Cynthia Zheng Gu Date: Thu, 21 May 2015 12:01:17 -0700 Subject: [PATCH 3/8] improve the arg option process --- tsqa/conf_plugin.py | 12 +++++++---- tsqa/environment.py | 40 +++++++++++++++++++++--------------- tsqa/plugin/__init__.py | 19 +++++++++++++++++ tsqa/plugin/__init__.pyc | Bin 0 -> 227 bytes tsqa/plugin/conf_plugin.py | 28 +++++++++++++++++++++++++ tsqa/plugin/conf_plugin.pyc | Bin 0 -> 1800 bytes tsqa/test_cases.py | 2 +- 7 files changed, 79 insertions(+), 22 deletions(-) create mode 100644 tsqa/plugin/__init__.py create mode 100644 tsqa/plugin/__init__.pyc create mode 100644 tsqa/plugin/conf_plugin.py create mode 100644 tsqa/plugin/conf_plugin.pyc diff --git a/tsqa/conf_plugin.py b/tsqa/conf_plugin.py index a333098..d44e7cf 100644 --- a/tsqa/conf_plugin.py +++ b/tsqa/conf_plugin.py @@ -12,13 +12,17 @@ def options(self, parser, env=os.environ): """Register commandline options. """ Plugin.options(self, parser, env) - parser.add_option('--keep-tmp', action='store_true', dest='keep_tmp', + parser.add_option('--keep-tmp', action='store_true', dest='keep_tmp', default=True, help="Keep tmp files after running successfully") - parser.add_option('--sleep-in-sec', type='int', + parser.add_option('--sleep-in-sec', type='int', default=0, dest='sleep_in_sec', help='Sleep time before ATS start and after ATS stop to allow enough time for async tests') - parser.add_option('--standalone-ats-port', type='int', - dest='stand_alone', + parser.add_option('--standalone-ats-port', type='int', default=-1, + dest='standalone_ats_port', help="Allow standalone test with pre-deployed ATS port") + def configure(self, options, conf): + Plugin.configure(self, options, conf) + global args + args = options diff --git a/tsqa/environment.py b/tsqa/environment.py index abd3380..ea7ed2c 100644 --- a/tsqa/environment.py +++ b/tsqa/environment.py @@ -24,11 +24,13 @@ import time import multiprocessing import hashlib +import socket; import tsqa.configs import tsqa.utils import logging import time +import plugin.conf_plugin log = logging.getLogger(__name__) @@ -317,20 +319,17 @@ def __init__(self, layout=None): self.layout = None #process environment options - self.keep_tmp = False - if filter(lambda x: '--keep-tmp' in x, sys.argv): - self.keep_tmp=True + self.keep_env = False + if plugin.conf_plugin.args.keep_env: + self.keep_env = plugin.conf_plugin.args.keep_env self.sleep_in_sec = 0 - if filter(lambda x: '--sleep-in-sec' in x, sys.argv): - n = sys.argv.index('--sleep-in-sec') - self.sleep_in_sec = sys.argv[n+1] + if plugin.conf_plugin.args.sleep_in_sec: + self.sleep_in_sec = plugin.conf_plugin.args.sleep_in_sec self.standalone_ats_port = -1 - if filter(lambda x: '--standalone-ats-port' in x, sys.argv): - n = sys.argv.index('--standalone-ats-port') - self.standalone_ats_port = int(sys.argv[n+1]) - if self.standalone_ats_port <= 0 or self.standalone_ats_port >= 65536: + if plugin.conf_plugin.args.standalone_ats_port: + self.standalone_ats_port = plugin.conf_plugin.args.standalone_ats_port + if self.standalone_ats_port != -1 and plugin.conf_plugin.args.standalone_ats_port not in range(0, 65536): log.info("invalid port number assigned to --standalone_ats_port, start an ATS") - self.standalone_ats_port = -1 @@ -457,8 +456,7 @@ def destroy(self): installed files. """ self.stop() - if self.keep_tmp is False: - shutil.rmtree(self.layout.prefix, ignore_errors=True) + shutil.rmtree(self.layout.prefix, ignore_errors=True) self.layout = Layout(None) def start(self): @@ -470,14 +468,12 @@ def start(self): assert(os.path.isfile(os.path.join(self.layout.sysconfdir, 'records.config'))) self.__exec_cop() log.debug("Started traffic cop: %s", self.cop) - if int(self.sleep_in_sec) > 0: - time.sleep(self.sleep_in_sec) # TODO: exception if already stopped? def stop(self): if self.standalone_ats_port != -1: return - if int(self.sleep_in_sec) > 0: + if self.sleep_in_sec > 0: time.sleep(self.sleep_in_sec) log.debug("Stopping traffic cop: %s", self.cop) if self.running(): @@ -494,7 +490,17 @@ def stop(self): def running(self): if self.standalone_ats_port != -1: - return True + #try to connect to the port + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + result = sock.connect_ex(('127.0.0.1', self.standalone_ats_port)) + if result == 0: + log.info("The port for standalone ATS is open") + sock.close() + return True + else: + log.info("The port for standalone ATS is not open") + sock.close() + return False if self.cop is None: return False self.cop.poll() diff --git a/tsqa/plugin/__init__.py b/tsqa/plugin/__init__.py new file mode 100644 index 0000000..4804db4 --- /dev/null +++ b/tsqa/plugin/__init__.py @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# initialize logger +import tsqa.log + diff --git a/tsqa/plugin/__init__.pyc b/tsqa/plugin/__init__.pyc new file mode 100644 index 0000000000000000000000000000000000000000..347b102e67c4586fd68023315f82b62b26a8fbac GIT binary patch literal 227 zcmYLD!3x4K4DG~&2!i+(ySRV{Pa=MSAmYV?Wp1G>>)N@d1N}>X#e-K(W#A((?>!Po zp0C;TeYl(oF<`{HWBNI_k}DCl5a%SCV;ZOW!`S~UO*oUk<(YAc#zRP1#lLyv2$gFP zibsW3sldsTbUk*RE)l2;wgzfi=ygXSH)wk>`U>rxDrZ;O8oVZ4ws}$K3O&S=(3+sM Y1u3=FBxT0S@tsZM%3yH2VDFjY3*GlV8vpUVRi5oNx&|*)8dRT4ILrz7}G{*!+A*F{3NEh_(Xhn!3w@a=A z6}dU+|Mge;1I?RR#rC0hug%er^Je&FhU7p0o*X?7fBaUd{da)hpE1Hah#LQi+90nTnpO%}Cj?T8>phN|}4A9;qupC#oJR8mkq2?x~MTT`BeZ@_?f*rW5R7 z{=;`ZO`t}e|7!cjxbP)Z=L5~!VLp51-D>L{LrC2c5@-hLBQwYmCyEs7 ziHgvX;UNkz{tUXW>LXP@U~_#e&=UbYybU_N1$rc)#{zmHp!y6Y`-1s!f^%w2h}313 z-)^<5t#Oq3HX853>?qrRfpf_74N;quyOav+ia}Dv5Pe5w?0Sl00BZ^{9ztP!$#^~t zHLY}SqafB~+wdJ0UjvZ=k*$oSkm(gu>3ZjkYqHQ+6@{?st=)x(K<9a|Ou@K3P$iM# zt|b%-RdvzeN5x|ZftALUfQ1DX7VMWrv!(2UR$!aGe7nd()LqPw^?u`F%>(E&ZLNO~ zYQAsQgCpEB9d@qDA`%SYDdK`rguk}lQLbaiTi-=Fi}8$kTpLRmOE!k@vHntm-&iGL z_TI#G)^?QF)LOrzT1c|f`IPHA$52wseS@UDEK~Qz5x$1_ z|0|>A#aPIVKbM4u^4y09?`oR98;+yynlJ!PVG|Bg8%Et#Avpi9Fal?)lCyNmS41vN zSvtL?vXthQ<<{4|Dq?6?IWO@iy zs9N9PL2l;Hc!5!|(-mFd(8!Iw{Yc+5vZ43}e*1%q5Odz+E|ZiN{Wvt`Cnh6L<8gw& J^ig`6egd5o#n=D< literal 0 HcmV?d00001 diff --git a/tsqa/test_cases.py b/tsqa/test_cases.py index bd96b2f..fcb8729 100644 --- a/tsqa/test_cases.py +++ b/tsqa/test_cases.py @@ -117,7 +117,7 @@ def tearDownClass(cls): # call parent destructor super(EnvironmentCase, cls).tearDownClass() # if the test was successful, tear down the env - if cls.__successful: + if cls.__successful and (cls.environment.keep_env is True): cls.environment.destroy() # this will tear down any processes that we started # Some helpful properties From 973a6ff3c7d730016b8b55fd2667a4eccfc0137f Mon Sep 17 00:00:00 2001 From: Cynthia Zheng Gu Date: Thu, 21 May 2015 17:41:59 -0700 Subject: [PATCH 4/8] fix --keep-env and remove additional files --- tsqa/plugin/__init__.pyc | Bin 227 -> 0 bytes tsqa/plugin/conf_plugin.pyc | Bin 1800 -> 0 bytes tsqa/test_cases.py | 2 +- 3 files changed, 1 insertion(+), 1 deletion(-) delete mode 100644 tsqa/plugin/__init__.pyc delete mode 100644 tsqa/plugin/conf_plugin.pyc diff --git a/tsqa/plugin/__init__.pyc b/tsqa/plugin/__init__.pyc deleted file mode 100644 index 347b102e67c4586fd68023315f82b62b26a8fbac..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 227 zcmYLD!3x4K4DG~&2!i+(ySRV{Pa=MSAmYV?Wp1G>>)N@d1N}>X#e-K(W#A((?>!Po zp0C;TeYl(oF<`{HWBNI_k}DCl5a%SCV;ZOW!`S~UO*oUk<(YAc#zRP1#lLyv2$gFP zibsW3sldsTbUk*RE)l2;wgzfi=ygXSH)wk>`U>rxDrZ;O8oVZ4ws}$K3O&S=(3+sM Y1u3=FBxT0S@tsZM%3yH2VDFjY3*GlV8vpUVRi5oNx&|*)8dRT4ILrz7}G{*!+A*F{3NEh_(Xhn!3w@a=A z6}dU+|Mge;1I?RR#rC0hug%er^Je&FhU7p0o*X?7fBaUd{da)hpE1Hah#LQi+90nTnpO%}Cj?T8>phN|}4A9;qupC#oJR8mkq2?x~MTT`BeZ@_?f*rW5R7 z{=;`ZO`t}e|7!cjxbP)Z=L5~!VLp51-D>L{LrC2c5@-hLBQwYmCyEs7 ziHgvX;UNkz{tUXW>LXP@U~_#e&=UbYybU_N1$rc)#{zmHp!y6Y`-1s!f^%w2h}313 z-)^<5t#Oq3HX853>?qrRfpf_74N;quyOav+ia}Dv5Pe5w?0Sl00BZ^{9ztP!$#^~t zHLY}SqafB~+wdJ0UjvZ=k*$oSkm(gu>3ZjkYqHQ+6@{?st=)x(K<9a|Ou@K3P$iM# zt|b%-RdvzeN5x|ZftALUfQ1DX7VMWrv!(2UR$!aGe7nd()LqPw^?u`F%>(E&ZLNO~ zYQAsQgCpEB9d@qDA`%SYDdK`rguk}lQLbaiTi-=Fi}8$kTpLRmOE!k@vHntm-&iGL z_TI#G)^?QF)LOrzT1c|f`IPHA$52wseS@UDEK~Qz5x$1_ z|0|>A#aPIVKbM4u^4y09?`oR98;+yynlJ!PVG|Bg8%Et#Avpi9Fal?)lCyNmS41vN zSvtL?vXthQ<<{4|Dq?6?IWO@iy zs9N9PL2l;Hc!5!|(-mFd(8!Iw{Yc+5vZ43}e*1%q5Odz+E|ZiN{Wvt`Cnh6L<8gw& J^ig`6egd5o#n=D< diff --git a/tsqa/test_cases.py b/tsqa/test_cases.py index fcb8729..2e58d8b 100644 --- a/tsqa/test_cases.py +++ b/tsqa/test_cases.py @@ -117,7 +117,7 @@ def tearDownClass(cls): # call parent destructor super(EnvironmentCase, cls).tearDownClass() # if the test was successful, tear down the env - if cls.__successful and (cls.environment.keep_env is True): + if cls.__successful and (cls.environment.keep_env is False): cls.environment.destroy() # this will tear down any processes that we started # Some helpful properties From b8d3f99fe6d27a628ec200ae8e9d7ac813578781 Mon Sep 17 00:00:00 2001 From: Cynthia Zheng Gu Date: Fri, 22 May 2015 14:47:04 -0700 Subject: [PATCH 5/8] eliminate temp variables and change --standalone-server-port to string to comply with ATS settings --- tsqa/conf_plugin.py | 28 ---------------------------- tsqa/environment.py | 28 +++++++++------------------- tsqa/plugin/conf_plugin.py | 8 ++++---- tsqa/test_cases.py | 3 ++- 4 files changed, 15 insertions(+), 52 deletions(-) delete mode 100644 tsqa/conf_plugin.py diff --git a/tsqa/conf_plugin.py b/tsqa/conf_plugin.py deleted file mode 100644 index d44e7cf..0000000 --- a/tsqa/conf_plugin.py +++ /dev/null @@ -1,28 +0,0 @@ -import logging -import os - -from nose.plugins import Plugin - -log = logging.getLogger('nose.plugins.ConfPlugin') - -class ConfPlugin(Plugin): - - name = 'conf-plugin' - def options(self, parser, env=os.environ): - """Register commandline options. - """ - Plugin.options(self, parser, env) - parser.add_option('--keep-tmp', action='store_true', dest='keep_tmp', default=True, - help="Keep tmp files after running successfully") - parser.add_option('--sleep-in-sec', type='int', default=0, - dest='sleep_in_sec', - help='Sleep time before ATS start and after ATS stop to allow enough time for async tests') - parser.add_option('--standalone-ats-port', type='int', default=-1, - dest='standalone_ats_port', - help="Allow standalone test with pre-deployed ATS port") - - def configure(self, options, conf): - Plugin.configure(self, options, conf) - global args - args = options - diff --git a/tsqa/environment.py b/tsqa/environment.py index ea7ed2c..8f3ba4a 100644 --- a/tsqa/environment.py +++ b/tsqa/environment.py @@ -24,7 +24,7 @@ import time import multiprocessing import hashlib -import socket; +import socket import tsqa.configs import tsqa.utils @@ -319,19 +319,9 @@ def __init__(self, layout=None): self.layout = None #process environment options - self.keep_env = False - if plugin.conf_plugin.args.keep_env: - self.keep_env = plugin.conf_plugin.args.keep_env self.sleep_in_sec = 0 if plugin.conf_plugin.args.sleep_in_sec: self.sleep_in_sec = plugin.conf_plugin.args.sleep_in_sec - self.standalone_ats_port = -1 - if plugin.conf_plugin.args.standalone_ats_port: - self.standalone_ats_port = plugin.conf_plugin.args.standalone_ats_port - if self.standalone_ats_port != -1 and plugin.conf_plugin.args.standalone_ats_port not in range(0, 65536): - log.info("invalid port number assigned to --standalone_ats_port, start an ATS") - - def create(self): """ @@ -390,8 +380,8 @@ def clone(self, layout=None): else: os.chmod(dirname, 0777) - if self.standalone_ats_port != -1: - http_server_port = self.standalone_ats_port + if plugin.conf_plugin.args.standalone_server_port: + http_server_port = plugin.conf_plugin.args.standalone_server_port else: http_server_port = tsqa.utils.bind_unused_port()[1] manager_mgmt_port = tsqa.utils.bind_unused_port()[1] @@ -460,7 +450,7 @@ def destroy(self): self.layout = Layout(None) def start(self): - if self.standalone_ats_port != -1: + if plugin.conf_plugin.args.standalone_server_port: return if self.running(): # if its already running, don't start another one raise Exception('traffic cop already started') @@ -471,7 +461,7 @@ def start(self): # TODO: exception if already stopped? def stop(self): - if self.standalone_ats_port != -1: + if plugin.conf_plugin.args.standalone_server_port: return if self.sleep_in_sec > 0: time.sleep(self.sleep_in_sec) @@ -489,16 +479,16 @@ def stop(self): self.cop.terminate() # TODO: remove?? or wait... def running(self): - if self.standalone_ats_port != -1: + if plugin.conf_plugin.args.standalone_server_port: #try to connect to the port sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - result = sock.connect_ex(('127.0.0.1', self.standalone_ats_port)) + result = sock.connect_ex(('127.0.0.1', int(plugin.conf_plugin.args.standalone_server_port))) if result == 0: - log.info("The port for standalone ATS is open") + log.debug("Standalone ATS server port is open") sock.close() return True else: - log.info("The port for standalone ATS is not open") + log.error("Standalone ATS server port is not open") sock.close() return False if self.cop is None: diff --git a/tsqa/plugin/conf_plugin.py b/tsqa/plugin/conf_plugin.py index e173c40..98d94d2 100644 --- a/tsqa/plugin/conf_plugin.py +++ b/tsqa/plugin/conf_plugin.py @@ -17,10 +17,10 @@ def options(self, parser, env=os.environ): help="Keep env files after running successfully") parser.add_option('--sleep-in-sec', type='int', default=0, dest='sleep_in_sec', - help='Sleep time before ATS start and after ATS stop to allow enough time for async tests') - parser.add_option('--standalone-ats-port', type='int', default=-1, - dest='standalone_ats_port', - help="Allow standalone test with pre-deployed ATS port") + help='Sleep time after ATS stop to allow enough time for async logs') + parser.add_option('--standalone-server-port', type='string', + dest='standalone_server_port', + help="Allow standalone test with pre-deployed ATS server port") def configure(self, options, conf): Plugin.configure(self, options, conf) diff --git a/tsqa/test_cases.py b/tsqa/test_cases.py index 2e58d8b..ba36d6f 100644 --- a/tsqa/test_cases.py +++ b/tsqa/test_cases.py @@ -27,6 +27,7 @@ import tsqa.environment import tsqa.configs import tsqa.utils +import plugin.conf_plugin unittest = tsqa.utils.import_unittest() @@ -117,7 +118,7 @@ def tearDownClass(cls): # call parent destructor super(EnvironmentCase, cls).tearDownClass() # if the test was successful, tear down the env - if cls.__successful and (cls.environment.keep_env is False): + if cls.__successful and ((not plugin.conf_plugin.args.keep_env) or (plugin.conf_plugin.args.keep_env is False)): cls.environment.destroy() # this will tear down any processes that we started # Some helpful properties From 8da593294d67a1805c98ac38dfb432bf470d27bb Mon Sep 17 00:00:00 2001 From: Cynthia Zheng Gu Date: Tue, 26 May 2015 11:47:22 -0700 Subject: [PATCH 6/8] backwards compatible with no plugin added --- tsqa/environment.py | 17 +++++++++-------- tsqa/plugin/conf_plugin.py | 2 +- tsqa/test_cases.py | 8 ++++++-- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/tsqa/environment.py b/tsqa/environment.py index 8f3ba4a..304e87a 100644 --- a/tsqa/environment.py +++ b/tsqa/environment.py @@ -24,7 +24,7 @@ import time import multiprocessing import hashlib -import socket +import socket; import tsqa.configs import tsqa.utils @@ -320,8 +320,9 @@ def __init__(self, layout=None): #process environment options self.sleep_in_sec = 0 - if plugin.conf_plugin.args.sleep_in_sec: - self.sleep_in_sec = plugin.conf_plugin.args.sleep_in_sec + if hasattr(plugin.conf_plugin, 'args'): + if plugin.conf_plugin.args.sleep_in_sec: + self.sleep_in_sec = plugin.conf_plugin.args.sleep_in_sec def create(self): """ @@ -380,7 +381,7 @@ def clone(self, layout=None): else: os.chmod(dirname, 0777) - if plugin.conf_plugin.args.standalone_server_port: + if hasattr(plugin.conf_plugin, 'args') and plugin.conf_plugin.args.standalone_server_port: http_server_port = plugin.conf_plugin.args.standalone_server_port else: http_server_port = tsqa.utils.bind_unused_port()[1] @@ -450,7 +451,7 @@ def destroy(self): self.layout = Layout(None) def start(self): - if plugin.conf_plugin.args.standalone_server_port: + if hasattr(plugin.conf_plugin, 'args') and plugin.conf_plugin.args.standalone_server_port: return if self.running(): # if its already running, don't start another one raise Exception('traffic cop already started') @@ -461,7 +462,7 @@ def start(self): # TODO: exception if already stopped? def stop(self): - if plugin.conf_plugin.args.standalone_server_port: + if hasattr(plugin.conf_plugin, 'args') and plugin.conf_plugin.args.standalone_server_port: return if self.sleep_in_sec > 0: time.sleep(self.sleep_in_sec) @@ -479,12 +480,12 @@ def stop(self): self.cop.terminate() # TODO: remove?? or wait... def running(self): - if plugin.conf_plugin.args.standalone_server_port: + if hasattr(plugin.conf_plugin, 'args') and plugin.conf_plugin.args.standalone_server_port: #try to connect to the port sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) result = sock.connect_ex(('127.0.0.1', int(plugin.conf_plugin.args.standalone_server_port))) if result == 0: - log.debug("Standalone ATS server port is open") + log.info("Standalone ATS server port is open") sock.close() return True else: diff --git a/tsqa/plugin/conf_plugin.py b/tsqa/plugin/conf_plugin.py index 98d94d2..d9210c9 100644 --- a/tsqa/plugin/conf_plugin.py +++ b/tsqa/plugin/conf_plugin.py @@ -13,7 +13,7 @@ def options(self, parser, env=os.environ): """Register commandline options. """ Plugin.options(self, parser, env) - parser.add_option('--keep-env', action='store_true', dest='keep_env', default=True, + parser.add_option('--keep-env', action='store_true', dest='keep_env', default=False, help="Keep env files after running successfully") parser.add_option('--sleep-in-sec', type='int', default=0, dest='sleep_in_sec', diff --git a/tsqa/test_cases.py b/tsqa/test_cases.py index ba36d6f..7e1ee1e 100644 --- a/tsqa/test_cases.py +++ b/tsqa/test_cases.py @@ -118,8 +118,12 @@ def tearDownClass(cls): # call parent destructor super(EnvironmentCase, cls).tearDownClass() # if the test was successful, tear down the env - if cls.__successful and ((not plugin.conf_plugin.args.keep_env) or (plugin.conf_plugin.args.keep_env is False)): - cls.environment.destroy() # this will tear down any processes that we started + if hasattr(plugin.conf_plugin, 'args'): + if cls.__successful and ((not plugin.conf_plugin.args.keep_env) or (plugin.conf_plugin.args.keep_env is False)): + cls.environment.destroy() # this will tear down any processes that we started + else: + if cls.__successful: + cls.environment.destroy() # Some helpful properties @property From 67b9ed595e24107824c5c1ac9ddb69804d25239d Mon Sep 17 00:00:00 2001 From: Cynthia Zheng Gu Date: Wed, 27 May 2015 10:47:12 -0700 Subject: [PATCH 7/8] fix review comments --- tsqa/environment.py | 9 ++++----- tsqa/plugin/conf_plugin.py | 1 + tsqa/test_cases.py | 8 ++------ 3 files changed, 7 insertions(+), 11 deletions(-) diff --git a/tsqa/environment.py b/tsqa/environment.py index 304e87a..7ea24e5 100644 --- a/tsqa/environment.py +++ b/tsqa/environment.py @@ -24,7 +24,7 @@ import time import multiprocessing import hashlib -import socket; +import socket import tsqa.configs import tsqa.utils @@ -320,9 +320,8 @@ def __init__(self, layout=None): #process environment options self.sleep_in_sec = 0 - if hasattr(plugin.conf_plugin, 'args'): - if plugin.conf_plugin.args.sleep_in_sec: - self.sleep_in_sec = plugin.conf_plugin.args.sleep_in_sec + if hasattr(plugin.conf_plugin, 'args') and plugin.conf_plugin.args.sleep_in_sec: + self.sleep_in_sec = plugin.conf_plugin.args.sleep_in_sec def create(self): """ @@ -485,7 +484,7 @@ def running(self): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) result = sock.connect_ex(('127.0.0.1', int(plugin.conf_plugin.args.standalone_server_port))) if result == 0: - log.info("Standalone ATS server port is open") + log.debug("Standalone ATS server port is open") sock.close() return True else: diff --git a/tsqa/plugin/conf_plugin.py b/tsqa/plugin/conf_plugin.py index d9210c9..22b2d14 100644 --- a/tsqa/plugin/conf_plugin.py +++ b/tsqa/plugin/conf_plugin.py @@ -9,6 +9,7 @@ class ConfPlugin(Plugin): name = 'conf-plugin' + def options(self, parser, env=os.environ): """Register commandline options. """ diff --git a/tsqa/test_cases.py b/tsqa/test_cases.py index 7e1ee1e..28fcdf1 100644 --- a/tsqa/test_cases.py +++ b/tsqa/test_cases.py @@ -118,12 +118,8 @@ def tearDownClass(cls): # call parent destructor super(EnvironmentCase, cls).tearDownClass() # if the test was successful, tear down the env - if hasattr(plugin.conf_plugin, 'args'): - if cls.__successful and ((not plugin.conf_plugin.args.keep_env) or (plugin.conf_plugin.args.keep_env is False)): - cls.environment.destroy() # this will tear down any processes that we started - else: - if cls.__successful: - cls.environment.destroy() + if cls.__successful and (hasattr(plugin.conf_plugin, 'args') and ((not plugin.conf_plugin.args.keep_env) or (plugin.conf_plugin.args.keep_env is False))): + cls.environment.destroy() # this will tear down any processes that we started # Some helpful properties @property From 707fadb235d7281570768219b50af63393908ba2 Mon Sep 17 00:00:00 2001 From: Cynthia Zheng Gu Date: Wed, 27 May 2015 18:12:01 -0700 Subject: [PATCH 8/8] add color scheme for output --- tsqa/plugin/color_plugin.py | 918 ++++++++++++++++++++++++++++++++++++ 1 file changed, 918 insertions(+) create mode 100644 tsqa/plugin/color_plugin.py diff --git a/tsqa/plugin/color_plugin.py b/tsqa/plugin/color_plugin.py new file mode 100644 index 0000000..f3fcfc7 --- /dev/null +++ b/tsqa/plugin/color_plugin.py @@ -0,0 +1,918 @@ +"""Color output plugin for the nose testing framework. + +Use ``nosetests --with-color`` (no "u"!) to turn it on. + +http://en.wikipedia.org/wiki/Rudolph_the_Red-Nosed_Reindeer + +"Rudolph the Red-Nosed Reindeer" is a popular Christmas story about Santa +Claus' ninth and lead reindeer who possesses an unusually red colored nose that +gives off its own light that is powerful enough to illuminate the team's path +through inclement weather. + + +Copyright 2007 John J. Lee + +This code is derived from zope.testing version 3.5.0, which carries the +following copyright and licensing notice: + +############################################################################## +# +# Copyright (c) 2004-2006 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +""" + +import binascii +import doctest +import os +import re +import sys +import time +import traceback +import unittest +import warnings + +import nose.config +import nose.core +import nose.plugins +import nose.util + +# TODO +# syntax-highlight traceback Python source lines + + +__version__ = "0.3" +__revision__ = "$Id: rudolf.py 49867 2007-12-17 13:55:54Z jjlee $" + + +# some of this ANSI/xterm colour stuff is based on public domain code by Ian +# Ward + +CUBE_START = 16 # first index of colour cube +CUBE_SIZE = 6 # one side of the colour cube +GRAY_START = CUBE_SIZE ** 3 + CUBE_START +# values copied from xterm 256colres.h: +CUBE_STEPS = 0x00, 0x5f, 0x87, 0xaf, 0xd7, 0xff +GRAY_STEPS = (0x08, 0x12, 0x1c, 0x26, 0x30, 0x3a, 0x44, 0x4e, 0x58, 0x62, + 0x6c, 0x76, 0x80, 0x84, 0x94, 0x9e, 0xa8, 0xb2, 0xbc, 0xc6, 0xd0, + 0xda, 0xe4, 0xee) +TABLE_START = CUBE_START # don't use the basic 16 colours, since we can't be + # sure what their RGB values are +TABLE_END = 256 + + +def xterm_from_rgb_string(rgb_text): + """ + >>> xterm_from_rgb_string("000000") + 16 + >>> xterm_from_rgb_string("FF0000") + 196 + + >>> xterm_from_rgb_string("0000") + Traceback (most recent call last): + ValueError: 0000 + >>> xterm_from_rgb_string("blah") + Traceback (most recent call last): + ValueError: blah + """ + try: + bytes = binascii.unhexlify(rgb_text) + except TypeError: + raise ValueError(rgb_text) + if len(bytes) < 3: + raise ValueError(rgb_text) + rgb = map(ord, bytes) + return xterm_from_rgb(rgb) + + +def cube_vals(n): + """Return the cube coordinates for colour-number n.""" + assert n >= CUBE_START and n < GRAY_START + val = n - CUBE_START + c = val % CUBE_SIZE + val = val / CUBE_SIZE + b = val % CUBE_SIZE + a = val / CUBE_SIZE + return a, b, c + + +def rgb_from_xterm(n): + """Return the red, green and blue components of colour-number n. + Components are between 0 and 255.""" + # we don't handle the basic 16 colours, since we can't be sure what their + # RGB values are + assert n >= CUBE_START + if n < GRAY_START: + return [CUBE_STEPS[v] for v in cube_vals(n)] + return (GRAY_STEPS[n - GRAY_START],) * 3 + + +RGB_FROM_XTERM_COLOR = [ + rgb_from_xterm(xi) for xi in range(TABLE_START, TABLE_END)] + + +def xterm_from_rgb(rgb): + smallest_distance = sys.maxint + for index in range(0, TABLE_END - TABLE_START): + rc = RGB_FROM_XTERM_COLOR[index] + dist = ((rc[0] - rgb[0]) ** 2 + + (rc[1] - rgb[1]) ** 2 + + (rc[2] - rgb[2]) ** 2) + if dist < smallest_distance: + smallest_distance = dist + best_match = index + return best_match + TABLE_START + + +class Xterm256Color(object): + + def __init__(self, xterm_color_code): + self._code = xterm_color_code + + def __str__(self): + return "%s(%s)" % (self.__class__.__name__, self._code) + + def terminal_code(self): + return "\033[38;5;%dm" % self._code + + +class Ansi16Color(object): + + def __init__(self, foreground_color, bright): + self._fg_color = foreground_color + self._bright = bright + + def __str__(self): + return "%s(%s, %s)" % (self.__class__.__name__, + self._fg_color, self._bright) + + def terminal_code(self): + if self._fg_color is None: + fg_code = 0 + else: + fg_code = self._fg_color + 30 + if self._bright is None: + prefix_code = "" + elif self._bright: + prefix_code = "1;" + else: + prefix_code = "0;" + return "\033[%s%sm" % (prefix_code, fg_code) + + +def parse_color(color_text): + """ + Names for the 16 ANSI colours + + >>> print parse_color("black") + Ansi16Color(0, None) + >>> print parse_color("red") + Ansi16Color(1, None) + >>> print parse_color("darkred") + Ansi16Color(1, False) + >>> print parse_color("lightred") + Ansi16Color(1, True) + >>> print parse_color("brightred") + Ansi16Color(1, True) + >>> print parse_color("boldred") + Ansi16Color(1, True) + + RGB colours + + >>> print parse_color("rgb(ff0000)") + Xterm256Color(196) + >>> print parse_color("rgb(FF0000)") + Xterm256Color(196) + + xterm colour codes + + >>> print parse_color("0") + Xterm256Color(0) + >>> print parse_color("140") + Xterm256Color(140) + + Bad colours + + >>> parse_color("moored") + Traceback (most recent call last): + ValueError: Bad named colour: 'moored' + >>> parse_color("boldpink") + Traceback (most recent call last): + ValueError: Bad named colour: 'boldpink' + >>> parse_color("256") + Traceback (most recent call last): + ValueError: Bad xterm colour: '256' + >>> parse_color("-1") + Traceback (most recent call last): + ValueError: Bad xterm colour: '-1' + >>> parse_color("ff0000") + Traceback (most recent call last): + ValueError: Bad named colour: 'ff0000' + >>> parse_color("rgb(fg0000)") + Traceback (most recent call last): + ValueError: Bad RGB colour: 'rgb(fg0000)' + >>> parse_color("rgb(ff0000f)") + Traceback (most recent call last): + ValueError: Bad RGB colour: 'rgb(ff0000f)' + >>> parse_color("rgb(0000)") + Traceback (most recent call last): + ValueError: Bad RGB colour: 'rgb(0000)' + """ + assert color_text + + # RGB + if color_text.startswith("rgb(") and color_text.endswith(")"): + try: + xc = xterm_from_rgb_string(color_text[4:-1]) + except ValueError: + raise ValueError("Bad RGB colour: %r" % color_text) + else: + return Xterm256Color(xc) + + # xterm 256 colour code + try: + xc = int(color_text) + except ValueError: + pass + else: + if 0 <= xc < 256: + return Xterm256Color(xc) + else: + raise ValueError("Bad xterm colour: %r" % color_text) + + # named ANSI 16 colour + colorcodes = {"default": None, "normal": None, + "black": 0, + "red": 1, + "green": 2, + "brown": 3, "yellow": 3, + "blue": 4, + "magenta": 5, + "cyan": 6, + "grey": 7, "gray": 7, "white": 7} + prefixes = [("dark", False), + ("light", True), + ("bright", True), + ("bold", True)] + is_bright = None + remaining = color_text + for prefix, bright in prefixes: + if remaining.startswith(prefix): + remaining = color_text[len(prefix):] + is_bright = bright + break + try: + foreground = colorcodes[remaining] + except KeyError: + raise ValueError("Bad named colour: %r" % color_text) + return Ansi16Color(foreground, is_bright) + + +def parse_colorscheme(colorscheme): + """ + >>> colors = parse_colorscheme("fail=red,pass=rgb(00ff00),error=40") + >>> for name, color in sorted(colors.items()): + ... print "%s: %s" % (name, color) + error: Xterm256Color(40) + fail: Ansi16Color(1, None) + pass: Xterm256Color(46) + + >>> parse_colorscheme("fail:red,pass=green") + Traceback (most recent call last): + ValueError: Missing equals (name=colour): 'fail:red' + >>> parse_colorscheme("fail=spam") + Traceback (most recent call last): + ValueError: Bad named colour: 'spam' + >>> parse_colorscheme("fail=") + Traceback (most recent call last): + ValueError: Missing colour (name=colour): 'fail=' + """ + if not colorscheme: + return {} + + colors = {} + specs = colorscheme.split(",") + for spec in specs: + try: + name, color_text = spec.split("=", 1) + except ValueError: + raise ValueError("Missing equals (name=colour): %r" % spec) + if color_text == "": + raise ValueError("Missing colour (name=colour): %r" % spec) + colors[name] = parse_color(color_text) + return colors + + +def normalize_path(pathname): + if hasattr(os.path, "realpath"): + pathname = os.path.realpath(pathname) + return os.path.normcase(os.path.abspath(pathname)) + + +def relative_location(basedir, target, posix_result=True): + """ + >>> relative_location("/a/b/", "/a/b/c") + 'c' + >>> relative_location("a/b", "a/b/c/d") + 'c/d' + >>> relative_location("/z", "/a/b") + '../a/b' + + >>> this_dir = os.path.dirname(normalize_path(__file__)) + >>> relative_location("/a/b/", "a/b/c") == "../.." + this_dir + "/a/b/c" + True + + >>> nr_dirs_up_to_root = os.path.join(this_dir, "a", "b").count(os.sep) + >>> expected = "/".join([".."] * nr_dirs_up_to_root) + "/a/b/c/d" + >>> relative_location("a/b", "/a/b/c/d/") == expected + True + """ + # based on a function by Robin Becker + import os.path, posixpath + basedir = normalize_path(basedir) + target = normalize_path(target) + baseparts = basedir.split(os.sep) + targetparts = target.split(os.sep) + nr_base = len(baseparts) + nr_target = len(targetparts) + nr_common = min(nr_base, nr_target) + ii = 0 + while ii < nr_common and baseparts[ii] == targetparts[ii]: + ii += 1 + relative_parts = (nr_base-ii)*['..'] + targetparts[ii:] + if posix_result: + return posixpath.join(*relative_parts) + else: + return os.path.join(*relative_parts) + + +def elide_foreign_path_and_line_nr(base_dir, path, line_nr): + relpath = relative_location(base_dir, path) + if ".." in relpath: + filename = os.path.basename(relpath) + return os.path.join("...", filename), "..." + else: + return relpath, line_nr + + +class DocTestFailureException(AssertionError): + """Custom exception for doctest unit test failures.""" + + +# colour output code taken from zope.testing, and hacked + +class ColorfulOutputFormatter(object): + """Output formatter that uses ANSI color codes. + + Like syntax highlighting in your text editor, colorizing + test failures helps the developer. + """ + + separator1 = "=" * 70 + separator2 = "-" * 70 + + doctest_template = """ +File "%s", line %s, in %s + +%s +Want: +%s +Got: +%s +""" + + # Map prefix character to color in diff output. This handles ndiff and + # udiff correctly, but not cdiff. + diff_color = {"-": "expected-output", + "+": "actual-output", + "?": "character-diffs", + "@": "diff-chunk", + "*": "diff-chunk", + "!": "actual-output",} + + def __init__(self, verbosity, descriptions, colorscheme, + stream=sys.stdout, clean_tracebacks=False, base_dir=False): + self._stream = stream + self._verbose = bool(verbosity) + self._show_all = verbosity > 1 + self._dots = verbosity == 1 + self._descriptions = descriptions + self._clean_tracebacks = clean_tracebacks + self._base_dir = base_dir + self._colorscheme = colorscheme +# for name, value in self._colorscheme.items(): +# print >>sys.stderr, '%s = %s' % (name, value) + + def color(self, what): + """Pick a named color from the color scheme""" + return self._colorscheme[what].terminal_code() + + def colorize(self, what, message, normal="normal"): + """Wrap message in color.""" + return self.color(what) + message + self.color(normal) + + def get_description(self, test): + if self._descriptions: + return test.shortDescription() or str(test) + else: + return str(test) + + def start_test(self, test): + if self._show_all: + self._stream.write(self.colorize("testname", "Testing ")) + self._stream.writeln(self.colorize("testname", + self.get_description(test))) + #self._stream.write(self.colorize("normal", " ... ")) + self._stream.flush() + + def test_success(self, test): + if self._show_all: + self._stream.write(self.colorize("pass", + self.get_description(test))) + self._stream.write(self.colorize("pass", " ... ")) + self._stream.writeln(self.colorize("pass", "ok")) + elif self._dots: + self._stream.write(self.colorize("pass", ".")) + + def test_error(self, test, exc_info, label): + if self._show_all: + self._stream.write(self.colorize("error", + self.get_description(test))) + self._stream.write(self.colorize("error", " ... ")) + self._stream.writeln(self.colorize("error", label)) + elif self._dots: + self._stream.write(self.colorize("error", label[:1])) + + def test_failure(self, test, exc_info): + if self._show_all: + self._stream.write(self.colorize("failure", + self.get_description(test))) + self._stream.write(self.colorize("failure", " ... ")) + self._stream.writeln(self.colorize("failure", "FAIL")) + elif self._dots: + self._stream.write(self.colorize("failure", "F")) + + def print_error_list(self, flavour, errors): + problem_color = (flavour == "FAIL") and "failure" or "error" + for test, err, err_type in errors: + self._stream.writeln(self.separator1) + self._stream.writeln("%s: %s" % ( + self.colorize(problem_color, flavour), + self.colorize("testname", self.get_description(test)))) + self._stream.writeln(self.separator2) + self.print_traceback(err, err_type) + + def print_summary(self, success, summary, tests_run, start, stop): + write = self._stream.write + writeln = self._stream.writeln + writelines = self._stream.writelines + taken = float(stop - start) + plural = tests_run != 1 and "s" or "" + count_color = success and "ok-number" or "error-number" + + writeln(self.separator2) + writelines([ + "Ran ", + self.colorize(count_color, "%s " % tests_run), + "test%s in " % plural, + self._format_seconds(taken)]) + writeln() + if not success: + write(self.colorize("failure", "FAILED")) + write(" (") + any = False + for label, count in summary.items(): + if not count: + continue + if any: + write(", ") + write("%s=" % label) + problem_color = (label == "failures") and "failure" or "error" + write(self.colorize(problem_color, str(count))) + any = True + writeln(")") + else: + writeln(self.colorize("pass", "OK")) + + def _format_seconds(self, n_seconds, normal="normal"): + """Format a time in seconds.""" + if n_seconds >= 60: + n_minutes, n_seconds = divmod(n_seconds, 60) + return "%s minutes %s seconds" % ( + self.colorize("number", "%d" % n_minutes, normal), + self.colorize("number", "%.3f" % n_seconds, normal)) + else: + return "%s seconds" % ( + self.colorize("number", "%.3f" % n_seconds, normal)) + + def format_traceback(self, exc_info): + """Format the traceback.""" + v = exc_info[1] + if isinstance(v, DocTestFailureException): + tb = v.args[0] + if isinstance(v, doctest.DocTestFailure): + # XXX +# if self._clean_tracebacks: +# filename, lineno = elide_foreign_path_and_line_nr( +# self._base_dir, +# v.test.filename, +# (v.test.lineno + v.example.lineno + 1)) + tb = self.doctest_template % ( + v.test.filename, + v.test.lineno + v.example.lineno + 1, + v.test.name, + v.example.source, + v.example.want, + v.got, + ) + else: + tb = "".join(traceback.format_exception(*exc_info)) + return tb + + def print_traceback(self, formatted_traceback, err_type): + """Report an error with a traceback.""" + if issubclass(err_type, DocTestFailureException): + self.print_doctest_failure(formatted_traceback) + else: + self.print_colorized_traceback(formatted_traceback) + print >>self._stream + + def print_doctest_failure(self, formatted_failure): + """Report a doctest failure. + + ``formatted_failure`` is a string -- that's what + DocTestSuite/DocFileSuite gives us. + """ + color_of_indented_text = 'normal' + colorize_diff = False + colorize_exception = False + lines = formatted_failure.splitlines() + + # this first traceback in a doctest failure report is rarely + # interesting, but it looks funny non-colourized so let's colourize it + # anyway + exc_lines = [] + while True: + line = lines.pop(0) + if line == self.separator2: + break + exc_lines.append(line) + self.print_colorized_traceback("\n".join(exc_lines)) + print >>self._stream + print >>self._stream, self.separator2 + exc_lines = [] + + for line in lines: + if line.startswith('File '): + m = re.match(r'File "(.*)", line (\d*), in (.*)$', line) + if m: + filename, lineno, test = m.groups() + if self._clean_tracebacks: + filename, lineno = elide_foreign_path_and_line_nr( + self._base_dir, filename, lineno) + self._stream.writelines([ + self.color('normal'), 'File "', + self.color('filename'), filename, + self.color('normal'), '", line ', + self.color('lineno'), lineno, + self.color('normal'), ', in ', + self.color('testname'), test, + self.color('normal'), '\n']) + else: + print >>self._stream, line + elif line.startswith(' '): + if colorize_diff and len(line) > 4: + color = self.diff_color.get(line[4], + color_of_indented_text) + print >>self._stream, self.colorize(color, line) + elif colorize_exception: + exc_lines.append(line[4:]) + else: + print >>self._stream, self.colorize(color_of_indented_text, + line) + else: + colorize_diff = False + if colorize_exception: + self.print_colorized_traceback("\n".join(exc_lines), + indent_level=1) + colorize_exception = False + exc_lines = [] + if line.startswith('Failed example'): + color_of_indented_text = 'failed-example' + elif line.startswith('Expected:'): + color_of_indented_text = 'expected-output' + elif line.startswith('Got:'): + color_of_indented_text = 'actual-output' + elif line.startswith('Exception raised:'): + color_of_indented_text = 'exception' + colorize_exception = True + elif line.startswith('Differences '): + if line in [ + "Differences (ndiff with -expected +actual):", + "Differences (unified diff with -expected +actual):" + ]: + line = "".join([ + "Differences (ndiff with ", + self.color("expected-output"), "-expected ", + self.color("actual-output"), "+actual", + self.color("normal"), "):", + ]) + color_of_indented_text = 'normal' + colorize_diff = True + else: + color_of_indented_text = 'normal' + print >>self._stream, line + print >>self._stream + + def print_colorized_traceback(self, formatted_traceback, indent_level=0): + """Report a test failure. + + ``formatted_traceback`` is a string. + """ + indentation = " " * indent_level + for line in formatted_traceback.splitlines(): + if line.startswith(" File"): + m = re.match(r' File "(.*)", line (\d*)(?:, in (.*))?$', line) + if m: + filename, lineno, test = m.groups() + if self._clean_tracebacks: + filename, lineno = elide_foreign_path_and_line_nr( + self._base_dir, filename, lineno) + tb_lines = [ + self.color("normal"), ' File "', + self.color("filename"), filename, + self.color("normal"), '", line ', + self.color("lineno"), lineno, + ] + if test: + # this is missing for the first traceback in doctest + # failure report + tb_lines.extend([ + self.color("normal"), ", in ", + self.color("testname"), test, + ]) + tb_lines.extend([ + self.color("normal"), "\n", + ]) + self._stream.write(indentation) + self._stream.writelines(tb_lines) + else: + print >>self._stream, indentation + line + elif line.startswith(" "): + print >>self._stream, self.colorize("failed-example", + indentation + line) + elif line.startswith("Traceback (most recent call last)"): + print >>self._stream, indentation + line + else: + print >>self._stream, self.colorize("exception", + indentation + line) + + def stop_test(self, test): + if self._verbose > 1: + print >>self._stream + self._stream.flush() + + def stop_tests(self): + if self._verbose == 1: + self._stream.write("\n") + self._stream.flush() + + +class ColorOutputPlugin(nose.plugins.Plugin): + + """Output test results in colour to terminal.""" + + name = "color" + + formatter_class = ColorfulOutputFormatter + clean_tracebacks = False + base_dir = None + + # These colors are carefully chosen to have enough contrast + # on terminals with both black and white background. + default_colorscheme = {"normal": "normal", + "pass": "green", + "failure": "magenta", + "error": "brightred", + "number": "green", + "ok-number": "green", + "error-number": "brightred", + "filename": "lightblue", + "lineno": "lightred", + "testname": "lightcyan", + "failed-example": "cyan", + "expected-output": "green", + "actual-output": "red", + "character-diffs": "magenta", + "diff-chunk": "magenta", + "exception": "red"} + default_colorscheme = dict((name, parse_color(color)) for name, color in + default_colorscheme.iteritems()) + + score = 50 # Lower than default plugin level, since the output we're + # printing is replacing non-plugin core nose output, which + # usually happens after plugin output. If this were >= default + # score, then e.g. core plugin testid output would come out in + # the wrong place. + + def __init__(self): + nose.plugins.Plugin.__init__(self) + self._result = None + # for debugging +# self.base_dir = os.path.dirname(__file__) +# clean_tracebacks = True + + def options(self, parser, env=os.environ): + nose.plugins.Plugin.options(self, parser, env) + parser.add_option("--no-color", action="store_false", + dest="enable_plugin_color", + help="Don't output in color") + # XXX This might be wrong when running tests in a subprocess (since I + # guess sys.stdout will be a pipe, but colour output should be turned + # on). Depends on how the running-in-a-subprocess is done (it's not a + # core nose feature as of version 0.10.0). + # XXX should be able to specify auto-color in environment + action = sys.stdout.isatty() and "store_true" or "store_false" + parser.add_option("--auto-color", action=action, + dest="enable_plugin_color", + help="Output in color only if stdout is a terminal") + env_opt = "NOSE_COLORS" + parser.add_option("--colors", action="store", + type="string", + dest="colors", + default=env.get(env_opt, ""), + help="Colour scheme for --with-color terminal " + "output, listing colours to be used for each " + "named part of the output. Format is " + "name1=color1,name2=color2 . " + "Colours can be specified as xterm 256 colour " + "codes (e.g. '45'), RGB colours (e.g. " + "'rgb(00ff00)'), ANSI 16 colour names (e.g. " + "'red' or 'brightred'), and the special " + "colour 'normal'. Example: " + "--colors='fail=red,pass=rgb(00ff00),error=45' " + + "[%s]" % env_opt) + + def configure(self, options, conf): + nose.plugins.Plugin.configure(self, options, conf) + if not self.enabled: + return + + self._verbosity = conf.verbosity + cs = dict(self.default_colorscheme) + try: + user_colorscheme = parse_colorscheme(options.colors) + except ValueError, exc: + filenames = list(conf.files) + if options.files: + filenames.extend(options.files) + warnings.warn("Bad colorscheme string " + "(from --colors or one of %s): %s" % + (", ".join(filenames), exc), RuntimeWarning) + user_colorscheme = {} + unknown_names = set(user_colorscheme.keys()) - set(cs.keys()) + if unknown_names: + warnings.warn("Invalid colorscheme names: %s" % + (", ".join(unknown_names))) + cs.update(user_colorscheme) + self._colorscheme = cs + self._show_all = self._verbosity > 1 + self._dots = self._verbosity == 1 + + def begin(self): + self._old_failure_exception = doctest.DocTestCase.failureException + # monkeypatch! + doctest.DocTestCase.failureException = DocTestFailureException + + def setOutputStream(self, stream): + self._stream = stream + self._formatter = self.formatter_class( + self._verbosity, + True, + self._colorscheme, + self._stream, + clean_tracebacks=self.clean_tracebacks, + base_dir=self.base_dir) + + def prepareTestResult(self, result): + result.__failures = [] + result.__errors = [] + result.__tests_run = 0 + result.__start_time = time.time() + result.stream = unittest._WritelnDecorator(open(os.devnull, 'w')) + self._result = result + + def startTest(self, test): + self._result.__tests_run = self._result.__tests_run + 1 + self._formatter.start_test(test) + + def addSuccess(self, test): + self._formatter.test_success(test) + + def addFailure(self, test, err): + formatted_failure = self._exc_info_to_string(err, test) + self._result.__failures.append((test, formatted_failure, err[0])) + self._formatter.test_failure(test, err) + + def addError(self, test, err): + # If the exception is a registered class, the error will be added to + # the list for that class, not errors. + formatted_err = self._formatter.format_traceback(err) + for cls, (storage, label, isfail) in self._result.errorClasses.items(): + if issubclass(err[0], cls): + storage.append((test, formatted_err, err[0])) + self._formatter.test_error(test, err, label) + return + self._result.__errors.append((test, formatted_err, err[0])) + self._formatter.test_error(test, err, "ERROR") + + def stopTest(self, test): + self._formatter.stop_test(test) + + def report(self, stream): + self._print_errors() + self._print_summary(self._result.__start_time, + time.time()) + self._result = None + + def finalize(self, result): + self._formatter.stop_tests() + # remove monkeypatch + doctest.DocTestCase.failureException = self._old_failure_exception + + def _print_errors(self): + if self._dots or self._show_all: + self._stream.writeln() + self._formatter.print_error_list("ERROR", self._result.__errors) + self._formatter.print_error_list("FAIL", self._result.__failures) + for cls in self._result.errorClasses.keys(): + storage, label, isfail = self._result.errorClasses[cls] + self._formatter.print_error_list(label, storage) + + def _print_summary(self, start, stop): + success = self._result.wasSuccessful() + summary = nose.util.odict() + if not success: + summary["failures"], summary["errors"] = \ + map(len, [self._result.__failures, self._result.__errors]) + for cls in self._result.errorClasses.keys(): + storage, label, isfail = self._result.errorClasses[cls] + if not isfail: + continue + summary[label] = len(storage) + self._formatter.print_summary(success, summary, + self._result.__tests_run, start, stop) + + def _exc_info_to_string(self, err, test): + exctype, value, tb = err + # Skip test runner traceback levels + while tb and self._is_relevant_tb_level(tb): + tb = tb.tb_next + if exctype is test.failureException: + # Skip assert*() traceback levels + length = self._count_relevant_tb_levels(tb) + return ''.join(traceback.format_exception(exctype, value, tb, + length)) + return ''.join(traceback.format_exception(exctype, value, tb)) + + def _is_relevant_tb_level(self, tb): + return tb.tb_frame.f_globals.has_key('__unittest') + + def _count_relevant_tb_levels(self, tb): + length = 0 + while tb and not self._is_relevant_tb_level(tb): + length += 1 + tb = tb.tb_next + return length + + +# classes for use in rudolf's own tests + + +class TestColorfulOutputFormatter(ColorfulOutputFormatter): + + __test__ = False + + def _format_seconds(self, n_seconds, normal="normal"): + return "%s seconds" % (self.colorize("number", "...", normal)) + + +class TestColorOutputPlugin(ColorOutputPlugin): + + __test__ = False + + formatter_class = TestColorfulOutputFormatter + clean_tracebacks = True + + def __init__(self): + ColorOutputPlugin.__init__(self) + self.base_dir = os.path.dirname(__file__) +