# 导入我们需要的库
from typing import Literal, List, Optional
import asyncio
import os
import time
import logging
from pixivpy3 import AppPixivAPI
from mcp.server.fastmcp import FastMCP
# --- 配置日志 ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- 初始化设置 ---
# 从环境变量中读取 REFRESH_TOKEN
YOUR_REFRESH_TOKEN = os.getenv("PIXIV_REFRESH_TOKEN")
# 初始化 FastMCP 服务器
mcp = FastMCP("pixiv_tool")
# 创建 Pixiv API 的实例
api = AppPixivAPI()
# 全局变量用于跟踪认证状态和API调用
is_authenticated = False
last_api_call_time = 0
MIN_API_INTERVAL = 1.0 # 最小API调用间隔(秒)
# --- 认证和初始化 ---
async def authenticate_pixiv():
"""认证 Pixiv API"""
global is_authenticated
if not YOUR_REFRESH_TOKEN:
logger.error("❌ 环境变量 PIXIV_REFRESH_TOKEN 未设置")
return False
try:
# 使用异步方式进行认证
await asyncio.to_thread(api.auth, refresh_token=YOUR_REFRESH_TOKEN)
# 验证认证是否成功 - 尝试获取用户信息
user_info = await asyncio.to_thread(api.user_detail, api.user_id)
if user_info and 'user' in user_info:
is_authenticated = True
logger.info(f"✅ Pixiv 登录成功!欢迎,{user_info['user']['name']}")
return True
else:
logger.error("❌ 认证验证失败")
return False
except Exception as e:
logger.error(f"❌ Pixiv 登录失败: {str(e)}")
return False
async def rate_limit_check():
"""API 调用频率限制检查"""
global last_api_call_time
current_time = time.time()
if current_time - last_api_call_time < MIN_API_INTERVAL:
wait_time = MIN_API_INTERVAL - (current_time - last_api_call_time)
await asyncio.sleep(wait_time)
last_api_call_time = time.time()
# --- 辅助函数 ---
def validate_inputs(**kwargs):
"""输入验证"""
for key, value in kwargs.items():
if key == 'count' and (not isinstance(value, int) or value < 1 or value > 50):
raise ValueError(f"count 参数必须是 1-50 之间的整数,当前值: {value}")
elif key == 'artist_id' and (not isinstance(value, int) or value < 1):
raise ValueError(f"artist_id 必须是正整数,当前值: {value}")
elif key == 'keyword' and (not isinstance(value, str) or len(value.strip()) == 0):
raise ValueError("keyword 不能为空")
def format_illust_list(illusts: List[dict], title_prefix: str) -> str:
"""格式化插画列表为用户友好的字符串"""
if not illusts:
return f"{title_prefix}没有找到任何结果。"
response_parts = [f"{title_prefix}为你找到了 {len(illusts)} 张佳作:"]
for i, illust in enumerate(illusts):
try:
# 安全地获取图片URL
image_url = getattr(illust.image_urls, 'medium', '图片链接不可用')
# 安全地获取标签
tags = "无标签"
if hasattr(illust, 'tags') and illust.tags:
tags = ", ".join([tag.name for tag in illust.tags[:3] if hasattr(tag, 'name')])
# 安全地获取用户信息
user_name = getattr(illust.user, 'name', '未知画师')
user_id = getattr(illust.user, 'id', 'unknown')
# 安全地获取统计数据
bookmarks = getattr(illust, 'total_bookmarks', 0)
views = getattr(illust, 'total_view', 0)
part = (
f"### {i+1}. {getattr(illust, 'title', '无标题')}\n"
f"**画师**: {user_name} (ID: {user_id})\n"
f"**数据**: 💖 {bookmarks} | 👀 {views}\n"
f"**标签**: {tags}\n"
f"**作品链接**: https://www.pixiv.net/artworks/{getattr(illust, 'id', 'unknown')}\n"
f"**图片预览**: {image_url}"
)
response_parts.append(part)
except Exception as e:
logger.warning(f"格式化第 {i+1} 个插画时出错: {e}")
response_parts.append(f"### {i+1}. 数据格式错误,无法显示此作品")
return "\n\n".join(response_parts)
async def safe_api_call(api_func, *args, **kwargs):
"""安全的API调用包装器"""
if not is_authenticated:
raise Exception("未认证,请检查 PIXIV_REFRESH_TOKEN 是否有效")
await rate_limit_check()
try:
result = await asyncio.to_thread(api_func, *args, **kwargs)
return result
except Exception as e:
logger.error(f"API 调用失败: {api_func.__name__}, 错误: {e}")
raise
# --- MCP 工具定义 ---
@mcp.tool()
async def search_illusts(keyword: str, count: int = 5) -> str:
"""
根据关键词在 Pixiv 上搜索插画。结果将包含标题、作者、链接、收藏数和标签等信息。
你必须将此工具返回的格式化文本结果原封不动地展示给用户。
"""
try:
validate_inputs(keyword=keyword, count=count)
logger.info(f"⚙️ 搜索请求 - 关键词: '{keyword}', 数量: {count}")
json_result = await safe_api_call(
api.search_illust,
keyword,
search_target='exact_match_for_tags'
)
if not json_result or "illusts" not in json_result or not json_result.illusts:
return f"🤷 找不到关于"{keyword}"的插画,尝试使用其他关键词搜索。"
return format_illust_list(json_result.illusts[:count], f"关于"{keyword}"的搜索结果")
except ValueError as e:
return f"❌ 输入参数错误: {str(e)}"
except Exception as e:
logger.error(f"搜索 '{keyword}' 时发生错误: {e}")
return f"❌ 搜索"{keyword}"时遇到错误,请稍后再试。错误信息: {str(e)}"
@mcp.tool()
async def get_illust_ranking(mode: Literal['day', 'week', 'month'] = 'day', count: int = 5) -> str:
"""
获取 Pixiv 插画日榜、周榜或月榜。结果将包含详细的插画信息。
你必须将此工具返回的格式化文本结果原封不动地展示给用户。
"""
try:
validate_inputs(count=count)
logger.info(f"⚙️ 排行榜请求 - 模式: {mode}, 数量: {count}")
json_result = await safe_api_call(api.illust_ranking, mode=mode)
if not json_result or "illusts" not in json_result or not json_result.illusts:
return f"🤷 暂时无法获取"{mode}"模式的排行榜,请稍后再试。"
mode_names = {'day': '日榜', 'week': '周榜', 'month': '月榜'}
return format_illust_list(
json_result.illusts[:count],
f"Pixiv 插画排行榜 ({mode_names.get(mode, mode)})"
)
except ValueError as e:
return f"❌ 输入参数错误: {str(e)}"
except Exception as e:
logger.error(f"获取 '{mode}' 排行榜时发生错误: {e}")
return f"❌ 获取"{mode}"排行榜时遇到错误,请稍后再试。"
@mcp.tool()
async def get_recommended_illusts(count: int = 5) -> str:
"""
获取 Pixiv 根据你的喜好为你推荐的插画。
你必须将此工具返回的格式化文本结果原封不动地展示给用户。
"""
try:
validate_inputs(count=count)
logger.info(f"⚙️ 推荐作品请求 - 数量: {count}")
json_result = await safe_api_call(api.illust_recommended)
if not json_result or "illusts" not in json_result or not json_result.illusts:
return "🤷 暂时无法获取推荐作品,请稍后再试。"
return format_illust_list(json_result.illusts[:count], "为你精选的推荐作品")
except ValueError as e:
return f"❌ 输入参数错误: {str(e)}"
except Exception as e:
logger.error(f"获取推荐作品时发生错误: {e}")
return "❌ 获取推荐作品时遇到错误,请稍后再试。"
@mcp.tool()
async def get_artist_illusts(artist_id: int, count: int = 5) -> str:
"""
根据画师的用户ID获取其发布的作品列表。
你必须将此工具返回的格式化文本结果原封不动地展示给用户。
"""
try:
validate_inputs(artist_id=artist_id, count=count)
logger.info(f"⚙️ 画师作品请求 - 画师ID: {artist_id}, 数量: {count}")
json_result = await safe_api_call(api.user_illusts, artist_id)
if not json_result or "illusts" not in json_result or not json_result.illusts:
return f"🤷 找不到ID为 {artist_id} 的画师的作品,请检查画师ID是否正确。"
artist_name = json_result.illusts[0].user.name if json_result.illusts else "未知画师"
return format_illust_list(json_result.illusts[:count], f"画师"{artist_name}"的作品")
except ValueError as e:
return f"❌ 输入参数错误: {str(e)}"
except Exception as e:
logger.error(f"获取画师 {artist_id} 的作品时发生错误: {e}")
return f"❌ 获取画师作品时遇到错误,请检查画师ID是否正确。"
@mcp.tool()
async def get_my_bookmarks(count: int = 5) -> str:
"""
获取您在 Pixiv 上公开收藏的最新插画。需要有效的登录凭证。
你必须将此工具返回的格式化文本结果原封不动地展示给用户。
"""
try:
validate_inputs(count=count)
if not hasattr(api, 'user_id') or not api.user_id:
return "❌ 无法获取用户ID,请检查认证状态。"
my_user_id = api.user_id
logger.info(f"⚙️ 我的收藏请求 - 用户ID: {my_user_id}, 数量: {count}")
json_result = await safe_api_call(api.user_bookmarks_illust, my_user_id)
if not json_result or "illusts" not in json_result or not json_result.illusts:
return "🤷 您的公开收藏夹是空的,或者暂时无法访问。"
return format_illust_list(json_result.illusts[:count], "您最新收藏的作品")
except ValueError as e:
return f"❌ 输入参数错误: {str(e)}"
except Exception as e:
logger.error(f"获取我的收藏时发生错误: {e}")
return "❌ 获取收藏时遇到错误,请确保您的 REFRESH_TOKEN 有效。"
@mcp.tool()
async def get_pixiv_status() -> str:
"""
检查 Pixiv API 连接和认证状态。
"""
try:
if not is_authenticated:
return "❌ 未认证 - 请检查 PIXIV_REFRESH_TOKEN 环境变量是否正确设置。"
# 尝试获取用户信息来验证连接
user_info = await safe_api_call(api.user_detail, api.user_id)
if user_info and 'user' in user_info:
user = user_info['user']
return (
f"✅ Pixiv API 连接正常\n"
f"**用户**: {user.get('name', '未知')}\n"
f"**用户ID**: {user.get('id', api.user_id)}\n"
f"**关注数**: {user.get('total_follow_users', 0)}\n"
f"**粉丝数**: {user.get('total_followers', 0)}"
)
else:
return "⚠️ 认证状态异常,建议重新设置 REFRESH_TOKEN。"
except Exception as e:
logger.error(f"检查状态时发生错误: {e}")
return f"❌ 连接检查失败: {str(e)}"
# --- 启动初始化 ---
async def initialize_service():
"""初始化服务"""
logger.info("🚀 正在初始化 Pixiv MCP 服务...")
if await authenticate_pixiv():
logger.info("✅ 服务初始化完成,准备接收请求")
else:
logger.error("❌ 服务初始化失败,请检查配置")
# --- 运行服务器 ---
if __name__ == "__main__":
if YOUR_REFRESH_TOKEN:
# 在事件循环中运行初始化
asyncio.run(initialize_service())
# 启动 MCP 服务器
mcp.run(transport='stdio')
else:
logger.error("🔴 服务未启动:PIXIV_REFRESH_TOKEN 环境变量未设置")
print("请设置环境变量 PIXIV_REFRESH_TOKEN 后重新运行")