Pada kode sebelumnya terdapat kelemahan, dimana credential MQTT dan lain sebagainya akan tampak di frontend webiste. Berikut contoh script credentialnya tampak telanjang sehingga kurang aman.
Cara lainnya adalah menjadikan kode service backend dijadikan full proxy. Sehingga semua lalulintas data harus di kontrol oleh service. Sedangkan website akan menerima hasil jadi.
Di sini komunikasi yang semula dari website langsung ke broker MQTT HiveMQ harus melalui service terlebih dahulu. Tidak ada komunikasi direct/langsung antara frontend dengan server broker maupun lainnya.
Disamping kode service bertindak sebagai proxy, ia juga akan me-wrapping credential yang ada.
Agar komunikasi data antara backend (kode service) dan frontend (halaman website) terjadi secara realtime, maka protocol transport yang digunakan adalah webscoket.
Sekarang buatlah kode service baru dengan nama “ServiceAES128FullBackend.py”
import cv2
import paho.mqtt.client as mqtt
import json
import time
import logging
from ultralytics import YOLO
from datetime import datetime
import signal
import sys
import ssl
import base64
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
import asyncio
import websockets
import threading
# Konfigurasi Logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('person_detection.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Konfigurasi MQTT - HiveMQ Cloud
MQTT_CONFIG = {
'hostname': 'c0099a6e70884169bfc6b2f482c29e2b.s1.eu.hivemq.cloud',
'port': 8883,
'username': 'pbl2026',
'password': 'Pbl123456789#',
'topic_person': 'Person',
'topic_frame': 'VideoFrame'
}
# Konfigurasi AES128
AES_CONFIG = {
'key': b'16bytekey1234567',
'iv': b'16byteiv12345678',
'mode': AES.MODE_CBC
}
# Konfigurasi Camera
CAMERA_CONFIG = {
'rtsp_url': 'rtsp://192.168.131.121:1945/',
'reconnect_delay': 5,
'max_reconnect_attempts': 10
}
# Konfigurasi Deteksi
DETECTION_CONFIG = {
'confidence_threshold': 0.5,
'cooldown_seconds': 2,
'send_frame_interval': 0.1
}
# Konfigurasi WebSocket Server
WEBSOCKET_CONFIG = {
'host': '0.0.0.0',
'port': 8765,
'max_clients': 10
}
class PersonDetectionService:
def __init__(self):
self.model = None
self.mqtt_client = None
self.cap = None
self.running = True
self.last_notification_time = 0
self.last_frame_send_time = 0
self.person_detected = False
self.websocket_clients = set()
self.websocket_server = None
self.loop = None
self.websocket_thread = None
def encrypt_aes128(self, plaintext):
"""Encrypt data using AES128 CBC mode and return Base64 encoded string"""
try:
cipher = AES.new(AES_CONFIG['key'], AES_CONFIG['mode'], AES_CONFIG['iv'])
if isinstance(plaintext, str):
plaintext_bytes = plaintext.encode('utf-8')
else:
plaintext_bytes = plaintext
padded_data = pad(plaintext_bytes, AES.block_size)
encrypted_bytes = cipher.encrypt(padded_data)
encrypted_base64 = base64.b64encode(encrypted_bytes).decode('utf-8')
return encrypted_base64
except Exception as e:
logger.error(f"Encryption error: {e}")
return None
def load_model(self):
"""Load YOLO model"""
try:
logger.info("Loading YOLO model...")
self.model = YOLO('yolov8n.pt')
logger.info("YOLO model loaded successfully")
return True
except Exception as e:
logger.error(f"Failed to load YOLO model: {e}")
return False
def setup_mqtt(self):
"""Setup MQTT connection to HiveMQ Cloud"""
try:
self.mqtt_client = mqtt.Client(
client_id=f"person_detection_{datetime.now().strftime('%Y%m%d%H%M%S')}",
clean_session=True
)
self.mqtt_client.username_pw_set(
MQTT_CONFIG['username'],
MQTT_CONFIG['password']
)
self.mqtt_client.tls_set(cert_reqs=ssl.CERT_REQUIRED)
self.mqtt_client.on_connect = self.on_connect
self.mqtt_client.on_disconnect = self.on_disconnect
self.mqtt_client.on_publish = self.on_publish
logger.info(f"Connecting to MQTT at {MQTT_CONFIG['hostname']}:{MQTT_CONFIG['port']}")
self.mqtt_client.connect(MQTT_CONFIG['hostname'], MQTT_CONFIG['port'], 60)
self.mqtt_client.loop_start()
return True
except Exception as e:
logger.error(f"Failed to setup MQTT: {e}")
return False
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
logger.info("✅ Connected to HiveMQ broker successfully")
else:
logger.error(f"Failed to connect to HiveMQ broker with code: {rc}")
def on_disconnect(self, client, userdata, rc):
logger.warning(f"Disconnected from HiveMQ broker with code: {rc}")
if self.running:
logger.info("Attempting to reconnect...")
def on_publish(self, client, userdata, mid):
logger.debug(f"Message published with ID: {mid}")
def connect_camera(self):
"""Connect to RTSP camera"""
try:
logger.info(f"Connecting to camera: {CAMERA_CONFIG['rtsp_url']}")
self.cap = cv2.VideoCapture(CAMERA_CONFIG['rtsp_url'])
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
if not self.cap.isOpened():
logger.error("Cannot open camera")
return False
logger.info("Camera connected successfully")
return True
except Exception as e:
logger.error(f"Failed to connect camera: {e}")
return False
async def broadcast_to_websocket(self, data):
"""Broadcast data to all connected WebSocket clients"""
if self.websocket_clients:
message = json.dumps(data)
disconnected_clients = set()
for client in self.websocket_clients:
try:
await client.send(message)
except:
disconnected_clients.add(client)
# Remove disconnected clients
self.websocket_clients -= disconnected_clients
if disconnected_clients:
logger.info(f"Removed {len(disconnected_clients)} disconnected clients")
def send_mqtt_notification(self, has_person, num_person, frame=None):
"""Send MQTT notification when person detected and broadcast to WebSocket"""
current_time = time.time()
if has_person and (current_time - self.last_notification_time) > DETECTION_CONFIG['cooldown_seconds']:
message = {
'event': 'person_detected',
'message': 'Kamera mendeteksi manusia',
'timestamp': datetime.now().isoformat(),
'num_person': num_person,
'camera_url': CAMERA_CONFIG['rtsp_url']
}
try:
json_message = json.dumps(message)
encrypted_message = self.encrypt_aes128(json_message)
if encrypted_message:
# Kirim ke MQTT
result = self.mqtt_client.publish(
MQTT_CONFIG['topic_person'],
encrypted_message,
qos=1
)
# Broadcast ke WebSocket clients (dalam format plain untuk frontend)
if self.loop and self.websocket_clients:
asyncio.run_coroutine_threadsafe(
self.broadcast_to_websocket(message),
self.loop
)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
logger.info(f"🔴 Person detected: {num_person} person(s)")
self.last_notification_time = current_time
self.person_detected = True
else:
logger.error(f"Failed to publish message: {result.rc}")
else:
logger.error("Encryption failed, message not sent")
except Exception as e:
logger.error(f"Error publishing MQTT message: {e}")
if frame is not None and (current_time - self.last_frame_send_time) > DETECTION_CONFIG['send_frame_interval']:
self.send_frame_via_mqtt(frame, has_person, num_person)
self.last_frame_send_time = current_time
def send_frame_via_mqtt(self, frame, has_person, num_person):
"""Send frame as base64 via MQTT and broadcast to WebSocket"""
try:
height, width = frame.shape[:2]
new_width = 640
new_height = int(height * (new_width / width))
frame_resized = cv2.resize(frame, (new_width, new_height))
_, buffer = cv2.imencode('.jpg', frame_resized, [cv2.IMWRITE_JPEG_QUALITY, 70])
frame_base64 = base64.b64encode(buffer).decode('utf-8')
message = {
'event': 'video_frame',
'frame': frame_base64,
'has_person': has_person,
'num_person': num_person,
'timestamp': datetime.now().isoformat()
}
json_message = json.dumps(message)
encrypted_message = self.encrypt_aes128(json_message)
if encrypted_message:
# Kirim ke MQTT
self.mqtt_client.publish(
MQTT_CONFIG['topic_frame'],
encrypted_message,
qos=0
)
# Broadcast ke WebSocket clients (dalam format plain)
if self.loop and self.websocket_clients:
asyncio.run_coroutine_threadsafe(
self.broadcast_to_websocket(message),
self.loop
)
logger.debug(f"📹 Frame sent (Person: {has_person}, Count: {num_person})")
else:
logger.error("Encryption failed, frame not sent")
except Exception as e:
logger.error(f"Failed to send frame: {e}")
def detect_person(self, frame):
"""Detect person in frame using YOLO"""
try:
results = self.model(frame, conf=DETECTION_CONFIG['confidence_threshold'])
person_detections = []
annotated_frame = frame.copy()
for result in results:
if result.boxes is not None:
for box in result.boxes:
if int(box.cls[0]) == 0:
x1, y1, x2, y2 = map(int, box.xyxy[0].tolist())
confidence = float(box.conf[0])
person_detections.append({
'bbox': [x1, y1, x2, y2],
'confidence': confidence
})
cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), (0, 255, 0), 3)
label = f"Person: {confidence:.2f}"
(label_w, label_h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
cv2.rectangle(annotated_frame, (x1, y1 - 25), (x1 + label_w, y1), (0, 255, 0), -1)
cv2.putText(annotated_frame, label, (x1, y1 - 8),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 2)
has_person = len(person_detections) > 0
cv2.putText(annotated_frame, f"Person Count: {len(person_detections)}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0) if has_person else (0, 0, 255), 2)
cv2.putText(annotated_frame, datetime.now().strftime("%Y-%m-%d %H:%M:%S"), (10, 70),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
return has_person, len(person_detections), annotated_frame, person_detections
except Exception as e:
logger.error(f"Error in detection: {e}")
return False, 0, frame, []
async def websocket_handler(self, websocket):
"""Handle WebSocket connections from clients (no path parameter needed)"""
client_address = websocket.remote_address
logger.info(f"New WebSocket client connected from {client_address}")
self.websocket_clients.add(websocket)
try:
# Send initial connection message
await websocket.send(json.dumps({
'event': 'connection',
'status': 'connected',
'message': 'Connected to Person Detection Service Proxy'
}))
# Keep connection alive and handle incoming messages
async for message in websocket:
try:
data = json.loads(message)
if data.get('command') == 'ping':
await websocket.send(json.dumps({'event': 'pong'}))
logger.debug(f"Ping from {client_address}")
except json.JSONDecodeError:
logger.warning(f"Invalid JSON from client: {message}")
except Exception as e:
logger.error(f"Error handling message: {e}")
except websockets.exceptions.ConnectionClosed as e:
logger.info(f"WebSocket client disconnected: {e}")
except Exception as e:
logger.error(f"WebSocket error: {e}")
finally:
self.websocket_clients.discard(websocket)
logger.info(f"Client {client_address} removed. Total clients: {len(self.websocket_clients)}")
async def start_websocket_server(self):
"""Start WebSocket server"""
self.loop = asyncio.get_running_loop()
# Create server with handler
async with websockets.serve(
self.websocket_handler,
WEBSOCKET_CONFIG['host'],
WEBSOCKET_CONFIG['port'],
ping_interval=20,
ping_timeout=60
):
logger.info(f"✅ WebSocket server started on ws://{WEBSOCKET_CONFIG['host']}:{WEBSOCKET_CONFIG['port']}")
logger.info(f"Waiting for client connections...")
await asyncio.Future() # Run forever
def run_websocket_server(self):
"""Run WebSocket server in separate thread"""
try:
asyncio.run(self.start_websocket_server())
except Exception as e:
logger.error(f"WebSocket server error: {e}")
def process_stream(self):
"""Main loop to process camera stream"""
if not self.connect_camera():
logger.error("Cannot start stream processing")
return
frame_count = 0
fps_start_time = time.time()
fps = 0
logger.info("Starting person detection stream processing...")
logger.info(f"WebSocket proxy server running on port {WEBSOCKET_CONFIG['port']}")
logger.info(
f"Connect your web browser to ws://localhost:{WEBSOCKET_CONFIG['port']} or ws://YOUR_IP:{WEBSOCKET_CONFIG['port']}")
while self.running:
try:
ret, frame = self.cap.read()
if not ret:
logger.warning("Failed to read frame, attempting to reconnect...")
self.reconnect_camera()
continue
frame_count += 1
if frame_count % 30 == 0:
fps_end_time = time.time()
fps = 30 / (fps_end_time - fps_start_time)
fps_start_time = fps_end_time
has_person, num_person, annotated_frame, detections = self.detect_person(frame)
self.send_mqtt_notification(has_person, num_person, annotated_frame)
if fps > 0:
cv2.putText(annotated_frame, f"FPS: {fps:.1f}", (10, 110),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
# Optional: Add WebSocket client count to display
cv2.putText(annotated_frame, f"WS Clients: {len(self.websocket_clients)}", (10, 140),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)
cv2.imshow('Person Detection - RTSP Stream', annotated_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
time.sleep(0.01)
except Exception as e:
logger.error(f"Error in process stream: {e}")
time.sleep(1)
cv2.destroyAllWindows()
def reconnect_camera(self):
"""Reconnect to camera if connection lost"""
if self.cap:
self.cap.release()
for attempt in range(CAMERA_CONFIG['max_reconnect_attempts']):
logger.info(f"Reconnection attempt {attempt + 1}/{CAMERA_CONFIG['max_reconnect_attempts']}")
if self.connect_camera():
logger.info("Reconnected successfully")
return True
time.sleep(CAMERA_CONFIG['reconnect_delay'])
logger.error("Max reconnection attempts reached")
return False
def stop_service(self):
"""Stop the service gracefully"""
logger.info("Stopping person detection service...")
self.running = False
if self.cap:
self.cap.release()
if self.mqtt_client:
self.mqtt_client.loop_stop()
self.mqtt_client.disconnect()
# Close all WebSocket connections
if self.websocket_clients:
logger.info(f"Closing {len(self.websocket_clients)} WebSocket connections...")
for client in self.websocket_clients:
try:
asyncio.run_coroutine_threadsafe(client.close(), self.loop)
except:
pass
cv2.destroyAllWindows()
logger.info("Service stopped")
def run(self):
"""Main entry point"""
if not self.load_model():
logger.error("Cannot start service without model")
return
if not self.setup_mqtt():
logger.error("Cannot start service without MQTT")
return
# Start WebSocket server in a separate thread
self.websocket_thread = threading.Thread(target=self.run_websocket_server, daemon=True)
self.websocket_thread.start()
# Give WebSocket server time to start
time.sleep(2)
try:
self.process_stream()
except KeyboardInterrupt:
logger.info("Received interrupt signal")
finally:
self.stop_service()
def signal_handler(sig, frame):
logger.info("Received shutdown signal")
sys.exit(0)
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
service = PersonDetectionService()
service.run()
Penjelasan kode:
Kode ini adalah sistem deteksi person yang berfungsi ganda:
Sebagai detektor - Mendeteksi person dari kamera CCTV menggunakan YOLOv8
Sebagai proxy server - Menyediakan WebSocket server untuk frontend
Sebagai publisher MQTT - Mengirim data terenkripsi ke HiveMQ Cloud
Keunggulan utama: Semua kredensial sensitif (MQTT username/password, AES key/IV) tersimpan aman di sisi service, tidak terekspos ke frontend.
import cv2 # OpenCV: capture video, processing frame, bounding box
import paho.mqtt.client as mqtt # MQTT: komunikasi dengan broker HiveMQ
import json # JSON: encoding/decoding data untuk MQTT payload
import time # Time: fungsi sleep, pengukuran interval, cooldown
import logging # Logging: mencatat aktivitas untuk debugging
from ultralytics import YOLO # YOLOv8: deteksi objek person dalam frame
from datetime import datetime # Datetime: timestamp untuk setiap event
import signal # Signal: menangani Ctrl+C shutdown graceful
import sys # Sys: exit system saat program berhenti
import ssl # SSL: koneksi aman TLS ke MQTT broker
import base64 # Base64: encode frame gambar untuk transmisi
from Crypto.Cipher import AES # AES: enkripsi data sebelum dikirim ke MQTT
from Crypto.Util.Padding import pad # Padding: menggenapkan data ke kelipatan 16 bytes
import asyncio # Asyncio: operasi async untuk WebSocket server
import websockets # WebSockets: komunikasi real-time dengan frontend
import threading # Threading: menjalankan WebSocket server di thread terpisah
Penjelasan: Kode ini mengimpor 16 library yang diperlukan. Yang paling penting adalah:
cv2 + YOLO: Untuk capture kamera dan deteksi person
paho.mqtt + Crypto: Untuk komunikasi MQTT terenkripsi
websockets + asyncio: Untuk proxy server ke frontend
logging.basicConfig(
level=logging.INFO, # Level INFO: mencatat informasi normal
format='%(asctime)s - %(levelname)s - %(message)s', # Format: Waktu - Level - Pesan
handlers=[
logging.FileHandler('person_detection.log'), # Simpan ke file
logging.StreamHandler() # Tampilkan di console
]
)
logger = logging.getLogger(__name__)
Fungsi: Mencatat semua aktivitas sistem ke file person_detection.log dan juga menampilkannya di terminal. Ini penting untuk debugging dan monitoring.
MQTT_CONFIG = {
'hostname': 'c0099a6e70884169bfc6b2f482c29e2b.s1.eu.hivemq.cloud', # Ganti dengan broker Anda
'port': 8883, # Port 8883 = MQTT over TLS (koneksi aman)
'username': 'pbl2026', # Ganti dengan username MQTT Anda
'password': 'Pbl123456789#', # Ganti dengan password MQTT Anda
'topic_person': 'Person', # Topic untuk notifikasi person detected
'topic_frame': 'VideoFrame' # Topic untuk video stream
}
Fungsi: Menyimpan kredensial koneksi ke HiveMQ Cloud. WAJIB disesuaikan dengan akun HiveMQ Anda.
AES_CONFIG = {
'key': b'16bytekey1234567', # 16 bytes key (contoh, GANTI dengan key sendiri)
'iv': b'16byteiv12345678', # 16 bytes IV (contoh, GANTI dengan IV sendiri)
'mode': AES.MODE_CBC # Cipher Block Chaining mode
}
Fungsi: Konfigurasi enkripsi AES128. Key dan IV harus sama persis dengan yang digunakan website agar website bisa mendekripsi data.
CAMERA_CONFIG = {
'rtsp_url': 'rtsp://192.168.131.121:1945/', # Ganti dengan URL kamera Anda
'reconnect_delay': 5, # Tunggu 5 detik sebelum reconnect
'max_reconnect_attempts': 10 # Maksimal 10 kali percobaan
}
Fungsi: Konfigurasi koneksi ke kamera CCTV via protokol RTSP.
DETECTION_CONFIG = {
'confidence_threshold': 0.5, # Minimal confidence 50% untuk dianggap person
'cooldown_seconds': 2, # Jeda minimal 2 detik antar notifikasi
'send_frame_interval': 0.1 # Kirim frame setiap 0.1 detik (10 fps)
}
Fungsi: Mengatur parameter deteksi. Confidence 0.5 berarti model yakin minimal 50% untuk melaporkan deteksi.
WEBSOCKET_CONFIG = {
'host': '0.0.0.0', # Listen dari semua network interface
'port': 8765, # Port default WebSocket
'max_clients': 10 # Maksimal 10 client terhubung
}
Fungsi: Konfigurasi WebSocket server yang akan melayani frontend. 0.0.0.0 berarti dapat diakses dari komputer lain dalam jaringan.
def __init__(self):
self.model = None # YOLO model untuk deteksi
self.mqtt_client = None # MQTT client connection
self.cap = None # Video capture object dari kamera
self.running = True # Flag untuk loop utama
self.last_notification_time = 0 # Waktu terakhir kirim notifikasi
self.last_frame_send_time = 0 # Waktu terakhir kirim frame
self.person_detected = False # Status apakah ada person terdeteksi
self.websocket_clients = set() # Kumpulan client WebSocket yang terhubung
self.websocket_server = None # WebSocket server instance
self.loop = None # Asyncio event loop
self.websocket_thread = None # Thread untuk WebSocket server
Fungsi: Menginisialisasi semua variabel yang akan digunakan. websocket_clients = set() memungkinkan multiple frontend terhubung simultan.
def encrypt_aes128(self, plaintext):
"""Encrypt data using AES128 CBC mode and return Base64 encoded string"""
try:
# Step 1: Buat cipher AES dengan mode CBC
cipher = AES.new(AES_CONFIG['key'], AES_CONFIG['mode'], AES_CONFIG['iv'])
# Step 2: Konversi ke bytes (UTF-8)
if isinstance(plaintext, str):
plaintext_bytes = plaintext.encode('utf-8')
else:
plaintext_bytes = plaintext
# Step 3: Padding agar panjang data kelipatan 16 bytes (block size AES)
padded_data = pad(plaintext_bytes, AES.block_size)
# Step 4: Enkripsi
encrypted_bytes = cipher.encrypt(padded_data)
# Step 5: Encode ke Base64 untuk safe transmission
encrypted_base64 = base64.b64encode(encrypted_bytes).decode('utf-8')
return encrypted_base64
except Exception as e:
logger.error(f"Encryption error: {e}")
return None
Penjelasan Detail:
AES.new(): Membuat cipher object dengan key, mode CBC, dan IV
Padding: Karena AES memerlukan input kelipatan 16 bytes, data ditambah padding PKCS7
Base64 encoding: Hasil enkripsi (binary) diencode ke Base64 agar aman dikirim dalam JSON
Mengapa perlu enkripsi? Melindungi data deteksi person dari sniffing saat transit melalui MQTT broker.
def load_model(self):
"""Load YOLO model"""
try:
logger.info("Loading YOLO model...")
self.model = YOLO('yolov8n.pt') # 'n' = nano version (tercepat)
logger.info("YOLO model loaded successfully")
return True
except Exception as e:
logger.error(f"Failed to load YOLO model: {e}")
return False
Penjelasan: Memuat model YOLOv8 nano (yolov8n.pt). Model ini akan:
Otomatis didownload saat pertama kali dijalankan (ukuran ~6 MB)
Mendeteksi 80 kelas objek termasuk person (class ID = 0)
Nano version dipilih karena keseimbangan antara kecepatan dan akurasi
def setup_mqtt(self):
"""Setup MQTT connection to HiveMQ Cloud"""
try:
# Buat client dengan ID unik berdasarkan timestamp
self.mqtt_client = mqtt.Client(
client_id=f"person_detection_{datetime.now().strftime('%Y%m%d%H%M%S')}",
clean_session=True
)
# Set autentikasi
self.mqtt_client.username_pw_set(
MQTT_CONFIG['username'],
MQTT_CONFIG['password']
)
# Konfigurasi TLS untuk koneksi aman
self.mqtt_client.tls_set(cert_reqs=ssl.CERT_REQUIRED)
# Register callback functions
self.mqtt_client.on_connect = self.on_connect
self.mqtt_client.on_disconnect = self.on_disconnect
self.mqtt_client.on_publish = self.on_publish
# Koneksi ke broker
logger.info(f"Connecting to MQTT at {MQTT_CONFIG['hostname']}:{MQTT_CONFIG['port']}")
self.mqtt_client.connect(MQTT_CONFIG['hostname'], MQTT_CONFIG['port'], 60)
self.mqtt_client.loop_start() # Mulai loop network di background thread
return True
except Exception as e:
logger.error(f"Failed to setup MQTT: {e}")
return False
Penjelasan:
Client ID unik: Menggunakan timestamp agar setiap instance memiliki ID berbeda
TLS: ssl.CERT_REQUIRED memastikan koneksi terenkripsi dan server diverifikasi
loop_start(): Menjalankan MQTT network loop di background thread agar tidak memblokir main thread
def on_connect(self, client, userdata, flags, rc):
if rc == 0: # rc = return code, 0 berarti sukses
logger.info("✅ Connected to HiveMQ broker successfully")
else:
logger.error(f"Failed to connect to HiveMQ broker with code: {rc}")
def on_disconnect(self, client, userdata, rc):
logger.warning(f"Disconnected from HiveMQ broker with code: {rc}")
if self.running:
logger.info("Attempting to reconnect...")
def on_publish(self, client, userdata, mid):
logger.debug(f"Message published with ID: {mid}")
Fungsi: Callback yang dipanggil otomatis oleh library MQTT saat event terjadi.
def connect_camera(self):
"""Connect to RTSP camera"""
try:
logger.info(f"Connecting to camera: {CAMERA_CONFIG['rtsp_url']}")
self.cap = cv2.VideoCapture(CAMERA_CONFIG['rtsp_url'])
# Kurangi buffer size untuk mengurangi delay
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
if not self.cap.isOpened():
logger.error("Cannot open camera")
return False
logger.info("Camera connected successfully")
return True
except Exception as e:
logger.error(f"Failed to connect camera: {e}")
return False
Penjelasan:
cv2.VideoCapture(): Membuka stream RTSP dari kamera
CAP_PROP_BUFFERSIZE = 1: Mengurangi buffer untuk meminimalkan delay (live streaming)
isOpened(): Memverifikasi koneksi berhasil
async def broadcast_to_websocket(self, data):
"""Broadcast data to all connected WebSocket clients"""
if self.websocket_clients:
message = json.dumps(data) # Konversi ke JSON string
disconnected_clients = set()
for client in self.websocket_clients:
try:
await client.send(message) # Kirim ke setiap client
except:
disconnected_clients.add(client) # Tandai yang disconnect
# Hapus client yang sudah disconnect
self.websocket_clients -= disconnected_clients
if disconnected_clients:
logger.info(f"Removed {len(disconnected_clients)} disconnected clients")
Penjelasan: Fungsi async ini mengirim data ke semua frontend yang terhubung. Jika ada client yang disconnect, otomatis dihapus dari set.
Keunggulan: Multiple browser/monitor dapat terhubung simultan ke service yang sama!
def send_mqtt_notification(self, has_person, num_person, frame=None):
"""Send MQTT notification when person detected and broadcast to WebSocket"""
current_time = time.time()
# KIRIM NOTIFIKASI PERSON (dengan cooldown)
if has_person and (current_time - self.last_notification_time) > DETECTION_CONFIG['cooldown_seconds']:
message = {
'event': 'person_detected',
'message': 'Kamera mendeteksi manusia',
'timestamp': datetime.now().isoformat(),
'num_person': num_person,
'camera_url': CAMERA_CONFIG['rtsp_url']
}
try:
json_message = json.dumps(message)
encrypted_message = self.encrypt_aes128(json_message) # ENKRIPSI!
if encrypted_message:
# 1. KIRIM KE MQTT (terenkripsi)
result = self.mqtt_client.publish(
MQTT_CONFIG['topic_person'],
encrypted_message,
qos=1 # Quality of Service 1 = minimal sekali sampai
)
# 2. BROADCAST KE WEBSOCKET (plain text)
if self.loop and self.websocket_clients:
asyncio.run_coroutine_threadsafe(
self.broadcast_to_websocket(message),
self.loop
)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
logger.info(f"🔴 Person detected: {num_person} person(s)")
self.last_notification_time = current_time
self.person_detected = True
except Exception as e:
logger.error(f"Error publishing MQTT message: {e}")
# KIRIM FRAME VIDEO (periodik)
if frame is not None and (current_time - self.last_frame_send_time) > DETECTION_CONFIG['send_frame_interval']:
self.send_frame_via_mqtt(frame, has_person, num_person)
self.last_frame_send_time = current_time
Konsep Penting:
Cooldown 2 detik: Mencegah notifikasi spam
Dua jalur pengiriman:
MQTT (terenkripsi): Untuk persistence dan multiple subscribers
WebSocket (plain): Untuk frontend real-time
QoS 1: Menjamin notifikasi person sampai ke broker
def send_frame_via_mqtt(self, frame, has_person, num_person):
"""Send frame as base64 via MQTT and broadcast to WebSocket"""
try:
# STEP 1: RESIZE frame untuk mengurangi ukuran
height, width = frame.shape[:2]
new_width = 640
new_height = int(height * (new_width / width))
frame_resized = cv2.resize(frame, (new_width, new_height))
# STEP 2: ENCODE ke JPEG dengan kualitas 70%
_, buffer = cv2.imencode('.jpg', frame_resized, [cv2.IMWRITE_JPEG_QUALITY, 70])
# STEP 3: CONVERT ke Base64
frame_base64 = base64.b64encode(buffer).decode('utf-8')
# STEP 4: BUAT MESSAGE
message = {
'event': 'video_frame',
'frame': frame_base64,
'has_person': has_person,
'num_person': num_person,
'timestamp': datetime.now().isoformat()
}
# STEP 5: ENKRIPSI
json_message = json.dumps(message)
encrypted_message = self.encrypt_aes128(json_message)
if encrypted_message:
# Kirim ke MQTT (terenkripsi)
self.mqtt_client.publish(
MQTT_CONFIG['topic_frame'],
encrypted_message,
qos=0 # QoS 0 = best effort, karena frame real-time
)
# Broadcast ke WebSocket (plain)
if self.loop and self.websocket_clients:
asyncio.run_coroutine_threadsafe(
self.broadcast_to_websocket(message),
self.loop
)
logger.debug(f"📹 Frame sent (Person: {has_person}, Count: {num_person})")
except Exception as e:
logger.error(f"Failed to send frame: {e}")
Optimasi Frame:
Resize ke 640px: Mengurangi ukuran data signifikan
JPEG quality 70%: Keseimbangan antara kualitas dan ukuran
QoS 0: Frame tidak perlu dijamin sampai (lebih mementingkan kecepatan)
def detect_person(self, frame):
"""Detect person in frame using YOLO"""
try:
# Jalankan YOLO pada frame
results = self.model(frame, conf=DETECTION_CONFIG['confidence_threshold'])
person_detections = []
annotated_frame = frame.copy() # Buat salinan untuk digambar
for result in results:
if result.boxes is not None:
for box in result.boxes:
# Class ID 0 = person (dalam dataset COCO)
if int(box.cls[0]) == 0:
# Ambil koordinat bounding box
x1, y1, x2, y2 = map(int, box.xyxy[0].tolist())
confidence = float(box.conf[0])
# Simpan detail deteksi
person_detections.append({
'bbox': [x1, y1, x2, y2],
'confidence': confidence
})
# GAMBAR BOUNDING BOX HIJAU
cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), (0, 255, 0), 3)
# GAMBAR LABEL dengan background
label = f"Person: {confidence:.2f}"
(label_w, label_h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
cv2.rectangle(annotated_frame, (x1, y1 - 25), (x1 + label_w, y1), (0, 255, 0), -1)
cv2.putText(annotated_frame, label, (x1, y1 - 8),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 2)
has_person = len(person_detections) > 0
# TAMBAHKAN INFO TEXT
cv2.putText(annotated_frame, f"Person Count: {len(person_detections)}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0) if has_person else (0, 0, 255), 2)
cv2.putText(annotated_frame, datetime.now().strftime("%Y-%m-%d %H:%M:%S"), (10, 70),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
return has_person, len(person_detections), annotated_frame, person_detections
except Exception as e:
logger.error(f"Error in detection: {e}")
return False, 0, frame, []
Visualisasi:
Bounding box hijau = Person terdeteksi
Label confidence = Seberapa yakin model (misal 0.85 = 85%)
Person count = Jumlah person dalam frame
Timestamp = Waktu deteksi
Class ID 0 adalah person dalam dataset COCO (Common Objects in Context).
async def websocket_handler(self, websocket):
"""Handle WebSocket connections from clients"""
client_address = websocket.remote_address
logger.info(f"New WebSocket client connected from {client_address}")
self.websocket_clients.add(websocket)
try:
# Kirim pesan selamat datang
await websocket.send(json.dumps({
'event': 'connection',
'status': 'connected',
'message': 'Connected to Person Detection Service Proxy'
}))
# Loop menerima pesan dari client
async for message in websocket:
try:
data = json.loads(message)
if data.get('command') == 'ping':
await websocket.send(json.dumps({'event': 'pong'}))
logger.debug(f"Ping from {client_address}")
except json.JSONDecodeError:
logger.warning(f"Invalid JSON from client: {message}")
except Exception as e:
logger.error(f"Error handling message: {e}")
except websockets.exceptions.ConnectionClosed as e:
logger.info(f"WebSocket client disconnected: {e}")
except Exception as e:
logger.error(f"WebSocket error: {e}")
finally:
self.websocket_clients.discard(websocket)
logger.info(f"Client {client_address} removed. Total clients: {len(self.websocket_clients)}")
Fungsi: Melayani koneksi WebSocket dari frontend:
Welcome message: Memberi tahu client bahwa koneksi berhasil
Ping/pong: Keep-alive mechanism untuk menjaga koneksi tetap hidup
Auto cleanup: Menghapus client yang disconnect dari daftar
async def start_websocket_server(self):
"""Start WebSocket server"""
self.loop = asyncio.get_running_loop()
async with websockets.serve(
self.websocket_handler, # Handler function
WEBSOCKET_CONFIG['host'], # 0.0.0.0 (all interfaces)
WEBSOCKET_CONFIG['port'], # 8765
ping_interval=20, # Kirim ping setiap 20 detik
ping_timeout=60 # Timeout 60 detik jika no response
):
logger.info(f"✅ WebSocket server started on ws://{WEBSOCKET_CONFIG['host']}:{WEBSOCKET_CONFIG['port']}")
logger.info(f"Waiting for client connections...")
await asyncio.Future() # Run forever
def run_websocket_server(self):
"""Run WebSocket server in separate thread"""
try:
asyncio.run(self.start_websocket_server())
except Exception as e:
logger.error(f"WebSocket server error: {e}")
Penjelasan:
· ping_interval=20: Server mengirim ping setiap 20 detik untuk mengecek client masih hidup
· asyncio.Future(): Membuat server berjalan forever
· Thread terpisah: WebSocket server berjalan di thread sendiri agar tidak memblokir main loop
def process_stream(self):
"""Main loop to process camera stream"""
if not self.connect_camera():
logger.error("Cannot start stream processing")
return
frame_count = 0
fps_start_time = time.time()
fps = 0
logger.info("Starting person detection stream processing...")
logger.info(f"WebSocket proxy server running on port {WEBSOCKET_CONFIG['port']}")
while self.running:
try:
# BACA FRAME DARI KAMERA
ret, frame = self.cap.read()
if not ret:
logger.warning("Failed to read frame, attempting to reconnect...")
self.reconnect_camera()
continue
# HITUNG FPS
frame_count += 1
if frame_count % 30 == 0:
fps_end_time = time.time()
fps = 30 / (fps_end_time - fps_start_time)
fps_start_time = fps_end_time
# DETEKSI PERSON
has_person, num_person, annotated_frame, detections = self.detect_person(frame)
# KIRIM NOTIFIKASI
self.send_mqtt_notification(has_person, num_person, annotated_frame)
# TAMBAHKAN INFO FPS DAN WEBSOCKET CLIENTS
if fps > 0:
cv2.putText(annotated_frame, f"FPS: {fps:.1f}", (10, 110),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
cv2.putText(annotated_frame, f"WS Clients: {len(self.websocket_clients)}", (10, 140),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)
# TAMPILKAN DI WINDOW
cv2.imshow('Person Detection - RTSP Stream', annotated_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
time.sleep(0.01) # Beri sedikit jeda agar CPU tidak 100%
except Exception as e:
logger.error(f"Error in process stream: {e}")
time.sleep(1)
cv2.destroyAllWindows()
Alur Utama:
1. Baca frame dari kamera
2. Jika gagal, coba reconnect
3. Hitung FPS untuk monitoring performa
4. Deteksi person dengan YOLO
5. Kirim notifikasi ke MQTT dan WebSocket
6. Tampilkan hasil di window OpenCV
7. User tekan 'q' untuk berhenti
def reconnect_camera(self):
"""Reconnect to camera if connection lost"""
if self.cap:
self.cap.release()
for attempt in range(CAMERA_CONFIG['max_reconnect_attempts']):
logger.info(f"Reconnection attempt {attempt + 1}/{CAMERA_CONFIG['max_reconnect_attempts']}")
if self.connect_camera():
logger.info("Reconnected successfully")
return True
time.sleep(CAMERA_CONFIG['reconnect_delay'])
logger.error("Max reconnection attempts reached")
return False
def stop_service(self):
"""Stop the service gracefully"""
logger.info("Stopping person detection service...")
self.running = False
# Release camera
if self.cap:
self.cap.release()
# Disconnect MQTT
if self.mqtt_client:
self.mqtt_client.loop_stop()
self.mqtt_client.disconnect()
# Close all WebSocket connections
if self.websocket_clients:
logger.info(f"Closing {len(self.websocket_clients)} WebSocket connections...")
for client in self.websocket_clients:
try:
asyncio.run_coroutine_threadsafe(client.close(), self.loop)
except:
pass
cv2.destroyAllWindows()
logger.info("Service stopped")
Graceful shutdown: Ketika program dimatikan (Ctrl+C), semua resource dibersihkan dengan rapi.
def run(self):
"""Main entry point"""
if not self.load_model():
logger.error("Cannot start service without model")
return
if not self.setup_mqtt():
logger.error("Cannot start service without MQTT")
return
# Start WebSocket server in a separate thread
self.websocket_thread = threading.Thread(target=self.run_websocket_server, daemon=True)
self.websocket_thread.start()
# Give WebSocket server time to start
time.sleep(2)
try:
self.process_stream()
except KeyboardInterrupt:
logger.info("Received interrupt signal")
finally:
self.stop_service()
def signal_handler(sig, frame):
logger.info("Received shutdown signal")
sys.exit(0)
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
signal.signal(signal.SIGTERM, signal_handler) # Terminate signal
service = PersonDetectionService()
service.run()
Urutan Eksekusi Program:
1. Setup signal handler untuk shutdown yang graceful
2. Load YOLO model
3. Setup MQTT connection
4. Start WebSocket server di thread terpisah
5. Jalankan main loop process_stream()
6. Jika user tekan Ctrl+C → stop service dengan rapi
1. Program dimulai
↓
2. Load YOLO model
↓
3. Setup MQTT + WebSocket server
↓
4. Connect ke kamera RTSP
↓
5. ┌─────────────────────────────────────┐
│ LOOP UTAMA (setiap frame): │
│ - Capture frame dari kamera │
│ - Deteksi person dengan YOLO │
│ - Gambar bounding box │
│ - Jika ada person: │
│ * Kirim notifikasi ke MQTT (enc) │
│ * Broadcast ke WebSocket (plain) │
│ - Kirim frame periodik (10 fps) │
│ - Tampilkan di window OpenCV │
└─────────────────────────────────────┘
↓
6. User tekan 'q' atau Ctrl+C
↓
7. Cleanup: release camera, disconnect MQTT, close WebSockets
Keamanan maksimal - Kredensial MQTT dan AES tidak terekspos ke frontend
Multiple client support - Banyak browser/monitor bisa terhubung simultan
Real-time - WebSocket memberikan latensi minimal
Persistence - Data tetap tersimpan di MQTT broker untuk analisis lanjutan
Monitoring - Logging lengkap dan FPS counter
Auto-reconnect - Koneksi kamera dan MQTT otomatis pulih jika terputus