""" 告警记录业务逻辑服务层 """ import logging from datetime import datetime from typing import Optional, List, Dict, Any from sqlalchemy.orm import Session from models.sur_alert_record import SurAlertRecord from utils.logger import setup_logger logger = setup_logger(__name__) class SurAlertRecordService: """告警记录业务服务""" def __init__(self, db: Session): """ 初始化服务 Args: db: 数据库会话实例 """ self.db = db def create_alert_record( self, alert_type: int, person_id: Optional[int] = None, alert_message: Optional[str] = None, alert_data: Optional[Dict[str, Any]] = None, ori_video_url: Optional[str] = None, ori_img_url: Optional[str] = None, processed_video_url: Optional[str] = None, processed_img_url: Optional[str] = None, camera_id: Optional[int] = None, room_id: Optional[int] = None, room_name: Optional[str] = None, confidence: Optional[float] = None, handled: Optional[int] = None, handler_id: Optional[int] = None, handled_time: Optional[datetime] = None, handle_notes: Optional[str] = None ) -> SurAlertRecord: """ 创建告警记录 Args: alert_type: 告警类型 person_id: 人员ID alert_message: 告警消息 alert_data: 告警数据 ori_video_url: 原始视频URL ori_img_url: 原始图片URL processed_video_url: 处理后视频URL processed_img_url: 处理后图片URL camera_id: 摄像头ID room_id: 房间ID room_name: 房间名称 confidence: 置信度 handled: 处理状态 handler_id: 处理人ID handled_time: 处理时间 handle_notes: 处理备注 Returns: 创建的告警记录 """ logger.info(f"Creating alert record: type={alert_type}, person_id={person_id}") # 创建告警记录 alert_record = SurAlertRecord( alert_type=alert_type, person_id=person_id, alert_message=alert_message, alert_data=alert_data, ori_video_url=ori_video_url, ori_img_url=ori_img_url, processed_video_url=processed_video_url, processed_img_url=processed_img_url, camera_id=camera_id, room_id=room_id, room_name=room_name, confidence=confidence, handled=handled or 0, handler_id=handler_id, handled_time=handled_time, handle_notes=handle_notes ) try: self.db.add(alert_record) self.db.commit() self.db.refresh(alert_record) logger.info(f"Alert record created successfully: id={alert_record.id}") return alert_record except Exception as e: self.db.rollback() logger.error(f"Failed to create alert record: {e}") raise def get_alert_record_by_id(self, alert_id: int) -> Optional[SurAlertRecord]: """ 根据ID获取告警记录 Args: alert_id: 告警记录ID Returns: 告警记录或None """ return self.db.query(SurAlertRecord).filter(SurAlertRecord.id == alert_id).first() def get_alerts_by_person(self, person_id: int, limit: int = 100) -> List[SurAlertRecord]: """ 根据人员ID获取告警记录列表 Args: person_id: 人员ID limit: 返回数量限制 Returns: 告警记录列表 """ return ( self.db.query(SurAlertRecord) .filter(SurAlertRecord.person_id == person_id) .order_by(SurAlertRecord.created_time.desc()) .limit(limit) .all() ) def get_alerts_by_camera(self, camera_id: int, limit: int = 100) -> List[SurAlertRecord]: """ 根据摄像头ID获取告警记录列表 Args: camera_id: 摄像头ID limit: 返回数量限制 Returns: 告警记录列表 """ return ( self.db.query(SurAlertRecord) .filter(SurAlertRecord.camera_id == camera_id) .order_by(SurAlertRecord.created_time.desc()) .limit(limit) .all() ) def get_alerts_by_time_range( self, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, limit: int = 100, offset: int = 0 ) -> List[SurAlertRecord]: """ 根据时间范围获取告警记录列表 Args: start_time: 开始时间 end_time: 结束时间 limit: 返回数量限制 offset: 偏移量 Returns: 告警记录列表 """ query = self.db.query(SurAlertRecord) if start_time: query = query.filter(SurAlertRecord.created_time >= start_time) if end_time: query = query.filter(SurAlertRecord.created_time <= end_time) return ( query.order_by(SurAlertRecord.created_time.desc()) .offset(offset) .limit(limit) .all() ) def update_alert_record( self, alert_id: int, update_data: Dict[str, Any] ) -> Optional[SurAlertRecord]: """ 更新告警记录 Args: alert_id: 告警记录ID update_data: 更新数据 Returns: 更新后的告警记录或None """ logger.info(f"Updating alert record: id={alert_id}") alert_record = self.get_alert_record_by_id(alert_id) if not alert_record: logger.warning(f"Alert record not found: id={alert_id}") return None try: for key, value in update_data.items(): if hasattr(alert_record, key) and key != 'id': setattr(alert_record, key, value) self.db.commit() self.db.refresh(alert_record) logger.info(f"Alert record updated successfully: id={alert_record.id}") return alert_record except Exception as e: self.db.rollback() logger.error(f"Failed to update alert record: {e}") raise def mark_as_handled( self, alert_id: int, handler_id: int, handle_notes: Optional[str] = None ) -> Optional[SurAlertRecord]: """ 标记告警为已处理 Args: alert_id: 告警记录ID handler_id: 处理人ID handle_notes: 处理备注 Returns: 更新后的告警记录或None """ return self.update_alert_record( alert_id, { 'handled': 1, 'handler_id': handler_id, 'handled_time': datetime.now(), 'handle_notes': handle_notes } ) def delete_alert_record(self, alert_id: int) -> bool: """ 删除告警记录 Args: alert_id: 告警记录ID Returns: 是否成功删除 """ logger.info(f"Deleting alert record: id={alert_id}") alert_record = self.get_alert_record_by_id(alert_id) if not alert_record: logger.warning(f"Alert record not found: id={alert_id}") return False try: self.db.delete(alert_record) self.db.commit() logger.info(f"Alert record deleted successfully: id={alert_id}") return True except Exception as e: self.db.rollback() logger.error(f"Failed to delete alert record: {e}") return False def get_alert_count_by_person_and_time( self, person_id: int, alert_type: int, hours: int ) -> int: """ 查询指定人员和时间范围内的告警记录数 Args: person_id: 人员ID alert_type: 告警类型 hours: 小时数(从当前时间往前推算) Returns: 告警记录数 """ from datetime import datetime, timedelta # 计算时间范围 end_time = datetime.now() start_time = end_time - timedelta(hours=hours) try: count = ( self.db.query(SurAlertRecord) .filter( SurAlertRecord.person_id == person_id, SurAlertRecord.alert_type == alert_type, SurAlertRecord.created_time >= start_time, SurAlertRecord.created_time <= end_time ) .count() ) logger.debug(f"查询到人员 {person_id} 在最近 {hours} 小时内类型 {alert_type} 的告警记录数: {count}") return count except Exception as e: logger.error(f"查询告警记录数失败: {e}") return 0