From 56d7cffdaa5df52a431ee0eb92e19b726f69b18b Mon Sep 17 00:00:00 2001 From: UPToZ Date: Sat, 6 Jun 2026 14:56:09 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9iKuuu=E7=AD=BE=E5=88=B0?= =?UTF-8?q?=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- iKuuu/README.md | 56 +++- iKuuu/ik_signin.py | 730 +++++++++++++++++++++++++-------------------- 2 files changed, 444 insertions(+), 342 deletions(-) diff --git a/iKuuu/README.md b/iKuuu/README.md index cfa18c2..b3068c3 100644 --- a/iKuuu/README.md +++ b/iKuuu/README.md @@ -15,10 +15,10 @@ ## 安装步骤 1. 将脚本保存为`ik_signin.py`到青龙面板的`scripts`目录 -2. 添加环境变量`IKUUU_COOKIES`,格式为: +2. 添加环境变量`IKUUU_ACCOUNTS`,格式为: ``` -备注1|cookie1 -备注2|cookie2 +邮箱1:密码1 +邮箱2:密码2 ``` 3. 在青龙面板添加定时任务: ``` @@ -29,7 +29,7 @@ task ik_signin 0 0 1 * * ? | 变量名 | 说明 | 示例 | | ---- | ---- | ---- | -| IKUUU_COOKIES | iKuuu登录Cookie,每行一个账号,格式为`备注|cookie`,备注可省略 | `主账号|PHPSESSID=xxx; uid=xxx` | +| IKUUU_ACCOUNTS | iKuuu账号密码,每行一个账号,格式为`邮箱:密码` | `test@example.com:password123` | ## 使用说明 @@ -38,18 +38,52 @@ task ik_signin 0 0 1 * * ? 3. 依次对配置的所有账号进行签到操作 4. 签到结果会通过青龙面板的通知系统发送 -### Cookie获取说明 +### 运行依赖 -1. 浏览器登录iKuuu后打开开发者工具 -2. 在Network中访问`/user`或`/user/checkin` -3. 复制请求头里的`Cookie`完整内容 -4. 填入青龙环境变量`IKUUU_COOKIES` +当前版本使用真实浏览器页面登录并点击Geetest验证按钮。Python依赖只需要安装: + +``` +DrissionPage +``` + +脚本不需要`chromium-chromedriver`,也不需要`xvfb`。DrissionPage使用浏览器调试协议,不走ChromeDriver;脚本已启用headless模式。 + +青龙容器内需要有可用的Chrome或Chromium。如果脚本没有自动找到浏览器,可以额外设置: + +| 变量名 | 说明 | 示例 | +| ---- | ---- | ---- | +| IKUUU_BROWSER_PATH | Chrome/Chromium可执行文件路径 | `/usr/bin/chromium` | + +浏览器不是Python依赖,不能在青龙Python依赖页面安装。需要进入青龙容器安装或确认已有路径: + +``` +which chromium chromium-browser google-chrome google-chrome-stable chrome +``` + +Debian/Ubuntu容器可尝试: + +``` +apt-get update && apt-get install -y chromium +``` + +Alpine容器可尝试: + +``` +apk add --no-cache chromium +``` + +安装后如果脚本仍未自动识别,在青龙环境变量中增加: + +``` +IKUUU_BROWSER_PATH=/usr/bin/chromium +``` ## 更新日志 ### 2026-06-04 -- 移除账号密码登录,统一使用`IKUUU_COOKIES`执行签到 -- 优化剩余流量查询,支持从页面卡片、文本和脚本片段中提取流量信息 +- 参照浏览器自动化逻辑,恢复`IKUUU_ACCOUNTS`账号密码登录 +- 移除Cookie登录路径,改为使用DrissionPage打开页面、点击Geetest并调用页面签到函数 +- 移除`xvfb`和`chromium-chromedriver`依赖要求,支持通过`IKUUU_BROWSER_PATH`指定浏览器路径 ### 2025-07-29 - 修复「剩余流量」查询 diff --git a/iKuuu/ik_signin.py b/iKuuu/ik_signin.py index aa64f91..405d052 100644 --- a/iKuuu/ik_signin.py +++ b/iKuuu/ik_signin.py @@ -5,14 +5,14 @@ name: iKuuu签到 cron: 0 0 8 * * ? """ -import requests -import re -import os import datetime +import os +import re +import shutil import sys import time -import base64 -from bs4 import BeautifulSoup + +import requests try: sys.stdout.reconfigure(encoding="utf-8") @@ -20,15 +20,15 @@ except Exception: pass # 添加青龙脚本根目录到Python路径 -QL_SCRIPTS_DIR = '/ql/scripts' # 青龙脚本默认目录 +QL_SCRIPTS_DIR = '/ql/scripts' sys.path.append(QL_SCRIPTS_DIR) # 添加notify可能存在的其他路径 POSSIBLE_PATHS = [ - '/ql', # 青龙根目录 - '/ql/data/scripts', # 新版青龙数据目录 - '/ql/scripts/notify', # 自定义通知目录 - os.path.dirname(__file__) # 当前脚本目录 + '/ql', + '/ql/data/scripts', + '/ql/scripts/notify', + os.path.dirname(__file__) ] for path in POSSIBLE_PATHS: @@ -40,474 +40,534 @@ try: from notify import send except ImportError: print("⚠️ 无法加载通知模块,请检查路径配置") - send = lambda title, content: None # 创建空函数防止报错 + send = lambda title, content: None + -# 初始域名 ikun_host = "ikuuu.win" # 自动更新于2026-01-17 08:00:07 -backup_hosts = ["ikuuu.win","ikuuu.fyi","ikuuu.one", "ikuuu.pw", "ikuuu.me","ikuuu.cc"] # 备用域名列表 +backup_hosts = ["ikuuu.win", "ikuuu.fyi", "ikuuu.one", "ikuuu.pw", "ikuuu.me", "ikuuu.cc"] -# 统一的User-Agent -USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36" +USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" -def build_headers(referer=None, ajax=False): - headers = {"User-Agent": USER_AGENT} - if referer: - headers["Referer"] = referer - headers["Origin"] = f"https://{ikun_host}" - if ajax: - headers["X-Requested-With"] = "XMLHttpRequest" - return headers -def decode_origin_body(html): - match = re.search(r'var originBody = "([^"]+)"', html) - if not match: - return html - return base64.b64decode(match.group(1)).decode('utf-8') +def mask_email(email): + if '@' not in email: + return email + local, domain = email.split('@', 1) + if len(local) <= 2: + return f"{local[0]}***@{domain}" + return f"{local[0]}***{local[-1]}@{domain}" -def parse_json_response(response): - try: - return response.json() - except Exception: - return None - -def load_cookie_string(session, cookie_string): - for item in cookie_string.split(';'): - if '=' not in item: - continue - name, value = item.split('=', 1) - name = name.strip() - value = value.strip() - if name: - session.cookies.set(name, value) - -def compact_text(text): - return re.sub(r'\s+', ' ', text).strip() - -def split_flow_text(text): - match = re.search(r'(\d+(?:\.\d+)?)\s*(B|KB|MB|GB|TB|PB|KiB|MiB|GiB|TiB)', text, re.IGNORECASE) - if not match: - return None - return match.group(1), match.group(2).upper() - -def extract_remaining_flow(decoded_content): - soup = BeautifulSoup(decoded_content, 'html.parser') - - # Prefer the card/box containing the label, but do not depend on one fixed class name. - label_nodes = [node for node in soup.find_all(string=re.compile(r'剩余流量|剩余.*流量'))] - for label in label_nodes: - current = label.parent - for _ in range(5): - if not current: - break - candidate = split_flow_text(compact_text(current.get_text(' ', strip=True))) - if candidate: - return candidate - current = current.parent - - text = compact_text(soup.get_text(' ', strip=True)) - patterns = [ - r'(?:剩余流量|剩余.*?流量|可用流量|流量余额)[^0-9]{0,80}(\d+(?:\.\d+)?)\s*(B|KB|MB|GB|TB|PB|KiB|MiB|GiB|TiB)', - r'(\d+(?:\.\d+)?)\s*(B|KB|MB|GB|TB|PB|KiB|MiB|GiB|TiB)[^\n]{0,80}(?:剩余流量|剩余.*?流量|可用流量|流量余额)', - ] - for pattern in patterns: - match = re.search(pattern, text, re.IGNORECASE) - if match: - return match.group(1), match.group(2).upper() - - # Some pages render counters from inline scripts or JSON-like snippets. - script_candidate = split_flow_text(decoded_content) - if script_candidate and re.search(r'剩余流量|unused|remain|traffic|transfer', decoded_content, re.IGNORECASE): - return script_candidate - - return None def extract_domains_from_content(content): - """ - 从网页内容中提取可用域名 - """ + """从网页内容中提取可用域名""" domains = [] - - # 多种域名提取模式 patterns = [ - # 匹配

新域名: xxx.com

或类似格式 r']*>.*?(?:域名|domain|新域名|最新域名)[::]\s*([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', - # 匹配JavaScript中的跳转域名 r'(?:location\.href|window\.location)\s*=\s*["\']https?://([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', - # 匹配登录链接 r'https?://([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})/auth/login', - # 匹配任何完整的链接 r'https?://([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', - # 匹配文本中的域名描述 r'(?:域名|domain|网址|地址)[::\s]*([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', - # 匹配ikuuu相关域名 r'(ikuuu\.[a-zA-Z0-9.-]+)', ] - + for pattern in patterns: matches = re.findall(pattern, content, re.IGNORECASE | re.MULTILINE) for match in matches: domain = match.strip().lower() - # 过滤掉明显不是域名的内容 - if (domain and - '.' in domain and - not domain.startswith('.') and - not domain.endswith('.') and - len(domain) > 3 and - len(domain) < 50 and - not any(char in domain for char in [' ', '\n', '\t', '<', '>', '"', "'"])): + if (domain and '.' in domain and not domain.startswith('.') and + not domain.endswith('.') and 3 < len(domain) < 50 and + not any(char in domain for char in [' ', '\n', '\t', '<', '>', '"', "'"])): domains.append(domain) - - # 去重并返回 + return list(set(domains)) + def get_available_domains_from_old_domain(old_domain): - """ - 从旧域名页面获取新的可用域名 - """ + """从旧域名页面获取新的可用域名""" available_domains = [] - try: print(f"🔍 从域名 {old_domain} 获取新域名信息...") - response = requests.get(f"https://{old_domain}/", - headers={"User-Agent": USER_AGENT}, - timeout=15, - allow_redirects=True) - + response = requests.get( + f"https://{old_domain}/", + headers={"User-Agent": USER_AGENT}, + timeout=15, + allow_redirects=True + ) if response.status_code == 200: - content = response.text - - # 检查是否包含域名变更信息 - change_indicators = [ - '官网域名已更改', 'Domain deprecated', '域名已更新', - '新域名', '最新域名', '域名变更', '网站已迁移' - ] - - has_change_info = any(indicator in content for indicator in change_indicators) - - if has_change_info: - print("✅ 检测到域名变更通知") - domains = extract_domains_from_content(content) - available_domains.extend(domains) - else: - print("ℹ️ 未检测到域名变更通知,但尝试解析可能的域名") - domains = extract_domains_from_content(content) - # 只保留ikuuu相关域名 - ikuuu_domains = [d for d in domains if 'ikuuu' in d] - available_domains.extend(ikuuu_domains) - + domains = extract_domains_from_content(response.text) + available_domains.extend([d for d in domains if 'ikuuu' in d]) else: print(f"⚠️ 域名 {old_domain} 返回状态码: {response.status_code}") - except requests.exceptions.Timeout: print(f"⏰ 域名 {old_domain} 请求超时") except requests.exceptions.ConnectionError: print(f"🔌 域名 {old_domain} 连接失败") except Exception as e: print(f"❌ 检查域名 {old_domain} 时出错: {e}") - return available_domains + def get_latest_ikun_host(): - """ - 获取最新可用域名 - """ - # 首先检查当前域名 - test_url = f"https://{ikun_host}/" + """获取最新可用域名""" try: - response = requests.get(test_url, headers={"User-Agent": USER_AGENT}, timeout=10) + response = requests.get(f"https://{ikun_host}/", headers={"User-Agent": USER_AGENT}, timeout=10) if response.status_code == 200: - # 检查是否有域名变更通知 - change_indicators = [ - '官网域名已更改', 'Domain deprecated', '域名已更新', - '新域名', '最新域名', '域名变更' - ] - + change_indicators = ['官网域名已更改', 'Domain deprecated', '域名已更新', '新域名', '最新域名', '域名变更'] if any(indicator in response.text for indicator in change_indicators): print("🔄 检测到域名变更通知,正在提取新域名...") domains = extract_domains_from_content(response.text) - - # 优先返回ikuuu相关域名 for domain in domains: if 'ikuuu' in domain and domain != ikun_host: print(f"🎯 找到新域名: {domain}") return domain - - # 如果没有ikuuu域名,返回第一个有效域名 - if domains: - print(f"🎯 找到域名: {domains[0]}") - return domains[0] - - print("⚠️ 检测到域名变更但无法提取新域名") - return None else: print("✅ 当前域名正常") - return None except Exception as e: print(f"🔍 当前域名检测异常: {e}") - return None + def update_self_host(new_host): - """ - 更新脚本中的域名 - """ + """更新脚本中的域名""" script_path = os.path.abspath(__file__) try: with open(script_path, "r", encoding="utf-8") as f: lines = f.readlines() - + updated = False for i, line in enumerate(lines): if line.strip().startswith("ikun_host = "): - lines[i] = f'ikun_host = "{new_host}" # 自动更新于{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\n' + now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + lines[i] = f'ikun_host = "{new_host}" # 自动更新于{now}\n' updated = True break - + if updated: with open(script_path, "w", encoding="utf-8") as f: f.writelines(lines) print(f"✅ 脚本已更新至域名: {new_host}") return True - else: - print("⚠️ 未找到域名配置行,无法自动更新") - return False + + print("⚠️ 未找到域名配置行,无法自动更新") + return False except Exception as e: print(f"⚠️ 域名更新失败: {e}") return False + def test_host_reachable(host): - """ - 测试域名是否可达 - """ + """测试域名是否可达""" try: print(f"🔗 测试域名: {host}") - response = requests.get(f"https://{host}/", - headers={"User-Agent": USER_AGENT}, - timeout=10) + response = requests.get(f"https://{host}/", headers={"User-Agent": USER_AGENT}, timeout=10) if response.status_code == 200: print(f"✅ 域名 {host} 可用") return True - else: - print(f"⚠️ 域名 {host} 返回状态码: {response.status_code}") - return False + print(f"⚠️ 域名 {host} 返回状态码: {response.status_code}") except Exception as e: print(f"❌ 域名 {host} 不可用: {e}") - return False + return False + def find_working_domain(): - """ - 寻找可用的域名 - """ + """寻找可用的域名""" global ikun_host - - # 1. 首先检查当前域名 print(f"🏠 当前域名: {ikun_host}") if test_host_reachable(ikun_host): return ikun_host - - # 2. 从当前域名和备用域名中获取新域名信息 - all_domains_to_check = [ikun_host] + backup_hosts + discovered_domains = [] - - for domain in all_domains_to_check: - new_domains = get_available_domains_from_old_domain(domain) - discovered_domains.extend(new_domains) - - # 去重 + for domain in [ikun_host] + backup_hosts: + discovered_domains.extend(get_available_domains_from_old_domain(domain)) + discovered_domains = list(set(discovered_domains)) print(f"🔍 发现的域名: {discovered_domains}") - - # 3. 测试发现的域名 for domain in discovered_domains: if domain != ikun_host and test_host_reachable(domain): - print(f"🎉 找到可用域名: {domain}") ikun_host = domain - # 尝试更新脚本 update_self_host(domain) return domain - - # 4. 测试备用域名 + print("🔄 测试备用域名列表...") for host in backup_hosts: if host != ikun_host and test_host_reachable(host): - print(f"🎉 备用域名可用: {host}") ikun_host = host return host - - # 5. 都不可用 + print("❌ 所有域名均不可用") return None -def get_remaining_flow(session): - """获取用户剩余流量信息""" - user_url = f'https://{ikun_host}/user' - try: - user_page = session.get(user_url, headers=build_headers(), timeout=20) - if user_page.status_code != 200: - return "获取流量失败", "状态码: " + str(user_page.status_code) - - decoded_content = decode_origin_body(user_page.text) - if re.search(r'/auth/login|id="email"|id="password"|登录\s*—\s*iKuuu', decoded_content, re.IGNORECASE): - return "Cookie可能已过期", "无法获取" - flow = extract_remaining_flow(decoded_content) - if flow: - return flow - - return "未找到", "流量信息" - - except Exception as e: - return "流量获取异常", str(e) +def parse_accounts(): + account_str = os.getenv('IKUUU_ACCOUNTS') + if not account_str: + print("❌ 未找到环境变量 IKUUU_ACCOUNTS") + sys.exit(1) -def ikuuu_signin_with_cookie(account_name, cookie_string): - session = requests.Session() - load_cookie_string(session, cookie_string) - try: - flow_value, flow_unit = get_remaining_flow(session) - - checkin_res = session.post( - f'https://{ikun_host}/user/checkin', - headers=build_headers(f'https://{ikun_host}/user'), - timeout=20 - ) - if checkin_res.status_code != 200: - return False, f"签到失败(状态码{checkin_res.status_code})", flow_value, flow_unit - - checkin_data = parse_json_response(checkin_res) - if checkin_data is None: - return False, "签到响应解析失败,请检查 Cookie 是否已过期", flow_value, flow_unit - - flow_value, flow_unit = get_remaining_flow(session) - - if checkin_data.get('ret') == 1: - return True, f"成功 | {checkin_data.get('msg', '')}", flow_value, flow_unit - return False, f"签到失败:{checkin_data.get('msg', '未知错误')}", flow_value, flow_unit - except requests.exceptions.Timeout: - return False, "请求超时", "未知", "未知" - except Exception as e: - return False, f"请求异常:{str(e)}", "未知", "未知" - -def parse_cookie_accounts(cookie_str): accounts = [] - for index, line in enumerate(cookie_str.strip().splitlines(), 1): - line = line.strip() - if not line: + for line in account_str.strip().splitlines(): + if ':' not in line: + print(f"⚠️ 忽略无效账户行: {line}") continue + email, password = line.split(':', 1) + email = email.strip() + password = password.strip() + if email and password: + accounts.append((email, password)) + else: + print(f"⚠️ 忽略无效账户行: {line}") - name = f"Cookie账号{index}" - cookie = line - if '|' in line: - name, cookie = line.split('|', 1) - name = name.strip() or name - cookie = cookie.strip() + if not accounts: + print("❌ 未找到有效账户") + sys.exit(1) - if '=' not in cookie: - print(f"⚠️ 忽略无效 Cookie 行: {line}") - continue - - accounts.append({ - 'type': 'cookie', - 'name': name, - 'cookie': cookie, - }) + print(f"📋 找到 {len(accounts)} 个账户") return accounts + +def find_browser_path(): + env_names = ['IKUUU_BROWSER_PATH', 'CHROME_PATH', 'CHROMIUM_PATH', 'BROWSER_PATH'] + for env_name in env_names: + env_path = os.getenv(env_name) + if env_path: + if os.path.exists(env_path): + return env_path + print(f"⚠️ {env_name} 指定的浏览器不存在: {env_path}") + + executable_names = ['chromium-browser', 'chromium', 'google-chrome', 'google-chrome-stable', 'chrome'] + for name in executable_names: + path = shutil.which(name) + if path: + return path + + common_paths = [ + '/usr/bin/chromium-browser', + '/usr/bin/chromium', + '/usr/bin/google-chrome', + '/usr/bin/google-chrome-stable', + '/opt/google/chrome/chrome', + r'C:\Program Files\Google\Chrome\Application\chrome.exe', + r'C:\Program Files (x86)\Google\Chrome\Application\chrome.exe', + ] + for path in common_paths: + if os.path.exists(path): + return path + + return None + + +def print_browser_install_help(): + print("❌ 未找到Chrome/Chromium浏览器可执行文件") + print("请进入青龙容器安装浏览器,或设置 IKUUU_BROWSER_PATH 指向已有浏览器路径。") + print("常用检查命令:") + print(" which chromium chromium-browser google-chrome google-chrome-stable chrome") + print("Debian/Ubuntu容器可尝试:") + print(" apt-get update && apt-get install -y chromium") + print("Alpine容器可尝试:") + print(" apk add --no-cache chromium") + print("安装后在青龙环境变量中设置,例如:") + print(" IKUUU_BROWSER_PATH=/usr/bin/chromium") + + +def init_browser(): + try: + from DrissionPage import ChromiumOptions, ChromiumPage + except ImportError: + print("❌ 缺少依赖 DrissionPage,请在青龙依赖管理中安装") + raise + + co = ChromiumOptions() + browser_path = find_browser_path() + if not browser_path: + print_browser_install_help() + sys.exit(1) + + co.set_browser_path(browser_path) + print(f"✅ 使用浏览器: {browser_path}") + + co.set_argument('--no-sandbox') + co.set_argument('--disable-dev-shm-usage') + co.set_argument('--disable-gpu') + co.set_argument('--headless=new') + co.set_argument('--window-size=1920,1080') + co.set_argument('--disable-blink-features=AutomationControlled') + co.set_argument('--disable-infobars') + co.set_user_agent(USER_AGENT) + + page = ChromiumPage(co) + print("✅ 浏览器初始化成功") + return page + + +def click_geetest(page): + """点击极验验证按钮。该步骤依赖站点当前是否允许无交互验证通过。""" + try: + print("⏳ 等待验证码加载...") + time.sleep(8) + selectors = [ + '.geetest_btn_click', + '@aria-label=点击按钮开始验证', + '.geetest_btn', + '.embed-captcha', + ] + for selector in selectors: + try: + btn = page.ele(selector, timeout=3) + if btn and btn.states.is_displayed: + print(f"✅ 找到验证按钮: {selector}") + btn.click() + time.sleep(5) + return True + except Exception: + continue + + try: + for div in page.eles('tag:div')[:120]: + cls = div.attr('class') or '' + if 'geetest' in cls.lower() and div.states.is_displayed: + div.click() + time.sleep(5) + return True + except Exception: + pass + + print("⚠️ 未找到验证按钮") + return False + except Exception as e: + print(f"⚠️ 点击验证按钮失败: {e}") + return False + + +def logout(page, site_url): + try: + page.get(f'{site_url}/user/logout') + time.sleep(3) + page.run_js('document.cookie.split(";").forEach(c => document.cookie = c.trim().split("=")[0] + "=;expires=Thu, 01 Jan 1970 00:00:00 GMT;path=/")') + print("✅ 已退出登录") + except Exception as e: + print(f"⚠️ 退出登录失败: {e}") + + +def get_logged_email(page, site_url): + try: + page.get(f'{site_url}/user/profile') + time.sleep(3) + email_ele = page.ele('tag:input@@id():email', timeout=5) + if email_ele: + return email_ele.attr('value') or email_ele.text + except Exception: + pass + return None + + +def login(page, site_url, email, password): + try: + masked = mask_email(email) + print(f"🔐 正在登录: {masked}") + page.get(f'{site_url}/user') + time.sleep(5) + + if '/auth/login' not in page.url: + logged_email = get_logged_email(page, site_url) + if logged_email and logged_email.lower() == email.lower(): + print(f"✅ 当前已登录目标账号: {masked}") + return True + print("ℹ️ 当前登录账号不一致,正在退出...") + logout(page, site_url) + page.get(f'{site_url}/user') + time.sleep(5) + + email_input = page.ele('#email', timeout=20) + if not email_input: + print("❌ 未找到邮箱输入框") + return False + email_input.clear() + email_input.input(email) + + password_input = page.ele('#password', timeout=10) + if not password_input: + print("❌ 未找到密码输入框") + return False + password_input.clear() + password_input.input(password) + + click_geetest(page) + + login_btn = page.ele('tag:button@@text():Login', timeout=5) or page.ele('.login', timeout=5) + if not login_btn: + print("❌ 未找到登录按钮") + return False + login_btn.click() + time.sleep(6) + + if '/user' in page.url and '/auth/login' not in page.url: + print("✅ 登录成功") + return True + + error_msg = page.ele('.swal2-html-container', timeout=3) or page.ele('.swal-text', timeout=1) + if error_msg: + print(f"❌ 登录失败: {error_msg.text}") + else: + print("❌ 登录失败,请检查账号密码或验证码状态") + return False + except Exception as e: + print(f"❌ 登录过程出错: {e}") + return False + + +def get_traffic_info(page): + try: + traffic_card = page.ele('tag:h4@@text():剩余流量', timeout=5) + if traffic_card: + current = traffic_card.parent() + for _ in range(5): + if not current: + break + text = current.text or '' + remaining_match = re.search(r'(\d+(?:\.\d+)?)\s*(B|KB|MB|GB|TB|PB)', text, re.IGNORECASE) + today_match = re.search(r'今日[^\d]*(\d+(?:\.\d+)?)\s*(B|KB|MB|GB|TB|PB)', text, re.IGNORECASE) + if remaining_match: + remaining = f"{remaining_match.group(1)} {remaining_match.group(2).upper()}" + today_used = '未知' + if today_match: + today_used = f"{today_match.group(1)} {today_match.group(2).upper()}" + return remaining, today_used + current = current.parent() + + page_text = page.ele('tag:body', timeout=5).text + remaining_match = re.search(r'(?:剩余流量|可用流量|流量余额)[^\d]{0,80}(\d+(?:\.\d+)?)\s*(B|KB|MB|GB|TB|PB)', page_text, re.IGNORECASE) + today_match = re.search(r'(?:今日已用|今日使用|已用流量)[^\d]{0,80}(\d+(?:\.\d+)?)\s*(B|KB|MB|GB|TB|PB)', page_text, re.IGNORECASE) + remaining = f"{remaining_match.group(1)} {remaining_match.group(2).upper()}" if remaining_match else '未知' + today_used = f"{today_match.group(1)} {today_match.group(2).upper()}" if today_match else '未知' + return remaining, today_used + except Exception as e: + print(f"⚠️ 获取流量信息失败: {e}") + return '未知', '未知' + + +def checkin(page, site_url): + try: + if '/user' not in page.url: + page.get(f'{site_url}/user') + time.sleep(3) + + checkin_div = page.ele('#checkin-div', timeout=5) + if checkin_div and ('已签到' in checkin_div.text or '明日再来' in checkin_div.text): + print("✅ 今日已签到") + return True, '今日已签到' + + try: + page.run_js('checkin()') + time.sleep(5) + msg = page.ele('.swal2-html-container', timeout=10) or page.ele('.swal-text', timeout=1) + if msg: + print(f"✅ 签到结果: {msg.text}") + return True, msg.text + + checkin_div = page.ele('#checkin-div', timeout=3) + if checkin_div and ('已签到' in checkin_div.text or '已签' in checkin_div.text): + return True, '签到成功' + return True, '签到成功' + except Exception as e: + print(f"⚠️ JS签到失败: {e}") + return False, '签到失败' + except Exception as e: + print(f"❌ 签到过程出错: {e}") + return False, str(e) + + def send_qinglong_notification(results, current_domain): - """ - 使用青龙面板内置通知系统发送通知 - 需要青龙面板已配置通知渠道(如钉钉、企业微信等) - """ title = "iKuuu签到通知" - - # 构建消息内容 success_count = sum(1 for res in results if res['success']) failure_count = len(results) - success_count - message = [ f"🔔 签到完成 | 成功:{success_count} 失败:{failure_count}", f"🌐 当前域名:{current_domain}", "================================" ] - + for index, res in enumerate(results, 1): status = "✅ 成功" if res['success'] else "❌ 失败" message.append(f"{index}. {res['email']}") message.append(f" 状态:{status}") message.append(f" 详情:{res['message']}") - message.append(f" 剩余流量:{res['flow_value']} {res['flow_unit']}") + message.append(f" 剩余流量:{res['remaining_flow']}") + message.append(f" 今日已用:{res['today_used']}") message.append("--------------------------------") - - # 添加统计信息 + message.append("\n🕒 执行时间:" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) - try: - # 发送通知(青龙自动处理多通知渠道) send(title, "\n".join(message)) print("✅ 通知已发送") except Exception as e: print(f"⚠️ 通知发送失败,请检查通知配置: {str(e)}") -if __name__ == "__main__": + +def main(): print("🚀 iKuuu签到脚本启动") print("=" * 50) - - # ==================== 域名检查和更新 ==================== - # 首先检查是否有域名更新通知 + latest_host = get_latest_ikun_host() if latest_host and latest_host != ikun_host: print(f"🔄 检测到新域名: {latest_host}") if update_self_host(latest_host): - ikun_host = latest_host - - # 寻找可用域名 + globals()['ikun_host'] = latest_host + working_domain = find_working_domain() if not working_domain: print("💥 无法找到可用域名,脚本退出") - exit(1) - + sys.exit(1) + + site_url = f"https://{working_domain}" print(f"🎯 使用域名: {working_domain}") print("=" * 50) - - # ==================== 账户处理 ==================== - cookie_str = os.getenv('IKUUU_COOKIES') - if not cookie_str: - print("❌ 未找到环境变量 IKUUU_COOKIES") - exit(1) - accounts = parse_cookie_accounts(cookie_str) - - if not accounts: - print("❌ 未找到有效账户") - exit(1) - - print(f"📋 找到 {len(accounts)} 个账户") - - # ==================== 执行签到 ==================== + accounts = parse_accounts() + page = None results = [] - for index, account in enumerate(accounts, 1): - print(f"\n👤 [{index}/{len(accounts)}] 处理账户: {account['name']}") - success, msg, flow_value, flow_unit = ikuuu_signin_with_cookie(account['name'], account['cookie']) - results.append({ - 'email': account['name'], - 'success': success, - 'message': msg, - 'flow_value': flow_value, - 'flow_unit': flow_unit - }) - status_icon = "✅" if success else "❌" - print(f" {status_icon} 结果: {msg}") - print(f" 📊 剩余流量: {flow_value} {flow_unit}") - - # 账户间延迟防止请求过快 - if index < len(accounts): # 最后一个账户不需要延迟 - time.sleep(2) - - # ==================== 结果通知 ==================== + try: + page = init_browser() + for index, (email, password) in enumerate(accounts, 1): + masked_email = mask_email(email) + print(f"\n👤 [{index}/{len(accounts)}] 处理账户: {masked_email}") + result = { + 'email': masked_email, + 'success': False, + 'message': '登录失败', + 'remaining_flow': '未知', + 'today_used': '未知' + } + + if login(page, site_url, email, password): + success, msg = checkin(page, site_url) + remaining, today_used = get_traffic_info(page) + result.update({ + 'success': success, + 'message': msg, + 'remaining_flow': remaining, + 'today_used': today_used + }) + print(f" 📊 剩余流量: {remaining}") + print(f" 📈 今日已用: {today_used}") + logout(page, site_url) + + results.append(result) + status_icon = "✅" if result['success'] else "❌" + print(f" {status_icon} 结果: {result['message']}") + + if index < len(accounts): + time.sleep(2) + finally: + if page: + try: + page.quit() + except Exception: + pass + print("\n📢 正在发送通知...") send_qinglong_notification(results, working_domain) - - # ==================== 本地结果输出 ==================== + print("\n📊 签到结果汇总:") print("=" * 50) success_count = sum(1 for res in results if res['success']) @@ -516,12 +576,20 @@ if __name__ == "__main__": print(f"❌ 失败: {len(results) - success_count}") print(f"🌐 使用域名: {working_domain}") print("=" * 50) - + for res in results: status_icon = "✅" if res['success'] else "❌" print(f"{status_icon} {res['email']}") print(f" 详情: {res['message']}") - print(f" 流量: {res['flow_value']} {res['flow_unit']}") - + print(f" 剩余流量: {res['remaining_flow']}") + print(f" 今日已用: {res['today_used']}") + print("=" * 50) print("🏁 脚本执行完成") + + if any(not res['success'] for res in results): + sys.exit(1) + + +if __name__ == "__main__": + main()