Skip to main content
Use client.events with AsyncVZapsClient to receive instance events in real time without exposing a public URL. For HTTP callbacks, see Webhooks.

Common events

EventDescription
MessageNew incoming message or message event.
ReadReceiptRead/delivery update.
PresenceUser presence.
ChatPresenceChat presence.
HistorySyncHistory synchronization.
ConnectedInstance connected to WhatsApp.
DisconnectedInstance disconnected.
GroupParticipantsAddParticipants added to group.
GroupParticipantsRemoveParticipants removed from group.
AllAll subscribed events.

Subscribe to realtime

import asyncio

from vzaps import AsyncVZapsClient

async def main():
    async with AsyncVZapsClient(
        client_token="your-client-token",
        client_secret="your-client-secret",
    ) as client:
        async with client.events.subscribe(
            instance_id="VZ...",
            instance_token="instance-token",
            events=["Message", "ReadReceipt", "Connected", "Disconnected"],
            reconnect=True,
            max_retries=10,
            retry_delay_seconds=1.0,
        ) as subscription:
            await subscription.wait_closed()

asyncio.run(main())
Return: EventSubscription — object with on(), close(), and automatic reconnect when configured. Options:
FieldTypeRequiredDescription
instance_idstringYesInstance to watch.
instance_tokenstringYesInstance token.
eventslist[str]NoEvent list. If omitted, uses events subscribed on the instance.
reconnectbooleanNoReconnect automatically. Default: True.
max_retriesnumberNoMaximum retry attempts.
retry_delay_secondsnumberNoBase delay between retries.
last_event_idstringNoResume cursor.

Register handlers

@subscription.on_open
async def on_open():
    print("Realtime connected")

@subscription.on("Message")
async def on_message(event):
    print(event.id)
    print(event.instance_id)
    print(event.data)

@subscription.on("All")
async def on_all(event):
    print("Received event:", event.type)

@subscription.on_error
async def on_error(error):
    print("Realtime error:", error)

Close subscription

await subscription.close()
Return: Promise<void> after the WebSocket closes. In long-running processes, close on shutdown:
import signal

def shutdown():
  asyncio.create_task(subscription.close())

signal.signal(signal.SIGINT, lambda *_: shutdown())

Event envelope

Every event received by the SDK has this shape:
{
  "id": "evt_01J...",
  "type": "Message",
  "instance_id": "VZ...",
  "created_at": "2026-06-23T22:57:17.000Z",
  "data": {
    "type": "Message",
    "media_url": "https://cdn.example.com/media/image.jpg"
  }
}
Fields:
FieldDescription
idEvent identifier. Use for deduplication.
typeEvent type.
instance_idSource instance.
created_atEvent creation date.
dataEvent payload.
data.media_urlMedia URL when the incoming event contains media and the platform provides the file.

Delivery and ack

Delivery is at-least-once. Your app should process events idempotently. After the handler finishes, the SDK automatically sends an ack. Recommendations:
  • store event.id if your automation performs external effects;
  • ignore events that were already processed;
  • use last_event_id when reconnecting if you want to reduce gaps;
  • keep handlers fast and move long-running work to your own queue.

Realtime or webhook?

ScenarioRecommendation
Bot, dashboard, or app with immediate consumptionRealtime
Backend with public URL and HTTP pipelineWebhook
Do not want to expose a public URLRealtime
Need to reprocess deliveries via logsWebhook