diff --git a/hls_downloader.py b/hls_downloader.py new file mode 100644 index 0000000..473b2ed --- /dev/null +++ b/hls_downloader.py @@ -0,0 +1,196 @@ +import requests +import m3u8 +import time +import os +from urllib.parse import urljoin +from datetime import datetime + + +class RawHLSDownloader: + """直接下载HLS分片,保留原始时间戳""" + + def __init__(self, m3u8_url, output_dir="segments"): + self.m3u8_url = m3u8_url + self.output_dir = output_dir + self.session = requests.Session() + self.session.headers.update({ + 'User-Agent': 'Mozilla/5.0' + }) + + os.makedirs(output_dir, exist_ok=True) + + def download_playlist(self): + """下载并解析m3u8播放列表""" + response = self.session.get(self.m3u8_url) + response.raise_for_status() + + # 解析播放列表 + playlist = m3u8.loads(response.text, uri=self.m3u8_url) + + # 如果是主播放列表(多码率),选择第一个 + if playlist.is_variant: + print(f"发现多码率流,选择: {playlist.playlists[0].stream_info}") + playlist_url = playlist.playlists[0].absolute_uri + response = self.session.get(playlist_url) + response.raise_for_status() + playlist = m3u8.loads(response.text, uri=playlist_url) + + return playlist + + def download_segment_raw(self, segment_url, segment_filename): + """直接下载TS分片,不进行任何处理""" + print(f"下载: {segment_url}") + + response = self.session.get(segment_url, stream=True) + response.raise_for_status() + + # 直接保存原始字节 + filepath = os.path.join(self.output_dir, segment_filename) + with open(filepath, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + + # 获取下载时间戳 + download_time = datetime.now() + + # 获取文件大小 + file_size = os.path.getsize(filepath) + + print(f" 保存到: {segment_filename} ({file_size} 字节)") + + return { + 'filename': segment_filename, + 'url': segment_url, + 'size': file_size, + 'download_time': download_time, + 'local_path': filepath + } + + def analyze_raw_ts(self, ts_file): + """分析原始TS文件的时间戳(不通过FFmpeg)""" + import struct + + filepath = os.path.join(self.output_dir, ts_file) + + with open(filepath, 'rb') as f: + # 读取TS包(每个188字节) + packet_size = 188 + packets = [] + + while True: + packet = f.read(packet_size) + if len(packet) < packet_size: + break + + # 解析TS包头 + sync_byte = packet[0] + if sync_byte != 0x47: # TS同步字节 + print(f"警告: 无效的TS包同步字节: {sync_byte:02x}") + continue + + # 解析PID + pid = ((packet[1] & 0x1F) << 8) | packet[2] + + # 检查适配字段是否存在 + adaptation_field_control = (packet[3] >> 4) & 0x03 + + # 如果有适配字段,可能包含PCR + if adaptation_field_control in [2, 3]: + adaptation_field_length = packet[4] + if adaptation_field_length > 0: + pcr_flag = (packet[5] >> 4) & 0x01 + if pcr_flag and adaptation_field_length >= 6: + # 提取PCR + pcr_bytes = packet[6:12] + pcr_base = ( + (pcr_bytes[0] << 25) | + (pcr_bytes[1] << 17) | + (pcr_bytes[2] << 9) | + (pcr_bytes[3] << 1) | + (pcr_bytes[4] >> 7) + ) + pcr_extension = ((pcr_bytes[4] & 0x01) << 8) | pcr_bytes[5] + pcr_value = pcr_base * 300 + pcr_extension + pcr_ms = pcr_value / 27000.0 # 转换为毫秒 + + packets.append({ + 'pid': pid, + 'has_pcr': True, + 'pcr_ms': pcr_ms + }) + continue + + packets.append({ + 'pid': pid, + 'has_pcr': False + }) + + # 统计PCR信息 + pcr_packets = [p for p in packets if p['has_pcr']] + + return { + 'total_packets': len(packets), + 'pcr_packets': len(pcr_packets), + 'pcr_values': [p['pcr_ms'] for p in pcr_packets] if pcr_packets else [], + 'file_size': os.path.getsize(filepath) + } + + def monitor_and_download(self, max_segments=None): + """监控并下载新的分片""" + downloaded_segments = set() + segment_counter = 0 + + while True: + try: + # 获取最新播放列表 + playlist = self.download_playlist() + + # 检查播放列表中的分片 + for segment in playlist.segments: + segment_url = segment.absolute_uri + segment_filename = os.path.basename(segment_url) + + # 如果还没下载过 + if segment_filename not in downloaded_segments: + # 下载原始分片 + result = self.download_segment_raw(segment_url, segment_filename) + + # 分析原始TS文件 + analysis = self.analyze_raw_ts(segment_filename) + + print(f" 分析结果: {analysis['total_packets']}包, " + f"PCR包: {analysis['pcr_packets']}") + + if analysis['pcr_values']: + print(f" PCR范围: {min(analysis['pcr_values']):.1f}ms - " + f"{max(analysis['pcr_values']):.1f}ms") + + downloaded_segments.add(segment_filename) + segment_counter += 1 + + # 检查是否达到最大数量 + if max_segments and segment_counter >= max_segments: + print(f"达到最大分片数: {max_segments}") + return + + # 等待下一轮 + sleep_time = playlist.target_duration or 2 + print(f"等待 {sleep_time} 秒...") + time.sleep(sleep_time) + + except KeyboardInterrupt: + print("用户中断") + break + except Exception as e: + print(f"错误: {e}") + time.sleep(5) + + +# 使用示例 +downloader = RawHLSDownloader( + m3u8_url="http://192.168.110.139:8080/stream.m3u8", + output_dir="raw_segments" +) + +# 下载5个分片进行测试 +downloader.monitor_and_download(max_segments=5) \ No newline at end of file diff --git a/hls_test.py b/hls_test.py new file mode 100644 index 0000000..441c0ac --- /dev/null +++ b/hls_test.py @@ -0,0 +1,32 @@ +# 使用PyAV(FFmpeg的Python绑定) +import av + + +def extract_timestamps_from_ts(ts_file): + container = av.open(ts_file) + video_stream = container.streams.video[0] + + frame_index = 0 + for packet in container.demux(video_stream): + for frame in packet.decode(): + # 获取精确时间信息 + pts_ms = frame.pts * video_stream.time_base * 1000 + dts_ms = frame.dts * video_stream.time_base * 1000 if frame.dts else None + + print(f"帧 {frame_index}:") + print(f" PTS: {pts_ms:.3f} ms") + print(f" DTS: {dts_ms:.3f} ms" if dts_ms else " DTS: None") + print(f" 类型: {frame.pict_type}") # I, P, B + print(f" 关键帧: {frame.key_frame}") + + frame_index += 1 + +# 精度:可以达到微秒级 + +def main(): + # extract_timestamps_from_ts("D:\\ProjectDoc\\Police\\data\\hls\\segment_00000001.ts") + extract_timestamps_from_ts("raw_segments\\segment_00001619.ts") + + +if __name__ == "__main__": + main() \ No newline at end of file