Skip to content

Discovery Sources API

All sources implement BaseSource and are designed to run concurrently via asyncio.gather().


Base class

coldreach.sources.base

Abstract base class for all ColdReach email discovery sources.

Every source follows the same contract: - Accepts a domain (and optionally a person name hint) - Returns a list of SourceResult objects - Never raises — catches its own errors and returns an empty list with a note

Sources are designed to run concurrently via asyncio.gather().

SourceResult dataclass

SourceResult(
    email, source, url="", context="", confidence_hint=0
)

A single email address found by a source.

Attributes:

Name Type Description
email str

The discovered email address (raw, not yet normalised).

source EmailSource

Which source found this email.

url str

The page or endpoint where the email was found.

context str

Surrounding text snippet that contained the email.

confidence_hint int

Optional score delta hint from the source (0 = no hint).

SourceSummary dataclass

SourceSummary(
    source_name,
    found=0,
    errors=list(),
    skipped=False,
    skip_reason="",
)

Execution summary for one source run.

BaseSource

BaseSource(timeout=10.0)

Bases: ABC

Abstract base for all email discovery sources.

Subclasses must implement :meth:fetch.

Parameters:

Name Type Description Default
timeout float

HTTP / subprocess timeout in seconds.

10.0
Source code in coldreach/sources/base.py
def __init__(self, timeout: float = 10.0) -> None:
    self.timeout = timeout
    self._log = logging.getLogger(f"coldreach.sources.{self.name}")

fetch abstractmethod async

fetch(domain, *, person_name=None)

Discover emails for domain.

Parameters:

Name Type Description Default
domain str

The target domain, e.g. "stripe.com".

required
person_name str | None

Optional full name hint for pattern-based sources.

None

Returns:

Type Description
list[SourceResult]

Discovered email addresses. Empty list if nothing found or on error.

Source code in coldreach/sources/base.py
@abstractmethod
async def fetch(
    self,
    domain: str,
    *,
    person_name: str | None = None,
) -> list[SourceResult]:
    """Discover emails for *domain*.

    Parameters
    ----------
    domain:
        The target domain, e.g. ``"stripe.com"``.
    person_name:
        Optional full name hint for pattern-based sources.

    Returns
    -------
    list[SourceResult]
        Discovered email addresses. Empty list if nothing found or on error.
    """

run async

run(domain, *, person_name=None)

Safe wrapper around :meth:fetch — never raises.

Returns the results list and a summary suitable for logging/display.

Source code in coldreach/sources/base.py
async def run(
    self,
    domain: str,
    *,
    person_name: str | None = None,
) -> tuple[list[SourceResult], SourceSummary]:
    """Safe wrapper around :meth:`fetch` — never raises.

    Returns the results list and a summary suitable for logging/display.
    """
    summary = SourceSummary(source_name=self.name)
    try:
        results = await self.fetch(domain, person_name=person_name)
        summary.found = len(results)
        if results:
            self._log.debug("%s found %d email(s) for %s", self.name, len(results), domain)
    except Exception as exc:
        self._log.warning("%s failed for %s: %s", self.name, domain, exc)
        summary.errors.append(str(exc))
        results = []
    return results, summary

Website crawler

coldreach.sources.web_crawler

Website crawler source.

Crawls the target domain's public pages (homepage, /contact, /team, /about, /people, /staff, /leadership) and extracts email addresses using: - RFC 5322-compliant regex - mailto: link href extraction - Common obfuscation patterns ([at], (at), " at ", [dot], etc.)

Uses httpx for async HTTP (no Playwright/JS required for most B2B sites). Falls back gracefully on SSL errors, redirects, and timeouts.

Source priority for scoring: contact page > team page > generic.

WebCrawlerSource

WebCrawlerSource(
    timeout=10.0, max_pages=15, follow_homepage_links=True
)

Bases: BaseSource

Crawl company website pages to find email addresses.

Fetches the homepage plus a fixed list of high-value paths (/contact, /team, /about, etc.) concurrently and extracts emails from each page.

Parameters:

Name Type Description Default
timeout float

Per-request HTTP timeout in seconds.

10.0
max_pages int

Maximum number of pages to fetch (homepage + paths).

15
follow_homepage_links bool

If True, also parse any internal links on the homepage that match the high-value path patterns.

True
Source code in coldreach/sources/web_crawler.py
def __init__(
    self,
    timeout: float = 10.0,
    max_pages: int = 15,  # increased: more paths now covered
    follow_homepage_links: bool = True,
) -> None:
    super().__init__(timeout=timeout)
    self.max_pages = max_pages
    self.follow_homepage_links = follow_homepage_links

WHOIS

coldreach.sources.whois_source

WHOIS source — extract registrant/admin/tech contact emails from WHOIS records.

Uses python-whois (synchronous) run in an executor to avoid blocking the event loop. WHOIS data often contains registrant email, admin email, and tech contact email — these are high-value leads for small companies.

Note: Many large companies use privacy-protecting WHOIS proxies (e.g. domains@squarespace.com). These are filtered out via a blocklist.

WhoisSource

WhoisSource(timeout=10.0)

Bases: BaseSource

Fetch WHOIS registrant contact emails for a domain.

Parameters:

Name Type Description Default
timeout float

Timeout passed to the thread executor (seconds).

10.0
Source code in coldreach/sources/base.py
def __init__(self, timeout: float = 10.0) -> None:
    self.timeout = timeout
    self._log = logging.getLogger(f"coldreach.sources.{self.name}")

GitHub

coldreach.sources.github

GitHub source — mine commit author emails for a company domain.

Strategy: 1. Search GitHub for repos whose owner matches the company domain slug (e.g. "stripe.com" → org slug "stripe") 2. Fetch recent commits from top repos → extract author.email 3. Filter to emails belonging to the target domain

Uses the public GitHub REST API (unauthenticated: 60 req/hr, authenticated: 5000/hr). Set COLDREACH_GITHUB_TOKEN in .env for higher rate limits.

Rate limit handling: - Checks X-RateLimit-Remaining header - Stops early if remaining < 5 to avoid exhausting the allowance

GitHubSource

GitHubSource(token=None, timeout=10.0)

Bases: BaseSource

Mine public GitHub commits for company domain email addresses.

Parameters:

Name Type Description Default
token str | None

Optional GitHub personal access token for higher rate limits.

None
timeout float

Per-request HTTP timeout in seconds.

10.0
Source code in coldreach/sources/github.py
def __init__(self, token: str | None = None, timeout: float = 10.0) -> None:
    super().__init__(timeout=timeout)
    self._token = token

Reddit

coldreach.sources.reddit

Reddit source — search Reddit posts/comments for company contact emails.

Uses the public Reddit JSON API — no authentication required. Rate limit: 1 request per second (enforced via asyncio.sleep).

Queries: 1. Search for "@domain.com" mentions across all of Reddit 2. Search for "company name" + "email" or "contact"

Both queries parse post titles, selftext, and comment bodies for email patterns matching the target domain.

RedditSource

RedditSource(timeout=10.0, max_results=25)

Bases: BaseSource

Search Reddit for company email addresses via the public JSON API.

Parameters:

Name Type Description Default
timeout float

Per-request HTTP timeout in seconds.

10.0
max_results int

Maximum number of Reddit posts to inspect per query.

25
Source code in coldreach/sources/reddit.py
def __init__(self, timeout: float = 10.0, max_results: int = 25) -> None:
    super().__init__(timeout=timeout)
    self.max_results = max_results

Search engine

coldreach.sources.search_engine

Search engine source — queries SearXNG (self-hosted) for domain email mentions.

Fallback chain (in order): 1. SearXNG local instance (http://localhost:8080 by default) 2. DuckDuckGo Lite HTML (no JS, scrapeable, no auth) 3. Brave Search API (free tier: 2000 req/month, requires API key)

Queries run against the target domain: - "@domain.com" — direct email format search - site:domain.com email contact — on-site contact pages - "domain.com" email — general mentions

Rate limiting: - SearXNG: 1 req/3s per query (configurable) - DDG Lite: 1 req/5s (more aggressive block) - Brave: respects X-RateLimit-Remaining header

All results are filtered to emails belonging to the target domain.

SearchEngineSource

SearchEngineSource(
    searxng_url=_SEARXNG_DEFAULT,
    brave_api_key=None,
    query_delay=3.0,
    timeout=10.0,
)

Bases: BaseSource

Search for domain email addresses via SearXNG → DDG Lite → Brave fallback chain.

Parameters:

Name Type Description Default
searxng_url str | None

URL of the local SearXNG instance. Set to None to skip SearXNG.

_SEARXNG_DEFAULT
brave_api_key str | None

Brave Search API key (free: 2000 req/month). Set to None to skip.

None
query_delay float

Seconds to wait between queries to avoid rate limiting.

3.0
timeout float

Per-request HTTP timeout in seconds.

10.0
Source code in coldreach/sources/search_engine.py
def __init__(
    self,
    searxng_url: str | None = _SEARXNG_DEFAULT,
    brave_api_key: str | None = None,
    query_delay: float = 3.0,
    timeout: float = 10.0,
) -> None:
    super().__init__(timeout=timeout)
    self.searxng_url = searxng_url
    self.brave_api_key = brave_api_key
    self.query_delay = query_delay

theHarvester

coldreach.sources.harvester

theHarvester source — HTTP REST API client.

The coldreach-theharvester Docker container runs restfulHarvest, which exposes a REST API on port 5050. We call it directly with httpx instead of docker exec (which failed silently because the container's entrypoint is the API server, not the CLI).

API endpoint: GET http://localhost:5050/query ?domain=acme.com&source=duckduckgo,bing,crtsh&limit=500

Swagger docs (when container is running): http://localhost:5050/docs

Response JSON: { "emails": ["user@acme.com", ...], "hosts": [...], "interesting_urls": [...], ... }

Free sources (no API key needed): duckduckgo, yahoo, bing, baidu, crtsh, certspotter, hackertarget, rapiddns, dnsdumpster, urlscan, otx, robtex

Excluded (slow / decommissioned): commoncrawl — terabyte dataset, queries take 10+ minutes waybackarchive — rarely contains emails, very slow thc — unreliable timeouts threatcrowd — decommissioned

Service: docker compose up theharvester

HarvesterSource

HarvesterSource(
    api_base=_API_BASE,
    sources=None,
    limit=500,
    timeout=240.0,
    container="coldreach-theharvester",
    max_wait=240.0,
    harvester_sources=None,
)

Bases: BaseSource

Discover emails via the theHarvester REST API (localhost:5050).

Calls GET /query?domain=...&source=...&limit=... on the running coldreach-theharvester container. No docker exec needed — the container's REST server is the correct integration point.

Parameters:

Name Type Description Default
api_base str

Base URL of the theHarvester REST server.

_API_BASE
sources str | None

Comma-separated source names. Defaults to all free sources.

None
limit int

Maximum results per source query.

500
timeout float

HTTP request timeout in seconds. theHarvester queries several external APIs so allow generous time.

240.0
Source code in coldreach/sources/harvester.py
def __init__(
    self,
    api_base: str = _API_BASE,
    sources: str | None = None,
    limit: int = 500,
    timeout: float = 240.0,
    # Kept for compatibility with FinderConfig.harvester_container
    container: str = "coldreach-theharvester",
    max_wait: float = 240.0,
    harvester_sources: str | None = None,
) -> None:
    super().__init__(timeout=timeout)
    self.api_base = api_base.rstrip("/")
    self.sources = sources or harvester_sources or ",".join(_FREE_SOURCES)
    self.limit = limit

SpiderFoot

coldreach.sources.spiderfoot

SpiderFoot source — REST API client with incremental result streaming.

Uses SpiderFoot's built-in CherryPy REST API (localhost:5001) instead of docker exec. This gives us real-time results as the scan progresses:

  1. POST /startscan → get scan_id immediately
  2. Poll GET /scaneventresults?id=SCAN_ID&eventType=EMAILADDR every 15s
  3. Emit each new email as it appears (streaming-friendly)
  4. GET /stopscan?id=SCAN_ID when done/timeout reached

Why not docker exec sf.py? - sf.py outputs ALL results only at the very end (no streaming) - The scan continues running in SpiderFoot's DB even after we kill docker exec - No way to cancel from Python side - REST API solves all three problems

Key endpoints confirmed working (SpiderFoot v4.0.0): GET /ping → ["SUCCESS", "4.0.0"] GET /scanlist → list of all scans POST /startscan (form data) → ["SUCCESS", "SCAN_ID"] GET /scaneventresults?id=ID&eventType=X → [[...], [...]] rows GET /scanstatus?id=ID → [name, target, started, ...] GET /stopscan?id=ID → stops the scan

Modules used (fast, no API keys, effective): sfp_pgp — PGP keyservers: finds 20+ emails per domain in ~60s sfp_emailformat — email-format.com database: instant sfp_whois — WHOIS registrant: instant sfp_email — extracts emails from any content fed by other modules sfp_citadel — breach/enrichment databases (free sources: PeopleDataLabs)

SpiderFootSource

SpiderFootSource(
    api_base=_API_BASE,
    max_wait=_MAX_SCAN_SECONDS,
    container="coldreach-spiderfoot",
    timeout=30.0,
)

Bases: BaseSource

Email discovery via SpiderFoot REST API with incremental streaming.

Creates a scan via SpiderFoot's REST API, polls for results every 15s, and yields emails as they are found. Stops the scan when done or after the configured timeout.

Parameters:

Name Type Description Default
api_base str

SpiderFoot web server base URL.

_API_BASE
max_wait float

Maximum seconds before the scan is aborted.

_MAX_SCAN_SECONDS
timeout float

HTTP request timeout for individual API calls.

30.0
Source code in coldreach/sources/spiderfoot.py
def __init__(
    self,
    api_base: str = _API_BASE,
    max_wait: float = _MAX_SCAN_SECONDS,
    # Kept for FinderConfig compatibility
    container: str = "coldreach-spiderfoot",
    timeout: float = 30.0,
) -> None:
    super().__init__(timeout=timeout)
    self.api_base = api_base.rstrip("/")
    self.max_wait = max_wait

fetch async

fetch(domain, *, person_name=None)

Run scan and return all found emails (blocking until done or timeout).

Source code in coldreach/sources/spiderfoot.py
async def fetch(
    self,
    domain: str,
    *,
    person_name: str | None = None,
) -> list[SourceResult]:
    """Run scan and return all found emails (blocking until done or timeout)."""
    results: list[SourceResult] = []
    async for r in self.fetch_stream(domain):
        results.append(r)
    return results

fetch_stream async

fetch_stream(domain, *, person_name=None)

Stream email results as they are found by SpiderFoot.

Yields a SourceResult each time SpiderFoot discovers a new email, allowing callers to forward results to SSE streams immediately.

Source code in coldreach/sources/spiderfoot.py
async def fetch_stream(
    self,
    domain: str,
    *,
    person_name: str | None = None,
) -> AsyncIterator[SourceResult]:
    """Stream email results as they are found by SpiderFoot.

    Yields a SourceResult each time SpiderFoot discovers a new email,
    allowing callers to forward results to SSE streams immediately.
    """
    if not await self._is_available():
        self._log.debug(
            "SpiderFoot not reachable at %s — is the container running?",
            self.api_base,
        )
        return

    scan_id = await self._start_scan(domain)
    if not scan_id:
        self._log.debug("SpiderFoot: failed to start scan for %s", domain)
        return

    self._log.info("SpiderFoot: scan %s started for %s", scan_id, domain)

    domain_lower = domain.lower()
    seen_emails: set[str] = set()
    elapsed = 0.0

    try:
        while elapsed < self.max_wait:
            await asyncio.sleep(_POLL_INTERVAL)
            elapsed += _POLL_INTERVAL

            # Check if scan is still running
            status = await self._scan_status(scan_id)
            self._log.debug(
                "SpiderFoot scan %s: status=%s elapsed=%.0fs", scan_id, status, elapsed
            )

            # Fetch all current EMAILADDR results
            rows = await self._fetch_results(scan_id)
            for row in rows:
                # Row format: [event_type, data, source_module, ...]
                if isinstance(row, (list, tuple)) and len(row) >= 2:
                    raw_email = str(row[1]).strip().lower()
                else:
                    continue

                # Strip trailing annotations like " [apollo.io]"
                email = raw_email.split("[")[0].strip()

                if not _EMAIL_RE.match(email):
                    continue
                if "@" not in email or email in seen_emails:
                    continue
                if not (
                    email.endswith(f"@{domain_lower}") or email.endswith(f".{domain_lower}")
                ):
                    continue

                seen_emails.add(email)
                yield SourceResult(
                    email=email,
                    source=EmailSource.SPIDERFOOT,
                    url=self.api_base,
                    context=f"SpiderFoot: {str(row[2]).strip() if len(row) > 2 else ''}",
                    confidence_hint=25,
                )

            if status in ("FINISHED", "ERROR", "ABORTED"):
                self._log.info(
                    "SpiderFoot scan %s finished (%s) — %d emails, %.0fs",
                    scan_id,
                    status,
                    len(seen_emails),
                    elapsed,
                )
                break

    finally:
        # Always stop the scan — avoids orphaned scans in the SpiderFoot UI
        await self._stop_scan(scan_id)