mirror of
https://github.com/yv1ing/MollyAudit.git
synced 2025-09-16 14:55:50 +08:00
Add graphical interface
This commit is contained in:
@@ -1,2 +1,5 @@
|
|||||||
# MollyAudit
|
# MollyAudit
|
||||||
LLM-driven automatic code audit tool
|
|
||||||
|
An automated code auditing tool powered by langchain.
|
||||||
|
|
||||||
|

|
||||||
|
|||||||
48
app/__init__.py
Normal file
48
app/__init__.py
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
import os
|
||||||
|
import warnings
|
||||||
|
from audit import Audit
|
||||||
|
|
||||||
|
warnings.simplefilter('ignore', FutureWarning)
|
||||||
|
|
||||||
|
home_dir = os.path.expanduser("~")
|
||||||
|
config_file_name = ".mollyaudit"
|
||||||
|
config_file_path = os.path.join(home_dir, config_file_name)
|
||||||
|
|
||||||
|
GLOBAL_CONFIG = {
|
||||||
|
"base_url": "https://openai.com/v1",
|
||||||
|
"api_key": "",
|
||||||
|
"reasoning_model": "o3-mini-all",
|
||||||
|
"embedding_model": "text-embedding-3-small"
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def load_config():
|
||||||
|
global GLOBAL_CONFIG
|
||||||
|
|
||||||
|
if os.path.exists(config_file_path):
|
||||||
|
with open(config_file_path, 'r') as file:
|
||||||
|
for line in file:
|
||||||
|
line = line.strip()
|
||||||
|
if line and '=' in line:
|
||||||
|
key, value = line.split('=', 1)
|
||||||
|
GLOBAL_CONFIG[key] = value
|
||||||
|
else:
|
||||||
|
with open(config_file_path, 'w') as file:
|
||||||
|
for key, value in GLOBAL_CONFIG.items():
|
||||||
|
file.write(f"{key}={value}\n")
|
||||||
|
|
||||||
|
|
||||||
|
def update_config(key, value):
|
||||||
|
global GLOBAL_CONFIG
|
||||||
|
|
||||||
|
GLOBAL_CONFIG[key] = value
|
||||||
|
with open(config_file_path, 'w') as file:
|
||||||
|
for k, v in GLOBAL_CONFIG.items():
|
||||||
|
file.write(f"{k}={v}\n")
|
||||||
|
|
||||||
|
|
||||||
|
def audit_code(base_url, api_key, src_root, language, reasoning_model, embedding_model, process_output_callback,
|
||||||
|
result_output_callback, event):
|
||||||
|
audit = Audit(base_url, api_key, reasoning_model, embedding_model, process_output_callback, result_output_callback)
|
||||||
|
audit.load_source_files(src_root, language)
|
||||||
|
audit.audit(event)
|
||||||
271
app/ui.py
Normal file
271
app/ui.py
Normal file
@@ -0,0 +1,271 @@
|
|||||||
|
import os
|
||||||
|
import re
|
||||||
|
import threading
|
||||||
|
from threading import Event
|
||||||
|
from app import audit_code, update_config, GLOBAL_CONFIG
|
||||||
|
from app.utils import get_now_date
|
||||||
|
from logger import Logger
|
||||||
|
from PyQt6.QtGui import QColor, QGuiApplication, QTextCursor
|
||||||
|
from PyQt6.QtWidgets import (
|
||||||
|
QWidget,
|
||||||
|
QVBoxLayout,
|
||||||
|
QHBoxLayout,
|
||||||
|
QLabel,
|
||||||
|
QLineEdit,
|
||||||
|
QPushButton,
|
||||||
|
QFileDialog,
|
||||||
|
QTextEdit,
|
||||||
|
QComboBox
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
BACKGROUND_COLOR = '#dcdcdc'
|
||||||
|
ANSI_ESCAPE = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
|
||||||
|
ANSI_COLOR_REGEX = re.compile(r'\x1B\[(?:([0-9]+);)?([0-9]+)m')
|
||||||
|
ANSI_COLOR_MAP = {
|
||||||
|
'94': QColor(0, 0, 200),
|
||||||
|
'92': QColor(0, 128, 0),
|
||||||
|
'93': QColor(255, 127, 0),
|
||||||
|
'91': QColor(220, 0, 0),
|
||||||
|
'95': QColor(180, 0, 180)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def convert_ansi_to_rich_text(text):
|
||||||
|
segments = []
|
||||||
|
pos = 0
|
||||||
|
for match in ANSI_COLOR_REGEX.finditer(text):
|
||||||
|
start, end = match.span()
|
||||||
|
if start > pos:
|
||||||
|
segments.append(text[pos:start])
|
||||||
|
color_code = match.group(2)
|
||||||
|
if color_code in ANSI_COLOR_MAP:
|
||||||
|
color = ANSI_COLOR_MAP[color_code]
|
||||||
|
html_color = color.name()
|
||||||
|
segments.append(f'<span style="color:{html_color}">')
|
||||||
|
else:
|
||||||
|
segments.append('<span>')
|
||||||
|
pos = end
|
||||||
|
segments.append(text[pos:])
|
||||||
|
segments.append('</span>')
|
||||||
|
rich_text = ''.join(segments)
|
||||||
|
rich_text = ANSI_ESCAPE.sub('', rich_text)
|
||||||
|
|
||||||
|
return rich_text
|
||||||
|
|
||||||
|
|
||||||
|
class MainWindow(QWidget):
|
||||||
|
def __init__(self):
|
||||||
|
self.event = Event()
|
||||||
|
self.log = Logger('ui', callback=self.process_output_callback)
|
||||||
|
super().__init__()
|
||||||
|
self.init_ui()
|
||||||
|
|
||||||
|
def init_ui(self):
|
||||||
|
main_layout = QVBoxLayout()
|
||||||
|
dir_lang_layout = QHBoxLayout()
|
||||||
|
|
||||||
|
# 目录选择
|
||||||
|
dir_layout = QHBoxLayout()
|
||||||
|
self.dir_label = QLabel('项目目录:')
|
||||||
|
self.dir_input = QLineEdit()
|
||||||
|
self.dir_button = QPushButton('选择')
|
||||||
|
self.dir_button.clicked.connect(self.select_directory)
|
||||||
|
dir_layout.addWidget(self.dir_label)
|
||||||
|
dir_layout.addWidget(self.dir_input)
|
||||||
|
dir_layout.addWidget(self.dir_button)
|
||||||
|
dir_lang_layout.addLayout(dir_layout)
|
||||||
|
|
||||||
|
# 语言选择
|
||||||
|
languages = ['c', 'cpp', 'go', 'php', 'jsp', 'java', 'python', 'javascript']
|
||||||
|
self.lang_label = QLabel('项目语言:')
|
||||||
|
self.lang_combobox = QComboBox()
|
||||||
|
self.lang_combobox.addItems(languages)
|
||||||
|
dir_lang_layout.addWidget(self.lang_label)
|
||||||
|
dir_lang_layout.addWidget(self.lang_combobox)
|
||||||
|
|
||||||
|
main_layout.addLayout(dir_lang_layout)
|
||||||
|
|
||||||
|
# 配置信息
|
||||||
|
config_layout = QHBoxLayout()
|
||||||
|
self.base_url_label = QLabel('接口地址:')
|
||||||
|
self.base_url_input = QLineEdit()
|
||||||
|
self.api_key_label = QLabel('模型密钥:')
|
||||||
|
self.api_key_input = QLineEdit()
|
||||||
|
self.api_key_input.setEchoMode(QLineEdit.EchoMode.Password)
|
||||||
|
config_layout.addWidget(self.base_url_label)
|
||||||
|
config_layout.addWidget(self.base_url_input)
|
||||||
|
config_layout.addWidget(self.api_key_label)
|
||||||
|
config_layout.addWidget(self.api_key_input)
|
||||||
|
main_layout.addLayout(config_layout)
|
||||||
|
|
||||||
|
model_layout = QHBoxLayout()
|
||||||
|
self.reasoning_model_label = QLabel('推理模型:')
|
||||||
|
self.reasoning_model_input = QLineEdit()
|
||||||
|
self.embedding_model_label = QLabel('嵌入模型:')
|
||||||
|
self.embedding_model_input = QLineEdit()
|
||||||
|
model_layout.addWidget(self.reasoning_model_label)
|
||||||
|
model_layout.addWidget(self.reasoning_model_input)
|
||||||
|
model_layout.addWidget(self.embedding_model_label)
|
||||||
|
model_layout.addWidget(self.embedding_model_input)
|
||||||
|
main_layout.addLayout(model_layout)
|
||||||
|
|
||||||
|
# 按钮部分
|
||||||
|
button_layout = QHBoxLayout()
|
||||||
|
self.start_button = QPushButton('开始审计')
|
||||||
|
self.start_button.clicked.connect(self.start_process)
|
||||||
|
self.stop_button = QPushButton('终止审计')
|
||||||
|
self.stop_button.clicked.connect(self.stop_process)
|
||||||
|
self.update_button = QPushButton('更新配置')
|
||||||
|
self.update_button.clicked.connect(self.update_config)
|
||||||
|
self.clear_button = QPushButton('清空输出')
|
||||||
|
self.clear_button.clicked.connect(self.clear_panel)
|
||||||
|
button_layout.addWidget(self.start_button)
|
||||||
|
button_layout.addWidget(self.stop_button)
|
||||||
|
button_layout.addWidget(self.update_button)
|
||||||
|
button_layout.addWidget(self.clear_button)
|
||||||
|
main_layout.addLayout(button_layout)
|
||||||
|
|
||||||
|
# 实时输出
|
||||||
|
output_layout = QVBoxLayout()
|
||||||
|
|
||||||
|
# 过程输出
|
||||||
|
self.process_output_text = QTextEdit()
|
||||||
|
self.process_output_text.setReadOnly(True)
|
||||||
|
self.process_output_text.setStyleSheet(f'background-color: {BACKGROUND_COLOR};')
|
||||||
|
output_layout.addWidget(self.process_output_text)
|
||||||
|
|
||||||
|
# 结果输出
|
||||||
|
self.result_output_text = QTextEdit()
|
||||||
|
self.result_output_text.setReadOnly(True)
|
||||||
|
self.result_output_text.setStyleSheet(f'background-color: {BACKGROUND_COLOR};')
|
||||||
|
output_layout.addWidget(self.result_output_text)
|
||||||
|
|
||||||
|
output_layout.setStretch(0, 1)
|
||||||
|
output_layout.setStretch(1, 2)
|
||||||
|
main_layout.addLayout(output_layout)
|
||||||
|
|
||||||
|
self.setLayout(main_layout)
|
||||||
|
self.setWindowTitle('MollyAudit - created by yvling')
|
||||||
|
screen = QGuiApplication.primaryScreen().geometry()
|
||||||
|
window_width = 1000
|
||||||
|
window_height = 600
|
||||||
|
x = (screen.width() - window_width) // 2
|
||||||
|
y = (screen.height() - window_height) // 2
|
||||||
|
self.setGeometry(x, y, window_width, window_height)
|
||||||
|
|
||||||
|
# 导出结果
|
||||||
|
export_button_layout = QHBoxLayout()
|
||||||
|
self.export_button = QPushButton('导出结果')
|
||||||
|
self.export_button.clicked.connect(self.export_result)
|
||||||
|
export_button_layout.addStretch(1) # 添加伸缩项,使按钮靠右
|
||||||
|
export_button_layout.addWidget(self.export_button)
|
||||||
|
main_layout.addLayout(export_button_layout)
|
||||||
|
|
||||||
|
# 加载配置
|
||||||
|
self.base_url_input.setText(GLOBAL_CONFIG['base_url'])
|
||||||
|
self.api_key_input.setText(GLOBAL_CONFIG['api_key'])
|
||||||
|
self.reasoning_model_input.setText(GLOBAL_CONFIG['reasoning_model'])
|
||||||
|
self.embedding_model_input.setText(GLOBAL_CONFIG['embedding_model'])
|
||||||
|
|
||||||
|
def closeEvent(self, event):
|
||||||
|
self.event.set()
|
||||||
|
|
||||||
|
def clear_panel(self):
|
||||||
|
self.process_output_text.clear()
|
||||||
|
self.result_output_text.clear()
|
||||||
|
|
||||||
|
def update_config(self):
|
||||||
|
base_url = self.base_url_input.text()
|
||||||
|
api_key = self.api_key_input.text()
|
||||||
|
reasoning_model = self.reasoning_model_input.text()
|
||||||
|
embedding_model = self.embedding_model_input.text()
|
||||||
|
|
||||||
|
update_config('base_url', base_url)
|
||||||
|
update_config('api_key', api_key)
|
||||||
|
update_config('reasoning_model', reasoning_model)
|
||||||
|
update_config('embedding_model', embedding_model)
|
||||||
|
|
||||||
|
self.log.info('更新配置成功')
|
||||||
|
|
||||||
|
def select_directory(self):
|
||||||
|
directory = QFileDialog.getExistingDirectory(self, '选择项目目录')
|
||||||
|
if directory:
|
||||||
|
self.dir_input.setText(directory)
|
||||||
|
|
||||||
|
def export_result(self):
|
||||||
|
result_text = self.result_output_text.toPlainText()
|
||||||
|
if result_text == '':
|
||||||
|
self.log.warning('当前结果为空')
|
||||||
|
return
|
||||||
|
|
||||||
|
directory = QFileDialog.getExistingDirectory(self, '选择导出目录')
|
||||||
|
if directory:
|
||||||
|
file_name = f'molly-audit-{get_now_date()}.txt'
|
||||||
|
file_path = os.path.join(directory, file_name).replace('\\', '/')
|
||||||
|
try:
|
||||||
|
with open(file_path, 'w', encoding='utf-8') as f:
|
||||||
|
f.write(result_text)
|
||||||
|
self.log.info(f'导出结果成功: {file_path}')
|
||||||
|
except Exception as e:
|
||||||
|
self.log.error(f'导出结果错误:{str(e)}')
|
||||||
|
|
||||||
|
def process_output_callback(self, content):
|
||||||
|
rich_text = convert_ansi_to_rich_text(content)
|
||||||
|
self.process_output_text.append(rich_text)
|
||||||
|
cursor = self.process_output_text.textCursor()
|
||||||
|
cursor.movePosition(QTextCursor.MoveOperation.End)
|
||||||
|
self.process_output_text.setTextCursor(cursor)
|
||||||
|
self.process_output_text.ensureCursorVisible()
|
||||||
|
|
||||||
|
def result_output_callback(self, content):
|
||||||
|
self.result_output_text.append(f'{content}\n')
|
||||||
|
cursor = self.result_output_text.textCursor()
|
||||||
|
cursor.movePosition(QTextCursor.MoveOperation.End)
|
||||||
|
self.result_output_text.setTextCursor(cursor)
|
||||||
|
self.result_output_text.ensureCursorVisible()
|
||||||
|
|
||||||
|
def start_process(self):
|
||||||
|
selected_dir = self.dir_input.text()
|
||||||
|
selected_lang = self.lang_combobox.currentText()
|
||||||
|
base_url = self.base_url_input.text()
|
||||||
|
api_key = self.api_key_input.text()
|
||||||
|
reasoning_model = self.reasoning_model_input.text()
|
||||||
|
embedding_model = self.embedding_model_input.text()
|
||||||
|
|
||||||
|
if not selected_dir or not base_url or not api_key:
|
||||||
|
self.log.error('请确保项目目录、接口地址和模型密钥等都已填写')
|
||||||
|
return
|
||||||
|
|
||||||
|
self.log.info('正在加载所需资源')
|
||||||
|
try:
|
||||||
|
threading.Thread(
|
||||||
|
target=audit_code,
|
||||||
|
args=(
|
||||||
|
base_url,
|
||||||
|
api_key,
|
||||||
|
selected_dir,
|
||||||
|
selected_lang,
|
||||||
|
reasoning_model,
|
||||||
|
embedding_model,
|
||||||
|
self.process_output_callback,
|
||||||
|
self.result_output_callback,
|
||||||
|
self.event
|
||||||
|
)
|
||||||
|
).start()
|
||||||
|
except Exception as e:
|
||||||
|
self.log.error(f'发生异常:{str(e)}')
|
||||||
|
finally:
|
||||||
|
if 'OPENAI_API_BASE' in os.environ:
|
||||||
|
del os.environ['OPENAI_API_BASE']
|
||||||
|
if 'OPENAI_API_KEY' in os.environ:
|
||||||
|
del os.environ['OPENAI_API_KEY']
|
||||||
|
|
||||||
|
def stop_process(self):
|
||||||
|
self.event.set()
|
||||||
|
|
||||||
|
if 'OPENAI_API_BASE' in os.environ:
|
||||||
|
del os.environ['OPENAI_API_BASE']
|
||||||
|
if 'OPENAI_API_KEY' in os.environ:
|
||||||
|
del os.environ['OPENAI_API_KEY']
|
||||||
|
self.log.info('已终止代码审计流程')
|
||||||
7
app/utils.py
Normal file
7
app/utils.py
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
import datetime
|
||||||
|
|
||||||
|
|
||||||
|
def get_now_date():
|
||||||
|
now = datetime.datetime.now()
|
||||||
|
formatted = now.strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
return formatted
|
||||||
BIN
assets/img-01.png
Normal file
BIN
assets/img-01.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 85 KiB |
@@ -13,33 +13,41 @@ from langchain.retrievers import ContextualCompressionRetriever
|
|||||||
from langchain.retrievers.document_compressors import EmbeddingsFilter, DocumentCompressorPipeline
|
from langchain.retrievers.document_compressors import EmbeddingsFilter, DocumentCompressorPipeline
|
||||||
from langchain_text_splitters import CharacterTextSplitter
|
from langchain_text_splitters import CharacterTextSplitter
|
||||||
|
|
||||||
|
from audit.rules import FROTIFY_RULES
|
||||||
from logger import Logger
|
from logger import Logger
|
||||||
from audit import callback
|
from audit import callback
|
||||||
from audit.prompt import SYSTEM_PROMPT
|
from audit.prompt import SYSTEM_PROMPT
|
||||||
from audit.language import LANGUAGE
|
from audit.language import LANGUAGE
|
||||||
|
|
||||||
reasoning_model = 'gpt-4o'
|
|
||||||
embedding_model = 'text-embedding-3-large'
|
|
||||||
|
|
||||||
xml_pattern = r'<root>.*?</root>'
|
xml_pattern = r'<root>.*?</root>'
|
||||||
|
|
||||||
|
|
||||||
class Audit:
|
class Audit:
|
||||||
def __init__(self, fortify_rules):
|
def __init__(self, base_url, api_key, reasoning_model, embedding_model, process_output_callback, result_output_callback):
|
||||||
self.raw_chain = None
|
self.raw_chain = None
|
||||||
self.source_files_list = []
|
self.source_files_list = []
|
||||||
self.max_token = 4096
|
self.max_token = 4096
|
||||||
self.fortify_rules = fortify_rules
|
self.reasoning_model = reasoning_model
|
||||||
|
self.embedding_model = embedding_model
|
||||||
|
self.fortify_rules = FROTIFY_RULES
|
||||||
|
self.process_output_callback = process_output_callback
|
||||||
|
self.result_output_callback = result_output_callback
|
||||||
self.chat_history = ChatMessageHistory()
|
self.chat_history = ChatMessageHistory()
|
||||||
self.session_id = uuid.uuid4().hex
|
self.session_id = uuid.uuid4().hex
|
||||||
self.response_callback = callback.CustomCallbackHandler()
|
self.response_callback = callback.CustomCallbackHandler()
|
||||||
self.embedding = OpenAIEmbeddings(model=embedding_model)
|
self.embedding = OpenAIEmbeddings(
|
||||||
|
base_url=base_url,
|
||||||
|
api_key=api_key,
|
||||||
|
model=embedding_model
|
||||||
|
)
|
||||||
self.llm = ChatOpenAI(
|
self.llm = ChatOpenAI(
|
||||||
|
base_url=base_url,
|
||||||
|
api_key=api_key,
|
||||||
model=reasoning_model,
|
model=reasoning_model,
|
||||||
streaming=True,
|
streaming=True,
|
||||||
callbacks=[self.response_callback]
|
callbacks=[self.response_callback]
|
||||||
)
|
)
|
||||||
self.log = Logger('audit')
|
self.log = Logger('audit', callback=self.process_output_callback)
|
||||||
self.splitter = CharacterTextSplitter(
|
self.splitter = CharacterTextSplitter(
|
||||||
chunk_size=300,
|
chunk_size=300,
|
||||||
chunk_overlap=0,
|
chunk_overlap=0,
|
||||||
@@ -65,12 +73,28 @@ class Audit:
|
|||||||
('human', '{input}'),
|
('human', '{input}'),
|
||||||
])
|
])
|
||||||
|
|
||||||
def audit(self, callback_function):
|
def audit(self, event):
|
||||||
self.log.info('Start auditing')
|
if len(self.source_files_list) <= 0:
|
||||||
|
self.log.error('没有找到源代码文件')
|
||||||
|
return
|
||||||
|
|
||||||
|
self.log.info('开始代码审计流程')
|
||||||
|
self.log.info(f'当前推理模型:{self.reasoning_model}')
|
||||||
|
self.log.info(f'当前嵌入模型:{self.embedding_model}')
|
||||||
|
|
||||||
input_content = ''
|
input_content = ''
|
||||||
while True:
|
while True:
|
||||||
|
if event.is_set():
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
result = self.send_message(input_content)
|
result = self.send_message(input_content)
|
||||||
|
except Exception as e:
|
||||||
|
self.log.error(e)
|
||||||
|
return
|
||||||
|
|
||||||
|
if event.is_set():
|
||||||
|
return
|
||||||
|
|
||||||
if xml_match := re.search(xml_pattern, result, re.DOTALL):
|
if xml_match := re.search(xml_pattern, result, re.DOTALL):
|
||||||
try:
|
try:
|
||||||
@@ -80,33 +104,36 @@ class Audit:
|
|||||||
action = root.find('action').text
|
action = root.find('action').text
|
||||||
content = root.find('content').text
|
content = root.find('content').text
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.log.error(f'Illegal output, try to correct')
|
print(result)
|
||||||
|
print(e)
|
||||||
|
self.log.error(f'动作指令不合法,尝试纠正')
|
||||||
input_content = 'ILLEGAL OUTPUT'
|
input_content = 'ILLEGAL OUTPUT'
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if action == 'QUERY STRUCTURE':
|
if action == 'QUERY STRUCTURE':
|
||||||
self.log.info('Request project structure')
|
self.log.info('请求查询项目结构')
|
||||||
input_content = '\n'.join(x for x in self.source_files_list)
|
input_content = '\n'.join(x for x in self.source_files_list)
|
||||||
continue
|
continue
|
||||||
elif action == 'QUERY SOURCE':
|
elif action == 'QUERY SOURCE':
|
||||||
self.log.info(f'Request source code: {content}')
|
self.log.info(f'请求查询源代码:{content}')
|
||||||
input_content = open(content, 'r', encoding='utf-8').read()
|
input_content = open(content, 'r', encoding='utf-8').read()
|
||||||
continue
|
continue
|
||||||
elif action == 'QUERY FORTIFY':
|
elif action == 'QUERY FORTIFY':
|
||||||
self.log.info(f'Request fortify: {content}')
|
self.log.info(f'请求查询规则库:{content}')
|
||||||
input_content = '\n'.join(x for x in self.fortify_rules if x == content)
|
input_content = '\n'.join(x for x in self.fortify_rules if x == content)
|
||||||
continue
|
continue
|
||||||
elif action == 'OUTPUT RESULT':
|
elif action == 'OUTPUT RESULT':
|
||||||
self.log.warning(f'Audit result: \n\n{content}')
|
self.log.warning('输出代码审计结果')
|
||||||
|
self.result_output_callback(content)
|
||||||
self.store_messages_in_faiss(content)
|
self.store_messages_in_faiss(content)
|
||||||
callback_function(content) # Callback function, used to obtain results externally
|
input_content = 'ok'
|
||||||
input_content = ''
|
|
||||||
continue
|
continue
|
||||||
elif action == 'FINISH TASK':
|
elif action == 'FINISH TASK':
|
||||||
self.log.info(content)
|
self.log.info('代码审计任务已完成')
|
||||||
|
return
|
||||||
else:
|
else:
|
||||||
self.log.critical(f'Unknown action! {action}')
|
self.log.error(f'动作指令未定义:{action}')
|
||||||
break
|
return
|
||||||
|
|
||||||
def send_message(self, input_content):
|
def send_message(self, input_content):
|
||||||
self.response_callback.temp_content = ''
|
self.response_callback.temp_content = ''
|
||||||
@@ -140,20 +167,18 @@ class Audit:
|
|||||||
text_embedding = self.embedding.embed_query(message)
|
text_embedding = self.embedding.embed_query(message)
|
||||||
doc_id = str(uuid.uuid4())
|
doc_id = str(uuid.uuid4())
|
||||||
self.messages_db.add_embeddings([(doc_id, text_embedding)], metadatas=[{"id": doc_id}])
|
self.messages_db.add_embeddings([(doc_id, text_embedding)], metadatas=[{"id": doc_id}])
|
||||||
self.log.info(f"Audit result stored in messages_db with ID: {doc_id}")
|
self.log.info(f"代码审计结果已缓存,文档编号:{doc_id}")
|
||||||
|
|
||||||
def load_source_files(self, path, lang):
|
def load_source_files(self, path, lang):
|
||||||
self.log.info('Loading source files')
|
|
||||||
|
|
||||||
if lang in LANGUAGE:
|
if lang in LANGUAGE:
|
||||||
suffixes = LANGUAGE[lang]
|
suffixes = LANGUAGE[lang]
|
||||||
else:
|
else:
|
||||||
self.log.critical('Language not supported!')
|
self.log.error('不支持的编程语言')
|
||||||
return
|
return
|
||||||
|
|
||||||
for root, _, files in os.walk(path):
|
for root, _, files in os.walk(path):
|
||||||
self.source_files_list.extend(
|
self.source_files_list.extend(
|
||||||
os.path.join(root, file) for file in files if any(file.endswith(suffix) for suffix in suffixes)
|
os.path.join(root, file).replace('\\', '/') for file in files if any(file.endswith(suffix) for suffix in suffixes)
|
||||||
)
|
)
|
||||||
|
|
||||||
self.log.info(f'Finished loading source files. total files: {len(self.source_files_list)}')
|
self.log.info(f'源代码文件加载完成,共:{len(self.source_files_list)} 个')
|
||||||
|
|||||||
117
audit/prompt.py
117
audit/prompt.py
@@ -1,59 +1,76 @@
|
|||||||
SYSTEM_PROMPT = """
|
SYSTEM_PROMPT = """
|
||||||
You are an intelligent code auditor. I will provide you with a source code. Please strictly follow the following requirements to conduct code audit.
|
You are a professional code audit security expert, responsible for helping users audit possible vulnerabilities and security issues in source code.
|
||||||
During the audit process, you can refer to Fortify's rule base(Execute Action 3), but it does not have to be completely consistent to determine the existence of a vulnerability. The rule base format provided to you is as follows:
|
You will perform code audits according to the following process:
|
||||||
|
|
||||||
|
1. Query project structure
|
||||||
|
You input the action command in the following format, and the user will send you the absolute path of all source files in the project below:
|
||||||
|
<root>
|
||||||
|
<action>QUERY STRUCTURE</action>
|
||||||
|
<content></content>
|
||||||
|
</root>
|
||||||
|
|
||||||
|
2. Query the vulnerability detection rule base
|
||||||
|
You input the action instructions in the following format, and the user will send you the vulnerability detection rule library extracted from Fortify as a reference for your code audit:
|
||||||
|
<root>
|
||||||
|
<action>QUERY FORTIFY</action>
|
||||||
|
<content>The language you want to query, options are: c, cpp, go, php, jsp, java, python, javascript</content>
|
||||||
|
</root>
|
||||||
|
|
||||||
|
3. Query the source code
|
||||||
|
You input the action command in the following format, and the user will send you the source code you need below:
|
||||||
|
<root>
|
||||||
|
<action>QUERY SOURCE</action>
|
||||||
|
<content>the absolute path of the file you want to query</content>
|
||||||
|
</root>
|
||||||
|
|
||||||
|
4. Output code audit results
|
||||||
|
You input the code audit results in the following format, and the user will send you "ok", then you can proceed to the next step of the audit:
|
||||||
|
<root>
|
||||||
|
<action>OUTPUT RESULT</action>
|
||||||
|
<content>the audit results you want to output</content>
|
||||||
|
</root>
|
||||||
|
|
||||||
|
5. Finish audit task
|
||||||
|
When you are sure that all source code files have been audited, you can output the action instructions to end the task in the following format:
|
||||||
|
<root>
|
||||||
|
<action>FINISH TASK</action>
|
||||||
|
<content></content>
|
||||||
|
</root>
|
||||||
|
|
||||||
|
All your output can only be one of the five actions mentioned above. Any other form of output is strictly prohibited.
|
||||||
|
|
||||||
|
|
||||||
|
Some additional information, which are some specifications when you perform actions:
|
||||||
|
1. The format of the vulnerability detection rule base provided to you is as follows:
|
||||||
{
|
{
|
||||||
'language':
|
'language':
|
||||||
'vuln_kingdom':
|
'vuln_kingdom':
|
||||||
'vuln_category':
|
'vuln_category':
|
||||||
}
|
}
|
||||||
|
|
||||||
Before officially starting the audit, it is recommended to query the Fortify rule base as a reference.
|
2. When you output the code audit results, you must use Chinese output and follow the following format:
|
||||||
All your output must strictly follow the following specifications. It is forbidden to output in any other form (including plain text, Markdown, etc.), and it is forbidden to bring "`" when outputting.
|
漏洞类型:
|
||||||
You can choose to perform the following actions:
|
漏洞文件:
|
||||||
|
相关代码:
|
||||||
|
修复建议:
|
||||||
|
|
||||||
1. Query project structure:
|
Some Mandatory regulations:
|
||||||
<root>
|
1. Output Format:
|
||||||
<action>QUERY STRUCTURE</action>
|
a. Strictly use the predefined XML tag structure
|
||||||
<content></content>
|
b. Any Markdown symbols are not allowed
|
||||||
</root>
|
c. No line breaks in the content field
|
||||||
|
2. Language Standards:
|
||||||
2. Query code files
|
a. Technical terms are kept in their original English
|
||||||
<root>
|
b. Vulnerability descriptions must be in Chinese
|
||||||
<action>QUERY SOURCE</action>
|
3. Interaction restrictions:
|
||||||
<content>the absolute path of the file you want to query</content>
|
a. Any content outside the output process is prohibited
|
||||||
</root>
|
b. Autonomously advance the audit process when receiving "nothing" or "ok"
|
||||||
|
c. Vulnerabilities must be output immediately
|
||||||
3. Query fortify
|
4. Error handling:
|
||||||
<root>
|
a. When receiving the "ILLEGAL OUTPUT" prompt, terminate the current output immediately and recheck the format specification before continuing
|
||||||
<action>QUERY FORTIFY</action>
|
5. Priority logic:
|
||||||
<content>The language you want to query, options are: c, cpp, go, php, jsp, java, python, javascript</content>
|
a. Entry file > Configuration file > Tool file
|
||||||
</root>
|
b. High-risk vulnerabilities (such as injection and RCE) are handled first
|
||||||
|
c. If multiple vulnerabilities are found in the same file, they need to be output multiple times
|
||||||
4. Output audit results
|
d. For vulnerabilities that may span files, the audit can only begin after the relevant files have been queried as needed
|
||||||
<root>
|
|
||||||
<action>OUTPUT RESULT</action>
|
|
||||||
<content>the audit results you want to output</content>
|
|
||||||
</root>
|
|
||||||
|
|
||||||
The output result format is as follows(JSON):
|
|
||||||
{
|
|
||||||
"Vulnerability Type":
|
|
||||||
"Vulnerability File":
|
|
||||||
"Vulnerability Code Summary":
|
|
||||||
"Vulnerability repair suggestions":
|
|
||||||
}
|
|
||||||
|
|
||||||
5. End the audit task
|
|
||||||
<root>
|
|
||||||
<action>FINISH TASK</action>
|
|
||||||
<content></content>
|
|
||||||
</root>
|
|
||||||
|
|
||||||
Important things:
|
|
||||||
1. When the user sends you "nothing", you need to decide the next step based on the current audit progress;
|
|
||||||
2. When you make an action to query the project structure, the user will send you the following format (C:\\Users\\yvling\\Desktop\\PHP-Vuln\\src\\index.php), which is a text containing the absolute paths of several source code files. You need to construct the project structure that you can understand based on these contents;
|
|
||||||
3. When you need to query the content of a code file, please note that you can only query one file at a time. Please follow The above format outputs the absolute path of the file to be queried;
|
|
||||||
4. After you output the audit results, the user will reply with an empty string. Please make sure that all code files have been audited before ending the audit task;
|
|
||||||
5. In any case, you must strictly follow the several action formats given above for output. Any content outside the output format is prohibited. Do not try to ask or suggest;
|
|
||||||
6. When the user prompts "ILLEGAL OUTPUT", it means that your output violates the user's specifications. Please confirm again that all your output must comply with the user's specifications.
|
|
||||||
"""
|
"""
|
||||||
|
|||||||
File diff suppressed because one or more lines are too long
@@ -1,6 +1,6 @@
|
|||||||
|
import time
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
|
|
||||||
LOG_COLORS = {
|
LOG_COLORS = {
|
||||||
'DEBUG': '\033[94m', # 蓝色
|
'DEBUG': '\033[94m', # 蓝色
|
||||||
'INFO': '\033[92m', # 绿色
|
'INFO': '\033[92m', # 绿色
|
||||||
@@ -11,30 +11,36 @@ LOG_COLORS = {
|
|||||||
RESET_COLOR = '\033[0m'
|
RESET_COLOR = '\033[0m'
|
||||||
|
|
||||||
|
|
||||||
def log_with_color(level, message):
|
|
||||||
color = LOG_COLORS.get(level, RESET_COLOR)
|
|
||||||
prefix = f"[{level}]"
|
|
||||||
date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
||||||
formatted_message = f"{color}{date} {prefix} {message}{RESET_COLOR}"
|
|
||||||
|
|
||||||
print(formatted_message)
|
|
||||||
|
|
||||||
|
|
||||||
class Logger:
|
class Logger:
|
||||||
def __init__(self, name):
|
def __init__(self, name, callback):
|
||||||
|
self.name = name
|
||||||
|
self.callback = callback
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def debug(self, message):
|
def debug(self, message):
|
||||||
log_with_color("DEBUG", message)
|
self.log_with_color("DEBUG", message)
|
||||||
|
|
||||||
def info(self, message):
|
def info(self, message):
|
||||||
log_with_color("INFO", message)
|
self.log_with_color("INFO", message)
|
||||||
|
|
||||||
def warning(self, message):
|
def warning(self, message):
|
||||||
log_with_color("WARNING", message)
|
self.log_with_color("WARNING", message)
|
||||||
|
|
||||||
def error(self, message):
|
def error(self, message):
|
||||||
log_with_color("ERROR", message)
|
self.log_with_color("ERROR", message)
|
||||||
|
|
||||||
def critical(self, message):
|
def critical(self, message):
|
||||||
log_with_color("CRITICAL", message)
|
self.log_with_color("CRITICAL", message)
|
||||||
|
|
||||||
|
def log_with_color(self, level, message):
|
||||||
|
color = LOG_COLORS.get(level, RESET_COLOR)
|
||||||
|
date = datetime.now().strftime('%H:%M:%S')
|
||||||
|
|
||||||
|
prefix = f"[{date}]"
|
||||||
|
formatted_message = f"{color}{prefix} {message}{RESET_COLOR}"
|
||||||
|
|
||||||
|
print(formatted_message)
|
||||||
|
if self.callback:
|
||||||
|
self.callback(formatted_message)
|
||||||
|
|
||||||
|
time.sleep(0.1)
|
||||||
|
|||||||
36
main.py
36
main.py
@@ -1,26 +1,24 @@
|
|||||||
import json
|
import sys
|
||||||
import os
|
from PyQt6.QtWidgets import QApplication
|
||||||
import warnings
|
from app import load_config
|
||||||
from audit import Audit
|
from app.ui import MainWindow
|
||||||
|
|
||||||
|
app = QApplication(sys.argv)
|
||||||
|
|
||||||
|
|
||||||
warnings.simplefilter('ignore', FutureWarning)
|
def main():
|
||||||
|
try:
|
||||||
os.environ['OPENAI_API_BASE'] = 'https://yunwu.ai/v1'
|
app.exec()
|
||||||
os.environ['OPENAI_API_KEY'] = 'sk-FdKVL1IiRCMhTVScD4iIEfE2U7978rKuAQhPl0Gbr55l6fDD'
|
return 0
|
||||||
|
except Exception as e:
|
||||||
fortify_rules = json.load(open('fortify_rules.json', 'r', encoding='utf-8'))
|
print(e)
|
||||||
|
return 1
|
||||||
|
|
||||||
def result_callback(result):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
src_root = r'C:\Users\yvling\Desktop\PHP-Vuln'
|
load_config()
|
||||||
language = 'php'
|
|
||||||
|
|
||||||
audit = Audit(fortify_rules)
|
window = MainWindow()
|
||||||
audit.load_source_files(src_root, language)
|
window.show()
|
||||||
audit.audit(result_callback)
|
|
||||||
|
|
||||||
|
sys.exit(main())
|
||||||
|
|||||||
BIN
requirements.txt
BIN
requirements.txt
Binary file not shown.
Reference in New Issue
Block a user