Files
SupervisorAI/hls_downloader.py
2026-02-04 14:25:43 +08:00

196 lines
6.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import m3u8
import time
import os
from urllib.parse import urljoin
from datetime import datetime
class RawHLSDownloader:
"""直接下载HLS分片保留原始时间戳"""
def __init__(self, m3u8_url, output_dir="segments"):
self.m3u8_url = m3u8_url
self.output_dir = output_dir
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0'
})
os.makedirs(output_dir, exist_ok=True)
def download_playlist(self):
"""下载并解析m3u8播放列表"""
response = self.session.get(self.m3u8_url)
response.raise_for_status()
# 解析播放列表
playlist = m3u8.loads(response.text, uri=self.m3u8_url)
# 如果是主播放列表(多码率),选择第一个
if playlist.is_variant:
print(f"发现多码率流,选择: {playlist.playlists[0].stream_info}")
playlist_url = playlist.playlists[0].absolute_uri
response = self.session.get(playlist_url)
response.raise_for_status()
playlist = m3u8.loads(response.text, uri=playlist_url)
return playlist
def download_segment_raw(self, segment_url, segment_filename):
"""直接下载TS分片不进行任何处理"""
print(f"下载: {segment_url}")
response = self.session.get(segment_url, stream=True)
response.raise_for_status()
# 直接保存原始字节
filepath = os.path.join(self.output_dir, segment_filename)
with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# 获取下载时间戳
download_time = datetime.now()
# 获取文件大小
file_size = os.path.getsize(filepath)
print(f" 保存到: {segment_filename} ({file_size} 字节)")
return {
'filename': segment_filename,
'url': segment_url,
'size': file_size,
'download_time': download_time,
'local_path': filepath
}
def analyze_raw_ts(self, ts_file):
"""分析原始TS文件的时间戳不通过FFmpeg"""
import struct
filepath = os.path.join(self.output_dir, ts_file)
with open(filepath, 'rb') as f:
# 读取TS包每个188字节
packet_size = 188
packets = []
while True:
packet = f.read(packet_size)
if len(packet) < packet_size:
break
# 解析TS包头
sync_byte = packet[0]
if sync_byte != 0x47: # TS同步字节
print(f"警告: 无效的TS包同步字节: {sync_byte:02x}")
continue
# 解析PID
pid = ((packet[1] & 0x1F) << 8) | packet[2]
# 检查适配字段是否存在
adaptation_field_control = (packet[3] >> 4) & 0x03
# 如果有适配字段可能包含PCR
if adaptation_field_control in [2, 3]:
adaptation_field_length = packet[4]
if adaptation_field_length > 0:
pcr_flag = (packet[5] >> 4) & 0x01
if pcr_flag and adaptation_field_length >= 6:
# 提取PCR
pcr_bytes = packet[6:12]
pcr_base = (
(pcr_bytes[0] << 25) |
(pcr_bytes[1] << 17) |
(pcr_bytes[2] << 9) |
(pcr_bytes[3] << 1) |
(pcr_bytes[4] >> 7)
)
pcr_extension = ((pcr_bytes[4] & 0x01) << 8) | pcr_bytes[5]
pcr_value = pcr_base * 300 + pcr_extension
pcr_ms = pcr_value / 27000.0 # 转换为毫秒
packets.append({
'pid': pid,
'has_pcr': True,
'pcr_ms': pcr_ms
})
continue
packets.append({
'pid': pid,
'has_pcr': False
})
# 统计PCR信息
pcr_packets = [p for p in packets if p['has_pcr']]
return {
'total_packets': len(packets),
'pcr_packets': len(pcr_packets),
'pcr_values': [p['pcr_ms'] for p in pcr_packets] if pcr_packets else [],
'file_size': os.path.getsize(filepath)
}
def monitor_and_download(self, max_segments=None):
"""监控并下载新的分片"""
downloaded_segments = set()
segment_counter = 0
while True:
try:
# 获取最新播放列表
playlist = self.download_playlist()
# 检查播放列表中的分片
for segment in playlist.segments:
segment_url = segment.absolute_uri
segment_filename = os.path.basename(segment_url)
# 如果还没下载过
if segment_filename not in downloaded_segments:
# 下载原始分片
result = self.download_segment_raw(segment_url, segment_filename)
# 分析原始TS文件
analysis = self.analyze_raw_ts(segment_filename)
print(f" 分析结果: {analysis['total_packets']}包, "
f"PCR包: {analysis['pcr_packets']}")
if analysis['pcr_values']:
print(f" PCR范围: {min(analysis['pcr_values']):.1f}ms - "
f"{max(analysis['pcr_values']):.1f}ms")
downloaded_segments.add(segment_filename)
segment_counter += 1
# 检查是否达到最大数量
if max_segments and segment_counter >= max_segments:
print(f"达到最大分片数: {max_segments}")
return
# 等待下一轮
sleep_time = playlist.target_duration or 2
print(f"等待 {sleep_time} 秒...")
time.sleep(sleep_time)
except KeyboardInterrupt:
print("用户中断")
break
except Exception as e:
print(f"错误: {e}")
time.sleep(5)
# 使用示例
downloader = RawHLSDownloader(
m3u8_url="http://192.168.110.139:8080/stream.m3u8",
output_dir="raw_segments"
)
# 下载5个分片进行测试
downloader.monitor_and_download(max_segments=5)