# import required libraries import time import numpy as np import cv2 import sys # import RflySim APIs import PX4MavCtrlV4 as PX4MavCtrl import VisionCaptureApi import UE4CtrlAPI ue = UE4CtrlAPI.UE4CtrlAPI() vis = VisionCaptureApi.VisionCaptureApi() # VisionCaptureApi 中的配置函数 vis.jsonLoad(0,'Config2.json') # 使用共享内存方式,并加载Config2.json中的传感器配置文件 mav = PX4MavCtrl.PX4MavCtrler(2) #对应2号飞机端口 mav.InitMavLoop() print("Simulation Start.") isSuss = vis.sendReqToUE4() # 向RflySim3D发送取图请求,并验证 if not isSuss: # 如果请求取图失败,则退出 sys.exit(0) vis.startImgCap(True) # 开启取图,并启用共享内存图像转发,转发到填写的目录 # Send command to UE4 Window 1 to change resolution ue.sendUE4Cmd('r.setres 720x405w',0) # 设置UE4窗口分辨率,注意本窗口仅限于显示,取图分辨率在json中配置,本窗口设置越小,资源需求越少。 ue.sendUE4Cmd('t.MaxFPS 30',0) # 设置UE4最大刷新频率,同时也是取图频率 time.sleep(2) width = 720 height = 405 channel = 4 # define same functions for computaion def angle_cos(p0, p1, p2): d1, d2 = (p0-p1).astype('float'), (p2-p1).astype('float') return abs( np.dot(d1, d2) / np.sqrt( np.dot(d1, d1)*np.dot(d2, d2) ) ) def diagonal_check(p): d1 = np.sqrt(np.dot(p[0]-p[2], p[0]-p[2])) d2 = np.sqrt(np.dot(p[1]-p[3], p[1]-p[3])) return abs(d1-d2)*1.0/d1 def saturationYawRate(yaw_rate): yr_bound = 20.0 if yaw_rate > yr_bound: yaw_rate = yr_bound if yaw_rate < -yr_bound: yaw_rate = -yr_bound return yaw_rate def taskChange(pos_x): if pos_x < 40: task = "range1" elif pos_x <70: task = "range2" elif pos_x < 130: task = "range3" elif pos_x < 140: task = "land" else: task = "finish" return task def sat(inPwm,thres=1): outPwm= inPwm if inPwm>thres: outPwm = thres elif inPwm<-thres: outPwm = -thres return outPwm # object detect function def objectDetect(task): """According task to detect objects""" if vis.hasData[0]: img_bgr=vis.Img[0] else: return -1,-1,-1 b,g,r = cv2.split(img_bgr) img_edge = cv2.Canny(b, 50, 100) if task == "range1" or task == "range2": return circleDetect(img_bgr, img_edge, b) else: return squareDetect(img_bgr, img_edge) # square detect for object detect function def squareDetect(img_bgr, img_edge): """Detect Square with PolyDP and diagonal length""" # find contours squares = [] cnts, hierarchy = cv2.findContours(img_edge, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE) for cnt in cnts: cnt_len = cv2.arcLength(cnt, True) cnt = cv2.approxPolyDP(cnt, 0.05 * cnt_len, True) if len(cnt) == 4 and cv2.contourArea(cnt) > 2000 and cv2.isContourConvex(cnt): cnt = cnt.reshape(-1, 2) diag_delta = diagonal_check(cnt) if diag_delta < 0.2: squares.append(cnt) cv2.drawContours( img_bgr, squares, -1, (0, 255, 0), 3) cv2.imshow("img_bgr", img_bgr) cv2.waitKey(1) height, width, channel = img_bgr.shape if squares: return (squares[0][0][0]+squares[0][2][0])/2 - width/2, (squares[0][0][1]+squares[0][2][1])/2 - height/2, np.sqrt(np.dot(squares[0][0]-squares[0][2], squares[0][0]-squares[0][2])) else: return -1,-1,-1 # circle detect for object detect function def circleDetect(img_bgr, img_edge, img_b): """Hough Circle detect""" circles = cv2.HoughCircles(img_b, cv2.HOUGH_GRADIENT, 1, 20, param1=100, param2=30, minRadius=0, maxRadius=0) if circles is not None: circles = np.uint16(np.around(circles)) obj = circles[0, 0] cv2.circle(img_bgr, (obj[0], obj[1]), obj[2], (0,255,0), 2) cv2.circle(img_bgr, (obj[0], obj[1]), 2, (0,255,255), 3) cv2.imshow("img_bgr", img_bgr) cv2.waitKey(1) height, width, channel = img_bgr.shape return obj[0]-width/2, obj[1]-height/2, obj[2] else: return -1,-1,-1 # approaching Objective/ crossing rings algorithm def approachObjective(): # 0. parameters # some parameters that work:(0.03, -0.03, 1, 5.0); (0.06, -0.04, 1, 10.0) K_z = 0.004 * 640 / height K_yawrate = 0.005 * 480 / width task = "range1" # 1. start startAppTime= time.time() lastTime = time.time() # time interval of the timer timeInterval = 1/30.0 #here is 0.0333s (30Hz) while (task != "finish") & (task != "land"): lastTime = lastTime + timeInterval sleepTime = lastTime - time.time() if sleepTime > 0: time.sleep(sleepTime) # sleep until the desired clock else: lastTime = time.time() # The following code will be executed 30Hz (0.0333s) # 1.1. detect object and get error with center of image ex, ey, r = objectDetect(task) # 1.2. where is the drone pos_x = mav.uavPosNED[0] #print(time.time()) # 1.3. update task task = taskChange(pos_x) # 1.4. attack if ex != -1: vx = sat(time.time()-startAppTime,3) vy = 0 vz = K_z * ey yawrate = K_yawrate * ex mav.SendVelFRD(vx, vy, vz, yawrate) lastTime = time.time() while task == "land": lastTime = lastTime + timeInterval sleepTime = lastTime - time.time() if sleepTime > 0: time.sleep(sleepTime) # sleep until the desired clock else: lastTime = time.time() pos_x = mav.uavPosNED[0] task = taskChange(pos_x) mav.SendVelFRD(0, 0, 1, 0) print("Enter Offboard mode.") time.sleep(5) mav.initOffboard() time.sleep(0.5) mav.SendMavArm(True) # Arm the drone mav.SendPosNED(5, 0, -5, 0) # Fly to target position 5,0,-5 # Show CV image and set the position if vis.hasData[0]: img_bgr=vis.Img[0] cv2.imshow("img_bgr", img_bgr) cv2.waitKey(1) #time.sleep(0.5) time.sleep(5) # start crossing ring task approachObjective()