Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 175 additions & 0 deletions atarashi/agents/keywordAgent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Copyright 2025 Rajul-Jha <rajuljha49@gmail.com>

SPDX-License-Identifier: GPL-2.0

This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
version 2 as published by the Free Software Foundation.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
import re
import argparse
import os
import pandas as pd

from atarashi.agents.atarashiAgent import AtarashiAgent


class KeywordAgent(AtarashiAgent):
"""
A scanning agent that quickly identifies potential license files
by searching for a set of predefined keywords. This agent is
inspired by the FOSSology nomos scanner.
"""
_keyword_file_path = os.path.join(os.path.dirname(__file__), '..', 'data', 'license_keywords.txt')
_refs_file_path = os.path.join(os.path.dirname(__file__), '..', 'data', 'licenses', 'license_refs_combined.csv')

def __init__(self, licenseList=None, verbose=0):
"""
Initializes the KeywordAgent.

:param licenseList: This parameter is ignored by this agent,
but kept for compatibility with the factory method.
:param verbose: Verbosity level.
"""
self.verbose = verbose
self.keywords = self.load_keywords(self._keyword_file_path)
self.license_shortnames_and_refs = self.load_license_shortnames_and_refs(self._refs_file_path)

def load_keywords(self, file_path):
"""
Keywords are based on FOSSology's nomos scanner's STRINGS.in
https://github.com/fossology/fossology/blob/master/src/nomos/agent/generator/STRINGS.in

:param: file_path: Path of the license_keywords.txt file
:return: List of license keyword regex patterns.
"""
patterns = []
try:
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
keyword = line.strip()
if keyword:
patterns.append(re.compile(keyword, re.IGNORECASE))
except Exception as e:
if self.verbose > 0:
print(f"Failed to load keywords from {file_path}: {e}")
return patterns

def load_license_shortnames_and_refs(self, file_path):
"""
Load the license shortnames and refs using LicenseDownloader as regex patterns.

:return: List of license keyword regex patterns.
"""
patterns = []
try:
df = pd.read_csv(file_path)
for keyword in df['key'].dropna():
patterns.append(re.compile(r'\b' + re.escape(str(keyword)) + r'\b', re.IGNORECASE))
except Exception as e:
if self.verbose > 0:
print(f"Failed to load keywords from {file_path}: {e}")
return patterns

def loadFile(self, filePath):
## DECIDE: To use comment preprocessor or not? Currently not using it for speed.
try:
with open(filePath, 'r', encoding='utf-8') as f:
return f.read()
except Exception:
# Fallback for different encodings if needed
with open(filePath, 'r', encoding='latin-1') as f:
return f.read()

def scan(self, filePath):
"""
Scans a file for keywords.

:param filePath: Path to the file to be scanned.
:return: A list of dictionaries with scan results.
"""
try:
processed_data = self.loadFile(filePath)
except Exception as e:
if self.verbose > 0:
print(f"Could not process file {filePath}: {e}")
return []

if not processed_data.strip():
return []

results = []
# Scan for license keywords
matched_keywords = []
for keyword_re in self.keywords:
if keyword_re.search(processed_data):
matched_keywords.append(keyword_re.pattern)

if matched_keywords:
if self.verbose > 0:
print(f"Found license-related keywords in {filePath}: {', '.join(matched_keywords)}")
results.append({
"shortname": "License-Possibility",
"sim_score": 1.0,
"sim_type": "Keyword-Scan",
"description": f"Matched keywords: {', '.join(matched_keywords)}"
})

# # Scan for license shortnames and refs
# matched_refs = []
# for license_ref_or_shortname in self.license_shortnames_and_refs:
# if license_ref_or_shortname.search(processed_data):
# matched_refs.append(license_ref_or_shortname.pattern)

# if matched_refs:
# if self.verbose > 0:
# print(f"Found license shortnames in {filePath}: {', '.join(matched_refs)}")
# results.append({
# "shortname": "License-Identifier",
# "sim_score": 1.0,
# "sim_type": "Shortname-Scan",
# "description": f"Matched shortnames/refs: {', '.join(matched_refs)}"
# })

return results


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Scan a file or directory for license keywords.")
parser.add_argument("input_path", help="Path to the file or directory to scan.")
parser.add_argument("-v", "--verbose", action="count", default=0, help="Increase output verbosity.")
args = parser.parse_args()

agent = KeywordAgent(verbose=args.verbose)
input_path = os.path.expanduser(args.input_path)

if os.path.isfile(input_path):
results = agent.scan(input_path)
if results:
print(f"Keyword Scan results for {input_path}:")
for result in results:
print(result)
elif os.path.isdir(input_path):
print(f"Scanning directory: {input_path}")
for root, _, files in os.walk(input_path):
for file in files:
file_path = os.path.join(root, file)
results = agent.scan(file_path)
if results:
print(f"Scan results for {file_path}:")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

Since it is more of a support agent rather than being a License Detector as of now. We should also mention that specifically and I currently wondering if it can act as a standalone agent altogether or it can act as a bypasser for all the other agents? What do you think @rajuljha ?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KeywordAgent can act indepdantly as well by directly running the keyWordAgent.py file. Not through atarashi cli currently. Inside the atarashi cli, it only acts as a support agent for now. Let me know if it should be made accessible in the CLI as well.

for result in results:
print(result)
else:
print(f"Error: Invalid path '{args.input_path}'")
8 changes: 7 additions & 1 deletion atarashi/agents/tfidf.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from enum import Enum
import itertools
import time
import warnings

from numpy import unique, sum, dot
from sklearn.feature_extraction.text import TfidfVectorizer
Expand Down Expand Up @@ -78,8 +79,11 @@ def __tfidfsumscore(self, inputFile):

all_documents = self.licenseList['processed_text'].tolist()
all_documents.append(processedData1)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
sklearn_tfidf = TfidfVectorizer(min_df=1, use_idf=True, smooth_idf=True,
sublinear_tf=True, tokenizer=tokenize,
token_pattern=None,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am hoping this is for a bug fix??

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup!

vocabulary=processedData)

sklearn_representation = sklearn_tfidf.fit_transform(all_documents).toarray()
Expand Down Expand Up @@ -115,8 +119,10 @@ def __tfidfcosinesim(self, inputFile):
startTime = time.time()

all_documents = self.licenseList['processed_text'].tolist()
with warnings.catch_warnings():
warnings.simplefilter("ignore")
sklearn_tfidf = TfidfVectorizer(min_df=1, max_df=0.10, use_idf=True, smooth_idf=True,
sublinear_tf=True, tokenizer=tokenize)
sublinear_tf=True, tokenizer=tokenize, token_pattern=None)

all_documents_matrix = sklearn_tfidf.fit_transform(all_documents).toarray()
search_martix = sklearn_tfidf.transform([processedData1]).toarray()[0]
Expand Down
127 changes: 74 additions & 53 deletions atarashi/atarashii.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
from importlib_resources import files
import argparse
import errno
import json
import os
import sys
from pathlib import Path

from atarashi.agents.cosineSimNgram import NgramAgent
from atarashi.agents.dameruLevenDist import DameruLevenDist
from atarashi.agents.tfidf import TFIDF
from atarashi.agents.wordFrequencySimilarity import WordFrequencySimilarity
from atarashi.agents.keywordAgent import KeywordAgent

__author__ = "Aman Jain"
__email__ = "amanjain5221@gmail.com"
Expand Down Expand Up @@ -121,89 +123,108 @@ def main():
Calls atarashii_runner for each file in the folder/ repository specified by user
Prints the Input path and the JSON output from atarashii_runner
'''
defaultProcessed = str(files("atarashi.data.licenses").joinpath("processedLicenses.csv"))
defaultJSON = str(files("atarashi.data").joinpath("Ngram_keywords.json"))
base_dir = Path(__file__).resolve().parent
defaultProcessed = str(base_dir / "data" / "licenses" / "processedLicenses.csv")
defaultJSON = str(base_dir / "data" / "Ngram_keywords.json")

parser = argparse.ArgumentParser()
parser.add_argument("inputPath", help="Specify the input file/directory path to scan")
parser.add_argument("-l", "--processedLicenseList", required=False,
help="Specify the location of processed license list file")
parser.add_argument("-l", "--processedLicenseList", help="Processed license list CSV")
parser.add_argument("-a", "--agent_name", required=True,
choices=['wordFrequencySimilarity', 'DLD', 'tfidf', 'Ngram'],
help="Name of the agent that needs to be run")
parser.add_argument("-s", "--similarity", required=False, default="CosineSim",
choices=["ScoreSim", "CosineSim", "DiceSim", "BigramCosineSim"],
help="Specify the similarity algorithm that you want."
" First 2 are for TFIDF and last 3 are for Ngram")
parser.add_argument("-j", "--ngram_json", required=False,
help="Specify the location of Ngram JSON (for Ngram agent only)")
parser.add_argument("-v", "--verbose", help="increase output verbosity",
action="count", default=0)
parser.add_argument('-V', '--version', action='version',
version='%(prog)s ' + __version__)
choices=['wordFrequencySimilarity', 'DLD', 'tfidf', 'Ngram'],
help="Agent to run")
parser.add_argument("-s", "--similarity", default="CosineSim",
choices=["ScoreSim", "CosineSim", "DiceSim", "BigramCosineSim"],
help="Select the Similarity algorithm. First 2 are for TFIDF agent, last three for Ngram agent.")
parser.add_argument("-j", "--ngram_json", help="Ngram JSON file (for Ngram agent only)")
parser.add_argument("-v", "--verbose", action="count", default=0, help="Increase output verbosity")
parser.add_argument("--skip-keyword", action="store_true",
help="Skip KeywordAgent precheck before similarity scan")
parser.add_argument("-V", "--version", action='version', version=f'%(prog)s {__version__}')
args = parser.parse_args()

inputPath = args.inputPath
agent_name = args.agent_name
similarity = args.similarity
processedLicense = args.processedLicenseList or defaultProcessed
ngram_json = args.ngram_json or defaultJSON
verbose = args.verbose
processedLicense = args.processedLicenseList
ngram_json = args.ngram_json

if processedLicense is None:
processedLicense = defaultProcessed
if ngram_json is None:
ngram_json = defaultJSON
# Validate compatibility between agent and similarity
if args.agent_name == "tfidf" and args.similarity not in ["CosineSim", "ScoreSim"]:
print("Error: TFIDF agent supports only CosineSim or ScoreSim.", file=sys.stderr)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally this warning should be there in the help command for the agent right??

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added this to the help command, but left this as a sanity check for the agent!

return 1
if args.agent_name == "Ngram" and args.similarity not in ["CosineSim", "DiceSim", "BigramCosineSim"]:
print("Error: Ngram agent supports only CosineSim, DiceSim or BigramCosineSim.", file=sys.stderr)
return 1

scanner_obj = build_scanner_obj(processedLicense, agent_name, similarity,
ngram_json, verbose)
scanner_obj = build_scanner_obj(processedLicense, args.agent_name, args.similarity, ngram_json, verbose)
if scanner_obj == -1:
return -1
return 1

keyword_scanner = KeywordAgent(verbose=verbose)
return_code = 0
files_scanned = 0
files_skipped = 0

if os.path.isfile(inputPath):
try:
result = run_scan(scanner_obj, inputPath)
except FileNotFoundError as e:
result = ["Error: " + e.strerror + ": '" + e.filename + "'"]
return_code |= 2
except UnicodeDecodeError as e:
result = ["Error: Can not encode file '" + inputPath + "' in '" + \
e.encoding + "'"]
return_code |= 4

result = list(result)
result = {"file": os.path.abspath(inputPath), "results": result}
result = json.dumps(result, sort_keys=True, ensure_ascii=False, indent=4)
print(result + "\n")
keyword_ok = args.skip_keyword or keyword_scanner.scan(inputPath)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oww, My bad. Keyword agent actually working as a support agent here for any type of scan. :D

if not keyword_ok:
files_skipped += 1
if verbose > 0:
print(f"File {inputPath} does not appear to contain a license, skipping.")
else:
try:
result = run_scan(scanner_obj, inputPath)
result = list(result)
except FileNotFoundError as e:
result = [f"Error: {e.strerror}: '{e.filename}'"]
return_code |= 2
except UnicodeDecodeError as e:
result = [f"Error: Cannot encode file '{inputPath}' in '{e.encoding}'"]
return_code |= 2
output = {"file": os.path.abspath(inputPath), "results": result}
print(json.dumps(output, sort_keys=True, ensure_ascii=False, indent=4))
print(f"Skipped {files_skipped}.\n")

elif os.path.isdir(inputPath):
print("[")
printComma = False
for dirpath, dirnames, filenames in os.walk(inputPath):
for dirpath, _, filenames in os.walk(inputPath):
if "__MACOSX" in Path(dirpath).parts:
continue
for file in filenames:
if file.startswith("._") or file == ".DS_Store":
continue
fpath = os.path.join(dirpath, file)
keyword_ok = args.skip_keyword or keyword_scanner.scan(fpath)
if not keyword_ok:
files_skipped += 1
continue
try:
result = run_scan(scanner_obj, fpath)
result = list(result)
except FileNotFoundError as e:
result = ["Error: " + e.strerror + ": '" + e.filename + "'"]
result = [f"Error: {e.strerror}: '{e.filename}'"]
return_code |= 2
except UnicodeDecodeError as e:
result = ["Error: Can not encode file '" + fpath + "' in '" + \
e.encoding + "'"]
return_code |= 4
result = list(result)
result = {"file": os.path.abspath(fpath), "results": result}
result = [f"Error: Cannot encode file '{fpath}' in '{e.encoding}'"]
return_code |= 2
output = {"file": os.path.abspath(fpath), "results": result}
if printComma:
print(",", end="")
else:
printComma = True
print(json.dumps(result, sort_keys=True, ensure_ascii=False))
files_scanned += 1
print(json.dumps(output, sort_keys=True, ensure_ascii=False))
print("]")
if verbose > 0:
print(f"\nScanned: {files_scanned}, Skipped: {files_skipped}")
# print(f"Total files scanned: {files_scanned}\n")
# print(f"Total files skipped: {files_skipped}.\n")

else:
print("Error: Can not understand '" + inputPath + "'. Please enter a " +
"correct path or a directory")
return_code |= 6
print(f"Error: The path '{inputPath}' is not a valid file or directory.", file=sys.stderr)
return_code |= 4

return return_code


Expand Down
3 changes: 3 additions & 0 deletions atarashi/build_deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ def download_dependencies(threads = os.cpu_count(), verbose = 0):
spdxLicenseList,
processedLicenseListCsv,
verbose = verbose)
print("** Populating Licence Refs SPDX and FOSSology")
generated_refs_path = LicenseDownloader.generate_combined_license_refs(threads=threads)
print(f"Combined license refs list generated at path: {generated_refs_path}")
print("** Generating Ngrams **")
createNgrams(processedLicenseListCsv, ngramJsonLoc, threads, verbose)

Expand Down
Loading