Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,6 @@ ENV/

# PyCharm
.idea
/framework/coverage.dump.graphml
/framework/irrelevant.dump.graphml
/framework/relevant.dump.graphml
Empty file added framework/__init__.py
Empty file.
191 changes: 186 additions & 5 deletions framework/context/coverage.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import copy
import json
import math
import os
import re
import tarfile
from functools import lru_cache
from typing import List
from typing import List, Optional, Union, Tuple

import networkx
import networkx as nx

import trc2chains
from framework.context.code_element import CodeElement


class CoverageMatrix(object):
Expand Down Expand Up @@ -70,7 +75,8 @@ class TraceCoverageMatrix(object):
def __init__(self, granularity: str):
self.granularity = granularity
self.graph = nx.Graph()
self.mapping = {}
self.key_mapping = {}
self.name_mapping = {}

def _add_test(self, member: tarfile.TarInfo):
file_name = os.path.basename(member.name)
Expand All @@ -80,12 +86,12 @@ def _add_test(self, member: tarfile.TarInfo):
test_name = match.group('test')
test_result = match.group('result')

self.graph.add_node(test_name, type='test', result=test_result)
self.graph.add_node(test_name, type='test', name=test_name, result=test_result)

return test_name

def _add_code_element(self, code_element: str):
self.graph.add_node(code_element, type='code_element')
self.graph.add_node(code_element, type='code_element', name=self.key_mapping[code_element])

def _add_coverage(self, test: str, code_element: str, **attr):
self._add_code_element(code_element)
Expand Down Expand Up @@ -126,7 +132,8 @@ def create_mapping(self, archive: tarfile.TarFile, member: tarfile.TarInfo):
id = int(parts[0])
name = parts[1]

self.mapping[id] = name
self.key_mapping[id] = name
self.name_mapping[name] = id

@lru_cache(maxsize=2)
def get_code_elements(self, with_result=True):
Expand Down Expand Up @@ -154,6 +161,67 @@ def get_tests(self, with_result=True):

return result

OR_OPERATOR = 'or'
AND_OPERATOR = 'and'

@lru_cache(maxsize=10000)
def query(
self,
operator: str,
test_result: Optional[str] = None,
type_of: Optional[str] = None,
neighbor_of_every: Tuple[Union[CodeElement, str, int], ...] = (),
not_neighbor_of_every: Tuple[Union[CodeElement, str, int], ...] = (),
neighbor_of_any: Tuple[Union[CodeElement, str, int], ...] = (),
not_neighbor_of_any: Tuple[Union[CodeElement, str, int], ...] = ()
):
result = []

for node, data in self.graph.nodes(data=True):
neighbors = list(self.graph.neighbors(node))
filters = (
type_of is None or data.get('type', None) == type_of,
test_result is None or data.get('result', None) == test_result,
neighbor_of_every == () or all([n in neighbors for n in self.get_graph_key(*neighbor_of_every)]),
neighbor_of_any == () or any([n in neighbors for n in self.get_graph_key(*neighbor_of_any)]),
not_neighbor_of_every == () or all([n not in neighbors for n in self.get_graph_key(*not_neighbor_of_every)]),
not_neighbor_of_any == () or any([n not in neighbors for n in self.get_graph_key(*not_neighbor_of_any)])
)

if operator == TraceCoverageMatrix.OR_OPERATOR:
if any(filters):
result.append((node, data))
elif operator == TraceCoverageMatrix.AND_OPERATOR:
if all(filters):
result.append((node, data))
else:
raise ValueError(f'unsupported operator: {operator}')

return result

def extract_score(self, name: str) -> float:
try:
return json.loads(
[item for item in self.get_code_elements() if item[1]['name'] == name][0][1]['scores']
)[CodeElement.SCORE_TYPE]
except:
print("something went wrong assuming 0 score")
return 0.0

@lru_cache(maxsize=10000)
def get_graph_key(self, *items: Union[CodeElement, str, int]):
keys = []
for item in items:
if isinstance(item, CodeElement):
keys.append(self.name_mapping[item.name])
elif isinstance(item, str):
keys.append(self.name_mapping[item])
elif isinstance(item, int):
keys.append(item)
else:
raise ValueError("unsupported type")
return keys

@classmethod
def load_from_file(cls, file_path, granularity='binary'):
coverage = TraceCoverageMatrix(granularity)
Expand All @@ -169,4 +237,117 @@ def load_from_file(cls, file_path, granularity='binary'):

coverage.parse_traces(archive, trace_files)

tests = coverage.query(
TraceCoverageMatrix.AND_OPERATOR,
type_of='test'
)
for test in tests:
test_name = test[0]
coverage.key_mapping[test_name] = test_name
coverage.name_mapping[test_name] = test_name

# networkx.write_graphml(coverage.graph, 'coverage.dump.graphml')

return coverage

def sub(self, base_nodes: Tuple[Union[CodeElement, str, int], ...]):
sub_coverage = copy.deepcopy(self)
base_ids = self.get_graph_key(*base_nodes)
sub_coverage.graph = self.graph.subgraph(base_ids)
return sub_coverage

def _calculate_spectrum_metrics(self):
all_pass, all_failed = set(), set()

for test, result in self.get_tests():
if result == 'FAIL':
all_failed.add(test)
elif result == 'PASS':
all_pass.add(test)
else:
assert False

total_pass, total_fail = len(all_pass), len(all_failed)

for ce, data in self.get_code_elements():
ep, ef = 0, 0

for node in self.graph.neighbors(ce):
data = self.graph.nodes[node]

assert data['type'] == 'test'

if data['result'] == 'FAIL':
ef += 1
elif data['result'] == 'PASS':
ep += 1
else:
assert False

self.graph.nodes[ce].update(
dict(
spectrum_metrics=json.dumps(dict(
ef=ef,
ep=ep,
nf=total_fail - ef,
np=total_pass - ep,
))
)
)

def calculate_scores(self):
self._calculate_spectrum_metrics()

formulae = [tarantula, ochiai, dstar]

for ce, data in self.get_code_elements():
metrics = json.loads(data['spectrum_metrics'])

result = {}

for formula in formulae:
score = formula(**metrics)

result[formula.__name__] = score

self.graph.nodes[ce].update(dict(scores=json.dumps(result)))


def ochiai(ef=0, ep=0, nf=0, np=0):
try:
denominator = math.sqrt((ef + nf) * (ef + ep))

score = ef / denominator
except ZeroDivisionError:
score = 0.0

return score


def tarantula(ef=0, ep=0, nf=0, np=0):
totalfail = ef + nf
totalpass = ep + np

try:
nominator = ef / totalfail
denominator = (ef / totalfail) + (ep / totalpass)

score = nominator / denominator
except ZeroDivisionError:
score = 0.0

return score


def dstar(ef=0, ep=0, nf=0, np=0, exponent=2):
totalfail = ef + nf

nominator = ef ** exponent
denominator = ep + (totalfail - ef)

try:
score = nominator / denominator
except ZeroDivisionError:
score = 0.0

return score
2 changes: 2 additions & 0 deletions framework/core/answer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ class Answer(Enum):

# Subject is faulty
YES = 1
FAULTY = 1 # introduced for Gong experiment
# Subject is not faulty
NO = 2
CLEAN = 2 # introduced for Gong experiment
# Subject is faulty but its neighbours are suspicious
NO_BUT_SUSPICIOUS = 3
# Neither the subject, nor its neighbours are faulty
Expand Down
7 changes: 5 additions & 2 deletions framework/experiment/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from framework.context.context import Context
from framework.context.context import Defects4JContext
from framework.context.context import SIRContext
from framework.utils import rm
from framework.util.os_helpers import rm


class Experiment(object):
Expand All @@ -20,7 +20,7 @@ def __init__(self, name: str, context_type: Context):
def configure(self):
raise NotImplementedError

def run(self, data_dir: str, output_dir: str, knowledge=100, confidence=100):
def run(self, data_dir: str, output_dir: str, knowledge=100, confidence=100, projects=[]):
output_dir = j(output_dir, self.name)
rm(output_dir, "*")

Expand All @@ -35,6 +35,9 @@ def run(self, data_dir: str, output_dir: str, knowledge=100, confidence=100):
}
print(program)

if projects and program['name'] not in projects:
continue

function_csv = j(data_dir, program["name"], "%(name)s.%(version)s.function.csv" % program)
change_csv = j(data_dir, "changeset", "changeset.json")
coverage_csv = j(data_dir, "coverage", program["name"], "%(version)sb" % program, "binary", "%(name)s-%(version)sb-binary.tar.gz" % program)
Expand Down
Loading