Polling updates¶
Tip
👩🏻🎨 start_polling
has the same signature as start_webhook
“under the hood”
👩🏻🔬This API method gives a chance to hook updates without webhooks on your machine, but it’s possible to use both approaches anyway.
🧑🎓 First of all, before starting polling, you need to register handlers, just like when installing webhooks.
Lets do it with decorators, but we also can do it another way, using wallet.dispatcher.register_transaction_handler
import datetime
from glQiwiApi import QiwiWrapper, types
from glQiwiApi.utils import executor
api_access_token = "your token"
phone_number = "your number"
wallet = QiwiWrapper(
api_access_token=api_access_token,
phone_number=phone_number
)
@wallet.transaction_handler()
async def my_first_handler(update: types.Transaction):
assert isinstance(update, types.Transaction)
def on_startup(wrapper: QiwiWrapper):
wrapper.dispatcher.logger.info("This message logged on startup")
executor.start_polling(
wallet,
get_updates_from=datetime.datetime.now() - datetime.timedelta(hours=1),
on_startup=on_startup
)
🧙♀️So, we also have to import executor
Polling and pass on our client,
that contains user-friendly functions start_polling
and start_webhook
.
import datetime
from glQiwiApi import QiwiWrapper, types
from glQiwiApi.utils import executor
api_access_token = "your token"
phone_number = "your number"
wallet = QiwiWrapper(
api_access_token=api_access_token,
phone_number=phone_number
)
@wallet.transaction_handler()
async def my_first_handler(update: types.Transaction):
assert isinstance(update, types.Transaction)
def on_startup(wrapper: QiwiWrapper):
wrapper.dispatcher.logger.info("This message logged on startup")
executor.start_polling(
wallet,
get_updates_from=datetime.datetime.now() - datetime.timedelta(hours=1),
on_startup=on_startup
)
👨🔬 Then, you can start polling, but, let’s make it clear which arguments you should pass on to start_polling
function.
So, in this example we see get_updates_from
and on_startup
, it means, that in example we want to receive notifications that came an hour
ago and execute some function on startup of polling updates
import datetime
from glQiwiApi import QiwiWrapper, types
from glQiwiApi.utils import executor
api_access_token = "your token"
phone_number = "your number"
wallet = QiwiWrapper(
api_access_token=api_access_token,
phone_number=phone_number
)
@wallet.transaction_handler()
async def my_first_handler(update: types.Transaction):
assert isinstance(update, types.Transaction)
def on_startup(wrapper: QiwiWrapper):
wrapper.dispatcher.logger.info("This message logged on startup")
executor.start_polling(
wallet,
get_updates_from=datetime.datetime.now() - datetime.timedelta(hours=1),
on_startup=on_startup
)
😼 As you can see, in the example we have a function that we pass as an argument to on_startup
.
As you may have guessed, this function will be executed at the beginning of the polling.
import datetime
from glQiwiApi import QiwiWrapper, types
from glQiwiApi.utils import executor
api_access_token = "your token"
phone_number = "your number"
wallet = QiwiWrapper(
api_access_token=api_access_token,
phone_number=phone_number
)
@wallet.transaction_handler()
async def my_first_handler(update: types.Transaction):
assert isinstance(update, types.Transaction)
def on_startup(wrapper: QiwiWrapper):
wrapper.dispatcher.logger.info("This message logged on startup")
executor.start_polling(
wallet,
get_updates_from=datetime.datetime.now() - datetime.timedelta(hours=1),
on_startup=on_startup
)
😻 If you did everything correctly, you will get something like this
2021-06-05 17:21:07.423 | DEBUG | glQiwiApi.utils.executor:welcome:373 - Start polling!
🧚♀️ Also, you can very easily implement simultaneous polling of updates from both aiogram and QIWI API.
In the example below, we catch all text messages and return the same “Hello” response.
from aiogram import Bot, Dispatcher
from aiogram import types
from glQiwiApi import QiwiWrapper
from glQiwiApi.core.builtin import TelegramPollingProxy
from glQiwiApi.types import Transaction
from glQiwiApi.utils import executor
api_access_token = "your token"
phone_number = "your number"
bot = Bot("token from BotFather")
dp = Dispatcher(bot)
wallet = QiwiWrapper(
api_access_token=api_access_token,
phone_number=phone_number
)
@dp.message_handler()
async def message_handler(msg: types.Message):
await msg.answer(text="Привет😇")
@wallet.transaction_handler()
async def my_first_handler(update: Transaction):
assert isinstance(update, Transaction)
def on_startup(wrapper: QiwiWrapper):
wrapper.dispatcher.logger.info("This message logged on startup")
executor.start_polling(
wallet,
on_startup=on_startup,
tg_app=TelegramPollingProxy(dp)
)