增加同城旅行每日签到脚本

This commit is contained in:
UPToZ 2025-06-01 20:32:29 +08:00
parent 35e3a329c0
commit ac2529f718
2 changed files with 596 additions and 0 deletions

123
TongCheng/README.md Normal file
View File

@ -0,0 +1,123 @@
# 同程旅行自动签到脚本
这是一个基于Python的自动化脚本用于自动完成同程旅行APP的每日签到和任务获取里程奖励。脚本支持多账号配置自动处理日常任务并通过青龙面板提供通知功能。
## 功能特性
- **自动签到**:每日自动完成签到任务
- **任务处理**:自动识别并完成各类日常任务
- **多账号支持**:通过环境变量配置多个账号,支持使用换行符或`#`分隔
- **通知功能**:支持通过青龙面板发送执行结果通知
- **智能重试**:任务失败时自动重试,提高成功率
- **数据统计**:记录签到天数、里程变化等信息
## 使用方法
### 1. 环境准备
首先需要准备好运行环境:
1. 安装青龙面板建议版本2.10.0以上)
2. 确保青龙面板已安装以下依赖:
- aiohttp
- requests
### 2. 获取抓包信息
1. 打开并登录“同程旅行”APP
2. 开启抓包工具如Charles、Fiddler等
3. 点击APP右下角“我的”-“签到”
4. 抓取请求域名`https://app.17u.cn`请求头中的三个参数:
- `phone`:手机号
- `apptoken`:认证令牌
- `device`:设备标识
### 3. 配置环境变量
在青龙面板中添加环境变量:
1. 打开青龙面板,进入「环境变量」页面
2. 点击「添加变量」按钮
3. 变量名:`tongcheng`
4. 变量值格式:
```
phone&apptoken&device
```
多账号之间支持用换行符或`#`分隔,例如:
```
# 使用换行符分隔(推荐)
13800138000&abcdef123456&device1
13900139000&abcdef789012&device2
# 或使用#分隔
13800138000&abcdef123456&device1#13900139000&abcdef789012&device2
```
### 4. 添加脚本
1. 在青龙面板中,进入「脚本管理」页面
2. 点击「添加脚本」按钮
3. 输入脚本名称(如`tc_signin.py`
4. 将脚本代码复制到编辑器中
5. 点击「保存」按钮
### 5. 设置定时任务
1. 在青龙面板中,进入「定时任务」页面
2. 点击「添加任务」按钮
3. 任务名称:自定义(如`同程旅行签到`
4. 命令:
```bash
task tc_signin.py
```
5. 定时规则:设置执行频率(如`0 0 * * *`表示每天0点执行
6. 点击「保存」按钮
## 通知配置
脚本支持通过青龙面板的通知功能发送执行结果需要确保青龙面板已正确配置通知方式如Server酱、Telegram等
如果通知功能无法正常工作,脚本会在日志中记录详细的错误信息,帮助排查问题。
## 脚本更新说明
### 版本1.6更新内容
- 优化环境变量解析,支持使用换行符分隔多账号
- 增强环境变量格式验证和错误处理
- 优化日志输出,明确显示解析到的账号数量
- 保留之前所有功能和优化
### 版本1.5更新内容
- 优化通知模块,解决异步环境下通知失败问题
- 使用线程池执行同步通知函数,避免阻塞事件循环
- 增强错误处理,提供更详细的通知失败信息
- 保留之前的所有功能和优化
### 版本1.4更新内容
- 修复推送服务失败的问题
- 优化notify模块导入逻辑
- 添加详细的路径查找和错误日志
- 保留之前的所有功能和优化
## 常见问题
1. **脚本执行失败**
- 检查环境变量是否正确配置
- 确认抓包获取的参数是否有效
- 查看脚本执行日志,定位具体错误原因
2. **通知未收到**
- 检查青龙面板通知配置是否正确
- 查看脚本日志,确认通知是否发送成功
- 检查通知服务提供商的状态
3. **任务执行不完全**
- 脚本有内置的重试机制,但部分任务可能需要手动完成
- 检查任务列表中是否有特殊任务需要额外操作
## 免责声明
本脚本仅供学习交流使用,请勿用于商业用途。使用本脚本可能违反同程旅行的用户协议,请谨慎使用。开发者不对因使用本脚本而导致的任何问题负责。

473
TongCheng/tc_signin.py Normal file
View File

@ -0,0 +1,473 @@
"""
描述: 打开并登录同程旅行APP开启抓包点击APP右下角我的-签到进去后抓包域名https://app.17u.cn请求头中的phoneapptokendevice三个参数
环境变量
变量名tongcheng
变量格式phone&apptoken&device
多账号之间支持用#或换行分隔:
phone1&apptoken1&device1
phone2&apptoken2&device2
需要依赖aiohttp
签到奖励日常任务得里程签到满360天得价值200元里程
----------------------
版本: 1.6
更新说明:
1. 优化环境变量解析支持使用换行符分隔多账号
2. 增强环境变量格式验证和错误处理
3. 优化日志输出明确显示解析到的账号数量
4. 保留之前所有功能和优化
"""
from datetime import datetime
import time
import json
import os
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
import random
import sys
import traceback # 用于获取详细的错误堆栈信息
# 手机号脱敏处理
def mask_phone(phone):
if len(phone) == 11:
return phone[:3] + '****' + phone[7:]
return phone
# 从环境变量获取账号信息
def get_accounts():
accounts = []
tongcheng_env = os.getenv('tongcheng', '')
if not tongcheng_env:
print("未找到环境变量 tongcheng")
return accounts
# 支持使用#或换行符分隔多账号
# 先尝试按换行符分割
lines = tongcheng_env.strip().split('\n')
if len(lines) > 1:
print(f"发现{len(lines)}行配置,尝试按行解析...")
account_list = lines
else:
# 尝试按#分割
print("未发现换行符,尝试按#分割...")
account_list = tongcheng_env.split('#')
valid_accounts = 0
invalid_accounts = 0
for account in account_list:
account = account.strip()
if not account:
continue
try:
phone, apptoken, device = account.split('&')
if not phone or not apptoken or not device:
raise ValueError("参数为空")
accounts.append({
'phone': phone,
'apptoken': apptoken,
'device': device
})
valid_accounts += 1
print(f" ✅ 解析账号: {mask_phone(phone)}")
except Exception as e:
invalid_accounts += 1
print(f" ❌ 无效账号配置: {account} - 错误: {str(e)}")
print(f"🔍 共解析出{valid_accounts}个有效账号,{invalid_accounts}个无效配置")
return accounts
# 获取请求头
def get_headers(phone, apptoken, device, payload):
payload_str = json.dumps(payload)
headers = {
'accept': 'application/json, text/plain, */*',
'phone': phone,
'channel': '1',
'apptoken': apptoken,
'sec-fetch-site': 'same-site',
'accept-language': 'zh-CN,zh-Hans;q=0.9',
'accept-encoding': 'gzip, deflate, br',
'sec-fetch-mode': 'cors',
'origin': 'https://m.17u.cn',
'user-agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 17_0_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 TcTravel/11.0.0 tctype/wk',
'referer': 'https://m.17u.cn/',
'content-length': str(len(payload_str.encode('utf-8'))),
'device': device,
'sec-fetch-dest': 'empty'
}
return headers
# 获取当前日期
def get_today_date():
return datetime.now().strftime('%Y-%m-%d')
# 异步执行任务
async def execute_task(session, phone, apptoken, device, task_code, title, browser_time):
# 开始任务
start_url = "https://app.17u.cn/welfarecenter/task/start"
start_payload = {"taskCode": task_code}
start_headers = get_headers(phone, apptoken, device, start_payload)
task_id = None
async with session.post(start_url, json=start_payload, headers=start_headers) as response:
data = await response.json()
if data['code'] != 2200:
print(f" - ❌ 做任务【{title}】失败,跳过当前任务")
# 即使任务开始失败也尝试获取task_id
task_id = data.get('data')
if not task_id:
return False, f"任务【{title}】开始失败"
else:
task_id = data['data']
print(f" - 🚀 开始执行任务【{title}】,等待{browser_time}秒...")
# 等待浏览时间
await asyncio.sleep(browser_time)
if task_id:
# 完成任务
finish_url = "https://app.17u.cn/welfarecenter/task/finish"
finish_payload = {"id": task_id}
finish_headers = get_headers(phone, apptoken, device, finish_payload)
async with session.post(finish_url, json=finish_payload, headers=finish_headers) as response:
data = await response.json()
if data['code'] != 2200:
print(f" - ⚠️ 任务【{title}】完成失败,尝试重新提交...")
await asyncio.sleep(2)
async with session.post(finish_url, json=finish_payload, headers=finish_headers) as retry_response:
data = await retry_response.json()
if data['code'] != 2200:
print(f" - ❌ 任务【{title}】完成失败,继续尝试领取奖励")
print(f" - ✅ 任务【{title}】完成成功,开始领取奖励")
# 领取奖励
reward_url = "https://app.17u.cn/welfarecenter/task/receive"
reward_payload = {"id": task_id}
reward_headers = get_headers(phone, apptoken, device, reward_payload)
# 添加奖励领取重试逻辑
max_retries = 2
for retry in range(max_retries + 1):
async with session.post(reward_url, json=reward_payload, headers=reward_headers) as response:
data = await response.json()
if data['code'] == 2200:
print(f" - 🎁 任务【{title}】领取奖励成功")
return True, f"任务【{title}】领取奖励成功"
elif retry < max_retries:
print(f" - ⚠️ 任务【{title}】领取奖励失败,正在进行第{retry + 1}次重试...")
await asyncio.sleep(2)
else:
print(f" - ❌ 任务【{title}】领取奖励失败,已重试{max_retries}次,请尝试手动领取")
return False, f"任务【{title}】领取奖励失败"
return False, f"任务【{title}】开始失败"
# 异步处理单个账号的所有任务
async def process_account(session, account):
phone = account['phone']
apptoken = account['apptoken']
device = account['device']
# 存储当前账号的通知内容
account_notify_content = []
print(f"\n🔄 开始处理用户{mask_phone(phone)}的任务")
account_notify_content.append(f"🔄 开始处理用户{mask_phone(phone)}的任务")
# 获取签到状态
sign_url = "https://app.17u.cn/welfarecenter/index/signIndex"
sign_headers = get_headers(phone, apptoken, device, {})
async with session.post(sign_url, json={}, headers=sign_headers) as response:
data = await response.json()
if data['data'] is None:
print(" ❌ Token失效请更新")
account_notify_content.append(" ❌ Token失效请更新")
return False, account_notify_content
today_sign = data['data']['todaySign']
mileage = data['data']['mileageBalance']['mileage']
print(f" 今日{'✅ 已' if today_sign else '⏳ 未'}签到,当前剩余里程{mileage}")
account_notify_content.append(f" 今日{'✅ 已' if today_sign else '⏳ 未'}签到,当前剩余里程{mileage}")
if not today_sign:
# 执行签到
sign_in_url = "https://app.17u.cn/welfarecenter/index/sign"
sign_in_payload = {"type": 1, "day": get_today_date()}
sign_in_headers = get_headers(phone, apptoken, device, sign_in_payload)
async with session.post(sign_in_url, json=sign_in_payload, headers=sign_in_headers) as response:
data = await response.json()
if data['code'] != 2200:
print(" ❌ 签到失败")
account_notify_content.append(" ❌ 签到失败")
return False, account_notify_content
print(" ✅ 签到成功")
account_notify_content.append(" ✅ 签到成功")
# 获取任务列表
task_list_url = "https://app.17u.cn/welfarecenter/task/taskList?version=11.1.2.1"
task_list_headers = get_headers(phone, apptoken, device, {})
async with session.post(task_list_url, json={}, headers=task_list_headers) as response:
data = await response.json()
if data['code'] != 2200:
print(" ❌ 获取任务列表失败,跳过当前账号")
account_notify_content.append(" ❌ 获取任务列表失败,跳过当前账号")
return False, account_notify_content
tasks = []
for task in data['data']:
if task['state'] == 1 and task['browserTime'] != 0:
tasks.append({
'taskCode': task['taskCode'],
'title': task['title'],
'browserTime': task['browserTime']
})
if not tasks:
print(" ✅ 没有待完成的任务")
account_notify_content.append(" ✅ 没有待完成的任务")
return True, account_notify_content
print(f"\n 📋 本次共有{len(tasks)}个待做任务")
account_notify_content.append(f"\n 📋 本次共有{len(tasks)}个待做任务")
# 记录任务完成情况
success_tasks = []
failed_tasks = []
# 将任务分批处理每批3个
for i in range(0, len(tasks), 3):
batch = tasks[i:i+3]
batch_num = i // 3 + 1
total_batches = (len(tasks) + 2) // 3
print(f"\n 🔄 开始执行第{batch_num}/{total_batches}批任务:")
account_notify_content.append(f"\n 🔄 开始执行第{batch_num}/{total_batches}批任务:")
for task in batch:
print(f" - 📌 {task['title']} (需要浏览{task['browserTime']}秒)")
account_notify_content.append(f" - 📌 {task['title']} (需要浏览{task['browserTime']}秒)")
# 创建当前批次任务的协程
batch_coroutines = [
execute_task(session, phone, apptoken, device, task['taskCode'], task['title'], task['browserTime'])
for task in batch
]
# 并发执行当前批次的任务
results = await asyncio.gather(*batch_coroutines)
# 处理任务结果
for success, message in results:
if success:
success_tasks.append(message)
else:
failed_tasks.append(message)
# 批次间隔
if batch_num < total_batches:
wait_time = random.uniform(1, 2)
print(f"\n ⏳ 等待{wait_time:.1f}秒后执行下一批任务...")
account_notify_content.append(f"\n ⏳ 等待{wait_time:.1f}秒后执行下一批任务...")
await asyncio.sleep(wait_time)
# 添加任务结果统计
account_notify_content.append("\n 📊 任务执行统计:")
account_notify_content.append(f" - ✅ 成功完成 {len(success_tasks)} 个任务")
for msg in success_tasks:
account_notify_content.append(f" - {msg}")
if failed_tasks:
account_notify_content.append(f" - ❌ 失败 {len(failed_tasks)} 个任务")
for msg in failed_tasks:
account_notify_content.append(f" - {msg}")
# 获取积分信息
info_url = "https://app.17u.cn/welfarecenter/index/signIndex"
info_headers = get_headers(phone, apptoken, device, {})
async with session.post(info_url, json={}, headers=info_headers) as response:
data = await response.json()
if data['code'] == 2200:
cycle_sign_num = data['data']['cycleSighNum']
continuous_history = data['data']['continuousHistory']
mileage = data['data']['mileageBalance']['mileage']
today_mileage = data['data']['mileageBalance']['todayMileage']
# 获取特殊连续签到信息
special_info = data['data'].get('specialContinuousInfo', {})
next_reward_mileage = special_info.get('nextRewardMileage', 0)
need_sign = special_info.get('needSign', 0)
next_reward_day = special_info.get('nextRewardDay', 0)
print(f"\n 📊 任务完成情况:")
account_notify_content.append(f"\n 📊 任务完成情况:")
print(f" - 📅 周期内连续签到{cycle_sign_num}天,已连续签到{continuous_history}")
account_notify_content.append(f" - 📅 周期内连续签到{cycle_sign_num}天,已连续签到{continuous_history}")
if need_sign > 0 and next_reward_day > 0:
print(f" - 🎯 再签{need_sign}天,可获得「{next_reward_day}日奖励」(最高{next_reward_mileage}里程)")
account_notify_content.append(f" - 🎯 再签{need_sign}天,可获得「{next_reward_day}日奖励」(最高{next_reward_mileage}里程)")
print(f" - 💰 今日获取{today_mileage}里程,当前剩余{mileage}里程")
account_notify_content.append(f" - 💰 今日获取{today_mileage}里程,当前剩余{mileage}里程")
return True, account_notify_content
# 异步发送通知
async def async_send_notification(send_func, title, content, loop=None):
if not loop:
loop = asyncio.get_running_loop()
# 创建线程池执行器
with ThreadPoolExecutor(max_workers=1) as executor:
try:
# 在单独的线程中执行同步的通知函数
result = await loop.run_in_executor(executor, lambda: send_func(title, content))
print("✅ 通知发送成功")
return True, result
except Exception as e:
# 获取详细的错误堆栈信息
error_stack = traceback.format_exc()
print(f"❌ 通知发送失败: {str(e)}")
print(f"错误堆栈信息:\n{error_stack}")
return False, str(e)
# 主函数
async def main():
# 存储所有账号的通知内容
all_notify_title = "同程旅行任务执行结果"
all_notify_content = []
# 尝试导入通知模块
send = None
notify_found = False
print("\n🔍 正在查找青龙面板通知模块...")
# 青龙面板默认路径
QL_SCRIPTS_DIR = '/ql/scripts'
sys.path.append(QL_SCRIPTS_DIR)
# 添加notify可能存在的其他路径
POSSIBLE_PATHS = [
'/ql', # 青龙根目录
'/ql/data/scripts', # 新版青龙数据目录
'/ql/repo', # 青龙脚本仓库目录
'/ql/repo/QLDependency', # QLDependency仓库目录
'/ql/config', # 配置目录
os.path.dirname(__file__), # 当前脚本目录
os.path.join(os.path.dirname(__file__), 'notify') # 当前目录下的notify文件夹
]
# 检查每个路径是否存在并尝试导入
for path in POSSIBLE_PATHS:
notify_path = os.path.join(path, 'notify.py')
print(f" - 检查路径: {notify_path}")
if os.path.exists(notify_path):
print(f" ✅ 找到通知模块文件")
try:
# 临时添加路径到sys.path
if path not in sys.path:
sys.path.append(path)
from notify import send
print(f" ✅ 成功导入通知模块")
notify_found = True
break
except Exception as e:
print(f" ❌ 导入失败: {str(e)}")
else:
print(f" ❌ 文件不存在")
# 如果未找到通知模块,创建一个模拟函数
if not notify_found:
print("❌ 未找到青龙面板通知模块请确保notify.py存在于正确的路径")
print("❌ 脚本将继续执行,但无法发送通知")
def dummy_send(title, content):
print(f"\n📧 通知内容 (未发送):\n{title}\n\n{content}")
return "Notification not sent - notify.py not found"
send = dummy_send
# 继续执行主程序
accounts = get_accounts()
if not accounts:
print("❌ 没有找到有效的账号信息,请添加环境变量")
print("❌ 环境变量名称tongcheng 格式为phone&apptoken&device")
all_notify_content.append("❌ 没有找到有效的账号信息,请添加环境变量")
all_notify_content.append("❌ 环境变量名称tongcheng 格式为phone&apptoken&device")
# 异步发送通知
loop = asyncio.get_running_loop()
await async_send_notification(send, all_notify_title, "\n".join(all_notify_content), loop)
return
print(f"🔍 本次共获取到 {len(accounts)} 个账号")
all_notify_content.append(f"🔍 本次共获取到 {len(accounts)} 个账号")
# 统计信息
total_accounts = len(accounts)
success_accounts = 0
failed_accounts = 0
# 存储每个账号的结果
account_results = []
async with aiohttp.ClientSession() as session:
# 串行处理每个账号
for i, account in enumerate(accounts, 1):
print(f"\n📱 开始处理第 {i}/{len(accounts)} 个账号")
account_result = {
'phone': mask_phone(account['phone']),
'success': False,
'content': []
}
success, content = await process_account(session, account)
account_result['success'] = success
account_result['content'] = content
account_results.append(account_result)
# 统计账号执行情况
if success:
success_accounts += 1
else:
failed_accounts += 1
if i < len(accounts):
wait_time = random.uniform(2, 3)
print(f"\n⏳ 等待 {wait_time:.1f} 秒后处理下一个账号...")
await asyncio.sleep(wait_time)
# 构建聚合通知
all_notify_content.append("\n\n📊 总体执行统计:")
all_notify_content.append(f" - 账号总数:{total_accounts}")
all_notify_content.append(f" - 成功:{success_accounts}")
all_notify_content.append(f" - 失败:{failed_accounts}")
all_notify_content.append("\n\n===== 详细结果 =====")
# 添加每个账号的结果
for result in account_results:
all_notify_content.append(f"\n\n📱 账号:{result['phone']}")
all_notify_content.append(f"{'✅ 成功' if result['success'] else '❌ 失败'}")
all_notify_content.extend(result['content'])
# 发送聚合通知
print("\n📤 正在发送通知...")
loop = asyncio.get_running_loop()
notify_success, notify_result = await async_send_notification(send, all_notify_title, "\n".join(all_notify_content), loop)
# 记录通知发送结果
if not notify_success:
all_notify_content.append("\n\n⚠️ 通知发送失败请检查notify.py配置")
all_notify_content.append(f"错误信息:{notify_result}")
print("✅ 脚本执行完成")
if __name__ == "__main__":
asyncio.run(main())