Files
2025-07-25 17:54:28 +08:00

200 lines
6.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# import required libraries
import time
import numpy as np
import cv2
import sys
# import RflySim APIs
import PX4MavCtrlV4 as PX4MavCtrl
import VisionCaptureApi
import UE4CtrlAPI
ue = UE4CtrlAPI.UE4CtrlAPI()
vis = VisionCaptureApi.VisionCaptureApi()
# VisionCaptureApi 中的配置函数
vis.jsonLoad(0,'Config1.json') # 使用共享内存方式并加载Config1.json中的传感器配置文件
mav = PX4MavCtrl.PX4MavCtrler(1) #对应1号飞机端口
mav.InitMavLoop()
print("Simulation Start.")
isSuss = vis.sendReqToUE4() # 向RflySim3D发送取图请求并验证
if not isSuss: # 如果请求取图失败,则退出
sys.exit(0)
vis.startImgCap(True) # 开启取图,并启用共享内存图像转发,转发到填写的目录
# Send command to UE4 Window 1 to change resolution
ue.sendUE4Cmd('r.setres 720x405w',0) # 设置UE4窗口分辨率注意本窗口仅限于显示取图分辨率在json中配置本窗口设置越小资源需求越少。
ue.sendUE4Cmd('t.MaxFPS 30',0) # 设置UE4最大刷新频率同时也是取图频率
time.sleep(2)
width = 720
height = 405
channel = 4
# define same functions for computaion
def angle_cos(p0, p1, p2):
d1, d2 = (p0-p1).astype('float'), (p2-p1).astype('float')
return abs( np.dot(d1, d2) / np.sqrt( np.dot(d1, d1)*np.dot(d2, d2) ) )
def diagonal_check(p):
d1 = np.sqrt(np.dot(p[0]-p[2], p[0]-p[2]))
d2 = np.sqrt(np.dot(p[1]-p[3], p[1]-p[3]))
return abs(d1-d2)*1.0/d1
def saturationYawRate(yaw_rate):
yr_bound = 20.0
if yaw_rate > yr_bound:
yaw_rate = yr_bound
if yaw_rate < -yr_bound:
yaw_rate = -yr_bound
return yaw_rate
def taskChange(pos_x):
if pos_x < 40:
task = "range1"
elif pos_x <70:
task = "range2"
elif pos_x < 130:
task = "range3"
elif pos_x < 140:
task = "land"
else:
task = "finish"
return task
def sat(inPwm,thres=1):
outPwm= inPwm
if inPwm>thres:
outPwm = thres
elif inPwm<-thres:
outPwm = -thres
return outPwm
# object detect function
def objectDetect(task):
"""According task to detect objects"""
if vis.hasData[0]:
img_bgr=vis.Img[0]
else:
return -1,-1,-1
b,g,r = cv2.split(img_bgr)
img_edge = cv2.Canny(b, 50, 100)
if task == "range1" or task == "range2":
return circleDetect(img_bgr, img_edge, b)
else:
return squareDetect(img_bgr, img_edge)
# square detect for object detect function
def squareDetect(img_bgr, img_edge):
"""Detect Square with PolyDP and diagonal length"""
# find contours
squares = []
cnts, hierarchy = cv2.findContours(img_edge, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
for cnt in cnts:
cnt_len = cv2.arcLength(cnt, True)
cnt = cv2.approxPolyDP(cnt, 0.05 * cnt_len, True)
if len(cnt) == 4 and cv2.contourArea(cnt) > 2000 and cv2.isContourConvex(cnt):
cnt = cnt.reshape(-1, 2)
diag_delta = diagonal_check(cnt)
if diag_delta < 0.2:
squares.append(cnt)
cv2.drawContours( img_bgr, squares, -1, (0, 255, 0), 3)
cv2.imshow("img_bgr", img_bgr)
cv2.waitKey(1)
height, width, channel = img_bgr.shape
if squares:
return (squares[0][0][0]+squares[0][2][0])/2 - width/2, (squares[0][0][1]+squares[0][2][1])/2 - height/2, np.sqrt(np.dot(squares[0][0]-squares[0][2], squares[0][0]-squares[0][2]))
else:
return -1,-1,-1
# circle detect for object detect function
def circleDetect(img_bgr, img_edge, img_b):
"""Hough Circle detect"""
circles = cv2.HoughCircles(img_b, cv2.HOUGH_GRADIENT, 1, 20, param1=100, param2=30, minRadius=0, maxRadius=0)
if circles is not None:
circles = np.uint16(np.around(circles))
obj = circles[0, 0]
cv2.circle(img_bgr, (obj[0], obj[1]), obj[2], (0,255,0), 2)
cv2.circle(img_bgr, (obj[0], obj[1]), 2, (0,255,255), 3)
cv2.imshow("img_bgr", img_bgr)
cv2.waitKey(1)
height, width, channel = img_bgr.shape
return obj[0]-width/2, obj[1]-height/2, obj[2]
else:
return -1,-1,-1
# approaching Objective/ crossing rings algorithm
def approachObjective():
# 0. parameters
# some parameters that work:(0.03, -0.03, 1, 5.0); (0.06, -0.04, 1, 10.0)
K_z = 0.004 * 640 / height
K_yawrate = 0.005 * 480 / width
task = "range1"
# 1. start
startAppTime= time.time()
lastTime = time.time()
# time interval of the timer
timeInterval = 1/30.0 #here is 0.0333s (30Hz)
while (task != "finish") & (task != "land"):
lastTime = lastTime + timeInterval
sleepTime = lastTime - time.time()
if sleepTime > 0:
time.sleep(sleepTime) # sleep until the desired clock
else:
lastTime = time.time()
# The following code will be executed 30Hz (0.0333s)
# 1.1. detect object and get error with center of image
ex, ey, r = objectDetect(task)
# 1.2. where is the drone
pos_x = mav.uavPosNED[0]
#print(time.time())
# 1.3. update task
task = taskChange(pos_x)
# 1.4. attack
if ex != -1:
vx = sat(time.time()-startAppTime,3)
vy = 0
vz = K_z * ey
yawrate = K_yawrate * ex
mav.SendVelFRD(vx, vy, vz, yawrate)
lastTime = time.time()
while task == "land":
lastTime = lastTime + timeInterval
sleepTime = lastTime - time.time()
if sleepTime > 0:
time.sleep(sleepTime) # sleep until the desired clock
else:
lastTime = time.time()
pos_x = mav.uavPosNED[0]
task = taskChange(pos_x)
mav.SendVelFRD(0, 0, 1, 0)
print("Enter Offboard mode.")
time.sleep(5)
mav.initOffboard()
time.sleep(0.5)
mav.SendMavArm(True) # Arm the drone
mav.SendPosNED(5, 0, -5, 0) # Fly to target position 5,0-5
# Show CV image and set the position
if vis.hasData[0]:
img_bgr=vis.Img[0]
cv2.imshow("img_bgr", img_bgr)
cv2.waitKey(1)
#time.sleep(0.5)
time.sleep(5)
# start crossing ring task
approachObjective()