qinglong-scripts/iKuuu/ik_signin.py
2026-06-06 14:56:09 +08:00

596 lines
20 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
任务名称
name: iKuuu签到
定时规则
cron: 0 0 8 * * ?
"""
import datetime
import os
import re
import shutil
import sys
import time
import requests
try:
sys.stdout.reconfigure(encoding="utf-8")
except Exception:
pass
# 添加青龙脚本根目录到Python路径
QL_SCRIPTS_DIR = '/ql/scripts'
sys.path.append(QL_SCRIPTS_DIR)
# 添加notify可能存在的其他路径
POSSIBLE_PATHS = [
'/ql',
'/ql/data/scripts',
'/ql/scripts/notify',
os.path.dirname(__file__)
]
for path in POSSIBLE_PATHS:
if os.path.exists(os.path.join(path, 'notify.py')):
sys.path.append(path)
break
try:
from notify import send
except ImportError:
print("⚠️ 无法加载通知模块,请检查路径配置")
send = lambda title, content: None
ikun_host = "ikuuu.win" # 自动更新于2026-01-17 08:00:07
backup_hosts = ["ikuuu.win", "ikuuu.fyi", "ikuuu.one", "ikuuu.pw", "ikuuu.me", "ikuuu.cc"]
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
def mask_email(email):
if '@' not in email:
return email
local, domain = email.split('@', 1)
if len(local) <= 2:
return f"{local[0]}***@{domain}"
return f"{local[0]}***{local[-1]}@{domain}"
def extract_domains_from_content(content):
"""从网页内容中提取可用域名"""
domains = []
patterns = [
r'<h[1-6][^>]*>.*?(?:域名|domain|新域名|最新域名)[:]\s*([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})',
r'(?:location\.href|window\.location)\s*=\s*["\']https?://([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})',
r'https?://([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})/auth/login',
r'https?://([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})',
r'(?:域名|domain|网址|地址)[:\s]*([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})',
r'(ikuuu\.[a-zA-Z0-9.-]+)',
]
for pattern in patterns:
matches = re.findall(pattern, content, re.IGNORECASE | re.MULTILINE)
for match in matches:
domain = match.strip().lower()
if (domain and '.' in domain and not domain.startswith('.') and
not domain.endswith('.') and 3 < len(domain) < 50 and
not any(char in domain for char in [' ', '\n', '\t', '<', '>', '"', "'"])):
domains.append(domain)
return list(set(domains))
def get_available_domains_from_old_domain(old_domain):
"""从旧域名页面获取新的可用域名"""
available_domains = []
try:
print(f"🔍 从域名 {old_domain} 获取新域名信息...")
response = requests.get(
f"https://{old_domain}/",
headers={"User-Agent": USER_AGENT},
timeout=15,
allow_redirects=True
)
if response.status_code == 200:
domains = extract_domains_from_content(response.text)
available_domains.extend([d for d in domains if 'ikuuu' in d])
else:
print(f"⚠️ 域名 {old_domain} 返回状态码: {response.status_code}")
except requests.exceptions.Timeout:
print(f"⏰ 域名 {old_domain} 请求超时")
except requests.exceptions.ConnectionError:
print(f"🔌 域名 {old_domain} 连接失败")
except Exception as e:
print(f"❌ 检查域名 {old_domain} 时出错: {e}")
return available_domains
def get_latest_ikun_host():
"""获取最新可用域名"""
try:
response = requests.get(f"https://{ikun_host}/", headers={"User-Agent": USER_AGENT}, timeout=10)
if response.status_code == 200:
change_indicators = ['官网域名已更改', 'Domain deprecated', '域名已更新', '新域名', '最新域名', '域名变更']
if any(indicator in response.text for indicator in change_indicators):
print("🔄 检测到域名变更通知,正在提取新域名...")
domains = extract_domains_from_content(response.text)
for domain in domains:
if 'ikuuu' in domain and domain != ikun_host:
print(f"🎯 找到新域名: {domain}")
return domain
else:
print("✅ 当前域名正常")
except Exception as e:
print(f"🔍 当前域名检测异常: {e}")
return None
def update_self_host(new_host):
"""更新脚本中的域名"""
script_path = os.path.abspath(__file__)
try:
with open(script_path, "r", encoding="utf-8") as f:
lines = f.readlines()
updated = False
for i, line in enumerate(lines):
if line.strip().startswith("ikun_host = "):
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
lines[i] = f'ikun_host = "{new_host}" # 自动更新于{now}\n'
updated = True
break
if updated:
with open(script_path, "w", encoding="utf-8") as f:
f.writelines(lines)
print(f"✅ 脚本已更新至域名: {new_host}")
return True
print("⚠️ 未找到域名配置行,无法自动更新")
return False
except Exception as e:
print(f"⚠️ 域名更新失败: {e}")
return False
def test_host_reachable(host):
"""测试域名是否可达"""
try:
print(f"🔗 测试域名: {host}")
response = requests.get(f"https://{host}/", headers={"User-Agent": USER_AGENT}, timeout=10)
if response.status_code == 200:
print(f"✅ 域名 {host} 可用")
return True
print(f"⚠️ 域名 {host} 返回状态码: {response.status_code}")
except Exception as e:
print(f"❌ 域名 {host} 不可用: {e}")
return False
def find_working_domain():
"""寻找可用的域名"""
global ikun_host
print(f"🏠 当前域名: {ikun_host}")
if test_host_reachable(ikun_host):
return ikun_host
discovered_domains = []
for domain in [ikun_host] + backup_hosts:
discovered_domains.extend(get_available_domains_from_old_domain(domain))
discovered_domains = list(set(discovered_domains))
print(f"🔍 发现的域名: {discovered_domains}")
for domain in discovered_domains:
if domain != ikun_host and test_host_reachable(domain):
ikun_host = domain
update_self_host(domain)
return domain
print("🔄 测试备用域名列表...")
for host in backup_hosts:
if host != ikun_host and test_host_reachable(host):
ikun_host = host
return host
print("❌ 所有域名均不可用")
return None
def parse_accounts():
account_str = os.getenv('IKUUU_ACCOUNTS')
if not account_str:
print("❌ 未找到环境变量 IKUUU_ACCOUNTS")
sys.exit(1)
accounts = []
for line in account_str.strip().splitlines():
if ':' not in line:
print(f"⚠️ 忽略无效账户行: {line}")
continue
email, password = line.split(':', 1)
email = email.strip()
password = password.strip()
if email and password:
accounts.append((email, password))
else:
print(f"⚠️ 忽略无效账户行: {line}")
if not accounts:
print("❌ 未找到有效账户")
sys.exit(1)
print(f"📋 找到 {len(accounts)} 个账户")
return accounts
def find_browser_path():
env_names = ['IKUUU_BROWSER_PATH', 'CHROME_PATH', 'CHROMIUM_PATH', 'BROWSER_PATH']
for env_name in env_names:
env_path = os.getenv(env_name)
if env_path:
if os.path.exists(env_path):
return env_path
print(f"⚠️ {env_name} 指定的浏览器不存在: {env_path}")
executable_names = ['chromium-browser', 'chromium', 'google-chrome', 'google-chrome-stable', 'chrome']
for name in executable_names:
path = shutil.which(name)
if path:
return path
common_paths = [
'/usr/bin/chromium-browser',
'/usr/bin/chromium',
'/usr/bin/google-chrome',
'/usr/bin/google-chrome-stable',
'/opt/google/chrome/chrome',
r'C:\Program Files\Google\Chrome\Application\chrome.exe',
r'C:\Program Files (x86)\Google\Chrome\Application\chrome.exe',
]
for path in common_paths:
if os.path.exists(path):
return path
return None
def print_browser_install_help():
print("❌ 未找到Chrome/Chromium浏览器可执行文件")
print("请进入青龙容器安装浏览器,或设置 IKUUU_BROWSER_PATH 指向已有浏览器路径。")
print("常用检查命令:")
print(" which chromium chromium-browser google-chrome google-chrome-stable chrome")
print("Debian/Ubuntu容器可尝试:")
print(" apt-get update && apt-get install -y chromium")
print("Alpine容器可尝试:")
print(" apk add --no-cache chromium")
print("安装后在青龙环境变量中设置,例如:")
print(" IKUUU_BROWSER_PATH=/usr/bin/chromium")
def init_browser():
try:
from DrissionPage import ChromiumOptions, ChromiumPage
except ImportError:
print("❌ 缺少依赖 DrissionPage请在青龙依赖管理中安装")
raise
co = ChromiumOptions()
browser_path = find_browser_path()
if not browser_path:
print_browser_install_help()
sys.exit(1)
co.set_browser_path(browser_path)
print(f"✅ 使用浏览器: {browser_path}")
co.set_argument('--no-sandbox')
co.set_argument('--disable-dev-shm-usage')
co.set_argument('--disable-gpu')
co.set_argument('--headless=new')
co.set_argument('--window-size=1920,1080')
co.set_argument('--disable-blink-features=AutomationControlled')
co.set_argument('--disable-infobars')
co.set_user_agent(USER_AGENT)
page = ChromiumPage(co)
print("✅ 浏览器初始化成功")
return page
def click_geetest(page):
"""点击极验验证按钮。该步骤依赖站点当前是否允许无交互验证通过。"""
try:
print("⏳ 等待验证码加载...")
time.sleep(8)
selectors = [
'.geetest_btn_click',
'@aria-label=点击按钮开始验证',
'.geetest_btn',
'.embed-captcha',
]
for selector in selectors:
try:
btn = page.ele(selector, timeout=3)
if btn and btn.states.is_displayed:
print(f"✅ 找到验证按钮: {selector}")
btn.click()
time.sleep(5)
return True
except Exception:
continue
try:
for div in page.eles('tag:div')[:120]:
cls = div.attr('class') or ''
if 'geetest' in cls.lower() and div.states.is_displayed:
div.click()
time.sleep(5)
return True
except Exception:
pass
print("⚠️ 未找到验证按钮")
return False
except Exception as e:
print(f"⚠️ 点击验证按钮失败: {e}")
return False
def logout(page, site_url):
try:
page.get(f'{site_url}/user/logout')
time.sleep(3)
page.run_js('document.cookie.split(";").forEach(c => document.cookie = c.trim().split("=")[0] + "=;expires=Thu, 01 Jan 1970 00:00:00 GMT;path=/")')
print("✅ 已退出登录")
except Exception as e:
print(f"⚠️ 退出登录失败: {e}")
def get_logged_email(page, site_url):
try:
page.get(f'{site_url}/user/profile')
time.sleep(3)
email_ele = page.ele('tag:input@@id():email', timeout=5)
if email_ele:
return email_ele.attr('value') or email_ele.text
except Exception:
pass
return None
def login(page, site_url, email, password):
try:
masked = mask_email(email)
print(f"🔐 正在登录: {masked}")
page.get(f'{site_url}/user')
time.sleep(5)
if '/auth/login' not in page.url:
logged_email = get_logged_email(page, site_url)
if logged_email and logged_email.lower() == email.lower():
print(f"✅ 当前已登录目标账号: {masked}")
return True
print(" 当前登录账号不一致,正在退出...")
logout(page, site_url)
page.get(f'{site_url}/user')
time.sleep(5)
email_input = page.ele('#email', timeout=20)
if not email_input:
print("❌ 未找到邮箱输入框")
return False
email_input.clear()
email_input.input(email)
password_input = page.ele('#password', timeout=10)
if not password_input:
print("❌ 未找到密码输入框")
return False
password_input.clear()
password_input.input(password)
click_geetest(page)
login_btn = page.ele('tag:button@@text():Login', timeout=5) or page.ele('.login', timeout=5)
if not login_btn:
print("❌ 未找到登录按钮")
return False
login_btn.click()
time.sleep(6)
if '/user' in page.url and '/auth/login' not in page.url:
print("✅ 登录成功")
return True
error_msg = page.ele('.swal2-html-container', timeout=3) or page.ele('.swal-text', timeout=1)
if error_msg:
print(f"❌ 登录失败: {error_msg.text}")
else:
print("❌ 登录失败,请检查账号密码或验证码状态")
return False
except Exception as e:
print(f"❌ 登录过程出错: {e}")
return False
def get_traffic_info(page):
try:
traffic_card = page.ele('tag:h4@@text():剩余流量', timeout=5)
if traffic_card:
current = traffic_card.parent()
for _ in range(5):
if not current:
break
text = current.text or ''
remaining_match = re.search(r'(\d+(?:\.\d+)?)\s*(B|KB|MB|GB|TB|PB)', text, re.IGNORECASE)
today_match = re.search(r'今日[^\d]*(\d+(?:\.\d+)?)\s*(B|KB|MB|GB|TB|PB)', text, re.IGNORECASE)
if remaining_match:
remaining = f"{remaining_match.group(1)} {remaining_match.group(2).upper()}"
today_used = '未知'
if today_match:
today_used = f"{today_match.group(1)} {today_match.group(2).upper()}"
return remaining, today_used
current = current.parent()
page_text = page.ele('tag:body', timeout=5).text
remaining_match = re.search(r'(?:剩余流量|可用流量|流量余额)[^\d]{0,80}(\d+(?:\.\d+)?)\s*(B|KB|MB|GB|TB|PB)', page_text, re.IGNORECASE)
today_match = re.search(r'(?:今日已用|今日使用|已用流量)[^\d]{0,80}(\d+(?:\.\d+)?)\s*(B|KB|MB|GB|TB|PB)', page_text, re.IGNORECASE)
remaining = f"{remaining_match.group(1)} {remaining_match.group(2).upper()}" if remaining_match else '未知'
today_used = f"{today_match.group(1)} {today_match.group(2).upper()}" if today_match else '未知'
return remaining, today_used
except Exception as e:
print(f"⚠️ 获取流量信息失败: {e}")
return '未知', '未知'
def checkin(page, site_url):
try:
if '/user' not in page.url:
page.get(f'{site_url}/user')
time.sleep(3)
checkin_div = page.ele('#checkin-div', timeout=5)
if checkin_div and ('已签到' in checkin_div.text or '明日再来' in checkin_div.text):
print("✅ 今日已签到")
return True, '今日已签到'
try:
page.run_js('checkin()')
time.sleep(5)
msg = page.ele('.swal2-html-container', timeout=10) or page.ele('.swal-text', timeout=1)
if msg:
print(f"✅ 签到结果: {msg.text}")
return True, msg.text
checkin_div = page.ele('#checkin-div', timeout=3)
if checkin_div and ('已签到' in checkin_div.text or '已签' in checkin_div.text):
return True, '签到成功'
return True, '签到成功'
except Exception as e:
print(f"⚠️ JS签到失败: {e}")
return False, '签到失败'
except Exception as e:
print(f"❌ 签到过程出错: {e}")
return False, str(e)
def send_qinglong_notification(results, current_domain):
title = "iKuuu签到通知"
success_count = sum(1 for res in results if res['success'])
failure_count = len(results) - success_count
message = [
f"🔔 签到完成 | 成功:{success_count} 失败:{failure_count}",
f"🌐 当前域名:{current_domain}",
"================================"
]
for index, res in enumerate(results, 1):
status = "✅ 成功" if res['success'] else "❌ 失败"
message.append(f"{index}. {res['email']}")
message.append(f" 状态:{status}")
message.append(f" 详情:{res['message']}")
message.append(f" 剩余流量:{res['remaining_flow']}")
message.append(f" 今日已用:{res['today_used']}")
message.append("--------------------------------")
message.append("\n🕒 执行时间:" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
try:
send(title, "\n".join(message))
print("✅ 通知已发送")
except Exception as e:
print(f"⚠️ 通知发送失败,请检查通知配置: {str(e)}")
def main():
print("🚀 iKuuu签到脚本启动")
print("=" * 50)
latest_host = get_latest_ikun_host()
if latest_host and latest_host != ikun_host:
print(f"🔄 检测到新域名: {latest_host}")
if update_self_host(latest_host):
globals()['ikun_host'] = latest_host
working_domain = find_working_domain()
if not working_domain:
print("💥 无法找到可用域名,脚本退出")
sys.exit(1)
site_url = f"https://{working_domain}"
print(f"🎯 使用域名: {working_domain}")
print("=" * 50)
accounts = parse_accounts()
page = None
results = []
try:
page = init_browser()
for index, (email, password) in enumerate(accounts, 1):
masked_email = mask_email(email)
print(f"\n👤 [{index}/{len(accounts)}] 处理账户: {masked_email}")
result = {
'email': masked_email,
'success': False,
'message': '登录失败',
'remaining_flow': '未知',
'today_used': '未知'
}
if login(page, site_url, email, password):
success, msg = checkin(page, site_url)
remaining, today_used = get_traffic_info(page)
result.update({
'success': success,
'message': msg,
'remaining_flow': remaining,
'today_used': today_used
})
print(f" 📊 剩余流量: {remaining}")
print(f" 📈 今日已用: {today_used}")
logout(page, site_url)
results.append(result)
status_icon = "" if result['success'] else ""
print(f" {status_icon} 结果: {result['message']}")
if index < len(accounts):
time.sleep(2)
finally:
if page:
try:
page.quit()
except Exception:
pass
print("\n📢 正在发送通知...")
send_qinglong_notification(results, working_domain)
print("\n📊 签到结果汇总:")
print("=" * 50)
success_count = sum(1 for res in results if res['success'])
print(f"🎯 总账户数: {len(results)}")
print(f"✅ 成功: {success_count}")
print(f"❌ 失败: {len(results) - success_count}")
print(f"🌐 使用域名: {working_domain}")
print("=" * 50)
for res in results:
status_icon = "" if res['success'] else ""
print(f"{status_icon} {res['email']}")
print(f" 详情: {res['message']}")
print(f" 剩余流量: {res['remaining_flow']}")
print(f" 今日已用: {res['today_used']}")
print("=" * 50)
print("🏁 脚本执行完成")
if any(not res['success'] for res in results):
sys.exit(1)
if __name__ == "__main__":
main()