120 lines
4.0 KiB
Python
120 lines
4.0 KiB
Python
import cv2
|
|
import numpy as np
|
|
import math
|
|
import time
|
|
from collections import deque
|
|
|
|
cap = cv2.VideoCapture("pagi.mp4") # Replace with your video file or camera index
|
|
|
|
fgbg = cv2.createBackgroundSubtractorMOG2(history=500, varThreshold=100)
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))
|
|
|
|
tracking = {}
|
|
counter = 0
|
|
next_vehicle_id = 0
|
|
|
|
# Define diagonal speed lines (you can adjust the points to match your scene)
|
|
# Diagonal lines (you can draw these on your video to test)
|
|
speed_line_1 = ((100, 500), (800, 250))
|
|
speed_line_2 = ((110, 390), (800, 180))
|
|
|
|
font = cv2.FONT_HERSHEY_SIMPLEX
|
|
|
|
while True:
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
|
|
fgmask = fgbg.apply(frame)
|
|
fgmask = cv2.morphologyEx(fgmask, cv2.MORPH_OPEN, kernel)
|
|
|
|
contours, _ = cv2.findContours(fgmask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
|
|
|
centers = []
|
|
for cnt in contours:
|
|
if cv2.contourArea(cnt) > 500:
|
|
x, y, w, h = cv2.boundingRect(cnt)
|
|
cx = x + w // 2
|
|
cy = y + h // 2
|
|
centers.append((cx, cy))
|
|
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
|
|
|
|
updated_tracking = {}
|
|
for center in centers:
|
|
cx, cy = center
|
|
matched = None
|
|
for vid, vehicle in tracking.items():
|
|
prev_center = vehicle['center']
|
|
if math.hypot(cx - prev_center[0], cy - prev_center[1]) < 50:
|
|
matched = vid
|
|
break
|
|
|
|
if matched is not None:
|
|
vehicle = tracking[matched]
|
|
dx = cx - vehicle['center'][0]
|
|
dy = cy - vehicle['center'][1]
|
|
angle = math.degrees(math.atan2(dy, dx))
|
|
|
|
if -45 <= angle <= 45:
|
|
direction = 'kanan'
|
|
elif 45 < angle <= 135:
|
|
direction = 'bawah'
|
|
elif angle > 135 or angle < -135:
|
|
direction = 'kiri'
|
|
else:
|
|
direction = 'atas'
|
|
|
|
vehicle['center'] = (cx, cy)
|
|
vehicle['trace'].append((cx, cy))
|
|
vehicle['last_seen'] = time.time()
|
|
vehicle['direction'] = direction
|
|
|
|
# Check if crossed diagonal speed lines
|
|
if not vehicle['crossed']:
|
|
line1 = speed_line_1
|
|
line2 = speed_line_2
|
|
trace = vehicle['trace']
|
|
if len(trace) >= 2:
|
|
for i in range(len(trace) - 1):
|
|
if cv2.clipLine(line1, trace[i], trace[i+1])[0] and cv2.clipLine(line2, trace[i], trace[i+1])[0]:
|
|
t1 = vehicle['timestamps'][i]
|
|
t2 = time.time()
|
|
duration = t2 - t1
|
|
distance_m = 2.0 # assume 2 meters between lines
|
|
speed = (distance_m / duration) * 3.6
|
|
vehicle['speed'] = round(speed, 2)
|
|
vehicle['crossed'] = True
|
|
break
|
|
|
|
updated_tracking[matched] = vehicle
|
|
else:
|
|
updated_tracking[next_vehicle_id] = {
|
|
'center': (cx, cy),
|
|
'trace': deque([(cx, cy)], maxlen=32),
|
|
'timestamps': deque([time.time()], maxlen=32),
|
|
'last_seen': time.time(),
|
|
'direction': None,
|
|
'speed': None,
|
|
'crossed': False
|
|
}
|
|
next_vehicle_id += 1
|
|
|
|
tracking = updated_tracking
|
|
|
|
for vid, vehicle in tracking.items():
|
|
cx, cy = vehicle['center']
|
|
if vehicle['speed']:
|
|
cv2.putText(frame, f"ID:{vid} {vehicle['direction']} {vehicle['speed']} km/h", (cx, cy), font, 0.5, (0, 255, 255), 2)
|
|
else:
|
|
cv2.putText(frame, f"ID:{vid} {vehicle['direction']}", (cx, cy), font, 0.5, (255, 255, 0), 2)
|
|
|
|
cv2.line(frame, speed_line_1[0], speed_line_1[1], (255, 0, 0), 2)
|
|
cv2.line(frame, speed_line_2[0], speed_line_2[1], (0, 0, 255), 2)
|
|
|
|
cv2.imshow('Diagonal Vehicle Tracking', frame)
|
|
if cv2.waitKey(30) & 0xFF == 27:
|
|
break
|
|
|
|
cap.release()
|
|
cv2.destroyAllWindows()
|