前言

最近一个盆友提了一个需求,想要通过币安公开接口实时关注指定币种类价格,然后达到指定阈值后通过飞书发送消息。既然是盆友那忙肯定得帮。

具体步骤

币安接口分析

https://developers.binance.com/docs/zh-CN/binance-spot-api-docs/web-socket-streams,这是api文档。有其他需求的朋友可以根据文档来对接指定api。

image-20250425222248305.png

因为高频要求,本次直接使用WebSocket调用,不使用rest api。

关于python WebSocket库的使用可以参考这篇文章,https://blog.csdn.net/liaoqingjian/article/details/146908212

核心功能分析

  1. 高频数据采集

    • 通过币安REST API获取所有现货交易对信息,这样避免手动更新币种
    • 通过WebSocket实时订阅交易对的ticker数据
    • 按批次订阅,每批100个交易对,避免单个连接订阅过多
  2. 价格监控

    • 记录每个交易对最近1分钟的价格历史
    • 计算1分钟内的价格涨跌幅
    • 当涨跌幅超过设定阈值(3%)时触发警报
  3. 警报机制

    • 通过飞书Webhook发送警报消息
    • 实现冷却期机制,避免同一交易对短时间内重复发送警报
    • 警报包含交易对、涨跌幅、价格变化、成交量等信息

程序源码

import json
import asyncio
import websockets
import requests
from datetime import datetime
import time


class BinanceSpotMonitor:
    def __init__(self, feishu_webhook_url):
        self.ws_url = "wss://stream.binance.com:9443/ws"
        self.rest_api_url = "https://api.binance.com/api"
        self.feishu_webhook_url = feishu_webhook_url
        self.market_data = {}
        self.batch_size = 100  # 每批订阅的交易对数量
        self.price_history = {}  # 存储价格历史
        self.alert_threshold = 3.0  # 涨跌幅阈值,超过此值将触发推送
        self.alert_cooldown = {}  # 用于控制推送频率,避免同一币种短时间内多次推送
        self.cooldown_period = 60  # 冷却时间(秒),同一币种在此时间内只推送一次

    async def get_spot_symbols(self):
        """获取所有现货交易对,仅保留USDT交易对"""
        try:
            response = requests.get(f"{self.rest_api_url}/v3/exchangeInfo")
            response.raise_for_status()
            symbols = []
            for symbol in response.json()["symbols"]:
                # 只保留以USDT结尾的交易对
                if (symbol["status"] == "TRADING" and
                    symbol["isSpotTradingAllowed"] and
                    symbol["quoteAsset"] == "USDT"):
                    symbols.append({
                        "symbol": symbol["symbol"],
                        "baseAsset": symbol["baseAsset"],
                        "quoteAsset": symbol["quoteAsset"]
                    })
            return symbols
        except Exception as e:
            print(f"获取交易对列表失败: {e}")
            return []

    async def connect_and_subscribe_batch(self, symbols_batch):
        """连接到币安WebSocket并订阅一批市场的ticker数据"""
        try:
            async with websockets.connect(self.ws_url) as websocket:
                # 创建订阅请求
                subscribe_request = {
                    "method": "SUBSCRIBE",
                    "params": [f"{symbol['symbol'].lower()}@ticker" for symbol in symbols_batch],
                    "id": 1
                }

                # 发送订阅请求
                await websocket.send(json.dumps(subscribe_request))

                # 接收订阅确认
                response = await websocket.recv()
                print(f"订阅确认批次 ({len(symbols_batch)}个交易对): {response}")

                # 持续接收数据
                while True:
                    try:
                        message = await websocket.recv()
                        await self.process_message(message, symbols_batch)

                    except websockets.exceptions.ConnectionClosedError as e:
                        print(f"连接关闭: {e}")
                        # 尝试重新连接
                        await asyncio.sleep(5)
                        break
                    except Exception as e:
                        print(f"处理消息时出错: {e}")
        except Exception as e:
            print(f"WebSocket连接错误: {e}")
            # 尝试重新连接
            await asyncio.sleep(5)
            asyncio.create_task(self.connect_and_subscribe_batch(symbols_batch))

    async def process_message(self, message, symbols_batch):
        """处理接收到的WebSocket消息"""
        data = json.loads(message)

        # 确保是ticker数据
        if 's' in data:  # 's' 是symbol的字段
            symbol = data['s']

            # 查找对应的baseAsset和quoteAsset
            symbol_info = next((s for s in symbols_batch if s["symbol"] == symbol), None)
            if not symbol_info:
                return

            base_asset = symbol_info["baseAsset"]
            quote_asset = symbol_info["quoteAsset"]

            # 确保只处理USDT交易对
            if quote_asset != "USDT":
                return

            # 提取所需数据
            price = float(data['c'])  # 最新价格
            volume_24h = float(data['q'])  # 24小时成交量(报价资产)

            current_time = datetime.now()
            formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S')
            short_time = current_time.strftime('%H:%M:%S')

            # 处理价格历史
            if symbol not in self.price_history:
                self.price_history[symbol] = {
                    "prices": [(price, current_time)],
                    "last_alert": None
                }
            else:
                # 添加当前价格到历史记录
                self.price_history[symbol]["prices"].append((price, current_time))

                # 清理超过1分钟的历史数据
                self.price_history[symbol]["prices"] = [
                    (p, t) for p, t in self.price_history[symbol]["prices"]
                    if (current_time - t).total_seconds() <= 60
                ]

                # 如果有足够的历史数据,计算1分钟涨跌幅
                if len(self.price_history[symbol]["prices"]) >= 2:
                    oldest_price, oldest_time = self.price_history[symbol]["prices"][0]
                    time_diff = (current_time - oldest_time).total_seconds()

                    # 确保有足够的时间间隔(至少30秒)来计算有意义的涨跌幅
                    if time_diff >= 30:
                        price_change_percent = ((price - oldest_price) / oldest_price) * 100

                        # 存储市场数据
                        self.market_data[symbol] = {
                            'symbol': symbol,
                            'base_asset': base_asset,
                            'quote_asset': quote_asset,
                            'price': price,
                            'old_price': oldest_price,
                            'price_change_percent_1min': price_change_percent,
                            'volume_24h': volume_24h,
                            'last_update': formatted_time
                        }

                        # 检查是否需要发送警报
                        await self.check_and_send_alert(symbol, price_change_percent, price, oldest_price, volume_24h,
                                                        short_time)

    async def check_and_send_alert(self, symbol, price_change_percent, current_price, old_price, volume_24h, time_str):
        """检查是否需要发送警报并发送"""
        # 检查涨跌幅是否超过阈值
        if abs(price_change_percent) >= self.alert_threshold:
            # 检查冷却期
            current_time = time.time()
            if symbol in self.alert_cooldown and current_time - self.alert_cooldown[symbol] < self.cooldown_period:
                return  # 在冷却期内,不发送警报

            # 更新冷却时间
            self.alert_cooldown[symbol] = current_time

            # 准备警报消息
            direction = "上涨" if price_change_percent > 0 else "下跌"
            formatted_volume = f"{volume_24h:,.2f}"

            # 构建飞书消息
            current_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            base_asset = self.market_data[symbol]['base_asset']
            quote_asset = self.market_data[symbol]['quote_asset']
            pair_name = f"{base_asset}/{quote_asset}"

            message = f"【test 现货推送】\n{current_date}\n币安:{base_asset}/1m内{direction}/\n{abs(price_change_percent):.2f}%\n价格:{old_price:.8f}/{current_price:.8f}\n成交:5m${formatted_volume}\n时间:{time_str}"

            # 发送到飞书
            await self.send_to_feishu(message)
            print(f"已发送警报: {symbol} 1分钟{direction} {abs(price_change_percent):.2f}%")

    async def send_to_feishu(self, message):
        """发送消息到飞书机器人"""
        try:
            headers = {'Content-Type': 'application/json'}
            payload = {
                "msg_type": "text",
                "content": {
                    "text": message
                }
            }
            response = requests.post(self.feishu_webhook_url, headers=headers, data=json.dumps(payload))
            if response.status_code != 200:
                print(f"发送飞书消息失败: {response.status_code}, {response.text}")
            return response.status_code == 200
        except Exception as e:
            print(f"发送飞书消息出错: {e}")
            return False

    async def start(self):
        """启动市场数据监控"""
        # 获取所有交易对
        symbols = await self.get_spot_symbols()
        print(f"获取到 {len(symbols)} 个USDT交易对")

        # 将交易对分批处理
        for i in range(0, len(symbols), self.batch_size):
            batch = symbols[i:i + self.batch_size]
            asyncio.create_task(self.connect_and_subscribe_batch(batch))
            # 避免同时创建太多连接
            await asyncio.sleep(1)

async def main():
    # 替换为你的飞书机器人webhook URL
    feishu_webhook_url = ""

    monitor = BinanceSpotMonitor(feishu_webhook_url)
    await monitor.start()

    # 保持程序运行
    while True:
        await asyncio.sleep(3600)

if __name__ == "__main__":
    asyncio.run(main())

仅供学习参考,切勿用于非法用途

标签: none