用Python推送Telegram通知
今天,我将指导你如何使用Python向telegram推送通知消息,使你的工作流程更加高效和无烦恼。

一键发币: SUI | SOL | BNB | ETH | BASE | ARB | OP | POLYGON | AVAX | FTM | OK
作为一名程序员、数据工程师和BI分析师,我经常需要向Telegram发送消息以确保一切运行正常。这些通知用于各种事件,例如当某些东西出现故障或停止工作时的异常警报。我的客户会收到每日报告、价格变动通知或当有人开始在亚马逊上销售他们的产品时的警报(我最近写了一篇关于创建此类机器人的文章)。
在Python脚本中通过Telegram机器人设置通知不仅简单而且非常有效。Telegram是一个多功能平台,允许你向不同的频道发送消息,并且可以轻松集成到公共和私人群组中,无论参与者有多少。该机器人只能发送消息,确保所有其他通信的隐私和安全。
今天,我将指导你如何使用Python自动化这些通知,使你的工作流程更加高效和无烦恼。
1、创建Telegram机器人
创建Telegram机器人非常简单。要创建你的新机器人,请在Telegram中找到一个特殊的机器人叫做BotFather。BotFather是所有其他机器人的“父亲”,用于管理你的机器人。搜索它(在搜索栏中输入“BotFather”),然后键入/newbot
。
接下来,你会被要求为你的机器人命名,该名称必须以“bot”结尾。我将我的机器人命名为andrew_medium_test_bot
。如果名称可用,你将收到一个API密钥。

使用这个API密钥,我们可以与机器人交互并发送消息。
2、创建新群组并查找其ID
你新创建的机器人不能随便给任何人或任何群组发送消息。为了让它能够发送消息,你需要将其添加到Telegram群组中。你可以将机器人添加到任何现有的群组(如果你有权限),或者创建一个新的群组并将你的Telegram机器人添加到其中。
点击“创建新群组”。

设置群组名称(我写了“Andrew Medium Test Group”)

点击“添加成员”,搜索你的新机器人,并将其添加到成员列表中。

最后,点击“创建”以创建新群组。

完美!现在你有了一个只有两个成员的空群组,其中一个成员是机器人。

如果你点击三个点并选择“信息”,可以查看群组成员。

请注意,默认情况下,机器人无法访问消息(这可以在机器人的设置中更改)。
下一步是找到群组的Chat ID,这是必要的,以便知道在哪里发送消息。不幸的是,Chat ID在Telegram界面中不可见。唯一找到它的方法是向机器人发送一条消息,然后使用一个小的Python脚本来检索最新消息。
让我们直接向机器人(在私聊模式下,默认情况下机器人只能读取这样的消息)写一条消息。键入以斜杠开头的消息,后面跟着机器人的名称。

现在,让我们编写一个脚本来查找Chat ID。安装用于与Telegram机器人交互的Python包:
pip install python-telegram-bot
然后编写代码来迭代机器人可用的所有更新,并输出聊天ID、聊天标题和消息:
import asyncio
from telegram import Bot
TELEGRAM_BOT_TOKEN = '[插入您的机器人令牌]'
async def main():
# 创建机器人对象
bot = Bot(token=TELEGRAM_BOT_TOKEN)
# 获取更新
updates = await bot.get_updates()
if not updates:
print("未找到更新")
else:
for update in updates:
if update.message:
# 如果需要显示所有消息数据
# print(update.message)
# 只显示聊天ID、标题和消息
chat_id = update.message.chat.id
chat_title = update.message.chat.title
message_text = update.message.text
print(f"聊天ID: {chat_id} | 聊天标题: {chat_title} | 消息: {message_text}")
asyncio.run(main())
聊天ID: -4206954951 | 聊天标题: Andrew Medium Test Group | 消息: /hello @andrew_medium_test_bot
当我们运行脚本时,我们看到群组的Chat ID,例如:聊天ID: -4206954951
。Telegram中的负数群组标识符是正常的,不要担心 😆
3、使用Python发送消息
现在我们有了Chat ID,我们可以使用bot.send_message
向群组发送消息。Telegram机器人在Python中通过asyncio
异步代码工作,所以我通过包装函数来运行它。
from telegram import Bot
import asyncio
TELEGRAM_BOT_TOKEN = '[插入您的机器人令牌]'
CHAT_ID = '[插入您的聊天ID]'
# 定义机器人
bot = Bot(token=TELEGRAM_BOT_TOKEN)
async def send_message(text, chat_id):
async with bot:
await bot.send_message(text=text, chat_id=chat_id)
async def run_bot(messages, chat_id):
text = '\n'.join(messages)
await send_message(text, chat_id)
# 测试消息
messages = [
'产品 https://www.amazon.com/dp/B08C1W5N87,价格从$24.99变更为$26.99',
'产品 https://www.amazon.com/dp/B0CL61F39H 添加了新的负面评价(评分2)',
'注意!过去3小时平均销量比平时低50%'
]
if messages:
asyncio.run(run_bot(messages, CHAT_ID))
我设置了三个示例消息作为数组,这里是在运行脚本后消息在群组中显示的样子。

很简单,对吧?
4、使用Python发送长消息
如果消息长度超过4096个字符,将会发生错误,因此最好将消息分成多个部分:
from telegram import Bot
from telegram.error import BadRequest
import asyncio
TELEGRAM_BOT_TOKEN = '[插入您的机器人令牌]'
CHAT_ID = '[插入您的聊天ID]'
# 定义机器人
bot = Bot(token=TELEGRAM_BOT_TOKEN)
async def send_long_message(text, chat_id):
MAX_MESSAGE_LENGTH = 4096
parts = [text[i:i + MAX_MESSAGE_LENGTH] for i in range(0, len(text), MAX_MESSAGE_LENGTH)]
async with bot:
for part in parts:
try:
await bot.send_message(chat_id=chat_id, text=part)
except BadRequest as e:
print(f"发送消息部分失败: {str(e)}")
async def run_bot(messages, chat_id):
text = '\n'.join(messages)
await send_long_message(text=text, chat_id=chat_id)
# 测试消息
messages = [
'产品 https://www.amazon.com/dp/B08C1W5N87,价格从$24.99变更为$26.99',
'产品 https://www.amazon.com/dp/B0CL61F39H 添加了新的负面评价(评分2)',
'注意!过去3小时平均销量比平时低50%'
]*20
if messages:
asyncio.run(run_bot(messages, CHAT_ID))
结果……

现在,非常长的消息可以正确发送。
5、使用Python发送图片
接下来,让我们尝试向Telegram发送一张图片。我将发送一张基于pandas数据动态生成的图表。这是一个很好的用例,因为我们的机器人可以根据分析数据每天发送图形统计信息。
from telegram import Bot, InputFile
import asyncio
import pandas as pd
import matplotlib.pyplot as plt
import io
TELEGRAM_BOT_TOKEN = '[插入您的机器人令牌]'
CHAT_ID = '[插入您的聊天ID]'
# 定义机器人
bot = Bot(token=TELEGRAM_BOT_TOKEN)
async def send_image(image, chat_id):
async with bot:
await bot.send_photo(chat_id=chat_id, photo=InputFile(image))
async def run_bot_image(image, chat_id):
await send_image(image, chat_id)
def create_price_chart():
# 生成数据框的虚拟数据
data = {
'日期': pd.date_range(start='2024-08-01', periods=10, freq='D'),
'价格': [24.99, 25.49, 26.00, 25.75, 26.99, 26.50, 27.00, 26.80, 27.20, 27.50]
}
df = pd.DataFrame(data)
# 构建简单图表
fig, ax = plt.subplots()
ax.plot(df['日期'], df['价格'], marker='o')
ax.set_title('价格趋势')
ax.set_xlabel('日期')
ax.set_ylabel('价格')
plt.xticks(rotation=45)
# 将图表保存到缓冲区
buf = io.BytesIO()
plt.savefig(buf, format='png')
buf.seek(0)
plt.close()
return buf
# 生成包含图表的图像
image = create_price_chart()
# 向Telegram发送图像
asyncio.run(run_bot_image(image, CHAT_ID))
运行脚本后的结果。

太棒了!现在我们知道如何快速轻松地将分析数据发送到Telegram机器人!
原文链接:How to send notifications to Telegram with Python
DefiPlot翻译整理,转载请标明出处
免责声明:本站资源仅用于学习目的,也不应被视为投资建议,读者在采取任何行动之前应自行研究并对自己的决定承担全部责任。