用Python开发数字货币交易机器人(附源码)
发布日期:2025-01-04 15:43 点击次数:141
众所周知,币圈一天,人间一年。我们进行数字货币交易时,在交易所 APP 或者网站 盯盘并手动下单非常耗时,当币价波动非常大的时候往往会错失良机。这时我们可以创建一个简单的 telegram 交易机器人,来帮助我们进行做空和做多交易。 该机器人可以实现以下功能: 做空交易 - 以指定的价格卖出持有货币并在价格下跌时回购 做多交易 - 指定的价格购买货币并在价格上涨时卖出 列出交易订单 显示可用余额 设置 Telegram 机器人 首先需要一个 Telegram 账号,如果没有的话请自己注册一个。然后与BotFather进行对话,通过输入/newbot来新建一个telegram机器人,根据指示一步步创建并记住你的token。 获取交易所的 API keys 查找你的交易所API文档,看看如何获取对订单和账户余额的访问权限和步骤,记住你的密码和API keys。本例中我们以bitfinex为例,Bitmex交易所是目前市面上交易量最大的比特币期货交易所,交易量和交易深度非常大。 安装依赖包 我们这边用的是Python 3.6版本,同时我们还需要利用CCXT框架获取Bitmex交易所数据,CCXT是一个JavaScript / Python / PHP 开发库,用于数字货币的交易,支持众多的比特币/以太币/山寨币交易市场和交易所API。 CCXT库用于连接数字货币交易所并在世界范围内进行交易和支付处理。使用 ccxt可以快速访问数字货币市场数据,可以用于存储、分析、可视化、指标开发、 量化交易、策略回溯测试、交易机器人程序以及相关的软件工程。 然后我们将使用python-telegram-bot与Telegram进行通讯,对聊天消息做出反应并进行交易。 只需要用下面方法安装以上两个依赖包: pip install python-telegram-bot ccxt 我们需要交易机器人实现的基本类功能: 1、获取交易所概况,允许创建订单,列出订单详情并获取余额。这将是以 ccxt 实现的装饰器。 2、交易执行者,因为我们希望自动执行做空和做多交易。 3、即时响应的telegram 机器人。 编写机器人 项目结构如下: main.py\config\core\model\util 我们将从一个简单的模型开始。因为多空交易两者有很多共同点,可以在\ model中创建一个基类TradeDetails: import abcclass TradeDetails(metaclass=abc.ABCMeta):def __init__(self, start_price: float, symbol: str, amount: float, currency:str="USD"):self.start_price= start_priceself.symbol= symbol.upper()self.amount= amountself.currency= currency@propertydef exchange_symbol(self):return f"{self.symbol.upper()}/{self.currency}"@[email protected] exit_price(self):passdef __str__(self) ->str:return f"order for {self.amount} {self.exchange_symbol} with enter price: {self.start_price:.5}, " \f"exit_price: {self.exit_price:.5}" 具体的为: LongTrade from fasttrade.model.trade import TradeDetailsclass LongTrade(TradeDetails):def __init__(self, start_price: float, symbol: str, amount: float, percent_change:float=0.5,currency:str="USD") ->None:super().__init__(start_price, symbol, amount, currency)self.end_price=start_price* (1 + percent_change / 100)@propertydef exit_price(self):return self.end_pricedef __str__(self) ->str:return "Long " + super().__str__() ShortTrade from fasttrade.model.trade import TradeDetailsclass ShortTrade(TradeDetails):def __init__(self, start_price: float, symbol: str, amount: float, percent_change:float=0.5,currency:str="USD") ->None:super().__init__(start_price, symbol, amount, currency)self.end_price=start_price* (1 - percent_change / 100)@propertydef exit_price(self):return self.end_pricedef __str__(self) ->str:return "Short " + super().__str__() 接下来是获取交易所数据: from ccxt import Exchange, OrderNotFoundclass CryptoExchange:def __init__(self, exchange: Exchange):self.exchange= exchangeself.exchange.load_markets()@propertydef free_balance(self):balance=self.exchange.fetch_free_balance()# surprisingly there are balances with 0, so we need to filter these outreturn {k: v for k, v in balance.items() if v>0}def fetch_open_orders(self, symbol:str=None):return self.exchange.fetch_open_orders(symbolsymbol=symbol)def fetch_order(self, order_id: int):return self.exchange.fetch_order(order_id)def cancel_order(self, order_id: int):try:self.exchange.cancel_order(order_id)except OrderNotFound:# treat as successpassdef create_sell_order(self, symbol: str, amount: float, price: float):return self.exchange.create_order(symbolsymbol=symbol,type="limit",side="sell",amountamount=amount,priceprice=price)def create_buy_order(self, symbol: str, amount: float, price: float):return self.exchange.create_order(symbolsymbol=symbol,type="limit",side="buy",amountamount=amount,priceprice=price) 然后,我们将执行交易程序。程序将接受交易所数据和超时情况以检查订单是否完成。当做空时,我们以设定的价格卖出,当价格下降到一定水平时回购。我们使用asyncio协程进行编码,以使等待不会阻塞: import asyncioimport loggingfrom ccxt import ExchangeErrorfrom model.longtrade import LongTradefrom model.shorttrade import ShortTradeclass TradeExecutor:def __init__(self, exchange, check_timeout:int=15):self.check_timeout= check_timeoutself.exchange= exchangeasync def execute_trade(self, trade):if isinstance(trade, ShortTrade):await self.execute_short_trade(trade)elif isinstance(trade, LongTrade):await self.execute_long_trade(trade)async def execute_short_trade(self, trade: ShortTrade):sell_price=trade.start_pricebuy_price=trade.exit_pricesymbol=trade.exchange_symbolamount=trade.amountorder=self.exchange.create_sell_order(symbol, amount, sell_price)logging.info(f'Opened sell order: {amount} of {symbol}. Target sell {sell_price}, buy price {buy_price}')await self._wait_order_complete(order['id'])# post buy orderorder=self.exchange.create_buy_order(symbol, amount, buy_price)await self._wait_order_complete(order['id'])logging.info(f'Completed short trade: {amount} of {symbol}. Sold at {sell_price} and bought at {buy_price}')async def execute_long_trade(self, trade: LongTrade):buy_price=trade.start_pricesell_price=trade.exit_pricesymbol=trade.exchange_symbolamount=trade.amountorder=self.exchange.create_buy_order(symbol, amount, buy_price)logging.info(f'Opened long trade: {amount} of {symbol}. Target buy {buy_price}, sell price {sell_price}')await self._wait_order_complete(order.id)# post sell orderorder=self.exchange.create_sell_order(symbol, amount, sell_price)await self._wait_order_complete(order.id)logging.info(f'Completed long trade: {amount} of {symbol}. Bought at {buy_price} and sold at {sell_price}')async def _wait_order_complete(self, order_id):status='open'while status is 'open':await asyncio.sleep(self.check_timeout)order=self.exchange.fetch_order(order_id)status=order['status']logging.info(f'Finished order {order_id} with {status} status')# do not proceed further if we canceled orderifstatus== 'canceled':raise ExchangeError('Trade has been canceled') ccxt使用REST API进行数据传输。它不如某些交易所支持的WebSockets快,但是对于这个简单的机器人来说,速度或许差别。 async def _wait_order_complete(self, order_id):status='open'order=Nonewhile status is 'open':await asyncio.sleep(self.check_timeout)order=self.exchange.fetch_order(order_id)status=order['status']logging.info(f'Finished order {order_id} with {status} status')# do not proceed further if we canceled orderifstatus== 'canceled':raise ExchangeError('Trade has been canceled')return order 接下来将创建Telegram机器人,这是最有难度的部分,我们将使其拥有以下指令: 1、列出/取消有效订单 2、显示可用余额 3、建立做多或做空交易 我们还需要对机器人做一些安全限制,使其仅对你的消息做出响应,而其他人则无法使用你的帐户进行交易。 主要是进行做多和做空交易的部分: 1、选择做空或者做多 2、输入数字货币品种 3、输入交易数量 4、所占百分比 5、每个价格 6、显示确认信息 7、显示最终交易信息 我们来创建telegrambot.py并添加以下常量: SELECTION="selection"SHORT_TRADE="short_trade"LONG_TRADE="long_trade"OPEN_ORDERS="open_orders"FREE_BALANCE="free_balance"CANCEL_ORD="cancel_order"PROCESS_ORD_CANCEL="process_ord_cancel"COIN_NAME="coin_select"PERCENT_CHANGE="percent_select"AMOUNT="amount"PRICE="price"PROCESS_TRADE="process_trade"CONFIRM="confirm"CANCEL="cancel"END_CONVERSATION=ConversationHandler.END 我们可以通过扩展BaseFilter来实现对user_id的限制。这样机器人必须接受被允许用户的token、id才能执行操作。 class TelegramBot:class PrivateUserFiler(BaseFilter):def __init__(self, user_id):self.user_id=int(user_id)def filter(self, message):returnmessage.from_user.id== self.user_iddef __init__(self, token: str, allowed_user_id, trade_executor: TradeExecutor):self.updater=Updater(tokentoken=token)selfself.dispatcher= self.updater.dispatcherself.trade_executor= trade_executorselfself.exchange= self.trade_executor.exchangeselfself.private_filter= self.PrivateUserFiler(allowed_user_id)self._prepare() 在_prepare()函数中,我们将创建所有处理函数并将其附加到调度程序。我们开始与机器人聊天时希望显示的基本选项: def _prepare(self):# Create our handlersdef show_help(bot, update):update.effective_message.reply_text('Type /trade to show options ')def show_options(bot, update):button_list= [[InlineKeyboardButton("Short trade",callback_data=SHORT_TRADE),InlineKeyboardButton("Long trade",callback_data=LONG_TRADE), ],[InlineKeyboardButton("Open orders",callback_data=OPEN_ORDERS),InlineKeyboardButton("Available balance",callback_data=FREE_BALANCE)],]update.message.reply_text("Trade options:",reply_markup=InlineKeyboardMarkup(button_list))return TRADE_SELECT InlineKeyboardButton允许我们将文本选项显示为键盘。这比键入所有命令更为直观。callback_data允许在按下按钮时传递其他数据。show_options返回下一个继续进行对话的处理函数的名称。其他处理函数将使用类似的方法。然后我们执行用户选择的处理程序。在这里,我们主要从一个问题转到另一个问题: def process_trade_selection(bot, update, user_data):query=update.callback_queryselection=query.dataifselection== OPEN_ORDERS:orders=self.exchange.fetch_open_orders()if len(orders) == 0:bot.edit_message_text(text="You don't have open orders",chat_id=query.message.chat_id,message_id=query.message.message_id)return END_CONVERSATION# show the option to cancel active orderskeyboard= [[InlineKeyboardButton("Ok",callback_data=CONFIRM),InlineKeyboardButton("Cancel order",callback_data=CANCEL)]]bot.edit_message_text(text=formatter.format_open_orders(orders),chat_id=query.message.chat_id,message_id=query.message.message_id,reply_markup=InlineKeyboardMarkup(keyboard))# attach opened orders, so that we can cancel by indexuser_data[OPEN_ORDERS] = ordersreturn CANCEL_ORDelifselection== FREE_BALANCE:balance=self.exchange.free_balancemsg="You don't have any available balance"if len(balance) == 0 \else f"Your available balance:\n{formatter.format_balance(balance)}"bot.edit_message_text(text=msg,chat_id=query.message.chat_id,message_id=query.message.message_id)return END_CONVERSATIONuser_data[TRADE_SELECT] = selectionbot.edit_message_text(text=f'Enter coin name for {selection}',chat_id=query.message.chat_id,message_id=query.message.message_id)return COIN_NAMEdef cancel_order(bot, update):query=update.callback_queryifquery.data== CANCEL:query.message.reply_text('Enter order index to cancel: ')return PROCESS_ORD_CANCELshow_help(bot, update)return END_CONVERSATIONdef process_order_cancel(bot, update, user_data):idx=int(update.message.text)order=user_data[OPEN_ORDERS][idx]self.exchange.cancel_order(order['id'])update.message.reply_text(f'Canceled order: {formatter.format_order(order)}')return END_CONVERSATIONdef process_coin_name(bot, update, user_data):user_data[COIN_NAME] = update.message.text.upper()update.message.reply_text(f'What amount of {user_data[COIN_NAME]}')return AMOUNTdef process_amount(bot, update, user_data):user_data[AMOUNT] = float(update.message.text)update.message.reply_text(f'What % change for {user_data[AMOUNT]} {user_data[COIN_NAME]}')return PERCENT_CHANGEdef process_percent(bot, update, user_data):user_data[PERCENT_CHANGE] = float(update.message.text)update.message.reply_text(f'What price for 1 unit of {user_data[COIN_NAME]}')return PRICEdef process_price(bot, update, user_data):user_data[PRICE] = float(update.message.text)keyboard= [[InlineKeyboardButton("Confirm",callback_data=CONFIRM),InlineKeyboardButton("Cancel",callback_data=CANCEL)]]update.message.reply_text(f"Confirm the trade: '{TelegramBot.build_trade(user_data)}'",reply_markup=InlineKeyboardMarkup(keyboard))return PROCESS_TRADE 最后,我们构建会话处理程序,设置错误处理程序,并将所有处理程序添加到调度程序中。 def process_trade(bot, update, user_data):query=update.callback_queryifquery.data== CONFIRM:trade=TelegramBot.build_trade(user_data)self._execute_trade(trade)update.callback_query.message.reply_text(f'Scheduled: {trade}')else:show_help(bot, update)return END_CONVERSATIONdef handle_error(bot, update, error):logging.warning('Update "%s" caused error "%s"', update, error)update.message.reply_text(f'Unexpected error:\n{error}')# configure our handlersdef build_conversation_handler():entry_handler=CommandHandler('trade',filters=self.private_filter,callback=show_options)conversation_handler=ConversationHandler(entry_points=[entry_handler],fallbacks=[entry_handler],states={TRADE_SELECT: [CallbackQueryHandler(process_trade_selection,pass_user_data=True)],CANCEL_ORD: [CallbackQueryHandler(cancel_order)],PROCESS_ORD_CANCEL: [MessageHandler(filters=Filters.text,callback=process_order_cancel,pass_user_data=True)],COIN_NAME: [MessageHandler(filters=Filters.text,callback=process_coin_name,pass_user_data=True)],AMOUNT: [MessageHandler(Filters.text,callback=process_amount,pass_user_data=True)],PERCENT_CHANGE: [MessageHandler(Filters.text,callback=process_percent,pass_user_data=True)],PRICE: [MessageHandler(Filters.text,callback=process_price,pass_user_data=True)],PROCESS_TRADE: [CallbackQueryHandler(process_trade,pass_user_data=True)],},)return conversation_handlerself.dispatcher.add_handler(CommandHandler('start',filters=self.private_filter,callback=show_help))self.dispatcher.add_handler(build_conversation_handler())self.dispatcher.add_error_handler(handle_error) 传递用户数据时允许我们向处理程序提供其他user_data参数。这样可以确保机器人从一个处理程序传递到另一个处理程序时,保持所有答复的对话状态。我们需要run_async装饰器在后台执行交易,而又不会阻止机器人对新消息进行响应: def start_bot(self):self.updater.start_polling()@run_asyncdef _execute_trade(self, trade):loop=asyncio.new_event_loop()task=loop.create_task(self.trade_executor.execute_trade(trade))loop.run_until_complete(task)@staticmethoddef build_trade(user_data):current_trade=user_data[TRADE_SELECT]price=user_data[PRICE]coin_name=user_data[COIN_NAME]amount=user_data[AMOUNT]percent_change=user_data[PERCENT_CHANGE]ifcurrent_trade== LONG_TRADE:return LongTrade(price, coin_name, amount, percent_change)elifcurrent_trade== SHORT_TRADE:return ShortTrade(price, coin_name, amount, percent_change)else:raise NotImplementedError 这是用于订单和余额显示的格式化程序: TITLES= ['idx', 'type', 'remaining', 'symbol', 'price']SPACING= [4, 6, 8, 10, 8]def format_open_orders(orders) ->str:def join_line(ln):return ' | '.join(str(item).center(SPACING[i]) for i, item in enumerate(ln))title_line=join_line(TITLES)lines= [title_line]for idx, order in enumerate(orders):line= [idx, order['side'], order['remaining'], order['symbol'], order['price']]lines.append(join_line(line))separator_line='-'* len(title_line)return f"\n{separator_line}\n".join(lines)def format_order(order):return f"{order['amount']} {order['symbol']} priced at {order['price']}"def format_balance(balance) ->str:coin_balance_as_list= list(f"{coin}: {val}" for coin, val in balance.items())return "\n".join(coin_balance_as_list) 最后,我们创建main.py并将所有内容归结在一起: import loggingimport osimport ccxtfrom core.exchange import CryptoExchangefrom core.telegrambot import TelegramBotfrom core.tradeexcutor import TradeExecutorif__name__== '__main__':logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s',level=logging.INFO)c_dir=os.path.dirname(__file__)with open(os.path.join(c_dir, "config/secrets.txt")) as key_file:api_key, secret, telegram_tkn,user_id=key_file.read().splitlines()ccxtccxt_ex= ccxt.bitfinex()ccxt_ex.apiKey=api_keyccxt_ex.secret= secretexchange=CryptoExchange(ccxt_ex)trade_executor=TradeExecutor(exchange)telegram_bot=TelegramBot(telegram_tkn, user_id, trade_executor)telegram_bot.start_bot() 我们从secrets.txt文件中获取交易所密钥,telegram的token和用户ID,构造核心类并启动机器人。使用以下内容在config文件夹中创建secrets.txt: # YOUR_API_KEY# YOUR_SECRET# YOUR_TELEGRAM_TOKEN# YOUR_TELEGRAM_USER_ID 总结 对于想要简化交易并拥有更好使用体验的人来说,该机器人更像是一个辅助工具。它不是最先进的算法交易机器人。后面可以进行以下改进: 当进行做空交易时获取可用余额并显示用户可以根据余额做空的最大值 要求在交易所执行之前验证创建的订单 添加TA指标、信号以通知最佳交易时间 止盈/止损操作和其他统计数据 有策略地根据超时等原因取消订单