mirror of
https://github.com/yv1ing/ShotRDP.git
synced 2025-09-16 15:10:57 +08:00
188 lines
5.7 KiB
Python
188 lines
5.7 KiB
Python
import sys
|
|
import ctypes
|
|
from PyQt6.QtCore import Qt, pyqtSignal, QObject, QThread
|
|
from PyQt6.QtGui import QPixmap
|
|
from PyQt6.QtWidgets import (
|
|
QApplication, QWidget, QVBoxLayout, QHBoxLayout,
|
|
QLineEdit, QPushButton, QLabel, QTextEdit
|
|
)
|
|
|
|
from predict import solve
|
|
|
|
|
|
def shot(target, width, height):
|
|
lib = ctypes.CDLL('./shotrdp.dll')
|
|
|
|
lib.GetScreen.argtypes = [
|
|
ctypes.c_char_p,
|
|
ctypes.POINTER(ctypes.POINTER(ctypes.c_char)),
|
|
ctypes.POINTER(ctypes.c_int),
|
|
ctypes.c_int,
|
|
ctypes.c_int,
|
|
]
|
|
lib.GetScreen.restype = ctypes.c_char_p
|
|
|
|
data = ctypes.POINTER(ctypes.c_char)()
|
|
length = ctypes.c_int()
|
|
|
|
error_ptr = lib.GetScreen(target, ctypes.byref(data), ctypes.byref(length), width, height)
|
|
if error_ptr:
|
|
lib.Free(data)
|
|
return None, ctypes.string_at(error_ptr).decode()
|
|
else:
|
|
image_bytes = ctypes.string_at(data, length.value)
|
|
lib.Free(data)
|
|
return image_bytes, ''
|
|
|
|
|
|
class Worker(QObject):
|
|
error_occurred = pyqtSignal(str)
|
|
image_signal = pyqtSignal(bytes, str)
|
|
finished = pyqtSignal(str)
|
|
|
|
def __init__(self, target):
|
|
super().__init__()
|
|
self.target = target
|
|
|
|
def run(self):
|
|
try:
|
|
image_bytes, error_msg = shot(self.target, 1024, 800)
|
|
if error_msg:
|
|
self.error_occurred.emit(error_msg)
|
|
else:
|
|
self.image_signal.emit(image_bytes, error_msg)
|
|
|
|
result = solve(image_bytes)
|
|
self.finished.emit(result)
|
|
except Exception as e:
|
|
self.error_occurred.emit(str(e))
|
|
|
|
|
|
class MainWindow(QWidget):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.setWindowFlag(Qt.WindowType.WindowMaximizeButtonHint, False)
|
|
self.init_ui()
|
|
|
|
def init_ui(self):
|
|
# 输入区域
|
|
input_layout = QHBoxLayout()
|
|
|
|
# 地址部分
|
|
address_group = QHBoxLayout()
|
|
self.address_label = QLabel("Host: ")
|
|
self.address_input = QLineEdit()
|
|
self.address_input.setText("127.0.0.1")
|
|
address_group.addWidget(self.address_label)
|
|
address_group.addWidget(self.address_input)
|
|
|
|
# 端口部分
|
|
port_group = QHBoxLayout()
|
|
self.port_label = QLabel("Port: ")
|
|
self.port_input = QLineEdit()
|
|
self.port_input.setText("3389")
|
|
port_group.addWidget(self.port_label)
|
|
port_group.addWidget(self.port_input)
|
|
|
|
# 开始按钮
|
|
self.start_btn = QPushButton("Start")
|
|
|
|
# 组合布局
|
|
input_layout.addLayout(address_group)
|
|
input_layout.addLayout(port_group)
|
|
input_layout.addWidget(self.start_btn)
|
|
input_layout.setSpacing(20)
|
|
|
|
address_group.setSpacing(5)
|
|
port_group.setSpacing(5)
|
|
|
|
# 显示区域
|
|
self.image_label = QLabel()
|
|
self.image_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
|
|
self.image_label.setMinimumSize(512, 400)
|
|
|
|
self.log_output = QTextEdit()
|
|
self.log_output.setReadOnly(True)
|
|
self.log_output.setFixedHeight(400)
|
|
|
|
display_layout = QHBoxLayout()
|
|
display_layout.addWidget(self.image_label)
|
|
display_layout.addWidget(self.log_output)
|
|
|
|
# 主布局
|
|
main_layout = QVBoxLayout()
|
|
main_layout.addLayout(input_layout)
|
|
main_layout.addLayout(display_layout)
|
|
self.setLayout(main_layout)
|
|
|
|
# 事件绑定
|
|
self.start_btn.clicked.connect(self.start_scan)
|
|
|
|
# 窗口设置
|
|
self.setWindowTitle("Shot RDP - By yv1ing")
|
|
self.setGeometry(100, 100, 900, 450)
|
|
|
|
def start_scan(self):
|
|
address = self.address_input.text().strip()
|
|
port = self.port_input.text().strip()
|
|
|
|
if not address or not port:
|
|
self.log_output.append("[WARN] The address and port cannot be empty")
|
|
return
|
|
|
|
try:
|
|
port = int(port)
|
|
if not (0 < port <= 65535):
|
|
raise ValueError
|
|
except ValueError:
|
|
self.log_output.append("[WARN] The port number is invalid.")
|
|
return
|
|
|
|
target = f"{address}:{port}".encode()
|
|
self.log_output.append(f"[INFO] Start identifying: {target.decode()}")
|
|
|
|
# 创建线程
|
|
self.worker = Worker(target)
|
|
self.thread = QThread()
|
|
self.worker.moveToThread(self.thread)
|
|
|
|
# 信号连接
|
|
self.thread.started.connect(self.worker.run)
|
|
self.worker.finished.connect(self.solve_result)
|
|
self.worker.image_signal.connect(self.update_image)
|
|
self.worker.error_occurred.connect(self.show_error)
|
|
self.worker.finished.connect(self.thread.quit)
|
|
self.worker.error_occurred.connect(self.thread.quit)
|
|
self.thread.finished.connect(self.thread.deleteLater)
|
|
|
|
# 禁用按钮防止重复点击
|
|
self.start_btn.setEnabled(False)
|
|
self.thread.finished.connect(lambda: self.start_btn.setEnabled(True))
|
|
|
|
self.thread.start()
|
|
|
|
def update_image(self, image_bytes, _):
|
|
pixmap = QPixmap()
|
|
pixmap.loadFromData(image_bytes)
|
|
if not pixmap.isNull():
|
|
scaled_pixmap = pixmap.scaled(512, 400, Qt.AspectRatioMode.KeepAspectRatio)
|
|
self.image_label.setPixmap(scaled_pixmap)
|
|
self.log_output.append("[INFO] Image acquisition successful")
|
|
else:
|
|
self.log_output.append("[WARN] Image data is invalid")
|
|
|
|
def solve_result(self, result):
|
|
self.log_output.append(f"[INFO] Operating system version: {result}")
|
|
self.log_output.append("-" * 70)
|
|
|
|
def show_error(self, error_msg):
|
|
self.log_output.append(f"[WARN] {error_msg}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app = QApplication(sys.argv)
|
|
|
|
window = MainWindow()
|
|
window.show()
|
|
sys.exit(app.exec())
|