更新 iKuuu/ik_signin.py

This commit is contained in:
UPToZ 2025-06-01 17:31:43 +08:00
parent e7378139ea
commit 71d0873821

View File

@ -1,281 +1,277 @@
# -*- coding: utf-8 -* # -*- coding: utf-8 -*
''' '''
定时自定义 定时自定义
<<<<<<< .mine 0 0 1 * * ?
0 0 1 * * ? new Env('iKuuu签到');
======= '''
0 0 1 * * ? ik_signin.py import requests
>>>>>>> .theirs import re
new Env('iKuuu签到'); import json
''' import os
import requests import datetime
import re import urllib.parse
import json import sys
import os import time
import datetime from bs4 import BeautifulSoup
import urllib.parse
import sys # 添加青龙脚本根目录到Python路径
import time QL_SCRIPTS_DIR = '/ql/scripts' # 青龙脚本默认目录
from bs4 import BeautifulSoup sys.path.append(QL_SCRIPTS_DIR)
# 添加青龙脚本根目录到Python路径 # 添加notify可能存在的其他路径
QL_SCRIPTS_DIR = '/ql/scripts' # 青龙脚本默认目录 POSSIBLE_PATHS = [
sys.path.append(QL_SCRIPTS_DIR) '/ql', # 青龙根目录
'/ql/data/scripts', # 新版青龙数据目录
# 添加notify可能存在的其他路径 '/ql/scripts/notify', # 自定义通知目录
POSSIBLE_PATHS = [ os.path.dirname(__file__) # 当前脚本目录
'/ql', # 青龙根目录 ]
'/ql/data/scripts', # 新版青龙数据目录
'/ql/scripts/notify', # 自定义通知目录 for path in POSSIBLE_PATHS:
os.path.dirname(__file__) # 当前脚本目录 if os.path.exists(os.path.join(path, 'notify.py')):
] sys.path.append(path)
break
for path in POSSIBLE_PATHS:
if os.path.exists(os.path.join(path, 'notify.py')): try:
sys.path.append(path) from notify import send
break except ImportError:
print("⚠️ 无法加载通知模块,请检查路径配置")
try: send = lambda title, content: None # 创建空函数防止报错
from notify import send
except ImportError: # 初始域名
print("⚠️ 无法加载通知模块,请检查路径配置") ikun_host = "ikuuu.one" # 自动更新于2025-04-29 13:08:20
send = lambda title, content: None # 创建空函数防止报错 backup_hosts = ["ikuuu.one", "ikuuu.pw", "ikuuu.me"] # 备用域名列表
# 初始域名 def get_latest_ikun_host():
ikun_host = "ikuuu.one" # 自动更新于2025-04-29 13:08:20 test_url = f"https://{ikun_host}/"
backup_hosts = ["ikuuu.one", "ikuuu.pw", "ikuuu.me"] # 备用域名列表 try:
response = requests.get(test_url, timeout=10)
def get_latest_ikun_host(): if response.status_code == 200:
test_url = f"https://{ikun_host}/" if "官网域名已更改" in response.text or "Domain deprecated" in response.text:
try: print("检测到域名变更通知,正在提取新域名...")
response = requests.get(test_url, timeout=10) h2_matches = re.findall(r'<h2>.*?(?:域名|domain)[:]\s*([a-zA-Z0-9.-]+)</h2>', response.text)
if response.status_code == 200: if h2_matches:
if "官网域名已更改" in response.text or "Domain deprecated" in response.text: return h2_matches[0]
print("检测到域名变更通知,正在提取新域名...") js_matches = re.findall(r'https?://([a-zA-Z0-9.-]+)/auth/login', response.text)
h2_matches = re.findall(r'<h2>.*?(?:域名|domain)[:]\s*([a-zA-Z0-9.-]+)</h2>', response.text) if js_matches:
if h2_matches: return js_matches[0]
return h2_matches[0] fallback_match = re.search(r'(?:域名|domain)[:]\s*([a-zA-Z0-9.-]+)', response.text)
js_matches = re.findall(r'https?://([a-zA-Z0-9.-]+)/auth/login', response.text) if fallback_match:
if js_matches: return fallback_match.group(1)
return js_matches[0] print("⚠️ 检测到域名变更但无法提取新域名")
fallback_match = re.search(r'(?:域名|domain)[:]\s*([a-zA-Z0-9.-]+)', response.text) return None
if fallback_match: else:
return fallback_match.group(1) print("✅ 当前域名正常")
print("⚠️ 检测到域名变更但无法提取新域名") return None
return None except Exception as e:
else: print(f"域名检测异常: {e}")
print("✅ 当前域名正常") return None
return None
except Exception as e: def update_self_host(new_host):
print(f"域名检测异常: {e}") script_path = os.path.abspath(__file__)
return None with open(script_path, "r", encoding="utf-8") as f:
lines = f.readlines()
def update_self_host(new_host): updated = False
script_path = os.path.abspath(__file__) for i, line in enumerate(lines):
with open(script_path, "r", encoding="utf-8") as f: if line.strip().startswith("ikun_host = "):
lines = f.readlines() lines[i] = f'ikun_host = "{new_host}" # 自动更新于{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\n'
updated = False updated = True
for i, line in enumerate(lines): break
if line.strip().startswith("ikun_host = "): if updated:
lines[i] = f'ikun_host = "{new_host}" # 自动更新于{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}\n' with open(script_path, "w", encoding="utf-8") as f:
updated = True f.writelines(lines)
break print(f"✅ 脚本已更新至域名: {new_host}")
if updated: return True
with open(script_path, "w", encoding="utf-8") as f: else:
f.writelines(lines) print("⚠️ 域名更新失败")
print(f"✅ 脚本已更新至域名: {new_host}") return False
return True
else: def test_host_reachable(host):
print("⚠️ 域名更新失败") try:
return False response = requests.get(f"https://{host}/", timeout=10)
return response.status_code == 200
def test_host_reachable(host): except:
try: return False
response = requests.get(f"https://{host}/", timeout=10)
return response.status_code == 200 def get_remaining_flow(cookies):
except: """获取用户剩余流量信息"""
return False user_url = f'https://{ikun_host}/user'
try:
def get_remaining_flow(cookies): # 获取用户页面
"""获取用户剩余流量信息""" user_page = requests.get(user_url, cookies=cookies, timeout=15)
user_url = f'https://{ikun_host}/user' if user_page.status_code != 200:
try: return "获取流量失败", "状态码: " + str(user_page.status_code)
# 获取用户页面
user_page = requests.get(user_url, cookies=cookies, timeout=15) # 使用BeautifulSoup解析HTML
if user_page.status_code != 200: soup = BeautifulSoup(user_page.text, 'html.parser')
return "获取流量失败", "状态码: " + str(user_page.status_code)
# 查找包含剩余流量的卡片
# 使用BeautifulSoup解析HTML flow_cards = soup.find_all('div', class_='card card-statistic-2')
soup = BeautifulSoup(user_page.text, 'html.parser') for card in flow_cards:
h4_tag = card.find('h4')
# 查找包含剩余流量的卡片 if h4_tag and '剩余流量' in h4_tag.text:
flow_cards = soup.find_all('div', class_='card card-statistic-2') # 查找流量数值
for card in flow_cards: counter_span = card.find('span', class_='counter')
h4_tag = card.find('h4') if counter_span:
if h4_tag and '剩余流量' in h4_tag.text: flow_value = counter_span.text.strip()
# 查找流量数值
counter_span = card.find('span', class_='counter') # 查找流量单位
if counter_span: unit_text = ""
flow_value = counter_span.text.strip() next_sibling = counter_span.next_sibling
if next_sibling:
# 查找流量单位 unit_text = next_sibling.strip()
unit_text = ""
next_sibling = counter_span.next_sibling return flow_value, unit_text
if next_sibling:
unit_text = next_sibling.strip() # 如果没有找到,尝试其他方式
flow_div = soup.find('div', string='剩余流量')
return flow_value, unit_text if flow_div:
parent_div = flow_div.find_parent('div', class_='card-body')
# 如果没有找到,尝试其他方式 if parent_div:
flow_div = soup.find('div', string='剩余流量') flow_text = parent_div.get_text(strip=True).replace('剩余流量', '')
if flow_div: return flow_text.split()[0], flow_text.split()[1] if len(flow_text.split()) > 1 else ""
parent_div = flow_div.find_parent('div', class_='card-body')
if parent_div: return "未找到", "流量信息"
flow_text = parent_div.get_text(strip=True).replace('剩余流量', '')
return flow_text.split()[0], flow_text.split()[1] if len(flow_text.split()) > 1 else "" except Exception as e:
return "流量获取异常", str(e)
return "未找到", "流量信息"
def ikuuu_signin(email, password):
except Exception as e: params = {'email': email, 'passwd': password, 'code': ''}
return "流量获取异常", str(e) login_url = f'https://{ikun_host}/auth/login'
try:
def ikuuu_signin(email, password): # 登录请求
params = {'email': email, 'passwd': password, 'code': ''} login_res = requests.post(login_url, data=params, timeout=15)
login_url = f'https://{ikun_host}/auth/login' if login_res.status_code != 200:
try: flow_value, flow_unit = "登录失败", "无法获取"
# 登录请求 return False, f"登录失败(状态码{login_res.status_code}", flow_value, flow_unit
login_res = requests.post(login_url, data=params, timeout=15)
if login_res.status_code != 200: login_data = login_res.json()
flow_value, flow_unit = "登录失败", "无法获取" if login_data.get('ret') != 1:
return False, f"登录失败(状态码{login_res.status_code}", flow_value, flow_unit flow_value, flow_unit = "登录失败", "无法获取"
return False, f"登录失败:{login_data.get('msg', '未知错误')}", flow_value, flow_unit
login_data = login_res.json()
if login_data.get('ret') != 1: # 获取用户剩余流量
flow_value, flow_unit = "登录失败", "无法获取" cookies = login_res.cookies
return False, f"登录失败:{login_data.get('msg', '未知错误')}", flow_value, flow_unit flow_value, flow_unit = get_remaining_flow(cookies)
# 获取用户剩余流量 # 执行签到
cookies = login_res.cookies checkin_res = requests.post(f'https://{ikun_host}/user/checkin', cookies=cookies, timeout=15)
flow_value, flow_unit = get_remaining_flow(cookies) if checkin_res.status_code != 200:
return False, f"签到失败(状态码{checkin_res.status_code}", flow_value, flow_unit
# 执行签到
checkin_res = requests.post(f'https://{ikun_host}/user/checkin', cookies=cookies, timeout=15) checkin_data = checkin_res.json()
if checkin_res.status_code != 200: if checkin_data.get('ret') == 1:
return False, f"签到失败(状态码{checkin_res.status_code}", flow_value, flow_unit return True, f"成功 | {checkin_data.get('msg', '')}", flow_value, flow_unit
else:
checkin_data = checkin_res.json() return False, f"签到失败:{checkin_data.get('msg', '未知错误')}", flow_value, flow_unit
if checkin_data.get('ret') == 1: except json.JSONDecodeError:
return True, f"成功 | {checkin_data.get('msg', '')}", flow_value, flow_unit return False, "响应解析失败", "未知", "未知"
else: except requests.exceptions.Timeout:
return False, f"签到失败:{checkin_data.get('msg', '未知错误')}", flow_value, flow_unit return False, "请求超时", "未知", "未知"
except json.JSONDecodeError: except Exception as e:
return False, "响应解析失败", "未知", "未知" return False, f"请求异常:{str(e)}", "未知", "未知"
except requests.exceptions.Timeout:
return False, "请求超时", "未知", "未知" def send_qinglong_notification(results):
except Exception as e: """
return False, f"请求异常:{str(e)}", "未知", "未知" 使用青龙面板内置通知系统发送通知
需要青龙面板已配置通知渠道如钉钉企业微信等
def send_qinglong_notification(results): """
""" title = "iKuuu签到通知"
使用青龙面板内置通知系统发送通知
需要青龙面板已配置通知渠道如钉钉企业微信等 # 构建消息内容
""" success_count = sum(1 for res in results if res['success'])
title = "iKuuu签到通知" failure_count = len(results) - success_count
# 构建消息内容 message = [
success_count = sum(1 for res in results if res['success']) f"🔔 签到完成 | 成功:{success_count} 失败:{failure_count}",
failure_count = len(results) - success_count "================================"
]
message = [
f"🔔 签到完成 | 成功:{success_count} 失败:{failure_count}", for index, res in enumerate(results, 1):
"================================" status = "✅ 成功" if res['success'] else "❌ 失败"
] message.append(f"{index}. {res['email']}")
message.append(f" 状态:{status}")
for index, res in enumerate(results, 1): message.append(f" 详情:{res['message']}")
status = "✅ 成功" if res['success'] else "❌ 失败" message.append(f" 剩余流量:{res['flow_value']} {res['flow_unit']}")
message.append(f"{index}. {res['email']}") message.append("--------------------------------")
message.append(f" 状态:{status}")
message.append(f" 详情:{res['message']}") # 添加统计信息
message.append(f" 剩余流量:{res['flow_value']} {res['flow_unit']}") message.append("\n🕒 执行时间:" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
message.append("--------------------------------")
try:
# 添加统计信息 # 发送通知(青龙自动处理多通知渠道)
message.append("\n🕒 执行时间:" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) send(title, "\n".join(message))
print("✅ 通知已发送")
try: except Exception as e:
# 发送通知(青龙自动处理多通知渠道) print(f"⚠️ 通知发送失败,请检查通知配置: {str(e)}")
send(title, "\n".join(message))
print("✅ 通知已发送") if __name__ == "__main__":
except Exception as e: # ==================== 域名更新逻辑 ====================
print(f"⚠️ 通知发送失败,请检查通知配置: {str(e)}") print(f"当前域名: {ikun_host}")
latest_host = get_latest_ikun_host()
if __name__ == "__main__": if latest_host:
# ==================== 域名更新逻辑 ==================== print(f"检测到新域名: {latest_host}")
print(f"当前域名: {ikun_host}") if update_self_host(latest_host):
latest_host = get_latest_ikun_host() ikun_host = latest_host
if latest_host:
print(f"检测到新域名: {latest_host}") # ==================== 域名可用性检查 ====================
if update_self_host(latest_host): if not test_host_reachable(ikun_host):
ikun_host = latest_host print("主域名不可用,尝试备用域名...")
found = False
# ==================== 域名可用性检查 ==================== for host in backup_hosts:
if not test_host_reachable(ikun_host): if test_host_reachable(host):
print("主域名不可用,尝试备用域名...") ikun_host = host
found = False print(f"切换到备用域名: {ikun_host}")
for host in backup_hosts: found = True
if test_host_reachable(host): break
ikun_host = host if not found:
print(f"切换到备用域名: {ikun_host}") print("❌ 所有域名均不可用")
found = True exit(1)
break
if not found: # ==================== 账户处理 ====================
print("❌ 所有域名均不可用") accounts = []
exit(1) account_str = os.getenv('IKUUU_ACCOUNTS')
if not account_str:
# ==================== 账户处理 ==================== print("❌ 未找到环境变量 IKUUU_ACCOUNTS")
accounts = [] exit(1)
account_str = os.getenv('IKUUU_ACCOUNTS')
if not account_str: for line in account_str.strip().splitlines():
print("❌ 未找到环境变量 IKUUU_ACCOUNTS") if ':' in line:
exit(1) email, pwd = line.split(':', 1)
accounts.append((email.strip(), pwd.strip()))
for line in account_str.strip().splitlines(): else:
if ':' in line: print(f"⚠️ 忽略无效账户行: {line}")
email, pwd = line.split(':', 1)
accounts.append((email.strip(), pwd.strip())) if not accounts:
else: print("❌ 未找到有效账户")
print(f"⚠️ 忽略无效账户行: {line}") exit(1)
if not accounts: # ==================== 执行签到 ====================
print("❌ 未找到有效账户") results = []
exit(1) for email, pwd in accounts:
print(f"\n处理账户: {email}")
# ==================== 执行签到 ==================== success, msg, flow_value, flow_unit = ikuuu_signin(email, pwd)
results = [] results.append({
for email, pwd in accounts: 'email': email,
print(f"\n处理账户: {email}") 'success': success,
success, msg, flow_value, flow_unit = ikuuu_signin(email, pwd) 'message': msg,
results.append({ 'flow_value': flow_value,
'email': email, 'flow_unit': flow_unit
'success': success, })
'message': msg, print(f"结果: {'成功' if success else '失败'} - {msg}")
'flow_value': flow_value, print(f"剩余流量: {flow_value} {flow_unit}")
'flow_unit': flow_unit
}) # 账户间延迟防止请求过快
print(f"结果: {'成功' if success else '失败'} - {msg}") time.sleep(1)
print(f"剩余流量: {flow_value} {flow_unit}")
# ==================== 结果通知 ====================
# 账户间延迟防止请求过快 print("\n正在发送通知...")
time.sleep(1) send_qinglong_notification(results)
# ==================== 结果通知 ==================== # ==================== 本地结果输出 ====================
print("\n正在发送通知...") print("\n签到结果汇总:")
send_qinglong_notification(results) for res in results:
print(f"邮箱: {res['email']}")
# ==================== 本地结果输出 ==================== print(f"状态: {'成功' if res['success'] else '失败'}")
print("\n签到结果汇总:") print(f"详情: {res['message']}")
for res in results:
print(f"邮箱: {res['email']}")
print(f"状态: {'成功' if res['success'] else '失败'}")
print(f"详情: {res['message']}")
print(f"剩余流量: {res['flow_value']} {res['flow_unit']}\n{'-'*40}") print(f"剩余流量: {res['flow_value']} {res['flow_unit']}\n{'-'*40}")