用Python推送Telegram通知

今天,我将指导你如何使用Python向telegram推送通知消息,使你的工作流程更加高效和无烦恼。

用Python推送Telegram通知
一键发币: SUI | SOL | BNB | ETH | BASE | ARB | OP | POLYGON | AVAX | FTM | OK

作为一名程序员、数据工程师和BI分析师,我经常需要向Telegram发送消息以确保一切运行正常。这些通知用于各种事件,例如当某些东西出现故障或停止工作时的异常警报。我的客户会收到每日报告、价格变动通知或当有人开始在亚马逊上销售他们的产品时的警报(我最近写了一篇关于创建此类机器人的文章)。

在Python脚本中通过Telegram机器人设置通知不仅简单而且非常有效。Telegram是一个多功能平台,允许你向不同的频道发送消息,并且可以轻松集成到公共和私人群组中,无论参与者有多少。该机器人只能发送消息,确保所有其他通信的隐私和安全。

今天,我将指导你如何使用Python自动化这些通知,使你的工作流程更加高效和无烦恼。

1、创建Telegram机器人

创建Telegram机器人非常简单。要创建你的新机器人,请在Telegram中找到一个特殊的机器人叫做BotFather。BotFather是所有其他机器人的“父亲”,用于管理你的机器人。搜索它(在搜索栏中输入“BotFather”),然后键入/newbot

接下来,你会被要求为你的机器人命名,该名称必须以“bot”结尾。我将我的机器人命名为andrew_medium_test_bot。如果名称可用,你将收到一个API密钥。

使用这个API密钥,我们可以与机器人交互并发送消息。

2、创建新群组并查找其ID

你新创建的机器人不能随便给任何人或任何群组发送消息。为了让它能够发送消息,你需要将其添加到Telegram群组中。你可以将机器人添加到任何现有的群组(如果你有权限),或者创建一个新的群组并将你的Telegram机器人添加到其中。

点击“创建新群组”。

设置群组名称(我写了“Andrew Medium Test Group”)

点击“添加成员”,搜索你的新机器人,并将其添加到成员列表中。

最后,点击“创建”以创建新群组。

完美!现在你有了一个只有两个成员的空群组,其中一个成员是机器人。

如果你点击三个点并选择“信息”,可以查看群组成员。

请注意,默认情况下,机器人无法访问消息(这可以在机器人的设置中更改)。

下一步是找到群组的Chat ID,这是必要的,以便知道在哪里发送消息。不幸的是,Chat ID在Telegram界面中不可见。唯一找到它的方法是向机器人发送一条消息,然后使用一个小的Python脚本来检索最新消息。

让我们直接向机器人(在私聊模式下,默认情况下机器人只能读取这样的消息)写一条消息。键入以斜杠开头的消息,后面跟着机器人的名称。

现在,让我们编写一个脚本来查找Chat ID。安装用于与Telegram机器人交互的Python包:

pip install python-telegram-bot

然后编写代码来迭代机器人可用的所有更新,并输出聊天ID、聊天标题和消息:

import asyncio  
from telegram import Bot  
  
TELEGRAM_BOT_TOKEN = '[插入您的机器人令牌]'  
  
async def main():  
  
    # 创建机器人对象  
    bot = Bot(token=TELEGRAM_BOT_TOKEN)  
  
    # 获取更新  
    updates = await bot.get_updates()  
  
    if not updates:  
        print("未找到更新")  
    else:  
        for update in updates:  
            if update.message:  
  
                # 如果需要显示所有消息数据  
                # print(update.message)  
  
                # 只显示聊天ID、标题和消息  
                chat_id = update.message.chat.id  
                chat_title = update.message.chat.title  
                message_text = update.message.text  
                print(f"聊天ID: {chat_id}  | 聊天标题: {chat_title} | 消息: {message_text}")  
  
asyncio.run(main())
聊天ID: -4206954951  | 聊天标题: Andrew Medium Test Group | 消息: /hello @andrew_medium_test_bot

当我们运行脚本时,我们看到群组的Chat ID,例如:聊天ID: -4206954951。Telegram中的负数群组标识符是正常的,不要担心 😆

3、使用Python发送消息

现在我们有了Chat ID,我们可以使用bot.send_message向群组发送消息。Telegram机器人在Python中通过asyncio异步代码工作,所以我通过包装函数来运行它。

from telegram import Bot  
import asyncio  
  
TELEGRAM_BOT_TOKEN = '[插入您的机器人令牌]'  
CHAT_ID = '[插入您的聊天ID]'  
  
# 定义机器人  
bot = Bot(token=TELEGRAM_BOT_TOKEN)  
  
async def send_message(text, chat_id):  
    async with bot:  
        await bot.send_message(text=text, chat_id=chat_id)  
  
async def run_bot(messages, chat_id):  
    text = '\n'.join(messages)  
    await send_message(text, chat_id)  
  
# 测试消息  
messages = [  
    '产品 https://www.amazon.com/dp/B08C1W5N87,价格从$24.99变更为$26.99',  
    '产品 https://www.amazon.com/dp/B0CL61F39H 添加了新的负面评价(评分2)',  
    '注意!过去3小时平均销量比平时低50%'  
]  
  
if messages:  
    asyncio.run(run_bot(messages, CHAT_ID))

我设置了三个示例消息作为数组,这里是在运行脚本后消息在群组中显示的样子。

很简单,对吧?

4、使用Python发送长消息

如果消息长度超过4096个字符,将会发生错误,因此最好将消息分成多个部分:

from telegram import Bot  
from telegram.error import BadRequest  
import asyncio  
  
TELEGRAM_BOT_TOKEN = '[插入您的机器人令牌]'  
CHAT_ID = '[插入您的聊天ID]'  
  
# 定义机器人  
bot = Bot(token=TELEGRAM_BOT_TOKEN)  
  
async def send_long_message(text, chat_id):  
  
    MAX_MESSAGE_LENGTH = 4096  
    parts = [text[i:i + MAX_MESSAGE_LENGTH] for i in range(0, len(text), MAX_MESSAGE_LENGTH)]  
  
    async with bot:  
        for part in parts:  
            try:  
                await bot.send_message(chat_id=chat_id, text=part)  
            except BadRequest as e:  
                print(f"发送消息部分失败: {str(e)}")  
  
async def run_bot(messages, chat_id):  
  
    text = '\n'.join(messages)  
    await send_long_message(text=text, chat_id=chat_id)  
  
# 测试消息  
messages = [  
    '产品 https://www.amazon.com/dp/B08C1W5N87,价格从$24.99变更为$26.99',  
    '产品 https://www.amazon.com/dp/B0CL61F39H 添加了新的负面评价(评分2)',  
    '注意!过去3小时平均销量比平时低50%'  
]*20  
  
if messages:  
    asyncio.run(run_bot(messages, CHAT_ID))

结果……

现在,非常长的消息可以正确发送。

5、使用Python发送图片

接下来,让我们尝试向Telegram发送一张图片。我将发送一张基于pandas数据动态生成的图表。这是一个很好的用例,因为我们的机器人可以根据分析数据每天发送图形统计信息。

from telegram import Bot, InputFile  
import asyncio  
import pandas as pd  
import matplotlib.pyplot as plt  
import io  
  
TELEGRAM_BOT_TOKEN = '[插入您的机器人令牌]'  
CHAT_ID = '[插入您的聊天ID]'  
  
# 定义机器人  
bot = Bot(token=TELEGRAM_BOT_TOKEN)  
  
async def send_image(image, chat_id):  
    async with bot:  
        await bot.send_photo(chat_id=chat_id, photo=InputFile(image))  
  
async def run_bot_image(image, chat_id):  
    await send_image(image, chat_id)  
  
def create_price_chart():  
    # 生成数据框的虚拟数据  
    data = {  
        '日期': pd.date_range(start='2024-08-01', periods=10, freq='D'),  
        '价格': [24.99, 25.49, 26.00, 25.75, 26.99, 26.50, 27.00, 26.80, 27.20, 27.50]  
    }  
    df = pd.DataFrame(data)  
  
    # 构建简单图表  
    fig, ax = plt.subplots()  
    ax.plot(df['日期'], df['价格'], marker='o')  
    ax.set_title('价格趋势')  
    ax.set_xlabel('日期')  
    ax.set_ylabel('价格')  
    plt.xticks(rotation=45)  
  
    # 将图表保存到缓冲区  
    buf = io.BytesIO()  
    plt.savefig(buf, format='png')  
    buf.seek(0)  
    plt.close()  
  
    return buf  
  
# 生成包含图表的图像  
image = create_price_chart()  
  
# 向Telegram发送图像  
asyncio.run(run_bot_image(image, CHAT_ID))

运行脚本后的结果。

太棒了!现在我们知道如何快速轻松地将分析数据发送到Telegram机器人!


原文链接:How to send notifications to Telegram with Python

DefiPlot翻译整理,转载请标明出处

免责声明:本站资源仅用于学习目的,也不应被视为投资建议,读者在采取任何行动之前应自行研究并对自己的决定承担全部责任。
通过 NowPayments 打赏