python实现币安监控
前言
最近一个盆友提了一个需求,想要通过币安公开接口实时关注指定币种类价格,然后达到指定阈值后通过飞书发送消息。既然是盆友那忙肯定得帮。
具体步骤
币安接口分析
https://developers.binance.com/docs/zh-CN/binance-spot-api-docs/web-socket-streams,这是api文档。有其他需求的朋友可以根据文档来对接指定api。
因为高频要求,本次直接使用WebSocket调用,不使用rest api。
关于python WebSocket库的使用可以参考这篇文章,https://blog.csdn.net/liaoqingjian/article/details/146908212
核心功能分析
高频数据采集:
- 通过币安REST API获取所有现货交易对信息,这样避免手动更新币种
- 通过WebSocket实时订阅交易对的ticker数据
- 按批次订阅,每批100个交易对,避免单个连接订阅过多
价格监控:
- 记录每个交易对最近1分钟的价格历史
- 计算1分钟内的价格涨跌幅
- 当涨跌幅超过设定阈值(3%)时触发警报
警报机制:
- 通过飞书Webhook发送警报消息
- 实现冷却期机制,避免同一交易对短时间内重复发送警报
- 警报包含交易对、涨跌幅、价格变化、成交量等信息
程序源码
import json
import asyncio
import websockets
import requests
from datetime import datetime
import time
class BinanceSpotMonitor:
def __init__(self, feishu_webhook_url):
self.ws_url = "wss://stream.binance.com:9443/ws"
self.rest_api_url = "https://api.binance.com/api"
self.feishu_webhook_url = feishu_webhook_url
self.market_data = {}
self.batch_size = 100 # 每批订阅的交易对数量
self.price_history = {} # 存储价格历史
self.alert_threshold = 3.0 # 涨跌幅阈值,超过此值将触发推送
self.alert_cooldown = {} # 用于控制推送频率,避免同一币种短时间内多次推送
self.cooldown_period = 60 # 冷却时间(秒),同一币种在此时间内只推送一次
async def get_spot_symbols(self):
"""获取所有现货交易对,仅保留USDT交易对"""
try:
response = requests.get(f"{self.rest_api_url}/v3/exchangeInfo")
response.raise_for_status()
symbols = []
for symbol in response.json()["symbols"]:
# 只保留以USDT结尾的交易对
if (symbol["status"] == "TRADING" and
symbol["isSpotTradingAllowed"] and
symbol["quoteAsset"] == "USDT"):
symbols.append({
"symbol": symbol["symbol"],
"baseAsset": symbol["baseAsset"],
"quoteAsset": symbol["quoteAsset"]
})
return symbols
except Exception as e:
print(f"获取交易对列表失败: {e}")
return []
async def connect_and_subscribe_batch(self, symbols_batch):
"""连接到币安WebSocket并订阅一批市场的ticker数据"""
try:
async with websockets.connect(self.ws_url) as websocket:
# 创建订阅请求
subscribe_request = {
"method": "SUBSCRIBE",
"params": [f"{symbol['symbol'].lower()}@ticker" for symbol in symbols_batch],
"id": 1
}
# 发送订阅请求
await websocket.send(json.dumps(subscribe_request))
# 接收订阅确认
response = await websocket.recv()
print(f"订阅确认批次 ({len(symbols_batch)}个交易对): {response}")
# 持续接收数据
while True:
try:
message = await websocket.recv()
await self.process_message(message, symbols_batch)
except websockets.exceptions.ConnectionClosedError as e:
print(f"连接关闭: {e}")
# 尝试重新连接
await asyncio.sleep(5)
break
except Exception as e:
print(f"处理消息时出错: {e}")
except Exception as e:
print(f"WebSocket连接错误: {e}")
# 尝试重新连接
await asyncio.sleep(5)
asyncio.create_task(self.connect_and_subscribe_batch(symbols_batch))
async def process_message(self, message, symbols_batch):
"""处理接收到的WebSocket消息"""
data = json.loads(message)
# 确保是ticker数据
if 's' in data: # 's' 是symbol的字段
symbol = data['s']
# 查找对应的baseAsset和quoteAsset
symbol_info = next((s for s in symbols_batch if s["symbol"] == symbol), None)
if not symbol_info:
return
base_asset = symbol_info["baseAsset"]
quote_asset = symbol_info["quoteAsset"]
# 确保只处理USDT交易对
if quote_asset != "USDT":
return
# 提取所需数据
price = float(data['c']) # 最新价格
volume_24h = float(data['q']) # 24小时成交量(报价资产)
current_time = datetime.now()
formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S')
short_time = current_time.strftime('%H:%M:%S')
# 处理价格历史
if symbol not in self.price_history:
self.price_history[symbol] = {
"prices": [(price, current_time)],
"last_alert": None
}
else:
# 添加当前价格到历史记录
self.price_history[symbol]["prices"].append((price, current_time))
# 清理超过1分钟的历史数据
self.price_history[symbol]["prices"] = [
(p, t) for p, t in self.price_history[symbol]["prices"]
if (current_time - t).total_seconds() <= 60
]
# 如果有足够的历史数据,计算1分钟涨跌幅
if len(self.price_history[symbol]["prices"]) >= 2:
oldest_price, oldest_time = self.price_history[symbol]["prices"][0]
time_diff = (current_time - oldest_time).total_seconds()
# 确保有足够的时间间隔(至少30秒)来计算有意义的涨跌幅
if time_diff >= 30:
price_change_percent = ((price - oldest_price) / oldest_price) * 100
# 存储市场数据
self.market_data[symbol] = {
'symbol': symbol,
'base_asset': base_asset,
'quote_asset': quote_asset,
'price': price,
'old_price': oldest_price,
'price_change_percent_1min': price_change_percent,
'volume_24h': volume_24h,
'last_update': formatted_time
}
# 检查是否需要发送警报
await self.check_and_send_alert(symbol, price_change_percent, price, oldest_price, volume_24h,
short_time)
async def check_and_send_alert(self, symbol, price_change_percent, current_price, old_price, volume_24h, time_str):
"""检查是否需要发送警报并发送"""
# 检查涨跌幅是否超过阈值
if abs(price_change_percent) >= self.alert_threshold:
# 检查冷却期
current_time = time.time()
if symbol in self.alert_cooldown and current_time - self.alert_cooldown[symbol] < self.cooldown_period:
return # 在冷却期内,不发送警报
# 更新冷却时间
self.alert_cooldown[symbol] = current_time
# 准备警报消息
direction = "上涨" if price_change_percent > 0 else "下跌"
formatted_volume = f"{volume_24h:,.2f}"
# 构建飞书消息
current_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
base_asset = self.market_data[symbol]['base_asset']
quote_asset = self.market_data[symbol]['quote_asset']
pair_name = f"{base_asset}/{quote_asset}"
message = f"【test 现货推送】\n{current_date}\n币安:{base_asset}/1m内{direction}/\n{abs(price_change_percent):.2f}%\n价格:{old_price:.8f}/{current_price:.8f}\n成交:5m${formatted_volume}\n时间:{time_str}"
# 发送到飞书
await self.send_to_feishu(message)
print(f"已发送警报: {symbol} 1分钟{direction} {abs(price_change_percent):.2f}%")
async def send_to_feishu(self, message):
"""发送消息到飞书机器人"""
try:
headers = {'Content-Type': 'application/json'}
payload = {
"msg_type": "text",
"content": {
"text": message
}
}
response = requests.post(self.feishu_webhook_url, headers=headers, data=json.dumps(payload))
if response.status_code != 200:
print(f"发送飞书消息失败: {response.status_code}, {response.text}")
return response.status_code == 200
except Exception as e:
print(f"发送飞书消息出错: {e}")
return False
async def start(self):
"""启动市场数据监控"""
# 获取所有交易对
symbols = await self.get_spot_symbols()
print(f"获取到 {len(symbols)} 个USDT交易对")
# 将交易对分批处理
for i in range(0, len(symbols), self.batch_size):
batch = symbols[i:i + self.batch_size]
asyncio.create_task(self.connect_and_subscribe_batch(batch))
# 避免同时创建太多连接
await asyncio.sleep(1)
async def main():
# 替换为你的飞书机器人webhook URL
feishu_webhook_url = ""
monitor = BinanceSpotMonitor(feishu_webhook_url)
await monitor.start()
# 保持程序运行
while True:
await asyncio.sleep(3600)
if __name__ == "__main__":
asyncio.run(main())
仅供学习参考,切勿用于非法用途