Files
netdisk-fast-download/parser/doc/PYTHON_PARSER_GUIDE.md
q 2fcf9cfab1 docs: 更新文档导航和解析器指南
- 添加演练场(Playground)文档导航区到主 README
- 新增 Python 解析器文档链接(开发指南、测试报告、LSP集成)
- 更新前端版本号至 0.1.9b19p
- 补充 Python 解析器 requests 库使用章节和官方文档链接
- 添加 JavaScript 和 Python 解析器的语言版本和官方文档
- 优化文档结构,分类为项目文档和外部资源
2026-01-11 22:35:45 +08:00

32 KiB
Raw Blame History

Python解析器开发指南

概述

本指南介绍如何使用Python编写自定义网盘解析器。Python解析器基于GraalPy运行提供与JavaScript解析器相同的功能但使用Python语法。

技术规格

  • Python 运行时: GraalPy (GraalVM Python)
  • Python 版本: Python 3.10+ 兼容
  • 标准库支持: 支持大部分 Python 标准库
  • 第三方库支持: 内置 requests 库(需在顶层导入)
  • 运行模式: 同步执行,所有操作都是阻塞式的

参考文档

目录

快速开始

1. 创建Python脚本

./custom-parsers/py/ 目录下创建 .py 文件,使用以下模板:

# ==UserScript==
# @name         我的解析器
# @type         my_parser
# @displayName  我的网盘
# @description  使用Python实现的网盘解析器
# @match        https?://example\.com/s/(?P<KEY>\w+)
# @author       yourname
# @version      1.0.0
# ==/UserScript==

def parse(share_link_info, http, logger):
    """
    解析单个文件下载链接
    
    Args:
        share_link_info: 分享链接信息对象
        http: HTTP客户端
        logger: 日志记录器
    
    Returns:
        str: 直链下载地址
    """
    url = share_link_info.get_share_url()
    logger.info(f"开始解析: {url}")
    
    response = http.get(url)
    if not response.ok():
        raise Exception(f"请求失败: {response.status_code()}")
    
    # 解析响应获取下载链接
    return "https://example.com/download/file.zip"

2. 解析器元数据

Python解析器使用与JavaScript相同的元数据格式

字段 必填 说明
@name 解析器名称
@type 解析器类型标识(小写字母和下划线)
@displayName 显示名称
@match URL匹配正则表达式
@description 解析器描述
@author 作者名称
@version 版本号

3. 正则表达式命名分组

Python使用 (?P<NAME>pattern) 语法定义命名分组:

# Python正则表达式命名分组
# @match https?://pan\.example\.com/s/(?P<KEY>\w+)(?:\?pwd=(?P<PWD>\w+))?

注意这与JavaScript的 (?<NAME>pattern) 语法不同。

API参考

ShareLinkInfo对象

提供分享链接信息的访问接口。

# 获取分享URL
url = share_link_info.get_share_url()
# 或使用驼峰命名
url = share_link_info.getShareUrl()

# 获取分享Key
key = share_link_info.get_share_key()

# 获取分享密码
password = share_link_info.get_share_password()

# 获取网盘类型
pan_type = share_link_info.get_type()

# 获取网盘名称
pan_name = share_link_info.get_pan_name()

# 获取文件ID用于parseById
file_id = share_link_info.get_file_id()

# 获取其他参数
dir_id = share_link_info.get_other_param("dirId")
custom_value = share_link_info.get_other_param("customKey")

PyHttpClient对象

提供HTTP请求功能支持GET、POST、PUT、DELETE、PATCH等方法。

基本请求

# GET请求
response = http.get("https://api.example.com/data")

# POST请求表单数据
response = http.post("https://api.example.com/login", {
    "username": "user",
    "password": "pass"
})

# POST请求JSON数据
response = http.post_json("https://api.example.com/api", {
    "action": "get_file",
    "file_id": "12345"
})

# PUT请求
response = http.put("https://api.example.com/update", {"key": "value"})

# DELETE请求
response = http.delete("https://api.example.com/resource/123")

# PATCH请求
response = http.patch("https://api.example.com/resource/123", {"status": "active"})

请求头设置

# 设置单个请求头
http.put_header("Referer", "https://example.com")
http.put_header("Authorization", "Bearer token123")

# 链式设置多个请求头
http.put_header("Accept", "application/json").put_header("X-Custom", "value")

# 清除请求头
http.clear_headers()

超时设置

# 设置超时时间(秒)
http.set_timeout(60)  # 60秒超时

重定向控制

# 自动跟随重定向(默认行为)
response = http.get(url)

# 不跟随重定向获取重定向URL
response = http.get_no_redirect(url)
if response.status_code() == 302:
    redirect_url = response.header("Location")
    logger.info(f"重定向到: {redirect_url}")

PyHttpResponse对象

HTTP响应对象提供响应数据访问。

# 检查请求是否成功2xx状态码
if response.ok():
    # 处理成功响应
    pass

# 获取状态码
status = response.status_code()

# 获取响应文本
html = response.text()

# 获取响应JSON自动解析
data = response.json()
file_list = data.get("files", [])

# 获取响应头
content_type = response.header("Content-Type")
all_headers = response.headers()

# 获取响应字节
raw_bytes = response.body_bytes()

# 获取内容长度
length = response.content_length()

# 检查是否为重定向
if response.is_redirect():
    location = response.header("Location")

PyLogger对象

提供日志记录功能。

# 调试日志
logger.debug("变量值: " + str(value))
logger.debug(f"处理第 {index} 个文件")

# 信息日志
logger.info("开始解析文件列表")
logger.info(f"找到 {count} 个文件")

# 警告日志
logger.warn("密码为空,尝试无密码访问")
logger.warn(f"重试第 {retry} 次")

# 错误日志
logger.error("解析失败")
logger.error(f"HTTP错误: {status}")

PyCryptoUtils对象

提供常用加密功能。

# MD5加密
md5_hash = crypto.md5("hello")  # 32位小写
md5_16 = crypto.md5_16("hello")  # 16位小写

# SHA系列
sha1_hash = crypto.sha1("hello")
sha256_hash = crypto.sha256("hello")
sha512_hash = crypto.sha512("hello")

# Base64编码/解码
encoded = crypto.base64_encode("hello")
decoded = crypto.base64_decode(encoded)

# Base64 URL安全编码
url_safe = crypto.base64_url_encode("hello")
decoded = crypto.base64_url_decode(url_safe)

# AES加密/解密ECB模式
encrypted = crypto.aes_encrypt("plaintext", "1234567890123456")
decrypted = crypto.aes_decrypt(encrypted, "1234567890123456")

# AES加密/解密CBC模式
encrypted = crypto.aes_encrypt_cbc("plaintext", "1234567890123456", "1234567890123456")
decrypted = crypto.aes_decrypt_cbc(encrypted, "1234567890123456", "1234567890123456")

# 字节转十六进制
hex_str = crypto.bytes_to_hex(byte_array)

使用 requests 库

GraalPy 环境支持使用流行的 Python requests 库来处理 HTTP 请求。requests 提供了更加 Pythonic 的 API适合熟悉 Python 生态的开发者。

官方文档: Requests: HTTP for Humans™

重要提示

requests 必须在脚本顶层导入,不能在函数内部导入:

# ✅ 正确:在顶层导入
import requests

def parse(share_link_info, http, logger):
    response = requests.get(url)
    # ...

# ❌ 错误:在函数内导入
def parse(share_link_info, http, logger):
    import requests  # 这会失败!

基本使用

GET 请求

import requests

def parse(share_link_info, http, logger):
    url = share_link_info.get_share_url()
    
    # 基本 GET 请求
    response = requests.get(url)
    
    # 检查状态码
    if response.status_code == 200:
        html = response.text
        logger.info(f"页面长度: {len(html)}")
    
    # 带参数的 GET 请求
    response = requests.get('https://api.example.com/search', params={
        'key': share_link_info.get_share_key(),
        'format': 'json'
    })
    
    # 自动解析 JSON
    data = response.json()
    return data['download_url']

POST 请求

import requests

def parse(share_link_info, http, logger):
    # POST 表单数据
    response = requests.post('https://api.example.com/login', data={
        'username': 'user',
        'password': 'pass'
    })
    
    # POST JSON 数据
    response = requests.post('https://api.example.com/api', json={
        'action': 'get_download',
        'file_id': '12345'
    })
    
    # 自定义请求头
    response = requests.post(
        'https://api.example.com/upload',
        json={'file': 'data'},
        headers={
            'Authorization': 'Bearer token123',
            'Content-Type': 'application/json',
            'User-Agent': 'Mozilla/5.0 ...'
        }
    )
    
    return response.json()['url']

设置请求头

import requests

def parse(share_link_info, http, logger):
    url = share_link_info.get_share_url()
    
    # 自定义请求头
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
        'Referer': url,
        'Accept': 'application/json',
        'Accept-Language': 'zh-CN,zh;q=0.9',
        'X-Requested-With': 'XMLHttpRequest'
    }
    
    response = requests.get(url, headers=headers)
    return response.text
import requests

def parse(share_link_info, http, logger):
    url = share_link_info.get_share_url()
    
    # 方法1使用 cookies 参数
    cookies = {
        'session_id': 'abc123',
        'user_token': 'xyz789'
    }
    response = requests.get(url, cookies=cookies)
    
    # 方法2从响应中获取 Cookie
    response = requests.get(url)
    logger.info(f"返回的 Cookies: {response.cookies}")
    
    # 在后续请求中使用
    next_response = requests.get('https://api.example.com/data', 
                                  cookies=response.cookies)
    
    return next_response.json()['download_url']

Session 会话

使用 Session 可以自动管理 Cookie适合需要多次请求的场景

import requests

def parse(share_link_info, http, logger):
    url = share_link_info.get_share_url()
    key = share_link_info.get_share_key()
    
    # 创建 Session
    session = requests.Session()
    
    # 设置全局请求头
    session.headers.update({
        'User-Agent': 'Mozilla/5.0 ...',
        'Referer': url
    })
    
    # 步骤1访问页面获取 Cookie
    logger.info("步骤1: 访问页面")
    response1 = session.get(url)
    
    # 步骤2提交验证
    logger.info("步骤2: 验证密码")
    password = share_link_info.get_share_password()
    response2 = session.post('https://api.example.com/verify', data={
        'key': key,
        'pwd': password
    })
    
    # 步骤3获取下载链接Session 自动携带 Cookie
    logger.info("步骤3: 获取下载链接")
    response3 = session.get(f'https://api.example.com/download?key={key}')
    
    data = response3.json()
    return data['url']

高级功能

超时设置

import requests

def parse(share_link_info, http, logger):
    try:
        # 设置 5 秒超时
        response = requests.get(url, timeout=5)
        
        # 分别设置连接超时和读取超时
        response = requests.get(url, timeout=(3, 10))  # 连接3秒读取10秒
        
        return response.text
    except requests.Timeout:
        logger.error("请求超时")
        raise Exception("请求超时")

重定向控制

import requests

def parse(share_link_info, http, logger):
    url = share_link_info.get_share_url()
    
    # 不跟随重定向
    response = requests.get(url, allow_redirects=False)
    
    if response.status_code in [301, 302, 303, 307, 308]:
        download_url = response.headers['Location']
        logger.info(f"重定向到: {download_url}")
        return download_url
    
    # 限制重定向次数
    response = requests.get(url, allow_redirects=True, max_redirects=5)
    return response.text

代理设置

import requests

def parse(share_link_info, http, logger):
    # 使用代理
    proxies = {
        'http': 'http://proxy.example.com:8080',
        'https': 'https://proxy.example.com:8080'
    }
    
    response = requests.get(url, proxies=proxies)
    return response.text

文件上传

import requests

def parse(share_link_info, http, logger):
    # 上传文件
    files = {
        'file': ('filename.txt', 'file content', 'text/plain')
    }
    
    response = requests.post('https://api.example.com/upload', files=files)
    return response.json()['file_url']

异常处理

import requests
from requests.exceptions import RequestException, HTTPError, Timeout, ConnectionError

def parse(share_link_info, http, logger):
    try:
        response = requests.get(url, timeout=10)
        
        # 检查 HTTP 错误4xx, 5xx
        response.raise_for_status()
        
        return response.json()['download_url']
        
    except HTTPError as e:
        logger.error(f"HTTP 错误: {e.response.status_code}")
        raise
    except Timeout:
        logger.error("请求超时")
        raise
    except ConnectionError:
        logger.error("连接失败")
        raise
    except RequestException as e:
        logger.error(f"请求异常: {str(e)}")
        raise

注意事项

1. 顶层导入限制

requests 必须在脚本最顶部导入,不能在函数内部导入:

# ✅ 正确示例
import requests
import json
import re

def parse(share_link_info, http, logger):
    response = requests.get(url)
    # ...

# ❌ 错误示例
def parse(share_link_info, http, logger):
    import requests  # 运行时会报错!
    response = requests.get(url)

2. 与内置 http 对象的选择

  • 使用 requests:适合熟悉 Python 生态、需要复杂功能Session、高级参数
  • 使用内置 http:更轻量、性能更好、适合简单场景
import requests

def parse(share_link_info, http, logger):
    # 方式1使用 requests更 Pythonic
    response = requests.get(url, headers={'User-Agent': 'Mozilla/5.0'})
    data = response.json()
    
    # 方式2使用内置 http更轻量
    http.put_header('User-Agent', 'Mozilla/5.0')
    response = http.get(url)
    data = response.json()
    
    # 两种方式可以混用
    return data['url']

3. 编码处理

import requests

def parse(share_link_info, http, logger):
    response = requests.get(url)
    
    # requests 自动检测编码
    text = response.text
    logger.info(f"检测到编码: {response.encoding}")
    
    # 手动设置编码
    response.encoding = 'utf-8'
    text = response.text
    
    # 获取原始字节
    raw_bytes = response.content
    
    return text

4. 性能考虑

import requests

def parse(share_link_info, http, logger):
    # 使用 Session 复用连接(提升性能)
    session = requests.Session()
    
    # 多次请求时Session 会复用 TCP 连接
    response1 = session.get('https://api.example.com/step1')
    response2 = session.get('https://api.example.com/step2')
    response3 = session.get('https://api.example.com/step3')
    
    return response3.json()['url']

完整示例:使用 requests

# ==UserScript==
# @name         示例-使用requests
# @type         example_requests
# @displayName  requests示例
# @match        https?://pan\.example\.com/s/(?P<KEY>\w+)
# @version      1.0.0
# ==/UserScript==

import requests
import json

def parse(share_link_info, http, logger):
    """
    使用 requests 库的完整示例
    """
    url = share_link_info.get_share_url()
    key = share_link_info.get_share_key()
    password = share_link_info.get_share_password()
    
    logger.info(f"开始解析: {url}")
    
    # 创建 Session
    session = requests.Session()
    session.headers.update({
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
        'Referer': url,
        'Accept': 'application/json'
    })
    
    try:
        # 步骤1获取分享信息
        logger.info("获取分享信息")
        response = session.get(
            f'https://api.example.com/share/info',
            params={'key': key},
            timeout=10
        )
        response.raise_for_status()
        
        info = response.json()
        if info['code'] != 0:
            raise Exception(f"分享不存在: {info['message']}")
        
        # 步骤2验证密码
        if info.get('need_password') and password:
            logger.info("验证密码")
            verify_response = session.post(
                'https://api.example.com/share/verify',
                json={
                    'key': key,
                    'password': password
                },
                timeout=10
            )
            verify_response.raise_for_status()
            
            if not verify_response.json().get('success'):
                raise Exception("密码错误")
        
        # 步骤3获取下载链接
        logger.info("获取下载链接")
        download_response = session.get(
            f'https://api.example.com/share/download',
            params={'key': key},
            allow_redirects=False,
            timeout=10
        )
        
        # 处理重定向
        if download_response.status_code in [301, 302]:
            download_url = download_response.headers['Location']
            logger.info(f"获取到下载链接: {download_url}")
            return download_url
        
        # 或从 JSON 中提取
        download_response.raise_for_status()
        data = download_response.json()
        return data['url']
        
    except requests.Timeout:
        logger.error("请求超时")
        raise Exception("请求超时,请稍后重试")
    except requests.HTTPError as e:
        logger.error(f"HTTP 错误: {e.response.status_code}")
        raise Exception(f"HTTP 错误: {e.response.status_code}")
    except requests.RequestException as e:
        logger.error(f"请求失败: {str(e)}")
        raise Exception(f"请求失败: {str(e)}")
    except Exception as e:
        logger.error(f"解析失败: {str(e)}")
        raise


def parse_file_list(share_link_info, http, logger):
    """
    使用 requests 解析文件列表
    """
    key = share_link_info.get_share_key()
    dir_id = share_link_info.get_other_param("dirId") or "0"
    
    logger.info(f"获取文件列表: {dir_id}")
    
    try:
        response = requests.get(
            'https://api.example.com/share/list',
            params={'key': key, 'dir': dir_id},
            headers={'User-Agent': 'Mozilla/5.0 ...'},
            timeout=10
        )
        response.raise_for_status()
        
        data = response.json()
        files = data.get('files', [])
        
        result = []
        for file in files:
            result.append({
                'file_name': file['name'],
                'file_id': str(file['id']),
                'file_type': 'dir' if file.get('is_dir') else 'file',
                'size': file.get('size', 0),
                'pan_type': share_link_info.get_type(),
                'parser_url': f'https://pan.example.com/s/{key}?fid={file["id"]}'
            })
        
        logger.info(f"找到 {len(result)} 个文件")
        return result
        
    except requests.RequestException as e:
        logger.error(f"获取文件列表失败: {str(e)}")
        raise

requests 官方资源

实现方法

parse方法必填

解析单个文件的下载链接。

def parse(share_link_info, http, logger):
    """
    解析分享链接,获取直链下载地址
    
    Args:
        share_link_info: 分享链接信息
        http: HTTP客户端
        logger: 日志记录器
    
    Returns:
        str: 直链下载地址
    
    Raises:
        Exception: 解析失败时抛出异常
    """
    url = share_link_info.get_share_url()
    key = share_link_info.get_share_key()
    
    logger.info(f"解析分享: {url}")
    
    # 1. 获取页面或API数据
    response = http.get(url)
    if not response.ok():
        raise Exception(f"请求失败: {response.status_code()}")
    
    # 2. 解析响应
    data = response.json()
    if data.get("code") != 0:
        raise Exception(f"API错误: {data.get('message')}")
    
    # 3. 返回下载链接
    download_url = data.get("data", {}).get("download_url")
    if not download_url:
        raise Exception("未找到下载链接")
    
    return download_url

parse_file_list方法可选

解析文件列表,用于文件夹分享。

def parse_file_list(share_link_info, http, logger):
    """
    解析文件列表
    
    Args:
        share_link_info: 分享链接信息
        http: HTTP客户端
        logger: 日志记录器
    
    Returns:
        list: 文件信息列表
    """
    key = share_link_info.get_share_key()
    dir_id = share_link_info.get_other_param("dirId") or "0"
    
    logger.info(f"获取文件列表目录ID: {dir_id}")
    
    # 请求文件列表API
    response = http.post_json("https://api.example.com/list", {
        "share_key": key,
        "dir_id": dir_id
    })
    
    if not response.ok():
        raise Exception(f"请求失败: {response.status_code()}")
    
    data = response.json()
    files = data.get("files", [])
    
    # 转换为标准格式
    result = []
    for file in files:
        result.append({
            "file_name": file.get("name"),
            "file_id": file.get("id"),
            "file_type": "dir" if file.get("is_dir") else "file",
            "size": file.get("size", 0),
            "pan_type": share_link_info.get_type(),
            "parser_url": f"https://example.com/s/{key}?fid={file.get('id')}"
        })
    
    return result

parse_by_id方法可选

根据文件ID获取下载链接。

def parse_by_id(share_link_info, http, logger):
    """
    根据文件ID解析下载链接
    
    Args:
        share_link_info: 分享链接信息包含file_id
        http: HTTP客户端
        logger: 日志记录器
    
    Returns:
        str: 直链下载地址
    """
    key = share_link_info.get_share_key()
    file_id = share_link_info.get_file_id()
    
    logger.info(f"解析文件ID: {file_id}")
    
    # 请求下载链接API
    response = http.post_json("https://api.example.com/download", {
        "share_key": key,
        "file_id": file_id
    })
    
    if not response.ok():
        raise Exception(f"请求失败: {response.status_code()}")
    
    data = response.json()
    return data.get("download_url")

重定向处理

很多网盘的下载链接需要通过重定向获取最终URL

def parse(share_link_info, http, logger):
    url = share_link_info.get_share_url()
    
    # 方法1使用不跟随重定向的请求
    response = http.get_no_redirect(url)
    if response.is_redirect():
        download_url = response.header("Location")
        logger.info(f"重定向URL: {download_url}")
        return download_url
    
    # 方法2跟随重定向获取最终URL
    response = http.get(url)
    final_url = response.header("X-Final-URL")  # 如果服务器提供
    
    return final_url

错误处理

def parse(share_link_info, http, logger):
    try:
        url = share_link_info.get_share_url()
        response = http.get(url)
        
        # 检查HTTP状态
        if not response.ok():
            if response.status_code() == 404:
                raise Exception("分享链接不存在或已失效")
            elif response.status_code() == 403:
                raise Exception("需要密码访问")
            else:
                raise Exception(f"HTTP错误: {response.status_code()}")
        
        # 检查业务状态
        data = response.json()
        if data.get("code") != 0:
            error_msg = data.get("message", "未知错误")
            raise Exception(f"业务错误: {error_msg}")
        
        return data.get("download_url")
        
    except Exception as e:
        logger.error(f"解析失败: {str(e)}")
        raise

调试技巧

1. 使用日志输出

def parse(share_link_info, http, logger):
    logger.info("===== 开始调试 =====")
    
    url = share_link_info.get_share_url()
    logger.debug(f"分享URL: {url}")
    
    response = http.get(url)
    logger.debug(f"状态码: {response.status_code()}")
    logger.debug(f"响应头: {response.headers()}")
    logger.debug(f"响应体前500字符: {response.text()[:500]}")
    
    # 解析逻辑...

2. 分步调试

def parse(share_link_info, http, logger):
    # 步骤1获取页面
    logger.info("步骤1获取页面")
    response = http.get(share_link_info.get_share_url())
    logger.info(f"步骤1完成状态: {response.status_code()}")
    
    # 步骤2提取token
    logger.info("步骤2提取token")
    html = response.text()
    # token = extract_token(html)
    # logger.info(f"Token: {token}")
    
    # 步骤3请求下载API
    logger.info("步骤3请求下载API")
    # ...

3. 使用演练场

在Web演练场中可以实时测试Python代码

  1. 打开演练场页面
  2. 选择Python语言
  3. 输入测试URL和代码
  4. 点击运行查看日志输出

与JavaScript解析器的差异

特性 JavaScript Python
正则命名分组 (?<NAME>pattern) (?P<NAME>pattern)
字符串格式化 模板字符串 f"字符串".format()
异常处理 try/catch try/except
空值判断 null/undefined None
布尔值 true/false True/False
字典访问 obj.keyobj["key"] dict.get("key")dict["key"]
列表操作 array.push() list.append()

示例对比

JavaScript:

function parse(shareLinkInfo, http, logger) {
    var url = shareLinkInfo.getShareUrl();
    logger.info("解析: " + url);
    
    var response = http.get(url);
    if (!response.ok()) {
        throw new Error("请求失败");
    }
    
    var data = response.json();
    return data.download_url || "";
}

Python:

def parse(share_link_info, http, logger):
    url = share_link_info.get_share_url()
    logger.info(f"解析: {url}")
    
    response = http.get(url)
    if not response.ok():
        raise Exception("请求失败")
    
    data = response.json()
    return data.get("download_url", "")

最佳实践

1. 使用Python风格的方法名

# 推荐Python下划线风格
url = share_link_info.get_share_url()
key = share_link_info.get_share_key()

# 也支持Java驼峰风格
url = share_link_info.getShareUrl()
key = share_link_info.getShareKey()

2. 合理设置超时

def parse(share_link_info, http, logger):
    # 对于慢速API增加超时时间
    http.set_timeout(60)
    
    # 对于需要多次请求的场景
    http.set_timeout(30)  # 每次请求30秒

3. 处理编码问题

# 响应自动处理UTF-8编码
text = response.text()

# 如果需要处理其他编码,可以获取原始字节
raw = response.body_bytes()

4. 请求头管理

def parse(share_link_info, http, logger):
    # 设置必要的请求头
    http.put_header("Referer", share_link_info.get_share_url())
    http.put_header("User-Agent", "Mozilla/5.0 ...")
    
    response = http.get(url)
    
    # 如果需要清除请求头
    http.clear_headers()

完整示例

# ==UserScript==
# @name         示例网盘解析器
# @type         example_pan
# @displayName  示例网盘
# @description  完整的Python解析器示例
# @match        https?://pan\.example\.com/s/(?P<KEY>\w+)(?:\?pwd=(?P<PWD>\w+))?
# @author       QAIU
# @version      1.0.0
# ==/UserScript==

"""
示例网盘Python解析器
展示完整的解析流程
"""

import re  # GraalPy支持标准库


def parse(share_link_info, http, logger):
    """解析单个文件下载链接"""
    url = share_link_info.get_share_url()
    key = share_link_info.get_share_key()
    password = share_link_info.get_share_password()
    
    logger.info(f"开始解析: {url}")
    logger.info(f"分享Key: {key}")
    
    # 设置请求头
    http.put_header("Referer", url)
    http.put_header("Accept", "application/json")
    
    # 步骤1获取分享信息
    info_url = f"https://api.example.com/share/info?key={key}"
    response = http.get(info_url)
    
    if not response.ok():
        raise Exception(f"获取分享信息失败: {response.status_code()}")
    
    data = response.json()
    if data.get("code") != 0:
        raise Exception(f"分享不存在或已失效: {data.get('message')}")
    
    # 步骤2验证密码如果需要
    if data.get("need_password") and password:
        verify_url = "https://api.example.com/share/verify"
        verify_response = http.post_json(verify_url, {
            "key": key,
            "password": password
        })
        
        if not verify_response.json().get("success"):
            raise Exception("密码错误")
    
    # 步骤3获取下载链接
    download_url = f"https://api.example.com/share/download?key={key}"
    download_response = http.get_no_redirect(download_url)
    
    if download_response.is_redirect():
        final_url = download_response.header("Location")
        logger.info(f"获取到下载链接: {final_url}")
        return final_url
    
    # 备选方案:从响应中解析
    download_data = download_response.json()
    return download_data.get("url")


def parse_file_list(share_link_info, http, logger):
    """解析文件列表"""
    key = share_link_info.get_share_key()
    dir_id = share_link_info.get_other_param("dirId") or "0"
    
    logger.info(f"获取文件列表,目录: {dir_id}")
    
    list_url = f"https://api.example.com/share/list?key={key}&dir={dir_id}"
    response = http.get(list_url)
    
    if not response.ok():
        raise Exception(f"获取文件列表失败: {response.status_code()}")
    
    data = response.json()
    files = data.get("files", [])
    
    result = []
    for file in files:
        result.append({
            "file_name": file.get("name"),
            "file_id": str(file.get("id")),
            "file_type": "dir" if file.get("is_dir") else "file",
            "size": file.get("size", 0),
            "pan_type": share_link_info.get_type(),
            "parser_url": f"https://pan.example.com/s/{key}?fid={file.get('id')}"
        })
    
    logger.info(f"找到 {len(result)} 个文件/文件夹")
    return result


def parse_by_id(share_link_info, http, logger):
    """根据文件ID获取下载链接"""
    key = share_link_info.get_share_key()
    file_id = share_link_info.get_file_id()
    
    logger.info(f"解析文件: {file_id}")
    
    download_url = f"https://api.example.com/share/download?key={key}&fid={file_id}"
    response = http.get_no_redirect(download_url)
    
    if response.is_redirect():
        return response.header("Location")
    
    return response.json().get("url")

相关文档

项目文档

外部资源