import cv2 import numpy as np import pymysql from datetime import datetime # DB Connection conn = pymysql.connect( host='localhost', user='root', password='', database='db_traffic2' ) cursor = conn.cursor() # RTSP or video cap = cv2.VideoCapture('rtsp://20.20.20.33:8554/') ret, frame = cap.read() if not ret: print("Failed to read from source.") exit() height2, width2, _ = frame.shape cv2.setUseOptimized(True) fgbg = cv2.createBackgroundSubtractorMOG2() vehicle_id_counter = 0 tracker_dict = {} # Line positions & buffer speed_line_y1 = 265 speed_line_y2 = 360 buffer = 10 # Real-world distance between lines (meters) real_world_distance_m = 0.25 min_speed_kmh = 10 # Threshold last_vehicle_ids_in_zone = set() while True: ret, frame = cap.read() if not ret: break gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) fgmask = fgbg.apply(gray) kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) closing = cv2.morphologyEx(fgmask, cv2.MORPH_CLOSE, kernel) opening = cv2.morphologyEx(closing, cv2.MORPH_OPEN, kernel) dilation = cv2.dilate(opening, kernel) _, bins = cv2.threshold(dilation, 220, 255, cv2.THRESH_BINARY) contours, _ = cv2.findContours(bins, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)[-2:] cv2.drawContours(frame, [cv2.convexHull(c) for c in contours], -1, (0, 255, 0), 2) min_area = 200 max_area = 40000 vehicle_ids_in_zone = set() for cnt in contours: area = cv2.contourArea(cnt) if not (min_area < area < max_area): continue M = cv2.moments(cnt) if M['m00'] == 0: continue cx = int(M['m10'] / M['m00']) cy = int(M['m01'] / M['m00']) x, y, w, h = cv2.boundingRect(cnt) cv2.rectangle(frame, (x, y), (x + w, y + h), (255, 0, 0), 2) cv2.drawMarker(frame, (cx, cy), (0, 0, 255), cv2.MARKER_CROSS, 10, 1) matched_id = None for vid, info in tracker_dict.items(): old_cx, old_cy = info['pos'] if abs(cx - old_cx) < 30 and abs(cy - old_cy) < 30: matched_id = vid break if matched_id is None: matched_id = vehicle_id_counter vehicle_id_counter += 1 tracker_dict[matched_id] = { 'pos': (cx, cy), 'last_cy': cy, 't1': None, 't2': None, 'direction': None, 'logged': False } else: tracker_dict[matched_id]['pos'] = (cx, cy) vehicle = tracker_dict[matched_id] last_cy = vehicle['last_cy'] # Detect downward direction if not vehicle['t1'] and last_cy < (speed_line_y1 - buffer) <= cy: vehicle['t1'] = datetime.now() elif vehicle['t1'] and not vehicle['t2'] and last_cy < (speed_line_y2 - buffer) <= cy: vehicle['t2'] = datetime.now() vehicle['direction'] = 'down' # Detect upward direction elif not vehicle['t1'] and last_cy > (speed_line_y2 + buffer) >= cy: vehicle['t1'] = datetime.now() elif vehicle['t1'] and not vehicle['t2'] and last_cy > (speed_line_y1 + buffer) >= cy: vehicle['t2'] = datetime.now() vehicle['direction'] = 'up' vehicle['last_cy'] = cy # Visual indicators for debug if vehicle['t1']: cv2.circle(frame, (cx, cy), 5, (0, 255, 255), -1) if vehicle['t2']: cv2.circle(frame, (cx, cy), 5, (255, 0, 255), -1) # Speed and database log if vehicle['t1'] and vehicle['t2'] and not vehicle['logged']: delta_time = (vehicle['t2'] - vehicle['t1']).total_seconds() if delta_time > 0: speed_kmh = (real_world_distance_m / delta_time) * 3.6 if speed_kmh >= min_speed_kmh: print(f"ID {matched_id} SPEED: {speed_kmh:.2f} km/h DIRECTION: {vehicle['direction']}") cursor.execute( "INSERT INTO vehicle_log (vehicle_id, direction, timestamp, speed) VALUES (%s, %s, %s, %s)", (matched_id, vehicle['direction'], datetime.now(), round(speed_kmh, 2)) ) conn.commit() else: print(f"ID {matched_id} IGNORED (too slow): {speed_kmh:.2f} km/h") vehicle['logged'] = True # Display ID & speed cv2.putText(frame, f"ID:{matched_id}", (x, y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0, 255, 255), 1) if vehicle['t1'] and vehicle['t2']: delta_time = (vehicle['t2'] - vehicle['t1']).total_seconds() if delta_time > 0: speed_kmh = (real_world_distance_m / delta_time) * 3.6 cv2.putText(frame, f"{speed_kmh:.1f} km/h", (x, y + h + 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 2) # Zone counting if speed_line_y1 <= cy <= speed_line_y2: vehicle_ids_in_zone.add(matched_id) # Store zone count if changed if vehicle_ids_in_zone != last_vehicle_ids_in_zone: last_vehicle_ids_in_zone = vehicle_ids_in_zone.copy() count = len(vehicle_ids_in_zone) cursor.execute("INSERT INTO total_vehicle (count, timestamp) VALUES (%s, %s)", (count, datetime.now())) conn.commit() print(f"🚗 Vehicles in zone: {count}") # Draw detection lines cv2.line(frame, (0, speed_line_y1), (width2, speed_line_y1), (0, 255, 255), 2) cv2.line(frame, (0, speed_line_y2), (width2, speed_line_y2), (255, 0, 255), 2) cv2.imshow("Vehicle Detection", frame) if cv2.waitKey(1) & 0xFF == 27: break # Cleanup cap.release() cursor.close() conn.close() cv2.destroyAllWindows()