Skip to content

Spam Analysis

VaultSandbox can analyze incoming emails for spam using Rspamd integration. When enabled on the server, each email is scored and results are available in the email object.

Spam analysis helps identify unwanted or malicious emails by:

  • Scoring emails based on various spam indicators
  • Identifying triggered rules (symbols)
  • Providing actionable recommendations (reject, add header, etc.)
  • Operating at both global and per-inbox levels

When spam analysis is enabled, every email has a spam_analysis property:

from vaultsandbox import WaitForEmailOptions
email = await inbox.wait_for_email(WaitForEmailOptions(timeout=10000))
if email.spam_analysis:
print(email.spam_analysis.status) # SpamAnalysisStatus.ANALYZED
print(email.spam_analysis.score) # 5.2
print(email.spam_analysis.is_spam) # False
print(email.spam_analysis.action) # SpamAction.ADD_HEADER
print(email.spam_analysis.symbols) # List of triggered rules

For convenience, the Email class provides shorthand properties:

email = await inbox.wait_for_email(WaitForEmailOptions(timeout=10000))
# Quick access (returns None if not analyzed)
print(email.is_spam) # True/False/None
print(email.spam_score) # float or None
from vaultsandbox.types import SpamAnalysisStatus, SpamAction
@dataclass
class SpamAnalysisResult:
status: SpamAnalysisStatus
score: float | None = None
required_score: float | None = None
action: SpamAction | None = None
is_spam: bool | None = None
symbols: list[SpamSymbol] = field(default_factory=list)
processing_time_ms: int | None = None
info: str | None = None
PropertyTypeDescription
statusSpamAnalysisStatusAnalysis status (analyzed, skipped, error)
scorefloat | NoneOverall spam score (positive = more spammy)
required_scorefloat | NoneThreshold for spam classification
actionSpamAction | NoneRecommended action from Rspamd
is_spambool | NoneWhether score >= required_score
symbolslist[SpamSymbol]List of triggered rules with their scores
processing_time_msint | NoneTime taken for analysis in milliseconds
infostr | NoneAdditional info (error message or skip reason)
from vaultsandbox.types import SpamAnalysisStatus
class SpamAnalysisStatus(str, Enum):
ANALYZED = "analyzed" # Email was successfully analyzed
SKIPPED = "skipped" # Analysis was skipped
ERROR = "error" # Analysis failed
StatusDescription
ANALYZEDEmail was successfully analyzed by Rspamd
SKIPPEDAnalysis was skipped (disabled globally or for this inbox)
ERRORAnalysis failed (timeout, connection error, etc.)

Rspamd returns an action recommendation based on the spam score:

from vaultsandbox.types import SpamAction
class SpamAction(str, Enum):
NO_ACTION = "no action"
GREYLIST = "greylist"
ADD_HEADER = "add header"
REWRITE_SUBJECT = "rewrite subject"
SOFT_REJECT = "soft reject"
REJECT = "reject"
ActionDescription
NO_ACTIONEmail is likely legitimate
GREYLISTTemporary rejection recommended
ADD_HEADERAdd spam header but deliver
REWRITE_SUBJECTModify subject line to indicate spam
SOFT_REJECTTemporary rejection
REJECTPermanently reject the email

Symbols represent individual rules that triggered during analysis:

@dataclass
class SpamSymbol:
name: str
score: float
description: str | None = None
options: list[str] | None = None
PropertyTypeDescription
namestrRule identifier (e.g., DKIM_SIGNED, BAYES_SPAM)
scorefloatScore contribution (positive = spam, negative = ham)
descriptionstr | NoneHuman-readable explanation
optionslist[str] | NoneAdditional context (e.g., matched URLs)

Spam analysis must be enabled on the Gateway server. See Gateway Spam Analysis for server configuration.

You can enable or disable spam analysis per inbox:

from vaultsandbox import CreateInboxOptions
# Enable spam analysis for this inbox
inbox = await client.create_inbox(CreateInboxOptions(spam_analysis=True))
# Disable spam analysis for this inbox
inbox = await client.create_inbox(CreateInboxOptions(spam_analysis=False))
# Use server default (omit the option)
inbox = await client.create_inbox()

Use get_server_info() to check if spam analysis is available:

info = await client.get_server_info()
if info.spam_analysis_enabled:
print("Spam analysis is available on this server")
else:
print("Spam analysis is not enabled on this server")
from vaultsandbox import WaitForEmailOptions
from vaultsandbox.types import SpamAnalysisStatus
email = await inbox.wait_for_email(WaitForEmailOptions(timeout=10000))
# Quick check using convenience property
if email.is_spam:
print("This email is classified as spam")
elif email.is_spam is False:
print("This email is not spam")
else:
print("Spam analysis was not performed")
from vaultsandbox.types import SpamAnalysisStatus, SpamAction
email = await inbox.wait_for_email(WaitForEmailOptions(timeout=10000))
if email.spam_analysis and email.spam_analysis.status == SpamAnalysisStatus.ANALYZED:
analysis = email.spam_analysis
print(f"Spam Score: {analysis.score}")
print(f"Required Score: {analysis.required_score}")
print(f"Is Spam: {analysis.is_spam}")
print(f"Action: {analysis.action.value if analysis.action else 'none'}")
print(f"Processing Time: {analysis.processing_time_ms}ms")
print("\nTriggered Rules:")
for symbol in analysis.symbols:
desc = f" - {symbol.description}" if symbol.description else ""
print(f" {symbol.name}: {symbol.score:+.1f}{desc}")
from vaultsandbox.types import SpamAnalysisStatus
email = await inbox.wait_for_email(WaitForEmailOptions(timeout=10000))
if not email.spam_analysis:
print("No spam analysis data available")
elif email.spam_analysis.status == SpamAnalysisStatus.ANALYZED:
print(f"Analyzed: score={email.spam_analysis.score}")
elif email.spam_analysis.status == SpamAnalysisStatus.SKIPPED:
print(f"Skipped: {email.spam_analysis.info or 'reason unknown'}")
elif email.spam_analysis.status == SpamAnalysisStatus.ERROR:
print(f"Error: {email.spam_analysis.info or 'unknown error'}")
from vaultsandbox import WaitForEmailOptions
from vaultsandbox.types import SpamAnalysisStatus
# Wait for a non-spam email
email = await inbox.wait_for_email(
WaitForEmailOptions(
timeout=30000,
predicate=lambda e: e.is_spam is False,
)
)
# Or filter spam emails
spam_emails = [e for e in await inbox.list_emails() if e.is_spam]
import pytest
from vaultsandbox import WaitForEmailOptions
from vaultsandbox.types import SpamAnalysisStatus
@pytest.mark.asyncio
async def test_spam_detection_works(inbox):
# Send a known spam-like email
await send_spammy_email(inbox.email_address)
email = await inbox.wait_for_email(WaitForEmailOptions(timeout=10000))
assert email.spam_analysis is not None
assert email.spam_analysis.status == SpamAnalysisStatus.ANALYZED
assert email.is_spam is True
assert email.spam_score is not None
assert email.spam_score > 0
import pytest
from vaultsandbox import WaitForEmailOptions
from vaultsandbox.types import SpamAnalysisStatus, SpamAction
@pytest.mark.asyncio
async def test_legitimate_email_not_flagged(inbox):
await send_welcome_email(inbox.email_address)
email = await inbox.wait_for_email(WaitForEmailOptions(timeout=10000))
assert email.is_spam is False
assert email.spam_analysis.action in [SpamAction.NO_ACTION, SpamAction.ADD_HEADER]
import pytest
from vaultsandbox import WaitForEmailOptions
@pytest.mark.asyncio
async def test_dkim_signed_detected(inbox):
await send_email_with_dkim(inbox.email_address)
email = await inbox.wait_for_email(WaitForEmailOptions(timeout=10000))
if email.spam_analysis:
symbol_names = [s.name for s in email.spam_analysis.symbols]
assert "DKIM_SIGNED" in symbol_names or "R_DKIM_ALLOW" in symbol_names
import pytest
from vaultsandbox import WaitForEmailOptions
from vaultsandbox.types import SpamAnalysisStatus
@pytest.mark.asyncio
async def test_handles_missing_spam_analysis(inbox):
"""Test gracefully handles when spam analysis is disabled."""
email = await inbox.wait_for_email(WaitForEmailOptions(timeout=10000))
# These should not raise, just return None
is_spam = email.is_spam
score = email.spam_score
if email.spam_analysis:
if email.spam_analysis.status == SpamAnalysisStatus.SKIPPED:
print("Spam analysis was skipped for this inbox")
if not email.spam_analysis:
# Check server support
info = await client.get_server_info()
if not info.spam_analysis_enabled:
print("Spam analysis is not enabled on the server")
else:
print("Spam analysis enabled but no data - check inbox settings")
from vaultsandbox.types import SpamAnalysisStatus
if email.spam_analysis and email.spam_analysis.status == SpamAnalysisStatus.ERROR:
print(f"Spam analysis failed: {email.spam_analysis.info}")
# Common causes:
# - Rspamd service unreachable
# - Analysis timeout
# - Invalid email format
from vaultsandbox.types import SpamAnalysisStatus
if email.spam_analysis and email.spam_analysis.status == SpamAnalysisStatus.SKIPPED:
print("Spam analysis skipped")
# Check:
# 1. Server has VSB_SPAM_ANALYSIS_ENABLED=true
# 2. Inbox was created with spam_analysis=True (or default is True)
# 3. Gateway is running in 'local' mode (not 'backend' mode)