242 lines
8.2 KiB
Python
242 lines
8.2 KiB
Python
from minio import Minio
|
||
from minio.error import S3Error
|
||
from typing import Optional, Tuple
|
||
import io
|
||
import os
|
||
import logging
|
||
import shutil
|
||
|
||
from app.config.settings import settings
|
||
|
||
|
||
class MinioClient:
|
||
"""MinIO客户端类,支持本地存储作为备选"""
|
||
|
||
def __init__(self):
|
||
"""初始化MinIO客户端"""
|
||
self.local_storage_path = "data_storage/local_uploads"
|
||
os.makedirs(self.local_storage_path, exist_ok=True)
|
||
|
||
try:
|
||
self.client = Minio(
|
||
settings.MINIO_ENDPOINT,
|
||
access_key=settings.MINIO_ACCESS_KEY,
|
||
secret_key=settings.MINIO_SECRET_KEY,
|
||
secure=settings.MINIO_SECURE
|
||
)
|
||
self.bucket_name = settings.MINIO_BUCKET_NAME
|
||
|
||
# 测试真实连接
|
||
self.client.list_buckets()
|
||
self.is_connected = True
|
||
logging.info("MinIO connected successfully")
|
||
|
||
# 确保存储桶存在
|
||
self._ensure_bucket_exists()
|
||
except Exception as e:
|
||
logging.warning(f"Failed to connect to MinIO: {e}. Using local storage.")
|
||
self.client = None
|
||
self.bucket_name = settings.MINIO_BUCKET_NAME
|
||
self.is_connected = False
|
||
|
||
def _get_local_path(self, object_name: str) -> str:
|
||
"""获取本地存储路径"""
|
||
return os.path.join(self.local_storage_path, object_name)
|
||
|
||
def _ensure_bucket_exists(self):
|
||
"""确保存储桶存在"""
|
||
if not self.is_connected:
|
||
return
|
||
|
||
try:
|
||
if not self.client.bucket_exists(self.bucket_name):
|
||
self.client.make_bucket(self.bucket_name)
|
||
except S3Error as e:
|
||
logging.warning(f"MinIO bucket error: {e}")
|
||
|
||
def upload_file(self, file_path: str, object_name: str) -> bool:
|
||
"""上传文件"""
|
||
if not self.is_connected:
|
||
logging.warning("MinIO is not connected. Upload skipped.")
|
||
return False
|
||
|
||
try:
|
||
self.client.fput_object(
|
||
self.bucket_name,
|
||
object_name,
|
||
file_path
|
||
)
|
||
return True
|
||
except S3Error as e:
|
||
logging.warning(f"MinIO upload error: {e}")
|
||
return False
|
||
|
||
def upload_from_bytes(self, data: bytes, object_name: str) -> bool:
|
||
"""从字节数据上传文件,优先使用MinIO,失败则使用本地存储"""
|
||
if self.is_connected:
|
||
try:
|
||
import io
|
||
file_obj = io.BytesIO(data)
|
||
self.client.put_object(
|
||
self.bucket_name,
|
||
object_name,
|
||
file_obj,
|
||
length=len(data),
|
||
part_size=10*1024*1024
|
||
)
|
||
return True
|
||
except S3Error as e:
|
||
logging.warning(f"MinIO upload error: {e}, falling back to local storage")
|
||
|
||
# 使用本地存储作为备选
|
||
try:
|
||
local_path = self._get_local_path(object_name)
|
||
os.makedirs(os.path.dirname(local_path), exist_ok=True)
|
||
with open(local_path, 'wb') as f:
|
||
f.write(data)
|
||
logging.info(f"File saved to local storage: {local_path}")
|
||
return True
|
||
except Exception as e:
|
||
logging.error(f"Local storage save error: {e}")
|
||
return False
|
||
|
||
def upload_fileobj(self, file_obj: io.BytesIO, object_name: str, content_type: str = "application/octet-stream") -> bool:
|
||
"""上传文件对象"""
|
||
if not self.is_connected:
|
||
logging.warning("MinIO is not connected. Upload skipped.")
|
||
return False
|
||
|
||
try:
|
||
self.client.put_object(
|
||
self.bucket_name,
|
||
object_name,
|
||
file_obj,
|
||
length=-1,
|
||
part_size=10*1024*1024,
|
||
content_type=content_type
|
||
)
|
||
return True
|
||
except S3Error as e:
|
||
logging.warning(f"MinIO upload error: {e}")
|
||
return False
|
||
|
||
def download_file(self, object_name: str, file_path: str) -> bool:
|
||
"""下载文件"""
|
||
if not self.is_connected:
|
||
logging.warning("MinIO is not connected. Download skipped.")
|
||
return False
|
||
|
||
try:
|
||
self.client.fget_object(
|
||
self.bucket_name,
|
||
object_name,
|
||
file_path
|
||
)
|
||
return True
|
||
except S3Error as e:
|
||
logging.warning(f"MinIO download error: {e}")
|
||
return False
|
||
|
||
def get_object(self, object_name: str) -> Optional[bytes]:
|
||
"""获取对象内容,优先使用MinIO,失败则使用本地存储"""
|
||
if self.is_connected:
|
||
try:
|
||
response = self.client.get_object(
|
||
self.bucket_name,
|
||
object_name
|
||
)
|
||
data = response.read()
|
||
response.close()
|
||
response.release_conn()
|
||
return data
|
||
except S3Error as e:
|
||
logging.warning(f"MinIO get object error: {e}, falling back to local storage")
|
||
|
||
# 使用本地存储作为备选
|
||
try:
|
||
local_path = self._get_local_path(object_name)
|
||
if os.path.exists(local_path):
|
||
with open(local_path, 'rb') as f:
|
||
return f.read()
|
||
else:
|
||
logging.warning(f"File not found in local storage: {local_path}")
|
||
return None
|
||
except Exception as e:
|
||
logging.error(f"Local storage get error: {e}")
|
||
return None
|
||
|
||
def delete_object(self, object_name: str) -> bool:
|
||
"""删除对象,优先使用MinIO,失败则使用本地存储"""
|
||
if self.is_connected:
|
||
try:
|
||
self.client.remove_object(
|
||
self.bucket_name,
|
||
object_name
|
||
)
|
||
return True
|
||
except S3Error as e:
|
||
logging.warning(f"MinIO delete error: {e}")
|
||
|
||
# 使用本地存储作为备选
|
||
try:
|
||
local_path = self._get_local_path(object_name)
|
||
if os.path.exists(local_path):
|
||
os.remove(local_path)
|
||
return True
|
||
return False
|
||
except Exception as e:
|
||
logging.error(f"Local storage delete error: {e}")
|
||
return False
|
||
|
||
def list_objects(self, prefix: str = "") -> list:
|
||
"""列出对象"""
|
||
if not self.is_connected:
|
||
logging.warning("MinIO is not connected. List objects skipped.")
|
||
return []
|
||
|
||
try:
|
||
objects = []
|
||
for obj in self.client.list_objects(
|
||
self.bucket_name,
|
||
prefix=prefix,
|
||
recursive=True
|
||
):
|
||
objects.append(obj.object_name)
|
||
return objects
|
||
except S3Error as e:
|
||
logging.warning(f"MinIO list objects error: {e}")
|
||
return []
|
||
|
||
def get_presigned_url(self, object_name: str, expires: int = 604800) -> Optional[str]:
|
||
"""获取预签名URL"""
|
||
if not self.is_connected:
|
||
logging.warning("MinIO is not connected. Get presigned URL skipped.")
|
||
return None
|
||
|
||
try:
|
||
url = self.client.presigned_get_object(
|
||
self.bucket_name,
|
||
object_name,
|
||
expires=expires
|
||
)
|
||
return url
|
||
except S3Error as e:
|
||
logging.warning(f"MinIO presigned url error: {e}")
|
||
return None
|
||
|
||
|
||
# 创建全局文件存储实例
|
||
try:
|
||
file_storage = MinioClient()
|
||
except Exception as e:
|
||
# 如果初始化失败,创建一个模拟实例
|
||
class MockFileStorage:
|
||
def __getattr__(self, name):
|
||
def mock_method(*args, **kwargs):
|
||
logging.warning(f"MinIO is not available. Method '{name}' will not execute.")
|
||
return None if name.startswith('get_') or name == 'list_objects' else False
|
||
return mock_method
|
||
|
||
file_storage = MockFileStorage()
|
||
logging.warning(f"Failed to initialize MinIO client: {e}. Using mock instance.")
|