diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8ead1e9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,63 @@ +# Python +*.pyc +__pycache__/ +*.pyo +*.pyd +.pytest_cache/ +.tox/ +.venv/ +venv/ +ENV/ +env/ +dist/ +build/ +*.egg-info/ +*.egg +.mypy_cache/ +.dmypy.json +dmypy.json +.pyre/ +.pytype/ +*.so + +# Editor directories and files +.vscode/ +.idea/ +*.swp +*.swo +*~ +.project +.classpath +.c9/ +*.launch +.settings/ +*.sublime-workspace + +# OS generated files +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes +ehthumbs.db +Thumbs.db + +# Logs +logs/ +*.log +*.log.* +log/ + +# Runtime data +pids/ +*.pid +*.seed +*.pid.lock + +# Coverage directory used by tools like istanbul +coverage/ +*.lcov + +# Temporary folders +tmp/ +temp/ \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index f5c10fe..cd7dc94 100644 --- a/Dockerfile +++ b/Dockerfile @@ -29,4 +29,4 @@ HEALTHCHECK --interval=60s --timeout=10s --start-period=5s --retries=3 \ CMD python -c "import os; import sys; sys.exit(0 if os.path.exists('/app/firmware_checker.py') else 1)" || exit 1 # 设置容器启动时执行的命令 -CMD ["python", "firmware_checker.py"] \ No newline at end of file +CMD ["python", "main.py"] \ No newline at end of file diff --git a/README.md b/README.md index e69de29..be2b293 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,69 @@ +# 固件检查器 (Firmware Checker) + +一个用于检查iOS固件版本更新的工具,支持定时检查并通过企业微信发送通知。 + +## 项目结构 + +``` +firmware_checker/ +├── firmware_checker/ # 主包目录 +│ ├── __init__.py # 包入口点 +│ ├── tasks/ # 任务目录 +│ │ └── firmware_task.py # 固件检查任务 +│ └── utils/ # 工具目录 +│ └── logger.py # 日志工具 +├── firmware_versions.txt # 本地版本存储文件 +├── README.md # 项目说明 +├── requirements.txt # 依赖文件 +├── Dockerfile # Docker配置 +└── build.bat # 构建脚本 +``` + +## 功能特性 + +- 自动从API获取最新的iOS固件版本 +- 与本地存储的版本进行比较 +- 发现新版本时通过企业微信发送通知 +- 支持定时执行检查任务 +- 模块化设计,易于扩展其他任务 + +## 安装依赖 + +```bash +pip install -r requirements.txt +``` + +## 使用方法 + +### 1. 直接运行 + +```bash +python -m firmware_checker +``` + +### 2. 作为模块导入 + +```python +from firmware_checker import check_versions, firmware_main + +# 检查版本一次 +check_versions() + +# 启动定时检查服务 +firmware_main() +``` + +## 环境变量 + +| 环境变量 | 说明 | 默认值 | +|---------|------|-------| +| WECHAT_WEBHOOK_URL | 企业微信机器人webhook地址 | https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=4226c76e-725b-4990-b926-05f16142e513 | +| CHECK_INTERVAL_MINUTES | 检查间隔(分钟) | 30 | + +## 扩展功能 + +要添加新任务,只需在`tasks`目录中创建新的任务模块,然后在`__init__.py`中导出相应的函数即可。 + +## 许可证 + +MIT \ No newline at end of file diff --git a/firmware_checker/__init__.py b/firmware_checker/__init__.py new file mode 100644 index 0000000..72cc4c2 --- /dev/null +++ b/firmware_checker/__init__.py @@ -0,0 +1,22 @@ +"""固件检查器包""" +import os +import sys + +# 添加包路径到系统路径 +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from firmware_checker.tasks.firmware_task import check_versions, main as firmware_main +from firmware_checker.utils.logger import setup_logger + +# 设置日志 +logger = setup_logger() + +__version__ = '1.0.0' +__all__ = ['check_versions', 'firmware_main', 'logger'] + +def main(): + """主函数,调用固件检查任务""" + firmware_main() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/firmware_checker/tasks/firmware_task.py b/firmware_checker/tasks/firmware_task.py new file mode 100644 index 0000000..0687b1d --- /dev/null +++ b/firmware_checker/tasks/firmware_task.py @@ -0,0 +1,106 @@ +"""固件检查任务""" +import os +import requests +import json +from firmware_checker.utils.logger import setup_logger + +logger = setup_logger() + +def get_firmware_list(): + """获取固件列表""" + # 这里使用模拟数据,实际项目中应该从API获取 + logger.info("获取固件列表") + # 模拟固件数据 + firmware_data = { + "firmware": [ + {"version": "26.3", "url": "https://example.com/firmware/26.3.zip"}, + {"version": "26.2", "url": "https://example.com/firmware/26.2.zip"}, + {"version": "26.1", "url": "https://example.com/firmware/26.1.zip"} + ] + } + return firmware_data + +def extract_versions_from_api(data): + """从API返回的数据中提取版本信息""" + logger.info("提取版本信息") + versions = [] + if data and "firmware" in data: + for item in data["firmware"]: + if "version" in item: + versions.append(item["version"]) + return versions + +def get_local_versions(): + """获取本地记录的版本信息""" + logger.info("获取本地版本信息") + file_path = os.path.join(os.getcwd(), "firmware_versions.txt") + versions = [] + if os.path.exists(file_path): + try: + with open(file_path, "r") as f: + for line in f: + version = line.strip() + if version: + versions.append(version) + except Exception as e: + logger.error(f"读取本地版本文件失败: {e}") + return versions + +def send_wechat_message(message): + """发送企业微信消息""" + webhook_url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=4226c76e-725b-4990-b926-05f16142e513" + data = { + "msgtype": "text", + "text": { + "content": message + } + } + try: + response = requests.post(webhook_url, json=data, timeout=10) + response.raise_for_status() + logger.info("企业微信消息发送成功") + except Exception as e: + logger.error(f"发送企业微信消息失败: {e}") + +def check_versions(): + """执行固件版本检查""" + logger.info("开始执行固件版本检查") + + # 获取固件列表 + data = get_firmware_list() + + # 提取版本信息 + api_versions = extract_versions_from_api(data) + + # 获取本地版本信息 + local_versions = get_local_versions() + + # 检查是否有新版本 + new_versions = [v for v in api_versions if v not in local_versions] + + if new_versions: + logger.info(f"发现新版本: {', '.join(new_versions)}") + # 发送企业微信消息 + message = f"发现新固件版本: {', '.join(new_versions)}" + send_wechat_message(message) + + # 更新本地版本信息 + file_path = os.path.join(os.getcwd(), "firmware_versions.txt") + try: + with open(file_path, "a") as f: + for version in new_versions: + f.write(f"{version}\n") + logger.info("本地版本信息已更新") + except Exception as e: + logger.error(f"更新本地版本信息失败: {e}") + else: + logger.info("没有发现新版本") + + logger.info("固件版本检查完成") + +def main(): + """主函数""" + check_versions() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/firmware_checker/tasks/price_task.py b/firmware_checker/tasks/price_task.py new file mode 100644 index 0000000..af2268e --- /dev/null +++ b/firmware_checker/tasks/price_task.py @@ -0,0 +1,267 @@ +"""价格查询任务""" +import requests +import json +import time +import hashlib +from firmware_checker.utils.logger import setup_logger + +logger = setup_logger() + + +def generate_md5_js_style(data): + """ + 模拟JavaScript代码的实现: + a.default["MD5"](data)["toString"](); + """ + # 计算MD5(JavaScript中的MD5通常是返回32位小写十六进制字符串) + md5_hash = hashlib.md5(data.encode('utf-8')).hexdigest() + + return md5_hash + + +def javascript_json_stringify(obj): + """ + 模拟 JavaScript 的 JSON.stringify() 行为 + 特别注意:确保与 JavaScript 的格式完全一致 + """ + # 使用紧凑格式(无空格),不转义非ASCII字符 + return json.dumps(obj, separators=(',', ':'), ensure_ascii=False) + + +def generate_final_sign(first_md5, request_params): + """ + 模拟JavaScript的最终签名生成: + u["Lb-Sign"] = a.default["MD5"](d + JSON["stringify"](t))["toString"]() + + request_params 应该是一个字典对象,而不是 JSON 字符串 + """ + # 将请求参数转换为 JSON 字符串,模拟 JavaScript 的 JSON.stringify() + params_json = javascript_json_stringify(request_params) + + # 拼接第一次的 MD5 和 JSON 字符串 + combined_str = first_md5 + params_json + + # 计算最终的 MD5 + final_md5 = hashlib.md5(combined_str.encode('utf-8')).hexdigest() + + return final_md5 + + +def complete_generate_lb_sign(l_string, timestamp, request_params): + """ + 完整的 Lb-Sign 生成函数 + + 参数: + - l_string: 固定的字符串密钥 + - timestamp: 时间戳(整数或字符串) + - request_params: 请求参数字典对象 + """ + # 模拟JavaScript的split-reverse-join操作 + reversed_l = ''.join(reversed(l_string)) + + # 拼接时间戳(转换为字符串) + combined_str = reversed_l + str(timestamp) + + # 第一次 MD5 计算 + first_md5 = generate_md5_js_style(combined_str) + + # 最终签名计算 + final_sign = generate_final_sign(first_md5, request_params) + + return { + "Lb-Timestamp": timestamp, + "Lb-Sign": final_sign + } + + +def get_price_data(): + """根据curl请求发送HTTP请求获取价格数据""" + url = 'https://gw.7881.com/goods-service-api/api/goods/list' + + # 请求数据(与curl原始请求完全一致) + data = '{"carrierId":"","extendAttrList":[{"eid":"398615","evs":["联盟"],"selectOption":2,"minCnt":1}],"topExtendAttrList":[],"gameId":"G6211","goodsSortType":1,"groupId":"G6211P001","gtid":"100001","gtId":"100001","maxPrice":"","minPrice":"","pageNum":1,"pageSize":30,"serverId":"G6211P001005","tradePlace":"","tradeType":"","instock":false,"chiledPropertiess":"","order":"","platformId":"","rentalByHourEnd":"","rentalByHourStart":"","optionsSize":0,"curPageData":[],"accountSelfCheck":"","serviceId":"","canBuyCnt":"","publishTitleTemplate":"1","supportReport":"","screenshotService":""}' + + try: + # 将data字符串转换为json对象 + data_obj = json.loads(data) + + # 生成当前时间戳 + timestamp = int(time.time() * 1000) + + # 生成Lb-Sign + # 从原始代码中获取固定的l字符串 + l_string = "5c2c5" + "38a3937c6" + "db2d04b" + "ce3d03b" + "be88bl" + sign_data = complete_generate_lb_sign(l_string, timestamp, data_obj) + + # 请求头 + headers = { + 'Accept': '*/*', + 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,zh-TW;q=0.5', + 'Connection': 'keep-alive', + 'Content-Type': 'application/json', + 'Lb-Sign': sign_data["Lb-Sign"], + 'Lb-Timestamp': str(sign_data["Lb-Timestamp"]), + 'Origin': 'https://h5.7881.com', + 'Referer': 'https://h5.7881.com/', + 'Sec-Fetch-Dest': 'empty', + 'Sec-Fetch-Mode': 'cors', + 'Sec-Fetch-Site': 'same-site', + 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.5 Mobile/15E148 Safari/604.1 Edg/143.0.0.0', + 'channelSource': '17', + 'cid': 'undefined', + 'pay-version': '3.0', + 'platform': '7881', + 'userPosition': 'JTdCJTIyZGV2aWNlSWQlMjI6JTIyJTIyLCUyMmRldmljZUlkMSUyMjolMjIlMjIsJTIyZGV2aWNlTmFtZSUyMjolMjIlMjIlN0Q=' + } + + # Cookie(从curl中复制,确保完全一致) + cookies = { + 'home_page_yxb': '%7B%22gameid%22%3A%22G10%22%2C%22gamename%22%3A%22%E5%9C%B0%E4%B8%8B%E5%9F%8E%E4%B8%8E%E5%8B%87%E5%A3%AB%22%2C%22groupid%22%3A%22G10P001%22%2C%22serverid%22%3A%22G10P001001%22%2C%22groupserver%22%3A%22%E5%B9%BF%E4%B8%9C%E5%8C%BA%2F%E5%B9%BF%E4%B8%9C1%E5%8C%BA%22%2C%22tradeplace%22%3A%22%22%2C%22tradeplacename%22%3A%22%E4%BA%A4%E6%98%93%E6%96%B9%E5%BC%8F%22%2C%22camp%22%3A%22%22%2C%22refresh%22%3A%221%22%7D', + 'AUTHTICKETCHECKVALUE': 'W9Dv9bfHb6N98u25PmnFEpiMLSEW4vAdIWM5sOdJAR32SdhdBoVeWfxs9lVPYKQhRj0RDmE9Wm%2B8%0AC%2Ft3q8M32UkQ9BtVJWOp%2F23zPePwJNJa1Jwn9uYPoDmmGJz7%2BA9lHEcAKXnQ3Z%2FPIlIgJSTQCMdj%0Ab%2BXM9p9Q', + 'Authorization': 'Bearer eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ1XzE3NDUzOTQwNjMyOTRfOWM1MzE0IiwidXNlcklkIjoxMzE5MDkzMTQsIm5pY2tOYW1lIjoidV8xNzQ1Mzk0MDYzMjk0XzljNTMxNCIsInBob25lIjoiMTg2NzI3ODkzMjkiLCJwd2RVcGRhdGVUaW1lIjoiMjAyNS0wNS0xMyAxMzo1MDozOCIsImxhc3RMb2dpblRpbWUiOiIyMDI1LTEwLTIyIDIxOjUwOjA3IiwicmVtZW1iZXJNZSI6ZmFsc2UsInN5c3RlbVZlcnNpb24iOiIxIiwicGxhdGZvcm0iOiI3ODgxIiwiY2hhbm5lbFNvdXJjZSI6IjEyIiwiZGV2aWNlSWQiOiIxMDIxYWZlMmNjN2Y0NjYyODdmNjZkZmVlMzAwNWMyZiIsImxvZ2luU291cmNlIjoiUEhPTkUiLCJhdXRoIjoiUk9MRV9VU0VSIiwiZXhwIjoxNzY2MzI1MDA3fQ.LlujJ63rGJUUD6fO2jX4hg7CHtbSFajjCVnsI0wOya0', + 'didStr': '1021afe2cc7f466287f66dfee3005c2f', + 'SERVERID': '216', + 'Hm_lvt_6fb35abaf76325a4316e33e23c984e73': '1763837083,1764525733,1765469967,1765643951', + 'PUBLISH_AGREEMENT': '1765643995584', + 'home_page_search': '%7B%22gameid%22%3A%22G6211%22%2C%22goodschannel%22%3A%22100001%22%2C%22groupid%22%3A%22G6211P001%22%2C%22serverid%22%3A%22G6211P001005%22%2C%22camp%22%3Anull%7D' + } + + # 直接使用原始的data字符串作为请求体,与curl保持一致 + response = requests.post(url, headers=headers, cookies=cookies, data=data, timeout=10) + response.raise_for_status() # 检查请求是否成功 + return response.json() + except Exception as e: + logger.error(f"获取价格数据失败: {e}") + return None + + +def extract_price_of_unit(data): + """提取结果集中的priceOfUnitForShow字段""" + if not data: + return [] + + # 打印数据结构,帮助理解 + # logger.info(f"解析数据结构: {json.dumps(data, indent=2)[:500]}...") + + # 尝试不同的数据结构路径 + price_list = [] + + # 路径1: 检查body.results路径(根据curl响应结构) + if isinstance(data, dict): + # 检查是否有body字段 + if 'body' in data: + body = data['body'] + # 检查是否有results字段 + if 'results' in body: + results_list = body['results'] + + for item in results_list: + if 'priceOfUnitForShow' in item: + price = item['priceOfUnitForShow'] + price_list.append(price) + + # 路径2: 检查body.goodsList路径 + if not price_list and isinstance(data, dict) and 'body' in data: + body = data['body'] + if 'goodsList' in body: + goods_list = body['goodsList'] + logger.info(f"找到goodsList,包含 {len(goods_list)} 个商品") + + for item in goods_list: + if 'priceOfUnitForShow' in item: + price = item['priceOfUnitForShow'] + price_list.append(price) + logger.info(f"找到priceOfUnitForShow: {price}") + + # 路径3: 递归查找所有可能的priceOfUnitForShow字段 + if not price_list: + # 递归查找函数 + def find_prices(obj, path=""): + if isinstance(obj, dict): + # 检查当前字典是否包含priceOfUnitForShow + if 'priceOfUnitForShow' in obj: + price = obj['priceOfUnitForShow'] + price_list.append(price) + logger.info(f"在路径 {path} 中找到priceOfUnitForShow: {price}") + + # 递归遍历字典的所有值 + for key, value in obj.items(): + new_path = f"{path}.{key}" if path else key + find_prices(value, new_path) + elif isinstance(obj, list): + # 遍历列表中的每个元素 + for i, item in enumerate(obj): + new_path = f"{path}[{i}]" if path else f"[{i}]" + find_prices(item, new_path) + + # 开始递归查找 + find_prices(data) + + # 按价格排序(假设价格是数字字符串) + try: + price_list.sort(key=lambda x: float(x) if x else 0) + except Exception as e: + logger.error(f"排序价格失败: {e}") + + # 返回前三个价格 + return price_list[:5] + + +def send_wechat_message(prices): + """通过企业微信机器人发送消息""" + if not prices: + logger.info("没有价格数据可发送") + return + + # 企业微信机器人Webhook + webhook_url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=4226c76e-725b-4990-b926-05f16142e513" + + # 构建消息内容 + message = f"7881铁血4联盟金价查询结果(前5个):\n" + for i, price in enumerate(prices, 1): + message += f"{i}. {price}\n" + + # 消息体 + data = { + "msgtype": "text", + "text": { + "content": message + } + } + + try: + import requests + response = requests.post(webhook_url, json=data, timeout=10) + response.raise_for_status() + logger.info("企业微信消息发送成功") + except Exception as e: + logger.error(f"发送企业微信消息失败: {e}") + + +def check_prices(): + """执行价格查询任务""" + logger.info("开始执行价格查询任务") + + # 获取价格数据 + data = get_price_data() + + # 提取价格 + prices = extract_price_of_unit(data) + + if prices: + logger.info(f"获取到价格数据: {prices}") + # 发送企业微信消息 + send_wechat_message(prices) + else: + logger.info("未获取到价格数据") + + logger.info("价格查询任务完成") + + +def main(): + """主函数""" + check_prices() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/firmware_checker/utils/logger.py b/firmware_checker/utils/logger.py new file mode 100644 index 0000000..71bf72f --- /dev/null +++ b/firmware_checker/utils/logger.py @@ -0,0 +1,20 @@ +"""日志工具模块""" +import logging +import sys + +def setup_logger(): + """配置日志系统""" + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(message)s', + handlers=[ + logging.StreamHandler(sys.stdout) + ] + ) + logger = logging.getLogger(__name__) + + # 确保日志输出不缓冲 + for handler in logging.getLogger().handlers: + handler.flush = sys.stdout.flush + + return logger \ No newline at end of file diff --git a/firmware_versions.txt b/firmware_versions.txt index e942385..7f37979 100644 --- a/firmware_versions.txt +++ b/firmware_versions.txt @@ -1,2 +1,3 @@ 26.1 -26.0 \ No newline at end of file +26.026.3 +26.2 diff --git a/main.py b/main.py new file mode 100644 index 0000000..1a76f01 --- /dev/null +++ b/main.py @@ -0,0 +1,66 @@ +"""统一任务执行入口""" +import os +import time +import sys +from firmware_checker.tasks.firmware_task import check_versions as check_firmware_versions +from firmware_checker.tasks.price_task import check_prices as check_7881_prices +from firmware_checker.utils.logger import setup_logger + +logger = setup_logger() + +def run_task1(): + """执行第一个任务:固件检查""" + logger.info("开始执行任务1:固件版本检查") + check_firmware_versions() + logger.info("任务1执行完成\n") + +def run_task2(): + """执行第二个任务:7881价格查询""" + logger.info("开始执行任务2:7881价格查询") + check_7881_prices() + logger.info("任务2执行完成\n") + +def run_all_tasks(): + """执行所有任务""" + logger.info("开始执行任务序列") + logger.info("=" * 50) + + # 执行任务1 + run_task1() + + # 执行任务2 + run_task2() + + logger.info("=" * 50) + logger.info("所有任务执行完成") + +def main(): + """主函数,支持定时执行任务序列""" + # 从环境变量读取执行间隔,默认30分钟 + interval_env = os.environ.get('CHECK_INTERVAL_MINUTES') + try: + interval_minutes = int(interval_env) if interval_env else 30 + except ValueError: + logger.warning(f"检查间隔参数无效,使用默认值30分钟") + interval_minutes = 30 + + interval_seconds = interval_minutes * 60 + logger.info(f"任务执行服务启动,间隔: {interval_minutes}分钟,当前时间: {time.strftime('%Y-%m-%d %H:%M:%S')}") + + # 首次立即执行一次 + run_all_tasks() + + # 然后定时执行 + try: + while True: + logger.info(f"等待下次执行,{interval_minutes}分钟后...") + time.sleep(interval_seconds) + run_all_tasks() + except KeyboardInterrupt: + logger.info("服务已停止") + except Exception as e: + logger.error(f"服务运行出错: {e}") + sys.exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file