|
|
本帖最后由 xiaoyingkm 于 2025-4-8 11:20 编辑
整体思路
计算机 1:负责将视频采集卡采集的画面发送到计算机 2,并接收计算机 2 回传的操作指令并执行。
计算机 2:接收计算机 1 发送的画面数据,进行目标检测和锁定操作,将操作指令回传至计算机 1。
代码实现
计算机 1(发送画面并接收操作)
python
import cv2
import socket
import struct
import pickle
import pyautogui
# 视频采集卡相关设置
cap = cv2.VideoCapture(0) # 假设视频采集卡设备编号为0
# 网络设置
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('0.0.0.0', 8888))
server_socket.listen(1)
print('Waiting for connection...')
conn, addr = server_socket.accept()
print('Connected by', addr)
while True:
ret, frame = cap.read()
if not ret:
break
# 发送画面数据
data = pickle.dumps(frame)
message_size = struct.pack("L", len(data))
conn.sendall(message_size + data)
# 接收操作指令
try:
operation_size = struct.unpack("L", conn.recv(struct.calcsize("L")))[0]
operation_data = b""
while len(operation_data) < operation_size:
operation_data += conn.recv(4096)
operation = pickle.loads(operation_data)
# 执行操作
if operation['type'] == 'move':
pyautogui.moveRel(operation['x'], operation['y'])
elif operation['type'] == 'click':
pyautogui.click()
except Exception as e:
print(f"Error receiving operation: {e}")
cap.release()
conn.close()
server_socket.close()
计算机 2(接收画面并发送操作)
python
import cv2
import socket
import struct
import pickle
import torch
from utils.general import non_max_suppression
from models.common import DetectMultiBackend
from utils.torch_utils import select_device
from math import ceil
import pyautogui
# 网络设置
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect(('计算机1的IP地址', 8888))
# 模型设置
conf_thres = 0.65
iou_thres = 0.55
device = select_device()
model = DetectMultiBackend('cf.pt', device=device)
aims = []
offset = 160
baim = True
autofire = False
def lock():
global autofire
aims_copy = aims
if baim:
aims_copy = [x for x in aims_copy if x[0] in [0, 1]]
if 0 in [x[0] for x in aims_copy]:
aims_copy = [x for x in aims_copy if x[0] in [0]]
else:
aims_copy = [x for x in aims_copy if x[0] in [1]]
autofire = False
if len(aims_copy):
dist_list = []
for _, x_c, y_c, _ in aims_copy:
dist = (x_c - offset) ** 2 + (y_c - offset) ** 2
dist_list.append(dist)
tag, x_center, y_center, height = aims_copy[dist_list.index(min(dist_list))]
move_x = ceil((x_center - offset) * xfov)
body_y = ceil((y_center - offset - height / 7) * yfov)
head_y = ceil((y_center - offset) * yfov)
if pyautogui.mouseDown():
if tag in [0]:
# 发送移动操作指令
operation = {'type': 'move', 'x': move_x, 'y': body_y}
data = pickle.dumps(operation)
message_size = struct.pack("L", len(data))
client_socket.sendall(message_size + data)
elif tag in [1]:
autofire = True
# 发送移动操作指令
operation = {'type': 'move', 'x': move_x, 'y': head_y}
data = pickle.dumps(operation)
message_size = struct.pack("L", len(data))
client_socket.sendall(message_size + data)
# 发送点击操作指令
operation = {'type': 'click'}
data = pickle.dumps(operation)
message_size = struct.pack("L", len(data))
client_socket.sendall(message_size + data)
while True:
# 接收画面数据
try:
message_size = struct.unpack("L", client_socket.recv(struct.calcsize("L")))[0]
data = b""
while len(data) < message_size:
data += client_socket.recv(4096)
frame = pickle.loads(data)
img = frame.transpose((2, 0, 1))
img = torch.from_numpy(img).to(device)
img = img.float()
img /= 255.
img = img[None]
pred = model(img, False, False)
pred = non_max_suppression(pred, conf_thres, iou_thres, agnostic=False)
aims.clear()
if not len(pred):
continue
for det in pred:
for *xyxy, _, cls in det:
line = [cls,
ceil((xyxy[0] + xyxy[2]) / 2),
ceil((xyxy[1] + xyxy[3]) / 2),
ceil(xyxy[3] - xyxy[1])]
aims.append(line)
lock()
except Exception as e:
print(f"Error receiving frame: {e}")
break
client_socket.close()
代码说明
计算机 1:
使用 cv2.VideoCapture 从视频采集卡获取画面。
通过 socket 建立服务器,将画面数据发送给计算机 2。
接收计算机 2 回传的操作指令,并使用 pyautogui 执行操作。
计算机 2:
通过 socket 建立客户端,连接到计算机 1。
接收计算机 1 发送的画面数据,进行目标检测和锁定操作。
将操作指令(如移动鼠标、点击鼠标)回传给计算机 1。
计算机2的代码思路来源于论坛大佬的分享下,非本人原创。
|
|