""" 任务名称 name: 同程旅行签到 定时规则 cron: 0 0 4 * * """ from datetime import datetime import time import json import os import asyncio import aiohttp from concurrent.futures import ThreadPoolExecutor import random import sys import traceback # 用于获取详细的错误堆栈信息 # 手机号脱敏处理 def mask_phone(phone): if len(phone) == 11: return phone[:3] + '****' + phone[7:] return phone # 从环境变量获取账号信息 def get_accounts(): accounts = [] tongcheng_env = os.getenv('tongcheng', '') if not tongcheng_env: print("未找到环境变量 tongcheng") return accounts # 支持使用#或换行符分隔多账号 # 先尝试按换行符分割 lines = tongcheng_env.strip().split('\n') if len(lines) > 1: print(f"发现{len(lines)}行配置,尝试按行解析...") account_list = lines else: # 尝试按#分割 print("未发现换行符,尝试按#分割...") account_list = tongcheng_env.split('#') valid_accounts = 0 invalid_accounts = 0 for account in account_list: account = account.strip() if not account: continue try: phone, apptoken, device = account.split('&') if not phone or not apptoken or not device: raise ValueError("参数为空") accounts.append({ 'phone': phone, 'apptoken': apptoken, 'device': device }) valid_accounts += 1 print(f" ✅ 解析账号: {mask_phone(phone)}") except Exception as e: invalid_accounts += 1 print(f" ❌ 无效账号配置: {account} - 错误: {str(e)}") print(f"🔍 共解析出{valid_accounts}个有效账号,{invalid_accounts}个无效配置") return accounts # 获取请求头 def get_headers(phone, apptoken, device, payload): payload_str = json.dumps(payload) headers = { 'accept': 'application/json, text/plain, */*', 'phone': phone, 'channel': '1', 'apptoken': apptoken, 'sec-fetch-site': 'same-site', 'accept-language': 'zh-CN,zh-Hans;q=0.9', 'accept-encoding': 'gzip, deflate, br', 'sec-fetch-mode': 'cors', 'origin': 'https://m.17u.cn', 'user-agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 17_0_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 TcTravel/11.0.0 tctype/wk', 'referer': 'https://m.17u.cn/', 'content-length': str(len(payload_str.encode('utf-8'))), 'device': device, 'sec-fetch-dest': 'empty' } return headers # 获取当前日期 def get_today_date(): return datetime.now().strftime('%Y-%m-%d') # 异步执行任务 async def execute_task(session, phone, apptoken, device, task_code, title, browser_time): # 开始任务 start_url = "https://app.17u.cn/welfarecenter/task/start" start_payload = {"taskCode": task_code} start_headers = get_headers(phone, apptoken, device, start_payload) task_id = None async with session.post(start_url, json=start_payload, headers=start_headers) as response: data = await response.json() if data['code'] != 2200: print(f" - ❌ 做任务【{title}】失败,跳过当前任务") # 即使任务开始失败,也尝试获取task_id task_id = data.get('data') if not task_id: return False, f"任务【{title}】开始失败" else: task_id = data['data'] print(f" - 🚀 开始执行任务【{title}】,等待{browser_time}秒...") # 等待浏览时间 await asyncio.sleep(browser_time) if task_id: # 完成任务 finish_url = "https://app.17u.cn/welfarecenter/task/finish" finish_payload = {"id": task_id} finish_headers = get_headers(phone, apptoken, device, finish_payload) async with session.post(finish_url, json=finish_payload, headers=finish_headers) as response: data = await response.json() if data['code'] != 2200: print(f" - ⚠️ 任务【{title}】完成失败,尝试重新提交...") await asyncio.sleep(2) async with session.post(finish_url, json=finish_payload, headers=finish_headers) as retry_response: data = await retry_response.json() if data['code'] != 2200: print(f" - ❌ 任务【{title}】完成失败,继续尝试领取奖励") print(f" - ✅ 任务【{title}】完成成功,开始领取奖励") # 领取奖励 reward_url = "https://app.17u.cn/welfarecenter/task/receive" reward_payload = {"id": task_id} reward_headers = get_headers(phone, apptoken, device, reward_payload) # 添加奖励领取重试逻辑 max_retries = 2 for retry in range(max_retries + 1): async with session.post(reward_url, json=reward_payload, headers=reward_headers) as response: data = await response.json() if data['code'] == 2200: print(f" - 🎁 任务【{title}】领取奖励成功") return True, f"任务【{title}】领取奖励成功" elif retry < max_retries: print(f" - ⚠️ 任务【{title}】领取奖励失败,正在进行第{retry + 1}次重试...") await asyncio.sleep(2) else: print(f" - ❌ 任务【{title}】领取奖励失败,已重试{max_retries}次,请尝试手动领取") return False, f"任务【{title}】领取奖励失败" return False, f"任务【{title}】开始失败" # 异步处理单个账号的所有任务 async def process_account(session, account): phone = account['phone'] apptoken = account['apptoken'] device = account['device'] # 存储当前账号的通知内容 account_notify_content = [] print(f"\n🔄 开始处理用户{mask_phone(phone)}的任务") account_notify_content.append(f"🔄 开始处理用户{mask_phone(phone)}的任务") # 获取签到状态 sign_url = "https://app.17u.cn/welfarecenter/index/signIndex" sign_headers = get_headers(phone, apptoken, device, {}) async with session.post(sign_url, json={}, headers=sign_headers) as response: data = await response.json() if data['data'] is None: print(" ❌ Token失效,请更新") account_notify_content.append(" ❌ Token失效,请更新") return False, account_notify_content today_sign = data['data']['todaySign'] mileage = data['data']['mileageBalance']['mileage'] print(f" 今日{'✅ 已' if today_sign else '⏳ 未'}签到,当前剩余里程{mileage}") account_notify_content.append(f" 今日{'✅ 已' if today_sign else '⏳ 未'}签到,当前剩余里程{mileage}") if not today_sign: # 执行签到 sign_in_url = "https://app.17u.cn/welfarecenter/index/sign" sign_in_payload = {"type": 1, "day": get_today_date()} sign_in_headers = get_headers(phone, apptoken, device, sign_in_payload) async with session.post(sign_in_url, json=sign_in_payload, headers=sign_in_headers) as response: data = await response.json() if data['code'] != 2200: print(" ❌ 签到失败") account_notify_content.append(" ❌ 签到失败") return False, account_notify_content print(" ✅ 签到成功") account_notify_content.append(" ✅ 签到成功") # 获取任务列表 task_list_url = "https://app.17u.cn/welfarecenter/task/taskList?version=11.1.2.1" task_list_headers = get_headers(phone, apptoken, device, {}) async with session.post(task_list_url, json={}, headers=task_list_headers) as response: data = await response.json() if data['code'] != 2200: print(" ❌ 获取任务列表失败,跳过当前账号") account_notify_content.append(" ❌ 获取任务列表失败,跳过当前账号") return False, account_notify_content tasks = [] for task in data['data']: if task['state'] == 1 and task['browserTime'] != 0: tasks.append({ 'taskCode': task['taskCode'], 'title': task['title'], 'browserTime': task['browserTime'] }) if not tasks: print(" ✅ 没有待完成的任务") account_notify_content.append(" ✅ 没有待完成的任务") return True, account_notify_content print(f"\n 📋 本次共有{len(tasks)}个待做任务") account_notify_content.append(f"\n 📋 本次共有{len(tasks)}个待做任务") # 记录任务完成情况 success_tasks = [] failed_tasks = [] # 将任务分批处理,每批3个 for i in range(0, len(tasks), 3): batch = tasks[i:i+3] batch_num = i // 3 + 1 total_batches = (len(tasks) + 2) // 3 print(f"\n 🔄 开始执行第{batch_num}/{total_batches}批任务:") account_notify_content.append(f"\n 🔄 开始执行第{batch_num}/{total_batches}批任务:") for task in batch: print(f" - 📌 {task['title']} (需要浏览{task['browserTime']}秒)") account_notify_content.append(f" - 📌 {task['title']} (需要浏览{task['browserTime']}秒)") # 创建当前批次任务的协程 batch_coroutines = [ execute_task(session, phone, apptoken, device, task['taskCode'], task['title'], task['browserTime']) for task in batch ] # 并发执行当前批次的任务 results = await asyncio.gather(*batch_coroutines) # 处理任务结果 for success, message in results: if success: success_tasks.append(message) else: failed_tasks.append(message) # 批次间隔 if batch_num < total_batches: wait_time = random.uniform(1, 2) print(f"\n ⏳ 等待{wait_time:.1f}秒后执行下一批任务...") account_notify_content.append(f"\n ⏳ 等待{wait_time:.1f}秒后执行下一批任务...") await asyncio.sleep(wait_time) # 添加任务结果统计 account_notify_content.append("\n 📊 任务执行统计:") account_notify_content.append(f" - ✅ 成功完成 {len(success_tasks)} 个任务") for msg in success_tasks: account_notify_content.append(f" - {msg}") if failed_tasks: account_notify_content.append(f" - ❌ 失败 {len(failed_tasks)} 个任务") for msg in failed_tasks: account_notify_content.append(f" - {msg}") # 获取积分信息 info_url = "https://app.17u.cn/welfarecenter/index/signIndex" info_headers = get_headers(phone, apptoken, device, {}) async with session.post(info_url, json={}, headers=info_headers) as response: data = await response.json() if data['code'] == 2200: cycle_sign_num = data['data']['cycleSighNum'] continuous_history = data['data']['continuousHistory'] mileage = data['data']['mileageBalance']['mileage'] today_mileage = data['data']['mileageBalance']['todayMileage'] # 获取特殊连续签到信息 special_info = data['data'].get('specialContinuousInfo', {}) next_reward_mileage = special_info.get('nextRewardMileage', 0) need_sign = special_info.get('needSign', 0) next_reward_day = special_info.get('nextRewardDay', 0) print(f"\n 📊 任务完成情况:") account_notify_content.append(f"\n 📊 任务完成情况:") print(f" - 📅 周期内连续签到{cycle_sign_num}天,已连续签到{continuous_history}天") account_notify_content.append(f" - 📅 周期内连续签到{cycle_sign_num}天,已连续签到{continuous_history}天") if need_sign > 0 and next_reward_day > 0: print(f" - 🎯 再签{need_sign}天,可获得「{next_reward_day}日奖励」(最高{next_reward_mileage}里程)") account_notify_content.append(f" - 🎯 再签{need_sign}天,可获得「{next_reward_day}日奖励」(最高{next_reward_mileage}里程)") print(f" - 💰 今日获取{today_mileage}里程,当前剩余{mileage}里程") account_notify_content.append(f" - 💰 今日获取{today_mileage}里程,当前剩余{mileage}里程") return True, account_notify_content # 异步发送通知 async def async_send_notification(send_func, title, content, loop=None): if not loop: loop = asyncio.get_running_loop() # 创建线程池执行器 with ThreadPoolExecutor(max_workers=1) as executor: try: # 在单独的线程中执行同步的通知函数 result = await loop.run_in_executor(executor, lambda: send_func(title, content)) print("✅ 通知发送成功") return True, result except Exception as e: # 获取详细的错误堆栈信息 error_stack = traceback.format_exc() print(f"❌ 通知发送失败: {str(e)}") print(f"错误堆栈信息:\n{error_stack}") return False, str(e) # 主函数 async def main(): # 存储所有账号的通知内容 all_notify_title = "同程旅行任务执行结果" all_notify_content = [] # 尝试导入通知模块 send = None notify_found = False print("\n🔍 正在查找青龙面板通知模块...") # 青龙面板默认路径 QL_SCRIPTS_DIR = '/ql/scripts' sys.path.append(QL_SCRIPTS_DIR) # 添加notify可能存在的其他路径 POSSIBLE_PATHS = [ '/ql', # 青龙根目录 '/ql/data/scripts', # 新版青龙数据目录 '/ql/repo', # 青龙脚本仓库目录 '/ql/repo/QLDependency', # QLDependency仓库目录 '/ql/config', # 配置目录 os.path.dirname(__file__), # 当前脚本目录 os.path.join(os.path.dirname(__file__), 'notify') # 当前目录下的notify文件夹 ] # 检查每个路径是否存在并尝试导入 for path in POSSIBLE_PATHS: notify_path = os.path.join(path, 'notify.py') print(f" - 检查路径: {notify_path}") if os.path.exists(notify_path): print(f" ✅ 找到通知模块文件") try: # 临时添加路径到sys.path if path not in sys.path: sys.path.append(path) from notify import send print(f" ✅ 成功导入通知模块") notify_found = True break except Exception as e: print(f" ❌ 导入失败: {str(e)}") else: print(f" ❌ 文件不存在") # 如果未找到通知模块,创建一个模拟函数 if not notify_found: print("❌ 未找到青龙面板通知模块,请确保notify.py存在于正确的路径") print("❌ 脚本将继续执行,但无法发送通知") def dummy_send(title, content): print(f"\n📧 通知内容 (未发送):\n{title}\n\n{content}") return "Notification not sent - notify.py not found" send = dummy_send # 继续执行主程序 accounts = get_accounts() if not accounts: print("❌ 没有找到有效的账号信息,请添加环境变量") print("❌ 环境变量名称tongcheng 格式为:phone&apptoken&device") all_notify_content.append("❌ 没有找到有效的账号信息,请添加环境变量") all_notify_content.append("❌ 环境变量名称tongcheng 格式为:phone&apptoken&device") # 异步发送通知 loop = asyncio.get_running_loop() await async_send_notification(send, all_notify_title, "\n".join(all_notify_content), loop) return print(f"🔍 本次共获取到 {len(accounts)} 个账号") all_notify_content.append(f"🔍 本次共获取到 {len(accounts)} 个账号") # 统计信息 total_accounts = len(accounts) success_accounts = 0 failed_accounts = 0 # 存储每个账号的结果 account_results = [] async with aiohttp.ClientSession() as session: # 串行处理每个账号 for i, account in enumerate(accounts, 1): print(f"\n📱 开始处理第 {i}/{len(accounts)} 个账号") account_result = { 'phone': mask_phone(account['phone']), 'success': False, 'content': [] } success, content = await process_account(session, account) account_result['success'] = success account_result['content'] = content account_results.append(account_result) # 统计账号执行情况 if success: success_accounts += 1 else: failed_accounts += 1 if i < len(accounts): wait_time = random.uniform(2, 3) print(f"\n⏳ 等待 {wait_time:.1f} 秒后处理下一个账号...") await asyncio.sleep(wait_time) # 构建聚合通知 all_notify_content.append("\n\n📊 总体执行统计:") all_notify_content.append(f" - 账号总数:{total_accounts}") all_notify_content.append(f" - 成功:{success_accounts}") all_notify_content.append(f" - 失败:{failed_accounts}") all_notify_content.append("\n\n===== 详细结果 =====") # 添加每个账号的结果 for result in account_results: all_notify_content.append(f"\n\n📱 账号:{result['phone']}") all_notify_content.append(f"{'✅ 成功' if result['success'] else '❌ 失败'}") all_notify_content.extend(result['content']) # 发送聚合通知 print("\n📤 正在发送通知...") loop = asyncio.get_running_loop() notify_success, notify_result = await async_send_notification(send, all_notify_title, "\n".join(all_notify_content), loop) # 记录通知发送结果 if not notify_success: all_notify_content.append("\n\n⚠️ 通知发送失败,请检查notify.py配置") all_notify_content.append(f"错误信息:{notify_result}") print("✅ 脚本执行完成") if __name__ == "__main__": asyncio.run(main())