Files
traffic-monitoring-new/dapp.py
2025-07-23 18:04:28 +08:00

120 lines
4.0 KiB
Python

import cv2
import numpy as np
import math
import time
from collections import deque
cap = cv2.VideoCapture("pagi.mp4") # Replace with your video file or camera index
fgbg = cv2.createBackgroundSubtractorMOG2(history=500, varThreshold=100)
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))
tracking = {}
counter = 0
next_vehicle_id = 0
# Define diagonal speed lines (you can adjust the points to match your scene)
# Diagonal lines (you can draw these on your video to test)
speed_line_1 = ((100, 500), (800, 250))
speed_line_2 = ((110, 390), (800, 180))
font = cv2.FONT_HERSHEY_SIMPLEX
while True:
ret, frame = cap.read()
if not ret:
break
fgmask = fgbg.apply(frame)
fgmask = cv2.morphologyEx(fgmask, cv2.MORPH_OPEN, kernel)
contours, _ = cv2.findContours(fgmask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
centers = []
for cnt in contours:
if cv2.contourArea(cnt) > 500:
x, y, w, h = cv2.boundingRect(cnt)
cx = x + w // 2
cy = y + h // 2
centers.append((cx, cy))
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
updated_tracking = {}
for center in centers:
cx, cy = center
matched = None
for vid, vehicle in tracking.items():
prev_center = vehicle['center']
if math.hypot(cx - prev_center[0], cy - prev_center[1]) < 50:
matched = vid
break
if matched is not None:
vehicle = tracking[matched]
dx = cx - vehicle['center'][0]
dy = cy - vehicle['center'][1]
angle = math.degrees(math.atan2(dy, dx))
if -45 <= angle <= 45:
direction = 'kanan'
elif 45 < angle <= 135:
direction = 'bawah'
elif angle > 135 or angle < -135:
direction = 'kiri'
else:
direction = 'atas'
vehicle['center'] = (cx, cy)
vehicle['trace'].append((cx, cy))
vehicle['last_seen'] = time.time()
vehicle['direction'] = direction
# Check if crossed diagonal speed lines
if not vehicle['crossed']:
line1 = speed_line_1
line2 = speed_line_2
trace = vehicle['trace']
if len(trace) >= 2:
for i in range(len(trace) - 1):
if cv2.clipLine(line1, trace[i], trace[i+1])[0] and cv2.clipLine(line2, trace[i], trace[i+1])[0]:
t1 = vehicle['timestamps'][i]
t2 = time.time()
duration = t2 - t1
distance_m = 2.0 # assume 2 meters between lines
speed = (distance_m / duration) * 3.6
vehicle['speed'] = round(speed, 2)
vehicle['crossed'] = True
break
updated_tracking[matched] = vehicle
else:
updated_tracking[next_vehicle_id] = {
'center': (cx, cy),
'trace': deque([(cx, cy)], maxlen=32),
'timestamps': deque([time.time()], maxlen=32),
'last_seen': time.time(),
'direction': None,
'speed': None,
'crossed': False
}
next_vehicle_id += 1
tracking = updated_tracking
for vid, vehicle in tracking.items():
cx, cy = vehicle['center']
if vehicle['speed']:
cv2.putText(frame, f"ID:{vid} {vehicle['direction']} {vehicle['speed']} km/h", (cx, cy), font, 0.5, (0, 255, 255), 2)
else:
cv2.putText(frame, f"ID:{vid} {vehicle['direction']}", (cx, cy), font, 0.5, (255, 255, 0), 2)
cv2.line(frame, speed_line_1[0], speed_line_1[1], (255, 0, 0), 2)
cv2.line(frame, speed_line_2[0], speed_line_2[1], (0, 0, 255), 2)
cv2.imshow('Diagonal Vehicle Tracking', frame)
if cv2.waitKey(30) & 0xFF == 27:
break
cap.release()
cv2.destroyAllWindows()