175 lines
5.7 KiB
Python
175 lines
5.7 KiB
Python
import cv2
|
|
import numpy as np
|
|
import pymysql
|
|
from datetime import datetime
|
|
|
|
# DB Connection
|
|
conn = pymysql.connect(
|
|
host='localhost',
|
|
user='root',
|
|
password='',
|
|
database='db_traffic2'
|
|
)
|
|
cursor = conn.cursor()
|
|
|
|
# RTSP or video
|
|
cap = cv2.VideoCapture('rtsp://20.20.20.33:8554/')
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
print("Failed to read from source.")
|
|
exit()
|
|
|
|
height2, width2, _ = frame.shape
|
|
cv2.setUseOptimized(True)
|
|
|
|
fgbg = cv2.createBackgroundSubtractorMOG2()
|
|
|
|
vehicle_id_counter = 0
|
|
tracker_dict = {}
|
|
|
|
# Line positions & buffer
|
|
speed_line_y1 = 265
|
|
speed_line_y2 = 360
|
|
buffer = 10
|
|
|
|
# Real-world distance between lines (meters)
|
|
real_world_distance_m = 0.25
|
|
min_speed_kmh = 10 # Threshold
|
|
|
|
last_vehicle_ids_in_zone = set()
|
|
|
|
while True:
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
fgmask = fgbg.apply(gray)
|
|
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
|
|
closing = cv2.morphologyEx(fgmask, cv2.MORPH_CLOSE, kernel)
|
|
opening = cv2.morphologyEx(closing, cv2.MORPH_OPEN, kernel)
|
|
dilation = cv2.dilate(opening, kernel)
|
|
_, bins = cv2.threshold(dilation, 220, 255, cv2.THRESH_BINARY)
|
|
|
|
contours, _ = cv2.findContours(bins, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)[-2:]
|
|
cv2.drawContours(frame, [cv2.convexHull(c) for c in contours], -1, (0, 255, 0), 2)
|
|
|
|
min_area = 200
|
|
max_area = 40000
|
|
vehicle_ids_in_zone = set()
|
|
|
|
for cnt in contours:
|
|
area = cv2.contourArea(cnt)
|
|
if not (min_area < area < max_area):
|
|
continue
|
|
|
|
M = cv2.moments(cnt)
|
|
if M['m00'] == 0:
|
|
continue
|
|
cx = int(M['m10'] / M['m00'])
|
|
cy = int(M['m01'] / M['m00'])
|
|
|
|
x, y, w, h = cv2.boundingRect(cnt)
|
|
cv2.rectangle(frame, (x, y), (x + w, y + h), (255, 0, 0), 2)
|
|
cv2.drawMarker(frame, (cx, cy), (0, 0, 255), cv2.MARKER_CROSS, 10, 1)
|
|
|
|
matched_id = None
|
|
for vid, info in tracker_dict.items():
|
|
old_cx, old_cy = info['pos']
|
|
if abs(cx - old_cx) < 30 and abs(cy - old_cy) < 30:
|
|
matched_id = vid
|
|
break
|
|
|
|
if matched_id is None:
|
|
matched_id = vehicle_id_counter
|
|
vehicle_id_counter += 1
|
|
tracker_dict[matched_id] = {
|
|
'pos': (cx, cy),
|
|
'last_cy': cy,
|
|
't1': None,
|
|
't2': None,
|
|
'direction': None,
|
|
'logged': False
|
|
}
|
|
else:
|
|
tracker_dict[matched_id]['pos'] = (cx, cy)
|
|
|
|
vehicle = tracker_dict[matched_id]
|
|
last_cy = vehicle['last_cy']
|
|
|
|
# Detect downward direction
|
|
if not vehicle['t1'] and last_cy < (speed_line_y1 - buffer) <= cy:
|
|
vehicle['t1'] = datetime.now()
|
|
elif vehicle['t1'] and not vehicle['t2'] and last_cy < (speed_line_y2 - buffer) <= cy:
|
|
vehicle['t2'] = datetime.now()
|
|
vehicle['direction'] = 'down'
|
|
|
|
# Detect upward direction
|
|
elif not vehicle['t1'] and last_cy > (speed_line_y2 + buffer) >= cy:
|
|
vehicle['t1'] = datetime.now()
|
|
elif vehicle['t1'] and not vehicle['t2'] and last_cy > (speed_line_y1 + buffer) >= cy:
|
|
vehicle['t2'] = datetime.now()
|
|
vehicle['direction'] = 'up'
|
|
|
|
vehicle['last_cy'] = cy
|
|
|
|
# Visual indicators for debug
|
|
if vehicle['t1']:
|
|
cv2.circle(frame, (cx, cy), 5, (0, 255, 255), -1)
|
|
if vehicle['t2']:
|
|
cv2.circle(frame, (cx, cy), 5, (255, 0, 255), -1)
|
|
|
|
# Speed and database log
|
|
if vehicle['t1'] and vehicle['t2'] and not vehicle['logged']:
|
|
delta_time = (vehicle['t2'] - vehicle['t1']).total_seconds()
|
|
if delta_time > 0:
|
|
speed_kmh = (real_world_distance_m / delta_time) * 3.6
|
|
if speed_kmh >= min_speed_kmh:
|
|
print(f"ID {matched_id} SPEED: {speed_kmh:.2f} km/h DIRECTION: {vehicle['direction']}")
|
|
cursor.execute(
|
|
"INSERT INTO vehicle_log (vehicle_id, direction, timestamp, speed) VALUES (%s, %s, %s, %s)",
|
|
(matched_id, vehicle['direction'], datetime.now(), round(speed_kmh, 2))
|
|
)
|
|
conn.commit()
|
|
else:
|
|
print(f"ID {matched_id} IGNORED (too slow): {speed_kmh:.2f} km/h")
|
|
vehicle['logged'] = True
|
|
|
|
# Display ID & speed
|
|
cv2.putText(frame, f"ID:{matched_id}", (x, y - 5),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0, 255, 255), 1)
|
|
|
|
if vehicle['t1'] and vehicle['t2']:
|
|
delta_time = (vehicle['t2'] - vehicle['t1']).total_seconds()
|
|
if delta_time > 0:
|
|
speed_kmh = (real_world_distance_m / delta_time) * 3.6
|
|
cv2.putText(frame, f"{speed_kmh:.1f} km/h", (x, y + h + 15),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 2)
|
|
|
|
# Zone counting
|
|
if speed_line_y1 <= cy <= speed_line_y2:
|
|
vehicle_ids_in_zone.add(matched_id)
|
|
|
|
# Store zone count if changed
|
|
if vehicle_ids_in_zone != last_vehicle_ids_in_zone:
|
|
last_vehicle_ids_in_zone = vehicle_ids_in_zone.copy()
|
|
count = len(vehicle_ids_in_zone)
|
|
cursor.execute("INSERT INTO total_vehicle (count, timestamp) VALUES (%s, %s)", (count, datetime.now()))
|
|
conn.commit()
|
|
print(f"🚗 Vehicles in zone: {count}")
|
|
|
|
# Draw detection lines
|
|
cv2.line(frame, (0, speed_line_y1), (width2, speed_line_y1), (0, 255, 255), 2)
|
|
cv2.line(frame, (0, speed_line_y2), (width2, speed_line_y2), (255, 0, 255), 2)
|
|
|
|
cv2.imshow("Vehicle Detection", frame)
|
|
if cv2.waitKey(1) & 0xFF == 27:
|
|
break
|
|
|
|
# Cleanup
|
|
cap.release()
|
|
cursor.close()
|
|
conn.close()
|
|
cv2.destroyAllWindows()
|