qinglong-scripts/TongCheng/tc_signin.py

473 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
描述: 打开并登录“同程旅行”APP开启抓包点击APP右下角“我的”-“签到”进去后抓包域名https://app.17u.cn请求头中的phone、apptoken、device三个参数。
环境变量:
变量名tongcheng
变量格式phone&apptoken&device
多账号之间支持用#或换行分隔:
phone1&apptoken1&device1
phone2&apptoken2&device2
需要依赖aiohttp
签到奖励日常任务得里程签到满360天得价值200元里程
----------------------
版本: 1.6
更新说明:
1. 优化环境变量解析,支持使用换行符分隔多账号
2. 增强环境变量格式验证和错误处理
3. 优化日志输出,明确显示解析到的账号数量
4. 保留之前所有功能和优化
"""
from datetime import datetime
import time
import json
import os
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
import random
import sys
import traceback # 用于获取详细的错误堆栈信息
# 手机号脱敏处理
def mask_phone(phone):
if len(phone) == 11:
return phone[:3] + '****' + phone[7:]
return phone
# 从环境变量获取账号信息
def get_accounts():
accounts = []
tongcheng_env = os.getenv('tongcheng', '')
if not tongcheng_env:
print("未找到环境变量 tongcheng")
return accounts
# 支持使用#或换行符分隔多账号
# 先尝试按换行符分割
lines = tongcheng_env.strip().split('\n')
if len(lines) > 1:
print(f"发现{len(lines)}行配置,尝试按行解析...")
account_list = lines
else:
# 尝试按#分割
print("未发现换行符,尝试按#分割...")
account_list = tongcheng_env.split('#')
valid_accounts = 0
invalid_accounts = 0
for account in account_list:
account = account.strip()
if not account:
continue
try:
phone, apptoken, device = account.split('&')
if not phone or not apptoken or not device:
raise ValueError("参数为空")
accounts.append({
'phone': phone,
'apptoken': apptoken,
'device': device
})
valid_accounts += 1
print(f" ✅ 解析账号: {mask_phone(phone)}")
except Exception as e:
invalid_accounts += 1
print(f" ❌ 无效账号配置: {account} - 错误: {str(e)}")
print(f"🔍 共解析出{valid_accounts}个有效账号,{invalid_accounts}个无效配置")
return accounts
# 获取请求头
def get_headers(phone, apptoken, device, payload):
payload_str = json.dumps(payload)
headers = {
'accept': 'application/json, text/plain, */*',
'phone': phone,
'channel': '1',
'apptoken': apptoken,
'sec-fetch-site': 'same-site',
'accept-language': 'zh-CN,zh-Hans;q=0.9',
'accept-encoding': 'gzip, deflate, br',
'sec-fetch-mode': 'cors',
'origin': 'https://m.17u.cn',
'user-agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 17_0_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 TcTravel/11.0.0 tctype/wk',
'referer': 'https://m.17u.cn/',
'content-length': str(len(payload_str.encode('utf-8'))),
'device': device,
'sec-fetch-dest': 'empty'
}
return headers
# 获取当前日期
def get_today_date():
return datetime.now().strftime('%Y-%m-%d')
# 异步执行任务
async def execute_task(session, phone, apptoken, device, task_code, title, browser_time):
# 开始任务
start_url = "https://app.17u.cn/welfarecenter/task/start"
start_payload = {"taskCode": task_code}
start_headers = get_headers(phone, apptoken, device, start_payload)
task_id = None
async with session.post(start_url, json=start_payload, headers=start_headers) as response:
data = await response.json()
if data['code'] != 2200:
print(f" - ❌ 做任务【{title}】失败,跳过当前任务")
# 即使任务开始失败也尝试获取task_id
task_id = data.get('data')
if not task_id:
return False, f"任务【{title}】开始失败"
else:
task_id = data['data']
print(f" - 🚀 开始执行任务【{title}】,等待{browser_time}秒...")
# 等待浏览时间
await asyncio.sleep(browser_time)
if task_id:
# 完成任务
finish_url = "https://app.17u.cn/welfarecenter/task/finish"
finish_payload = {"id": task_id}
finish_headers = get_headers(phone, apptoken, device, finish_payload)
async with session.post(finish_url, json=finish_payload, headers=finish_headers) as response:
data = await response.json()
if data['code'] != 2200:
print(f" - ⚠️ 任务【{title}】完成失败,尝试重新提交...")
await asyncio.sleep(2)
async with session.post(finish_url, json=finish_payload, headers=finish_headers) as retry_response:
data = await retry_response.json()
if data['code'] != 2200:
print(f" - ❌ 任务【{title}】完成失败,继续尝试领取奖励")
print(f" - ✅ 任务【{title}】完成成功,开始领取奖励")
# 领取奖励
reward_url = "https://app.17u.cn/welfarecenter/task/receive"
reward_payload = {"id": task_id}
reward_headers = get_headers(phone, apptoken, device, reward_payload)
# 添加奖励领取重试逻辑
max_retries = 2
for retry in range(max_retries + 1):
async with session.post(reward_url, json=reward_payload, headers=reward_headers) as response:
data = await response.json()
if data['code'] == 2200:
print(f" - 🎁 任务【{title}】领取奖励成功")
return True, f"任务【{title}】领取奖励成功"
elif retry < max_retries:
print(f" - ⚠️ 任务【{title}】领取奖励失败,正在进行第{retry + 1}次重试...")
await asyncio.sleep(2)
else:
print(f" - ❌ 任务【{title}】领取奖励失败,已重试{max_retries}次,请尝试手动领取")
return False, f"任务【{title}】领取奖励失败"
return False, f"任务【{title}】开始失败"
# 异步处理单个账号的所有任务
async def process_account(session, account):
phone = account['phone']
apptoken = account['apptoken']
device = account['device']
# 存储当前账号的通知内容
account_notify_content = []
print(f"\n🔄 开始处理用户{mask_phone(phone)}的任务")
account_notify_content.append(f"🔄 开始处理用户{mask_phone(phone)}的任务")
# 获取签到状态
sign_url = "https://app.17u.cn/welfarecenter/index/signIndex"
sign_headers = get_headers(phone, apptoken, device, {})
async with session.post(sign_url, json={}, headers=sign_headers) as response:
data = await response.json()
if data['data'] is None:
print(" ❌ Token失效请更新")
account_notify_content.append(" ❌ Token失效请更新")
return False, account_notify_content
today_sign = data['data']['todaySign']
mileage = data['data']['mileageBalance']['mileage']
print(f" 今日{'✅ 已' if today_sign else '⏳ 未'}签到,当前剩余里程{mileage}")
account_notify_content.append(f" 今日{'✅ 已' if today_sign else '⏳ 未'}签到,当前剩余里程{mileage}")
if not today_sign:
# 执行签到
sign_in_url = "https://app.17u.cn/welfarecenter/index/sign"
sign_in_payload = {"type": 1, "day": get_today_date()}
sign_in_headers = get_headers(phone, apptoken, device, sign_in_payload)
async with session.post(sign_in_url, json=sign_in_payload, headers=sign_in_headers) as response:
data = await response.json()
if data['code'] != 2200:
print(" ❌ 签到失败")
account_notify_content.append(" ❌ 签到失败")
return False, account_notify_content
print(" ✅ 签到成功")
account_notify_content.append(" ✅ 签到成功")
# 获取任务列表
task_list_url = "https://app.17u.cn/welfarecenter/task/taskList?version=11.1.2.1"
task_list_headers = get_headers(phone, apptoken, device, {})
async with session.post(task_list_url, json={}, headers=task_list_headers) as response:
data = await response.json()
if data['code'] != 2200:
print(" ❌ 获取任务列表失败,跳过当前账号")
account_notify_content.append(" ❌ 获取任务列表失败,跳过当前账号")
return False, account_notify_content
tasks = []
for task in data['data']:
if task['state'] == 1 and task['browserTime'] != 0:
tasks.append({
'taskCode': task['taskCode'],
'title': task['title'],
'browserTime': task['browserTime']
})
if not tasks:
print(" ✅ 没有待完成的任务")
account_notify_content.append(" ✅ 没有待完成的任务")
return True, account_notify_content
print(f"\n 📋 本次共有{len(tasks)}个待做任务")
account_notify_content.append(f"\n 📋 本次共有{len(tasks)}个待做任务")
# 记录任务完成情况
success_tasks = []
failed_tasks = []
# 将任务分批处理每批3个
for i in range(0, len(tasks), 3):
batch = tasks[i:i+3]
batch_num = i // 3 + 1
total_batches = (len(tasks) + 2) // 3
print(f"\n 🔄 开始执行第{batch_num}/{total_batches}批任务:")
account_notify_content.append(f"\n 🔄 开始执行第{batch_num}/{total_batches}批任务:")
for task in batch:
print(f" - 📌 {task['title']} (需要浏览{task['browserTime']}秒)")
account_notify_content.append(f" - 📌 {task['title']} (需要浏览{task['browserTime']}秒)")
# 创建当前批次任务的协程
batch_coroutines = [
execute_task(session, phone, apptoken, device, task['taskCode'], task['title'], task['browserTime'])
for task in batch
]
# 并发执行当前批次的任务
results = await asyncio.gather(*batch_coroutines)
# 处理任务结果
for success, message in results:
if success:
success_tasks.append(message)
else:
failed_tasks.append(message)
# 批次间隔
if batch_num < total_batches:
wait_time = random.uniform(1, 2)
print(f"\n ⏳ 等待{wait_time:.1f}秒后执行下一批任务...")
account_notify_content.append(f"\n ⏳ 等待{wait_time:.1f}秒后执行下一批任务...")
await asyncio.sleep(wait_time)
# 添加任务结果统计
account_notify_content.append("\n 📊 任务执行统计:")
account_notify_content.append(f" - ✅ 成功完成 {len(success_tasks)} 个任务")
for msg in success_tasks:
account_notify_content.append(f" - {msg}")
if failed_tasks:
account_notify_content.append(f" - ❌ 失败 {len(failed_tasks)} 个任务")
for msg in failed_tasks:
account_notify_content.append(f" - {msg}")
# 获取积分信息
info_url = "https://app.17u.cn/welfarecenter/index/signIndex"
info_headers = get_headers(phone, apptoken, device, {})
async with session.post(info_url, json={}, headers=info_headers) as response:
data = await response.json()
if data['code'] == 2200:
cycle_sign_num = data['data']['cycleSighNum']
continuous_history = data['data']['continuousHistory']
mileage = data['data']['mileageBalance']['mileage']
today_mileage = data['data']['mileageBalance']['todayMileage']
# 获取特殊连续签到信息
special_info = data['data'].get('specialContinuousInfo', {})
next_reward_mileage = special_info.get('nextRewardMileage', 0)
need_sign = special_info.get('needSign', 0)
next_reward_day = special_info.get('nextRewardDay', 0)
print(f"\n 📊 任务完成情况:")
account_notify_content.append(f"\n 📊 任务完成情况:")
print(f" - 📅 周期内连续签到{cycle_sign_num}天,已连续签到{continuous_history}")
account_notify_content.append(f" - 📅 周期内连续签到{cycle_sign_num}天,已连续签到{continuous_history}")
if need_sign > 0 and next_reward_day > 0:
print(f" - 🎯 再签{need_sign}天,可获得「{next_reward_day}日奖励」(最高{next_reward_mileage}里程)")
account_notify_content.append(f" - 🎯 再签{need_sign}天,可获得「{next_reward_day}日奖励」(最高{next_reward_mileage}里程)")
print(f" - 💰 今日获取{today_mileage}里程,当前剩余{mileage}里程")
account_notify_content.append(f" - 💰 今日获取{today_mileage}里程,当前剩余{mileage}里程")
return True, account_notify_content
# 异步发送通知
async def async_send_notification(send_func, title, content, loop=None):
if not loop:
loop = asyncio.get_running_loop()
# 创建线程池执行器
with ThreadPoolExecutor(max_workers=1) as executor:
try:
# 在单独的线程中执行同步的通知函数
result = await loop.run_in_executor(executor, lambda: send_func(title, content))
print("✅ 通知发送成功")
return True, result
except Exception as e:
# 获取详细的错误堆栈信息
error_stack = traceback.format_exc()
print(f"❌ 通知发送失败: {str(e)}")
print(f"错误堆栈信息:\n{error_stack}")
return False, str(e)
# 主函数
async def main():
# 存储所有账号的通知内容
all_notify_title = "同程旅行任务执行结果"
all_notify_content = []
# 尝试导入通知模块
send = None
notify_found = False
print("\n🔍 正在查找青龙面板通知模块...")
# 青龙面板默认路径
QL_SCRIPTS_DIR = '/ql/scripts'
sys.path.append(QL_SCRIPTS_DIR)
# 添加notify可能存在的其他路径
POSSIBLE_PATHS = [
'/ql', # 青龙根目录
'/ql/data/scripts', # 新版青龙数据目录
'/ql/repo', # 青龙脚本仓库目录
'/ql/repo/QLDependency', # QLDependency仓库目录
'/ql/config', # 配置目录
os.path.dirname(__file__), # 当前脚本目录
os.path.join(os.path.dirname(__file__), 'notify') # 当前目录下的notify文件夹
]
# 检查每个路径是否存在并尝试导入
for path in POSSIBLE_PATHS:
notify_path = os.path.join(path, 'notify.py')
print(f" - 检查路径: {notify_path}")
if os.path.exists(notify_path):
print(f" ✅ 找到通知模块文件")
try:
# 临时添加路径到sys.path
if path not in sys.path:
sys.path.append(path)
from notify import send
print(f" ✅ 成功导入通知模块")
notify_found = True
break
except Exception as e:
print(f" ❌ 导入失败: {str(e)}")
else:
print(f" ❌ 文件不存在")
# 如果未找到通知模块,创建一个模拟函数
if not notify_found:
print("❌ 未找到青龙面板通知模块请确保notify.py存在于正确的路径")
print("❌ 脚本将继续执行,但无法发送通知")
def dummy_send(title, content):
print(f"\n📧 通知内容 (未发送):\n{title}\n\n{content}")
return "Notification not sent - notify.py not found"
send = dummy_send
# 继续执行主程序
accounts = get_accounts()
if not accounts:
print("❌ 没有找到有效的账号信息,请添加环境变量")
print("❌ 环境变量名称tongcheng 格式为phone&apptoken&device")
all_notify_content.append("❌ 没有找到有效的账号信息,请添加环境变量")
all_notify_content.append("❌ 环境变量名称tongcheng 格式为phone&apptoken&device")
# 异步发送通知
loop = asyncio.get_running_loop()
await async_send_notification(send, all_notify_title, "\n".join(all_notify_content), loop)
return
print(f"🔍 本次共获取到 {len(accounts)} 个账号")
all_notify_content.append(f"🔍 本次共获取到 {len(accounts)} 个账号")
# 统计信息
total_accounts = len(accounts)
success_accounts = 0
failed_accounts = 0
# 存储每个账号的结果
account_results = []
async with aiohttp.ClientSession() as session:
# 串行处理每个账号
for i, account in enumerate(accounts, 1):
print(f"\n📱 开始处理第 {i}/{len(accounts)} 个账号")
account_result = {
'phone': mask_phone(account['phone']),
'success': False,
'content': []
}
success, content = await process_account(session, account)
account_result['success'] = success
account_result['content'] = content
account_results.append(account_result)
# 统计账号执行情况
if success:
success_accounts += 1
else:
failed_accounts += 1
if i < len(accounts):
wait_time = random.uniform(2, 3)
print(f"\n⏳ 等待 {wait_time:.1f} 秒后处理下一个账号...")
await asyncio.sleep(wait_time)
# 构建聚合通知
all_notify_content.append("\n\n📊 总体执行统计:")
all_notify_content.append(f" - 账号总数:{total_accounts}")
all_notify_content.append(f" - 成功:{success_accounts}")
all_notify_content.append(f" - 失败:{failed_accounts}")
all_notify_content.append("\n\n===== 详细结果 =====")
# 添加每个账号的结果
for result in account_results:
all_notify_content.append(f"\n\n📱 账号:{result['phone']}")
all_notify_content.append(f"{'✅ 成功' if result['success'] else '❌ 失败'}")
all_notify_content.extend(result['content'])
# 发送聚合通知
print("\n📤 正在发送通知...")
loop = asyncio.get_running_loop()
notify_success, notify_result = await async_send_notification(send, all_notify_title, "\n".join(all_notify_content), loop)
# 记录通知发送结果
if not notify_success:
all_notify_content.append("\n\n⚠️ 通知发送失败请检查notify.py配置")
all_notify_content.append(f"错误信息:{notify_result}")
print("✅ 脚本执行完成")
if __name__ == "__main__":
asyncio.run(main())