Pular para o conteúdo principal
Use client.events com AsyncVZapsClient para receber eventos da instância em tempo real sem expor uma URL pública. Para callbacks HTTP, veja Webhooks.

Eventos comuns

EventoDescrição
MessageNova mensagem recebida ou evento de mensagem.
ReadReceiptAtualização de leitura/entrega.
PresencePresença do usuário.
ChatPresencePresença no chat.
HistorySyncSincronização de histórico.
ConnectedInstância conectada ao WhatsApp.
DisconnectedInstância desconectada.
GroupParticipantsAddParticipantes adicionados ao grupo.
GroupParticipantsRemoveParticipantes removidos do grupo.
AllTodos os eventos inscritos.

Assinar o realtime

import asyncio

from vzaps import AsyncVZapsClient

async def main():
    async with AsyncVZapsClient(
        client_token="seu-client-token",
        client_secret="seu-client-secret",
    ) as client:
        async with client.events.subscribe(
            instance_id="VZ...",
            instance_token="instance-token",
            events=["Message", "ReadReceipt", "Connected", "Disconnected"],
            reconnect=True,
            max_retries=10,
            retry_delay_seconds=1.0,
        ) as subscription:
            await subscription.wait_closed()

asyncio.run(main())
Retorno: EventSubscription — objeto com on(), close() e reconexao automatica quando configurada. Opções:
CampoTipoObrigatórioDescrição
instance_idstringSimInstância a observar.
instance_tokenstringSimToken da instância.
eventslist[str]NãoLista de eventos. Se omitido, usa os eventos inscritos na instância.
reconnectbooleanNãoReconecta automaticamente. Padrão: True.
max_retriesnumberNãoMáximo de tentativas de reconexão.
retry_delay_secondsnumberNãoAtraso base entre tentativas.
last_event_idstringNãoCursor para retomar consumo.

Registrar handlers

@subscription.on_open
async def on_open():
    print("Realtime conectado")

@subscription.on("Message")
async def on_message(event):
    print(event.id)
    print(event.instance_id)
    print(event.data)

@subscription.on("All")
async def on_all(event):
    print("Evento recebido:", event.type)

@subscription.on_error
async def on_error(error):
    print("Erro no realtime:", error)

Fechar assinatura

await subscription.close()
Retorno: Promise<void> apos fechar o WebSocket. Em processos de longa duração, feche no shutdown:
import signal

def shutdown():
    asyncio.create_task(subscription.close())

signal.signal(signal.SIGINT, lambda *_: shutdown())

Envelope do evento

Cada evento recebido pelo SDK tem esta forma:
{
  "id": "evt_01J...",
  "type": "Message",
  "instance_id": "VZ...",
  "created_at": "2026-06-23T22:57:17.000Z",
  "data": {
    "type": "Message",
    "media_url": "https://cdn.example.com/media/image.jpg"
  }
}
Campos:
CampoDescrição
idIdentificador do evento. Use para deduplicação.
typeTipo do evento.
instance_idInstância de origem.
created_atData de criação do evento.
dataPayload do evento.
data.media_urlURL da mídia quando o evento recebido contém mídia e a plataforma fornece o arquivo.

Entrega e ack

A entrega é at-least-once. Sua app deve processar eventos de forma idempotente. Após o handler terminar, o SDK envia o ack automaticamente. Recomendações:
  • guarde event.id se sua automação executa efeitos externos;
  • ignore eventos já processados;
  • use last_event_id ao reconectar para reduzir lacunas;
  • mantenha handlers rápidos e mova trabalho longo para sua própria fila.

Realtime ou webhook?

CenárioRecomendação
Bot, dashboard ou app com consumo imediatoRealtime
Backend com URL pública e pipeline HTTPWebhook
Não quer expor URL públicaRealtime
Precisa reprocessar entregas via logsWebhook