import cv2 import numpy as np import math import time from collections import deque cap = cv2.VideoCapture("pagi.mp4") # Replace with your video file or camera index fgbg = cv2.createBackgroundSubtractorMOG2(history=500, varThreshold=100) kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3)) tracking = {} counter = 0 next_vehicle_id = 0 # Define diagonal speed lines (you can adjust the points to match your scene) # Diagonal lines (you can draw these on your video to test) speed_line_1 = ((100, 500), (800, 250)) speed_line_2 = ((110, 390), (800, 180)) font = cv2.FONT_HERSHEY_SIMPLEX while True: ret, frame = cap.read() if not ret: break fgmask = fgbg.apply(frame) fgmask = cv2.morphologyEx(fgmask, cv2.MORPH_OPEN, kernel) contours, _ = cv2.findContours(fgmask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) centers = [] for cnt in contours: if cv2.contourArea(cnt) > 500: x, y, w, h = cv2.boundingRect(cnt) cx = x + w // 2 cy = y + h // 2 centers.append((cx, cy)) cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2) updated_tracking = {} for center in centers: cx, cy = center matched = None for vid, vehicle in tracking.items(): prev_center = vehicle['center'] if math.hypot(cx - prev_center[0], cy - prev_center[1]) < 50: matched = vid break if matched is not None: vehicle = tracking[matched] dx = cx - vehicle['center'][0] dy = cy - vehicle['center'][1] angle = math.degrees(math.atan2(dy, dx)) if -45 <= angle <= 45: direction = 'kanan' elif 45 < angle <= 135: direction = 'bawah' elif angle > 135 or angle < -135: direction = 'kiri' else: direction = 'atas' vehicle['center'] = (cx, cy) vehicle['trace'].append((cx, cy)) vehicle['last_seen'] = time.time() vehicle['direction'] = direction # Check if crossed diagonal speed lines if not vehicle['crossed']: line1 = speed_line_1 line2 = speed_line_2 trace = vehicle['trace'] if len(trace) >= 2: for i in range(len(trace) - 1): if cv2.clipLine(line1, trace[i], trace[i+1])[0] and cv2.clipLine(line2, trace[i], trace[i+1])[0]: t1 = vehicle['timestamps'][i] t2 = time.time() duration = t2 - t1 distance_m = 2.0 # assume 2 meters between lines speed = (distance_m / duration) * 3.6 vehicle['speed'] = round(speed, 2) vehicle['crossed'] = True break updated_tracking[matched] = vehicle else: updated_tracking[next_vehicle_id] = { 'center': (cx, cy), 'trace': deque([(cx, cy)], maxlen=32), 'timestamps': deque([time.time()], maxlen=32), 'last_seen': time.time(), 'direction': None, 'speed': None, 'crossed': False } next_vehicle_id += 1 tracking = updated_tracking for vid, vehicle in tracking.items(): cx, cy = vehicle['center'] if vehicle['speed']: cv2.putText(frame, f"ID:{vid} {vehicle['direction']} {vehicle['speed']} km/h", (cx, cy), font, 0.5, (0, 255, 255), 2) else: cv2.putText(frame, f"ID:{vid} {vehicle['direction']}", (cx, cy), font, 0.5, (255, 255, 0), 2) cv2.line(frame, speed_line_1[0], speed_line_1[1], (255, 0, 0), 2) cv2.line(frame, speed_line_2[0], speed_line_2[1], (0, 0, 255), 2) cv2.imshow('Diagonal Vehicle Tracking', frame) if cv2.waitKey(30) & 0xFF == 27: break cap.release() cv2.destroyAllWindows()