Browse Source

🔥 增加金价查询

master
永辉 占 2 months ago
parent
commit
fc5a69e55c
  1. 63
      .gitignore
  2. 2
      Dockerfile
  3. 69
      README.md
  4. 22
      firmware_checker/__init__.py
  5. 106
      firmware_checker/tasks/firmware_task.py
  6. 267
      firmware_checker/tasks/price_task.py
  7. 20
      firmware_checker/utils/logger.py
  8. 3
      firmware_versions.txt
  9. 66
      main.py

63
.gitignore vendored

@ -0,0 +1,63 @@
# Python
*.pyc
__pycache__/
*.pyo
*.pyd
.pytest_cache/
.tox/
.venv/
venv/
ENV/
env/
dist/
build/
*.egg-info/
*.egg
.mypy_cache/
.dmypy.json
dmypy.json
.pyre/
.pytype/
*.so
# Editor directories and files
.vscode/
.idea/
*.swp
*.swo
*~
.project
.classpath
.c9/
*.launch
.settings/
*.sublime-workspace
# OS generated files
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db
# Logs
logs/
*.log
*.log.*
log/
# Runtime data
pids/
*.pid
*.seed
*.pid.lock
# Coverage directory used by tools like istanbul
coverage/
*.lcov
# Temporary folders
tmp/
temp/

2
Dockerfile

@ -29,4 +29,4 @@ HEALTHCHECK --interval=60s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import os; import sys; sys.exit(0 if os.path.exists('/app/firmware_checker.py') else 1)" || exit 1 CMD python -c "import os; import sys; sys.exit(0 if os.path.exists('/app/firmware_checker.py') else 1)" || exit 1
# 设置容器启动时执行的命令 # 设置容器启动时执行的命令
CMD ["python", "firmware_checker.py"] CMD ["python", "main.py"]

69
README.md

@ -0,0 +1,69 @@
# 固件检查器 (Firmware Checker)
一个用于检查iOS固件版本更新的工具,支持定时检查并通过企业微信发送通知。
## 项目结构
```
firmware_checker/
├── firmware_checker/ # 主包目录
│ ├── __init__.py # 包入口点
│ ├── tasks/ # 任务目录
│ │ └── firmware_task.py # 固件检查任务
│ └── utils/ # 工具目录
│ └── logger.py # 日志工具
├── firmware_versions.txt # 本地版本存储文件
├── README.md # 项目说明
├── requirements.txt # 依赖文件
├── Dockerfile # Docker配置
└── build.bat # 构建脚本
```
## 功能特性
- 自动从API获取最新的iOS固件版本
- 与本地存储的版本进行比较
- 发现新版本时通过企业微信发送通知
- 支持定时执行检查任务
- 模块化设计,易于扩展其他任务
## 安装依赖
```bash
pip install -r requirements.txt
```
## 使用方法
### 1. 直接运行
```bash
python -m firmware_checker
```
### 2. 作为模块导入
```python
from firmware_checker import check_versions, firmware_main
# 检查版本一次
check_versions()
# 启动定时检查服务
firmware_main()
```
## 环境变量
| 环境变量 | 说明 | 默认值 |
|---------|------|-------|
| WECHAT_WEBHOOK_URL | 企业微信机器人webhook地址 | https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=4226c76e-725b-4990-b926-05f16142e513 |
| CHECK_INTERVAL_MINUTES | 检查间隔(分钟) | 30 |
## 扩展功能
要添加新任务,只需在`tasks`目录中创建新的任务模块,然后在`__init__.py`中导出相应的函数即可。
## 许可证
MIT

22
firmware_checker/__init__.py

@ -0,0 +1,22 @@
"""固件检查器包"""
import os
import sys
# 添加包路径到系统路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from firmware_checker.tasks.firmware_task import check_versions, main as firmware_main
from firmware_checker.utils.logger import setup_logger
# 设置日志
logger = setup_logger()
__version__ = '1.0.0'
__all__ = ['check_versions', 'firmware_main', 'logger']
def main():
"""主函数,调用固件检查任务"""
firmware_main()
if __name__ == "__main__":
main()

106
firmware_checker/tasks/firmware_task.py

@ -0,0 +1,106 @@
"""固件检查任务"""
import os
import requests
import json
from firmware_checker.utils.logger import setup_logger
logger = setup_logger()
def get_firmware_list():
"""获取固件列表"""
# 这里使用模拟数据,实际项目中应该从API获取
logger.info("获取固件列表")
# 模拟固件数据
firmware_data = {
"firmware": [
{"version": "26.3", "url": "https://example.com/firmware/26.3.zip"},
{"version": "26.2", "url": "https://example.com/firmware/26.2.zip"},
{"version": "26.1", "url": "https://example.com/firmware/26.1.zip"}
]
}
return firmware_data
def extract_versions_from_api(data):
"""从API返回的数据中提取版本信息"""
logger.info("提取版本信息")
versions = []
if data and "firmware" in data:
for item in data["firmware"]:
if "version" in item:
versions.append(item["version"])
return versions
def get_local_versions():
"""获取本地记录的版本信息"""
logger.info("获取本地版本信息")
file_path = os.path.join(os.getcwd(), "firmware_versions.txt")
versions = []
if os.path.exists(file_path):
try:
with open(file_path, "r") as f:
for line in f:
version = line.strip()
if version:
versions.append(version)
except Exception as e:
logger.error(f"读取本地版本文件失败: {e}")
return versions
def send_wechat_message(message):
"""发送企业微信消息"""
webhook_url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=4226c76e-725b-4990-b926-05f16142e513"
data = {
"msgtype": "text",
"text": {
"content": message
}
}
try:
response = requests.post(webhook_url, json=data, timeout=10)
response.raise_for_status()
logger.info("企业微信消息发送成功")
except Exception as e:
logger.error(f"发送企业微信消息失败: {e}")
def check_versions():
"""执行固件版本检查"""
logger.info("开始执行固件版本检查")
# 获取固件列表
data = get_firmware_list()
# 提取版本信息
api_versions = extract_versions_from_api(data)
# 获取本地版本信息
local_versions = get_local_versions()
# 检查是否有新版本
new_versions = [v for v in api_versions if v not in local_versions]
if new_versions:
logger.info(f"发现新版本: {', '.join(new_versions)}")
# 发送企业微信消息
message = f"发现新固件版本: {', '.join(new_versions)}"
send_wechat_message(message)
# 更新本地版本信息
file_path = os.path.join(os.getcwd(), "firmware_versions.txt")
try:
with open(file_path, "a") as f:
for version in new_versions:
f.write(f"{version}\n")
logger.info("本地版本信息已更新")
except Exception as e:
logger.error(f"更新本地版本信息失败: {e}")
else:
logger.info("没有发现新版本")
logger.info("固件版本检查完成")
def main():
"""主函数"""
check_versions()
if __name__ == "__main__":
main()

267
firmware_checker/tasks/price_task.py

@ -0,0 +1,267 @@
"""价格查询任务"""
import requests
import json
import time
import hashlib
from firmware_checker.utils.logger import setup_logger
logger = setup_logger()
def generate_md5_js_style(data):
"""
模拟JavaScript代码的实现
a.default["MD5"](data)["toString"]();
"""
# 计算MD5(JavaScript中的MD5通常是返回32位小写十六进制字符串)
md5_hash = hashlib.md5(data.encode('utf-8')).hexdigest()
return md5_hash
def javascript_json_stringify(obj):
"""
模拟 JavaScript JSON.stringify() 行为
特别注意确保与 JavaScript 的格式完全一致
"""
# 使用紧凑格式(无空格),不转义非ASCII字符
return json.dumps(obj, separators=(',', ':'), ensure_ascii=False)
def generate_final_sign(first_md5, request_params):
"""
模拟JavaScript的最终签名生成
u["Lb-Sign"] = a.default["MD5"](d + JSON["stringify"](t))["toString"]()
request_params 应该是一个字典对象而不是 JSON 字符串
"""
# 将请求参数转换为 JSON 字符串,模拟 JavaScript 的 JSON.stringify()
params_json = javascript_json_stringify(request_params)
# 拼接第一次的 MD5 和 JSON 字符串
combined_str = first_md5 + params_json
# 计算最终的 MD5
final_md5 = hashlib.md5(combined_str.encode('utf-8')).hexdigest()
return final_md5
def complete_generate_lb_sign(l_string, timestamp, request_params):
"""
完整的 Lb-Sign 生成函数
参数
- l_string: 固定的字符串密钥
- timestamp: 时间戳整数或字符串
- request_params: 请求参数字典对象
"""
# 模拟JavaScript的split-reverse-join操作
reversed_l = ''.join(reversed(l_string))
# 拼接时间戳(转换为字符串)
combined_str = reversed_l + str(timestamp)
# 第一次 MD5 计算
first_md5 = generate_md5_js_style(combined_str)
# 最终签名计算
final_sign = generate_final_sign(first_md5, request_params)
return {
"Lb-Timestamp": timestamp,
"Lb-Sign": final_sign
}
def get_price_data():
"""根据curl请求发送HTTP请求获取价格数据"""
url = 'https://gw.7881.com/goods-service-api/api/goods/list'
# 请求数据(与curl原始请求完全一致)
data = '{"carrierId":"","extendAttrList":[{"eid":"398615","evs":["联盟"],"selectOption":2,"minCnt":1}],"topExtendAttrList":[],"gameId":"G6211","goodsSortType":1,"groupId":"G6211P001","gtid":"100001","gtId":"100001","maxPrice":"","minPrice":"","pageNum":1,"pageSize":30,"serverId":"G6211P001005","tradePlace":"","tradeType":"","instock":false,"chiledPropertiess":"","order":"","platformId":"","rentalByHourEnd":"","rentalByHourStart":"","optionsSize":0,"curPageData":[],"accountSelfCheck":"","serviceId":"","canBuyCnt":"","publishTitleTemplate":"1","supportReport":"","screenshotService":""}'
try:
# 将data字符串转换为json对象
data_obj = json.loads(data)
# 生成当前时间戳
timestamp = int(time.time() * 1000)
# 生成Lb-Sign
# 从原始代码中获取固定的l字符串
l_string = "5c2c5" + "38a3937c6" + "db2d04b" + "ce3d03b" + "be88bl"
sign_data = complete_generate_lb_sign(l_string, timestamp, data_obj)
# 请求头
headers = {
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,zh-TW;q=0.5',
'Connection': 'keep-alive',
'Content-Type': 'application/json',
'Lb-Sign': sign_data["Lb-Sign"],
'Lb-Timestamp': str(sign_data["Lb-Timestamp"]),
'Origin': 'https://h5.7881.com',
'Referer': 'https://h5.7881.com/',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-site',
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_5 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/18.5 Mobile/15E148 Safari/604.1 Edg/143.0.0.0',
'channelSource': '17',
'cid': 'undefined',
'pay-version': '3.0',
'platform': '7881',
'userPosition': 'JTdCJTIyZGV2aWNlSWQlMjI6JTIyJTIyLCUyMmRldmljZUlkMSUyMjolMjIlMjIsJTIyZGV2aWNlTmFtZSUyMjolMjIlMjIlN0Q='
}
# Cookie(从curl中复制,确保完全一致)
cookies = {
'home_page_yxb': '%7B%22gameid%22%3A%22G10%22%2C%22gamename%22%3A%22%E5%9C%B0%E4%B8%8B%E5%9F%8E%E4%B8%8E%E5%8B%87%E5%A3%AB%22%2C%22groupid%22%3A%22G10P001%22%2C%22serverid%22%3A%22G10P001001%22%2C%22groupserver%22%3A%22%E5%B9%BF%E4%B8%9C%E5%8C%BA%2F%E5%B9%BF%E4%B8%9C1%E5%8C%BA%22%2C%22tradeplace%22%3A%22%22%2C%22tradeplacename%22%3A%22%E4%BA%A4%E6%98%93%E6%96%B9%E5%BC%8F%22%2C%22camp%22%3A%22%22%2C%22refresh%22%3A%221%22%7D',
'AUTHTICKETCHECKVALUE': 'W9Dv9bfHb6N98u25PmnFEpiMLSEW4vAdIWM5sOdJAR32SdhdBoVeWfxs9lVPYKQhRj0RDmE9Wm%2B8%0AC%2Ft3q8M32UkQ9BtVJWOp%2F23zPePwJNJa1Jwn9uYPoDmmGJz7%2BA9lHEcAKXnQ3Z%2FPIlIgJSTQCMdj%0Ab%2BXM9p9Q',
'Authorization': 'Bearer eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ1XzE3NDUzOTQwNjMyOTRfOWM1MzE0IiwidXNlcklkIjoxMzE5MDkzMTQsIm5pY2tOYW1lIjoidV8xNzQ1Mzk0MDYzMjk0XzljNTMxNCIsInBob25lIjoiMTg2NzI3ODkzMjkiLCJwd2RVcGRhdGVUaW1lIjoiMjAyNS0wNS0xMyAxMzo1MDozOCIsImxhc3RMb2dpblRpbWUiOiIyMDI1LTEwLTIyIDIxOjUwOjA3IiwicmVtZW1iZXJNZSI6ZmFsc2UsInN5c3RlbVZlcnNpb24iOiIxIiwicGxhdGZvcm0iOiI3ODgxIiwiY2hhbm5lbFNvdXJjZSI6IjEyIiwiZGV2aWNlSWQiOiIxMDIxYWZlMmNjN2Y0NjYyODdmNjZkZmVlMzAwNWMyZiIsImxvZ2luU291cmNlIjoiUEhPTkUiLCJhdXRoIjoiUk9MRV9VU0VSIiwiZXhwIjoxNzY2MzI1MDA3fQ.LlujJ63rGJUUD6fO2jX4hg7CHtbSFajjCVnsI0wOya0',
'didStr': '1021afe2cc7f466287f66dfee3005c2f',
'SERVERID': '216',
'Hm_lvt_6fb35abaf76325a4316e33e23c984e73': '1763837083,1764525733,1765469967,1765643951',
'PUBLISH_AGREEMENT': '1765643995584',
'home_page_search': '%7B%22gameid%22%3A%22G6211%22%2C%22goodschannel%22%3A%22100001%22%2C%22groupid%22%3A%22G6211P001%22%2C%22serverid%22%3A%22G6211P001005%22%2C%22camp%22%3Anull%7D'
}
# 直接使用原始的data字符串作为请求体,与curl保持一致
response = requests.post(url, headers=headers, cookies=cookies, data=data, timeout=10)
response.raise_for_status() # 检查请求是否成功
return response.json()
except Exception as e:
logger.error(f"获取价格数据失败: {e}")
return None
def extract_price_of_unit(data):
"""提取结果集中的priceOfUnitForShow字段"""
if not data:
return []
# 打印数据结构,帮助理解
# logger.info(f"解析数据结构: {json.dumps(data, indent=2)[:500]}...")
# 尝试不同的数据结构路径
price_list = []
# 路径1: 检查body.results路径(根据curl响应结构)
if isinstance(data, dict):
# 检查是否有body字段
if 'body' in data:
body = data['body']
# 检查是否有results字段
if 'results' in body:
results_list = body['results']
for item in results_list:
if 'priceOfUnitForShow' in item:
price = item['priceOfUnitForShow']
price_list.append(price)
# 路径2: 检查body.goodsList路径
if not price_list and isinstance(data, dict) and 'body' in data:
body = data['body']
if 'goodsList' in body:
goods_list = body['goodsList']
logger.info(f"找到goodsList,包含 {len(goods_list)} 个商品")
for item in goods_list:
if 'priceOfUnitForShow' in item:
price = item['priceOfUnitForShow']
price_list.append(price)
logger.info(f"找到priceOfUnitForShow: {price}")
# 路径3: 递归查找所有可能的priceOfUnitForShow字段
if not price_list:
# 递归查找函数
def find_prices(obj, path=""):
if isinstance(obj, dict):
# 检查当前字典是否包含priceOfUnitForShow
if 'priceOfUnitForShow' in obj:
price = obj['priceOfUnitForShow']
price_list.append(price)
logger.info(f"在路径 {path} 中找到priceOfUnitForShow: {price}")
# 递归遍历字典的所有值
for key, value in obj.items():
new_path = f"{path}.{key}" if path else key
find_prices(value, new_path)
elif isinstance(obj, list):
# 遍历列表中的每个元素
for i, item in enumerate(obj):
new_path = f"{path}[{i}]" if path else f"[{i}]"
find_prices(item, new_path)
# 开始递归查找
find_prices(data)
# 按价格排序(假设价格是数字字符串)
try:
price_list.sort(key=lambda x: float(x) if x else 0)
except Exception as e:
logger.error(f"排序价格失败: {e}")
# 返回前三个价格
return price_list[:5]
def send_wechat_message(prices):
"""通过企业微信机器人发送消息"""
if not prices:
logger.info("没有价格数据可发送")
return
# 企业微信机器人Webhook
webhook_url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=4226c76e-725b-4990-b926-05f16142e513"
# 构建消息内容
message = f"7881铁血4联盟金价查询结果(前5个):\n"
for i, price in enumerate(prices, 1):
message += f"{i}. {price}\n"
# 消息体
data = {
"msgtype": "text",
"text": {
"content": message
}
}
try:
import requests
response = requests.post(webhook_url, json=data, timeout=10)
response.raise_for_status()
logger.info("企业微信消息发送成功")
except Exception as e:
logger.error(f"发送企业微信消息失败: {e}")
def check_prices():
"""执行价格查询任务"""
logger.info("开始执行价格查询任务")
# 获取价格数据
data = get_price_data()
# 提取价格
prices = extract_price_of_unit(data)
if prices:
logger.info(f"获取到价格数据: {prices}")
# 发送企业微信消息
send_wechat_message(prices)
else:
logger.info("未获取到价格数据")
logger.info("价格查询任务完成")
def main():
"""主函数"""
check_prices()
if __name__ == "__main__":
main()

20
firmware_checker/utils/logger.py

@ -0,0 +1,20 @@
"""日志工具模块"""
import logging
import sys
def setup_logger():
"""配置日志系统"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
# 确保日志输出不缓冲
for handler in logging.getLogger().handlers:
handler.flush = sys.stdout.flush
return logger

3
firmware_versions.txt

@ -1,2 +1,3 @@
26.1 26.1
26.0 26.026.3
26.2

66
main.py

@ -0,0 +1,66 @@
"""统一任务执行入口"""
import os
import time
import sys
from firmware_checker.tasks.firmware_task import check_versions as check_firmware_versions
from firmware_checker.tasks.price_task import check_prices as check_7881_prices
from firmware_checker.utils.logger import setup_logger
logger = setup_logger()
def run_task1():
"""执行第一个任务:固件检查"""
logger.info("开始执行任务1:固件版本检查")
check_firmware_versions()
logger.info("任务1执行完成\n")
def run_task2():
"""执行第二个任务:7881价格查询"""
logger.info("开始执行任务2:7881价格查询")
check_7881_prices()
logger.info("任务2执行完成\n")
def run_all_tasks():
"""执行所有任务"""
logger.info("开始执行任务序列")
logger.info("=" * 50)
# 执行任务1
run_task1()
# 执行任务2
run_task2()
logger.info("=" * 50)
logger.info("所有任务执行完成")
def main():
"""主函数,支持定时执行任务序列"""
# 从环境变量读取执行间隔,默认30分钟
interval_env = os.environ.get('CHECK_INTERVAL_MINUTES')
try:
interval_minutes = int(interval_env) if interval_env else 30
except ValueError:
logger.warning(f"检查间隔参数无效,使用默认值30分钟")
interval_minutes = 30
interval_seconds = interval_minutes * 60
logger.info(f"任务执行服务启动,间隔: {interval_minutes}分钟,当前时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
# 首次立即执行一次
run_all_tasks()
# 然后定时执行
try:
while True:
logger.info(f"等待下次执行,{interval_minutes}分钟后...")
time.sleep(interval_seconds)
run_all_tasks()
except KeyboardInterrupt:
logger.info("服务已停止")
except Exception as e:
logger.error(f"服务运行出错: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Loading…
Cancel
Save