203 lines
6.1 KiB
Python
203 lines
6.1 KiB
Python
|
|
|
|||
|
|
# import required libraries
|
|||
|
|
import time
|
|||
|
|
import numpy as np
|
|||
|
|
import cv2
|
|||
|
|
import sys
|
|||
|
|
|
|||
|
|
# import RflySim APIs
|
|||
|
|
import PX4MavCtrlV4 as PX4MavCtrl
|
|||
|
|
import VisionCaptureApi
|
|||
|
|
import UE4CtrlAPI
|
|||
|
|
ue = UE4CtrlAPI.UE4CtrlAPI()
|
|||
|
|
|
|||
|
|
vis = VisionCaptureApi.VisionCaptureApi()
|
|||
|
|
# VisionCaptureApi 中的配置函数
|
|||
|
|
vis.jsonLoad(0,'Config2.json') # 使用共享内存方式,并加载Config2.json中的传感器配置文件
|
|||
|
|
mav = PX4MavCtrl.PX4MavCtrler(2) #对应2号飞机端口
|
|||
|
|
|
|||
|
|
mav.InitMavLoop()
|
|||
|
|
print("Simulation Start.")
|
|||
|
|
|
|||
|
|
isSuss = vis.sendReqToUE4() # 向RflySim3D发送取图请求,并验证
|
|||
|
|
if not isSuss: # 如果请求取图失败,则退出
|
|||
|
|
sys.exit(0)
|
|||
|
|
vis.startImgCap(True) # 开启取图,并启用共享内存图像转发,转发到填写的目录
|
|||
|
|
|
|||
|
|
# Send command to UE4 Window 1 to change resolution
|
|||
|
|
ue.sendUE4Cmd('r.setres 720x405w',0) # 设置UE4窗口分辨率,注意本窗口仅限于显示,取图分辨率在json中配置,本窗口设置越小,资源需求越少。
|
|||
|
|
ue.sendUE4Cmd('t.MaxFPS 30',0) # 设置UE4最大刷新频率,同时也是取图频率
|
|||
|
|
time.sleep(2)
|
|||
|
|
|
|||
|
|
|
|||
|
|
width = 720
|
|||
|
|
height = 405
|
|||
|
|
channel = 4
|
|||
|
|
|
|||
|
|
# define same functions for computaion
|
|||
|
|
def angle_cos(p0, p1, p2):
|
|||
|
|
d1, d2 = (p0-p1).astype('float'), (p2-p1).astype('float')
|
|||
|
|
return abs( np.dot(d1, d2) / np.sqrt( np.dot(d1, d1)*np.dot(d2, d2) ) )
|
|||
|
|
|
|||
|
|
def diagonal_check(p):
|
|||
|
|
d1 = np.sqrt(np.dot(p[0]-p[2], p[0]-p[2]))
|
|||
|
|
d2 = np.sqrt(np.dot(p[1]-p[3], p[1]-p[3]))
|
|||
|
|
return abs(d1-d2)*1.0/d1
|
|||
|
|
|
|||
|
|
def saturationYawRate(yaw_rate):
|
|||
|
|
yr_bound = 20.0
|
|||
|
|
if yaw_rate > yr_bound:
|
|||
|
|
yaw_rate = yr_bound
|
|||
|
|
if yaw_rate < -yr_bound:
|
|||
|
|
yaw_rate = -yr_bound
|
|||
|
|
return yaw_rate
|
|||
|
|
|
|||
|
|
def taskChange(pos_x):
|
|||
|
|
if pos_x < 40:
|
|||
|
|
task = "range1"
|
|||
|
|
elif pos_x <70:
|
|||
|
|
task = "range2"
|
|||
|
|
elif pos_x < 130:
|
|||
|
|
task = "range3"
|
|||
|
|
elif pos_x < 140:
|
|||
|
|
task = "land"
|
|||
|
|
else:
|
|||
|
|
task = "finish"
|
|||
|
|
return task
|
|||
|
|
|
|||
|
|
def sat(inPwm,thres=1):
|
|||
|
|
outPwm= inPwm
|
|||
|
|
if inPwm>thres:
|
|||
|
|
outPwm = thres
|
|||
|
|
elif inPwm<-thres:
|
|||
|
|
outPwm = -thres
|
|||
|
|
return outPwm
|
|||
|
|
|
|||
|
|
|
|||
|
|
# object detect function
|
|||
|
|
def objectDetect(task):
|
|||
|
|
"""According task to detect objects"""
|
|||
|
|
if vis.hasData[0]:
|
|||
|
|
img_bgr=vis.Img[0]
|
|||
|
|
else:
|
|||
|
|
return -1,-1,-1
|
|||
|
|
b,g,r = cv2.split(img_bgr)
|
|||
|
|
img_edge = cv2.Canny(b, 50, 100)
|
|||
|
|
if task == "range1" or task == "range2":
|
|||
|
|
return circleDetect(img_bgr, img_edge, b)
|
|||
|
|
else:
|
|||
|
|
return squareDetect(img_bgr, img_edge)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# square detect for object detect function
|
|||
|
|
def squareDetect(img_bgr, img_edge):
|
|||
|
|
"""Detect Square with PolyDP and diagonal length"""
|
|||
|
|
# find contours
|
|||
|
|
squares = []
|
|||
|
|
cnts, hierarchy = cv2.findContours(img_edge, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
|
|||
|
|
for cnt in cnts:
|
|||
|
|
cnt_len = cv2.arcLength(cnt, True)
|
|||
|
|
cnt = cv2.approxPolyDP(cnt, 0.05 * cnt_len, True)
|
|||
|
|
if len(cnt) == 4 and cv2.contourArea(cnt) > 2000 and cv2.isContourConvex(cnt):
|
|||
|
|
cnt = cnt.reshape(-1, 2)
|
|||
|
|
diag_delta = diagonal_check(cnt)
|
|||
|
|
if diag_delta < 0.2:
|
|||
|
|
squares.append(cnt)
|
|||
|
|
cv2.drawContours( img_bgr, squares, -1, (0, 255, 0), 3)
|
|||
|
|
cv2.imshow("img_bgr", img_bgr)
|
|||
|
|
|
|||
|
|
cv2.waitKey(1)
|
|||
|
|
height, width, channel = img_bgr.shape
|
|||
|
|
if squares:
|
|||
|
|
return (squares[0][0][0]+squares[0][2][0])/2 - width/2, (squares[0][0][1]+squares[0][2][1])/2 - height/2, np.sqrt(np.dot(squares[0][0]-squares[0][2], squares[0][0]-squares[0][2]))
|
|||
|
|
else:
|
|||
|
|
return -1,-1,-1
|
|||
|
|
|
|||
|
|
# circle detect for object detect function
|
|||
|
|
def circleDetect(img_bgr, img_edge, img_b):
|
|||
|
|
"""Hough Circle detect"""
|
|||
|
|
circles = cv2.HoughCircles(img_b, cv2.HOUGH_GRADIENT, 1, 20, param1=100, param2=30, minRadius=0, maxRadius=0)
|
|||
|
|
if circles is not None:
|
|||
|
|
circles = np.uint16(np.around(circles))
|
|||
|
|
obj = circles[0, 0]
|
|||
|
|
cv2.circle(img_bgr, (obj[0], obj[1]), obj[2], (0,255,0), 2)
|
|||
|
|
cv2.circle(img_bgr, (obj[0], obj[1]), 2, (0,255,255), 3)
|
|||
|
|
cv2.imshow("img_bgr", img_bgr)
|
|||
|
|
|
|||
|
|
cv2.waitKey(1)
|
|||
|
|
height, width, channel = img_bgr.shape
|
|||
|
|
return obj[0]-width/2, obj[1]-height/2, obj[2]
|
|||
|
|
else:
|
|||
|
|
return -1,-1,-1
|
|||
|
|
|
|||
|
|
|
|||
|
|
# approaching Objective/ crossing rings algorithm
|
|||
|
|
def approachObjective():
|
|||
|
|
# 0. parameters
|
|||
|
|
# some parameters that work:(0.03, -0.03, 1, 5.0); (0.06, -0.04, 1, 10.0)
|
|||
|
|
K_z = 0.004 * 640 / height
|
|||
|
|
K_yawrate = 0.005 * 480 / width
|
|||
|
|
task = "range1"
|
|||
|
|
# 1. start
|
|||
|
|
startAppTime= time.time()
|
|||
|
|
lastTime = time.time()
|
|||
|
|
# time interval of the timer
|
|||
|
|
timeInterval = 1/30.0 #here is 0.0333s (30Hz)
|
|||
|
|
while (task != "finish") & (task != "land"):
|
|||
|
|
lastTime = lastTime + timeInterval
|
|||
|
|
sleepTime = lastTime - time.time()
|
|||
|
|
if sleepTime > 0:
|
|||
|
|
time.sleep(sleepTime) # sleep until the desired clock
|
|||
|
|
else:
|
|||
|
|
lastTime = time.time()
|
|||
|
|
# The following code will be executed 30Hz (0.0333s)
|
|||
|
|
|
|||
|
|
# 1.1. detect object and get error with center of image
|
|||
|
|
ex, ey, r = objectDetect(task)
|
|||
|
|
# 1.2. where is the drone
|
|||
|
|
pos_x = mav.uavPosNED[0]
|
|||
|
|
#print(time.time())
|
|||
|
|
# 1.3. update task
|
|||
|
|
task = taskChange(pos_x)
|
|||
|
|
# 1.4. attack
|
|||
|
|
if ex != -1:
|
|||
|
|
vx = sat(time.time()-startAppTime,3)
|
|||
|
|
vy = 0
|
|||
|
|
vz = K_z * ey
|
|||
|
|
yawrate = K_yawrate * ex
|
|||
|
|
mav.SendVelFRD(vx, vy, vz, yawrate)
|
|||
|
|
|
|||
|
|
lastTime = time.time()
|
|||
|
|
while task == "land":
|
|||
|
|
lastTime = lastTime + timeInterval
|
|||
|
|
sleepTime = lastTime - time.time()
|
|||
|
|
if sleepTime > 0:
|
|||
|
|
time.sleep(sleepTime) # sleep until the desired clock
|
|||
|
|
else:
|
|||
|
|
lastTime = time.time()
|
|||
|
|
|
|||
|
|
pos_x = mav.uavPosNED[0]
|
|||
|
|
task = taskChange(pos_x)
|
|||
|
|
mav.SendVelFRD(0, 0, 1, 0)
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
print("Enter Offboard mode.")
|
|||
|
|
time.sleep(5)
|
|||
|
|
mav.initOffboard()
|
|||
|
|
time.sleep(0.5)
|
|||
|
|
mav.SendMavArm(True) # Arm the drone
|
|||
|
|
mav.SendPosNED(5, 0, -5, 0) # Fly to target position 5,0,-5
|
|||
|
|
|
|||
|
|
# Show CV image and set the position
|
|||
|
|
if vis.hasData[0]:
|
|||
|
|
img_bgr=vis.Img[0]
|
|||
|
|
cv2.imshow("img_bgr", img_bgr)
|
|||
|
|
cv2.waitKey(1)
|
|||
|
|
#time.sleep(0.5)
|
|||
|
|
|
|||
|
|
time.sleep(5)
|
|||
|
|
|
|||
|
|
# start crossing ring task
|
|||
|
|
approachObjective()
|