Skip to content

Inbox API

The Inbox class represents a single email inbox in VaultSandbox. It provides methods for managing emails, waiting for new messages, and monitoring in real-time.

email_address: str

The email address for this inbox. Use this address to send test emails.

inbox = await client.create_inbox()
print(f"Send email to: {inbox.email_address}")
# Use in your application
await send_welcome_email(inbox.email_address)

inbox_hash: str

Unique identifier for this inbox (SHA-256 hash of the client KEM public key). Used internally for API operations.

print(f"Inbox ID: {inbox.inbox_hash}")

expires_at: datetime

The date and time when this inbox will expire and be automatically deleted.

from datetime import datetime, timezone
inbox = await client.create_inbox()
print(f"Inbox expires at: {inbox.expires_at.isoformat()}")
time_until_expiry = inbox.expires_at - datetime.now(timezone.utc)
print(f"Time remaining: {int(time_until_expiry.total_seconds())}s")

server_sig_pk: str | None

Base64-encoded server signing public key for ML-DSA-65 signature verification.

Note: This property is only present when the inbox is encrypted (encrypted=True). For plain inboxes, this will be None.


encrypted: bool

Indicates whether the inbox uses encryption.

  • True - Emails are encrypted using ML-KEM-768 (end-to-end encrypted)
  • False - Emails are stored and transmitted in plain text (Base64-encoded)

The encryption state is determined by the server’s encryption_policy and the encryption option passed during inbox creation.

inbox = await client.create_inbox()
if inbox.encrypted:
print("Inbox is encrypted")
print(f"Server signing key: {inbox.server_sig_pk}")
else:
print("Inbox is plain (not encrypted)")

email_auth: bool

Indicates whether email authentication checks (SPF, DKIM, DMARC, Reverse DNS) are enabled for this inbox.

  • True - Authentication checks are performed on incoming emails
  • False - Authentication checks are skipped (all auth results will show skipped status)
inbox = await client.create_inbox()
if inbox.email_auth:
print("Email authentication is enabled")
else:
print("Email authentication is disabled")

Lists all emails in the inbox with full content. Emails are automatically decrypted.

async def list_emails(self) -> list[Email]

list[Email] - List of decrypted email objects with full content

emails = await inbox.list_emails()
print(f"Inbox has {len(emails)} emails")
for email in emails:
print(f"- {email.subject} from {email.from_address}")
print(f" Body: {email.text[:100]}...")

Lists all emails in the inbox with metadata only (no full content). This is more efficient than list_emails() when you only need basic information like subject and sender.

async def list_emails_metadata_only(self) -> list[EmailMetadata]

list[EmailMetadata] - List of email metadata objects

@dataclass
class EmailMetadata:
id: str # Unique email identifier
from_address: str # Sender email address
subject: str # Email subject
received_at: datetime # When the email was received
is_read: bool # Whether the email has been read
from vaultsandbox import EmailMetadata
# Get just metadata - faster than fetching full content
metadata_list = await inbox.list_emails_metadata_only()
print(f"Inbox has {len(metadata_list)} emails")
for meta in metadata_list:
status = "read" if meta.is_read else "unread"
print(f"- [{status}] {meta.subject} from {meta.from_address}")
# Fetch full content only for emails you need
for meta in metadata_list:
if "important" in meta.subject.lower():
full_email = await inbox.get_email(meta.id)
print(f"Important email body: {full_email.text}")

Use list_emails_metadata_only() when:

  • You need to display a list of emails without their content
  • You want to filter emails before fetching full content
  • Performance is critical and you’re dealing with many emails

Use list_emails() when:

  • You need access to email body, attachments, or links
  • You’re processing all emails and need their full content

Retrieves a specific email by ID.

async def get_email(self, email_id: str) -> Email
  • email_id: The unique identifier for the email

Email - The decrypted email object

emails = await inbox.list_emails()
first_email = await inbox.get_email(emails[0].id)
print(f"Subject: {first_email.subject}")
print(f"Body: {first_email.text}")
  • EmailNotFoundError - Email does not exist

Waits for an email matching specified criteria. This is the recommended way to handle email arrival in tests.

async def wait_for_email(
self,
options: WaitForEmailOptions | None = None,
) -> Email
@dataclass
class WaitForEmailOptions:
subject: str | Pattern[str] | None = None
from_address: str | Pattern[str] | None = None
predicate: Callable[..., bool] | None = None
timeout: int = 30000
poll_interval: int = 2000
PropertyTypeDefaultDescription
timeoutint30000Maximum time to wait in milliseconds
poll_intervalint2000Polling interval in milliseconds
subjectstr | Pattern[str] | NoneNoneFilter by email subject
from_addressstr | Pattern[str] | NoneNoneFilter by sender address
predicateCallable[[Email], bool] | NoneNoneCustom filter function

Email - The first email matching the criteria

import re
from vaultsandbox import WaitForEmailOptions
# Wait for any email
email = await inbox.wait_for_email(
WaitForEmailOptions(timeout=10000)
)
# Wait for email with specific subject
email = await inbox.wait_for_email(
WaitForEmailOptions(
timeout=10000,
subject=re.compile(r"Password Reset"),
)
)
# Wait for email from specific sender
email = await inbox.wait_for_email(
WaitForEmailOptions(
timeout=10000,
from_address="noreply@example.com",
)
)
# Wait with custom predicate
email = await inbox.wait_for_email(
WaitForEmailOptions(
timeout=15000,
predicate=lambda email: "user@example.com" in email.to,
)
)
# Combine multiple filters
email = await inbox.wait_for_email(
WaitForEmailOptions(
timeout=10000,
subject=re.compile(r"Welcome"),
from_address=re.compile(r"noreply@"),
predicate=lambda email: len(email.links) > 0,
)
)
  • TimeoutError - No matching email received within timeout period

Waits until the inbox has at least the specified number of emails. More efficient than using arbitrary timeouts when testing multiple emails.

async def wait_for_email_count(
self,
count: int,
options: WaitForCountOptions | None = None,
) -> list[Email]
  • count: Minimum number of emails to wait for
  • options: Optional configuration for waiting
@dataclass
class WaitForCountOptions:
timeout: int = 30000 # Maximum wait time in milliseconds

list[Email] - List of all emails in the inbox once the count is reached

from vaultsandbox import WaitForCountOptions
# Trigger multiple emails
await send_multiple_notifications(inbox.email_address, 3)
# Wait for all 3 to arrive (with default timeout)
emails = await inbox.wait_for_email_count(3)
assert len(emails) >= 3
# Wait with custom timeout
emails = await inbox.wait_for_email_count(3, WaitForCountOptions(timeout=60000))
# Process the returned emails directly
for email in emails:
print(f"Subject: {email.subject}")
  • TimeoutError - Required count not reached within timeout

Subscribes to new emails in real-time. Receives a callback for each new email that arrives.

async def on_new_email(
self,
callback: Callable[[Email], Any],
*,
mark_existing_seen: bool = True,
) -> Subscription
  • callback: Function called when a new email arrives. Can be sync or async.
  • mark_existing_seen: If True (default), existing emails won’t trigger the callback. Set to False to receive callbacks for existing emails too.

Subscription - Subscription object for managing the subscription

class Subscription:
async def unsubscribe(self) -> None: ...
def mark_seen(self, email_id: str) -> None: ...
inbox = await client.create_inbox()
print(f"Monitoring: {inbox.email_address}")
# Subscribe to new emails
async def handle_email(email: Email):
print(f'New email: "{email.subject}"')
print(f"From: {email.from_address}")
# Process email...
subscription = await inbox.on_new_email(handle_email)
# Later, stop monitoring
await subscription.unsubscribe()

Always unsubscribe when done to avoid resource leaks:

import pytest
from vaultsandbox import VaultSandboxClient
@pytest.fixture
async def inbox(client):
inbox = await client.create_inbox()
yield inbox
await inbox.delete()
@pytest.mark.asyncio
async def test_email_notification(inbox):
received_emails = []
async def handle_email(email):
received_emails.append(email)
subscription = await inbox.on_new_email(handle_email)
try:
# Send email to inbox.email_address...
await asyncio.sleep(5) # Wait for email
finally:
await subscription.unsubscribe()

Gets the current synchronization status of the inbox with the server.

async def get_sync_status(self) -> SyncStatus

SyncStatus - Sync status information

@dataclass
class SyncStatus:
email_count: int
emails_hash: str
status = await inbox.get_sync_status()
print(f"Email count: {status.email_count}")
print(f"Emails hash: {status.emails_hash}")

Gets the raw, decrypted source of a specific email (original MIME format).

async def get_raw_email(self, email_id: str) -> RawEmail
  • email_id: The unique identifier for the email

RawEmail - Object containing the email ID and raw MIME content

@dataclass
class RawEmail:
id: str # The email ID
raw: str # The raw MIME email content
emails = await inbox.list_emails()
raw_email = await inbox.get_raw_email(emails[0].id)
print(f"Email ID: {raw_email.id}")
print("Raw MIME source:")
print(raw_email.raw)
# Save to file for debugging
with open("email.eml", "w") as f:
f.write(raw_email.raw)

Marks a specific email as read.

async def mark_email_as_read(self, email_id: str) -> None
  • email_id: The unique identifier for the email
emails = await inbox.list_emails()
await inbox.mark_email_as_read(emails[0].id)
print("Email marked as read")

Deletes a specific email from the inbox.

async def delete_email(self, email_id: str) -> None
  • email_id: The unique identifier for the email
emails = await inbox.list_emails()
# Delete first email
await inbox.delete_email(emails[0].id)
print("Email deleted")
# Verify deletion
updated = await inbox.list_emails()
assert len(updated) == len(emails) - 1

Deletes this inbox and all its emails.

async def delete(self) -> None
inbox = await client.create_inbox()
# Use inbox...
# Clean up
await inbox.delete()
print("Inbox deleted")

Always delete inboxes after tests:

import pytest
@pytest.fixture
async def inbox(client):
inbox = await client.create_inbox()
yield inbox
await inbox.delete()

Exports inbox data and encryption keys for backup or sharing.

def export(self) -> ExportedInbox

ExportedInbox - Serializable inbox data including sensitive keys

@dataclass
class ExportedInbox:
version: int # Export format version (always 1)
email_address: str
expires_at: str
inbox_hash: str
server_sig_pk: str # Base64url-encoded
secret_key: str # Base64url-encoded (SENSITIVE!)
exported_at: str

Note: The public key is derived from the secret key during import.

import json
inbox = await client.create_inbox()
data = inbox.export()
# Save for later
with open("inbox-backup.json", "w") as f:
json.dump(vars(data), f, indent=2)

Exported data contains private encryption keys. Store securely!

The Inbox class provides methods for managing webhooks that receive HTTP callbacks when events occur.

Creates a new webhook for this inbox.

async def create_webhook(
self,
url: str,
events: list[str],
*,
template: str | CustomTemplate | None = None,
filter: FilterConfig | None = None,
description: str | None = None,
) -> Webhook
  • url: Target URL for webhook deliveries (HTTPS required in production)
  • events: Event types to subscribe to (e.g., ["email.received"])
  • template: Optional payload template name ("slack", "discord", etc.) or CustomTemplate
  • filter: Optional FilterConfig to only receive matching events
  • description: Optional human-readable description (max 500 chars)

Webhook - The created webhook including the signing secret

webhook = await inbox.create_webhook(
url="https://your-app.com/webhook/emails",
events=["email.received"],
description="Production email notifications",
)
print(f"Webhook ID: {webhook.id}")
print(f"Secret: {webhook.secret}") # Save this!
  • WebhookLimitReachedError - Webhook limit for this inbox is reached

Lists all webhooks for this inbox.

async def list_webhooks(self) -> list[Webhook]

list[Webhook] - List of webhook objects

Note: The signing secret is not included in list responses. Use get_webhook() to retrieve a webhook with its secret.

webhooks = await inbox.list_webhooks()
print(f"Total webhooks: {len(webhooks)}")
for webhook in webhooks:
status = "enabled" if webhook.enabled else "disabled"
print(f"- {webhook.id}: {webhook.url} ({status})")

Retrieves a specific webhook by ID.

async def get_webhook(self, webhook_id: str) -> Webhook
  • webhook_id: The webhook ID (whk_ prefix)

Webhook - The webhook object including secret and stats

@dataclass
class Webhook:
id: str
url: str
events: list[str]
scope: Literal["global", "inbox"]
enabled: bool
created_at: datetime
inbox_email: str | None = None
inbox_hash: str | None = None
secret: str | None = None
template: str | CustomTemplate | None = None
filter: FilterConfig | None = None
description: str | None = None
updated_at: datetime | None = None
last_delivery_at: datetime | None = None
last_delivery_status: Literal["success", "failed"] | None = None
stats: WebhookStats | None = None
webhook = await inbox.get_webhook("whk_abc123")
print(f"URL: {webhook.url}")
print(f"Enabled: {webhook.enabled}")
print(f"Last delivery: {webhook.last_delivery_at or 'Never'}")
if webhook.stats:
print(f"Deliveries: {webhook.stats.successful_deliveries}/{webhook.stats.total_deliveries}")
  • WebhookNotFoundError - Webhook does not exist

Deletes a specific webhook by ID.

async def delete_webhook(self, webhook_id: str) -> None
  • webhook_id: The webhook ID (whk_ prefix)
await inbox.delete_webhook("whk_abc123")
print("Webhook deleted")
  • WebhookNotFoundError - Webhook does not exist

The Webhook object returned from create_webhook() and get_webhook() has additional methods for management.

Updates the webhook configuration.

async def update(
self,
*,
url: str | None = None,
events: list[str] | None = None,
template: str | CustomTemplate | None = None,
remove_template: bool = False,
filter: FilterConfig | None = None,
remove_filter: bool = False,
description: str | None = None,
enabled: bool | None = None,
) -> None
webhook = await inbox.get_webhook("whk_abc123")
await webhook.update(
url="https://your-app.com/webhook/v2/emails",
enabled=False,
)
print(f"Updated URL: {webhook.url}")

Deletes this webhook.

async def delete(self) -> None
webhook = await inbox.get_webhook("whk_abc123")
await webhook.delete()
print("Webhook deleted")

Tests the webhook by sending a test payload.

async def test(self) -> TestWebhookResult

TestWebhookResult - The test result

@dataclass
class TestWebhookResult:
success: bool
status_code: int | None = None
response_time: int | None = None
response_body: str | None = None
error: str | None = None
payload_sent: Any | None = None
webhook = await inbox.get_webhook("whk_abc123")
result = await webhook.test()
if result.success:
print(f"Test passed! Status: {result.status_code}")
print(f"Response time: {result.response_time}ms")
else:
print(f"Test failed: {result.error}")

Rotates the webhook signing secret. The old secret remains valid for a grace period.

async def rotate_secret(self) -> RotateSecretResult

RotateSecretResult - The new secret and grace period info

@dataclass
class RotateSecretResult:
id: str
secret: str
previous_secret_valid_until: str
webhook = await inbox.get_webhook("whk_abc123")
result = await webhook.rotate_secret()
print(f"New secret: {result.secret}")
print(f"Old secret valid until: {result.previous_secret_valid_until}")

Enable or disable the webhook.

async def enable(self) -> None
async def disable(self) -> None
webhook = await inbox.get_webhook("whk_abc123")
await webhook.disable()
print("Webhook disabled")
await webhook.enable()
print("Webhook enabled")

Refresh the webhook data from the server.

async def refresh(self) -> None
webhook = await inbox.get_webhook("whk_abc123")
# ... some time passes ...
await webhook.refresh()
print(f"Updated stats: {webhook.stats}")

The Inbox class provides methods for configuring chaos engineering features to test email resilience. Chaos must be enabled on the gateway server.

Gets the current chaos configuration for this inbox.

async def get_chaos(self) -> ChaosConfig

ChaosConfig - The current chaos configuration

@dataclass
class ChaosConfig:
enabled: bool
expires_at: str | None = None
latency: LatencyConfig | None = None
connection_drop: ConnectionDropConfig | None = None
random_error: RandomErrorConfig | None = None
greylist: GreylistConfig | None = None
blackhole: BlackholeConfig | None = None
config = await inbox.get_chaos()
print(f"Chaos enabled: {config.enabled}")
if config.latency and config.latency.enabled:
print(f"Latency: {config.latency.min_delay_ms}-{config.latency.max_delay_ms}ms")
  • ApiError (403) - Chaos features are disabled on the server

Sets or updates the chaos configuration for this inbox.

async def set_chaos(
self,
*,
enabled: bool,
expires_at: str | None = None,
latency: LatencyConfig | None = None,
connection_drop: ConnectionDropConfig | None = None,
random_error: RandomErrorConfig | None = None,
greylist: GreylistConfig | None = None,
blackhole: BlackholeConfig | None = None,
) -> ChaosConfig
ParameterTypeRequiredDescription
enabledboolYesMaster switch for all chaos
expires_atstr | NoneNoISO 8601 timestamp to auto-disable
latencyLatencyConfig | NoneNoLatency injection config
connection_dropConnectionDropConfig | NoneNoConnection drop config
random_errorRandomErrorConfig | NoneNoRandom error config
greylistGreylistConfig | NoneNoGreylisting config
blackholeBlackholeConfig | NoneNoBlackhole config

ChaosConfig - The updated chaos configuration

from vaultsandbox import LatencyConfig, RandomErrorConfig
config = await inbox.set_chaos(
enabled=True,
latency=LatencyConfig(
enabled=True,
min_delay_ms=1000,
max_delay_ms=5000,
probability=0.5,
),
random_error=RandomErrorConfig(
enabled=True,
error_rate=0.1,
error_types=["temporary"],
),
)
print(f"Chaos enabled: {config.enabled}")
  • ApiError (403) - Chaos features are disabled on the server
  • InboxNotFoundError - Inbox does not exist

Disables all chaos features for this inbox.

async def disable_chaos(self) -> None
# Disable all chaos features
await inbox.disable_chaos()
print("Chaos disabled")
  • ApiError (403) - Chaos features are disabled on the server
  • InboxNotFoundError - Inbox does not exist

import asyncio
import os
import re
from vaultsandbox import VaultSandboxClient, WaitForEmailOptions
async def complete_inbox_example():
async with VaultSandboxClient(
api_key=os.environ["VAULTSANDBOX_API_KEY"],
) as client:
# Create inbox
inbox = await client.create_inbox()
print(f"Created: {inbox.email_address}")
print(f"Expires: {inbox.expires_at.isoformat()}")
# Subscribe to new emails
async def handle_email(email):
print(f"Received: {email.subject}")
subscription = await inbox.on_new_email(handle_email)
try:
# Trigger test email
await send_test_email(inbox.email_address)
# Wait for specific email
email = await inbox.wait_for_email(
WaitForEmailOptions(
timeout=10000,
subject=re.compile(r"Test"),
)
)
print(f"Found email: {email.subject}")
print(f"Body: {email.text}")
# Mark as read
await inbox.mark_email_as_read(email.id)
# Get all emails
all_emails = await inbox.list_emails()
print(f"Total emails: {len(all_emails)}")
# Export inbox
export_data = inbox.export()
with open("inbox.json", "w") as f:
import json
json.dump(vars(export_data), f, indent=2)
finally:
# Clean up
await subscription.unsubscribe()
await inbox.delete()
asyncio.run(complete_inbox_example())