Skip to content

Cache & Storage API

Two-layer cache: SQLite (always-on) + Redis (optional). Results are stored as JSON-serialized DomainResult objects with a configurable TTL.


Cache store

coldreach.storage.cache

Local result cache for ColdReach domain scans.

Two-layer cache: 1. SQLite — always available, stored at ~/.coldreach/cache.db 2. Redis — optional faster layer; used when redis_url is configured

Both layers use a 7-day TTL by default (configurable).

DomainResult objects are round-tripped via Pydantic JSON (model_dump_json / model_validate_json), preserving all fields including nested EmailRecord and SourceRecord objects.

Thread safety: SQLite connections are per-instance (not shared across threads). For concurrent callers, instantiate one CacheStore per thread or rely on SQLite's built-in WAL locking.

CacheStore

CacheStore(
    db_path=_DEFAULT_DB,
    redis_url=None,
    ttl_days=_DEFAULT_TTL_DAYS,
)

SQLite-backed domain result cache with optional Redis layer.

Parameters:

Name Type Description Default
db_path str | None

Path to the SQLite database file. ~ is expanded automatically. Pass None to disable SQLite (Redis-only or no-op).

_DEFAULT_DB
redis_url str | None

Redis connection URL, e.g. "redis://localhost:6380/0". Pass None to skip Redis.

None
ttl_days int

How long to keep cached results before they expire.

_DEFAULT_TTL_DAYS
Source code in coldreach/storage/cache.py
def __init__(
    self,
    db_path: str | None = _DEFAULT_DB,
    redis_url: str | None = None,
    ttl_days: int = _DEFAULT_TTL_DAYS,
) -> None:
    self.ttl_seconds = ttl_days * 86400
    self._db_path = os.path.expanduser(db_path) if db_path else None
    self._redis: object = None

    if self._db_path:
        os.makedirs(os.path.dirname(self._db_path), exist_ok=True)
        self._init_db()

    if redis_url:
        try:
            import redis as redis_lib  # optional dependency

            self._redis = redis_lib.Redis.from_url(redis_url, decode_responses=True)
            self._redis.ping()
            logger.debug("Cache: Redis connected at %s", redis_url)
        except Exception as exc:
            logger.debug("Cache: Redis unavailable (%s) — using SQLite only", exc)
            self._redis = None

get

get(domain)

Return a cached DomainResult for domain, or None on miss/expiry.

Source code in coldreach/storage/cache.py
def get(self, domain: str) -> DomainResult | None:
    """Return a cached DomainResult for *domain*, or ``None`` on miss/expiry."""
    domain = domain.strip().lower()
    now = time.time()

    # 1. Try Redis first (fast path)
    if self._redis is not None:
        try:
            raw = self._redis.get(f"{_REDIS_PREFIX}{domain}")  # type: ignore[attr-defined]
            if raw:
                result = DomainResult.model_validate_json(raw)
                logger.debug("Cache: Redis hit for %s", domain)
                return result
        except Exception as exc:
            logger.debug("Cache: Redis get failed (%s) — falling back to SQLite", exc)

    # 2. Try SQLite
    if self._db_path:
        try:
            with self._connect() as conn:
                row = conn.execute(
                    "SELECT result_json, expires_at FROM domain_cache WHERE domain = ?",
                    (domain,),
                ).fetchone()
            if row:
                if row["expires_at"] > now:
                    result = DomainResult.model_validate_json(row["result_json"])
                    logger.debug("Cache: SQLite hit for %s", domain)
                    return result
                # Expired — clean it up
                self._delete_sqlite(domain)
                logger.debug("Cache: expired entry deleted for %s", domain)
        except Exception as exc:
            logger.debug("Cache: SQLite get failed (%s)", exc)

    return None

set

set(domain, result)

Store result for domain in all available cache layers.

Source code in coldreach/storage/cache.py
def set(self, domain: str, result: DomainResult) -> None:
    """Store *result* for *domain* in all available cache layers."""
    domain = domain.strip().lower()
    now = time.time()
    expires_at = now + self.ttl_seconds
    json_str = result.model_dump_json()

    # Redis
    if self._redis is not None:
        try:
            self._redis.setex(  # type: ignore[attr-defined]
                f"{_REDIS_PREFIX}{domain}",
                self.ttl_seconds,
                json_str,
            )
            logger.debug("Cache: Redis stored %s (TTL %ds)", domain, self.ttl_seconds)
        except Exception as exc:
            logger.debug("Cache: Redis set failed (%s)", exc)

    # SQLite
    if self._db_path:
        try:
            with self._connect() as conn:
                conn.execute(
                    """
                    INSERT OR REPLACE INTO domain_cache
                        (domain, result_json, cached_at, expires_at)
                    VALUES (?, ?, ?, ?)
                    """,
                    (domain, json_str, now, expires_at),
                )
            logger.debug("Cache: SQLite stored %s", domain)
        except Exception as exc:
            logger.debug("Cache: SQLite set failed (%s)", exc)

clear

clear(domain=None)

Delete cached entries.

Parameters:

Name Type Description Default
domain str | None

If given, delete only that domain. Otherwise delete everything.

None

Returns:

Type Description
int

Number of SQLite rows deleted.

Source code in coldreach/storage/cache.py
def clear(self, domain: str | None = None) -> int:
    """Delete cached entries.

    Parameters
    ----------
    domain:
        If given, delete only that domain. Otherwise delete everything.

    Returns
    -------
    int
        Number of SQLite rows deleted.
    """
    deleted = 0

    # Redis
    if self._redis is not None:
        try:
            if domain:
                self._redis.delete(f"{_REDIS_PREFIX}{domain}")  # type: ignore[attr-defined]
            else:
                keys = self._redis.keys(f"{_REDIS_PREFIX}*")  # type: ignore[attr-defined]
                if keys:
                    self._redis.delete(*keys)  # type: ignore[attr-defined]
        except Exception as exc:
            logger.debug("Cache: Redis clear failed (%s)", exc)

    # SQLite
    if self._db_path:
        try:
            with self._connect() as conn:
                if domain:
                    cur = conn.execute(
                        "DELETE FROM domain_cache WHERE domain = ?",
                        (domain.strip().lower(),),
                    )
                else:
                    cur = conn.execute("DELETE FROM domain_cache")
                deleted = cur.rowcount
        except Exception as exc:
            logger.debug("Cache: SQLite clear failed (%s)", exc)

    return deleted

list_domains

list_domains()

Return all cached domains as (domain, cached_at, is_expired) tuples.

Source code in coldreach/storage/cache.py
def list_domains(self) -> list[tuple[str, datetime, bool]]:
    """Return all cached domains as (domain, cached_at, is_expired) tuples."""
    if not self._db_path:
        return []
    now = time.time()
    try:
        with self._connect() as conn:
            rows = conn.execute(
                "SELECT domain, cached_at, expires_at FROM domain_cache ORDER BY cached_at DESC"
            ).fetchall()
        return [
            (
                row["domain"],
                datetime.fromtimestamp(row["cached_at"], tz=UTC),
                row["expires_at"] <= now,
            )
            for row in rows
        ]
    except Exception as exc:
        logger.debug("Cache: SQLite list failed (%s)", exc)
        return []

stats

stats()

Return basic cache statistics.

Source code in coldreach/storage/cache.py
def stats(self) -> dict[str, int]:
    """Return basic cache statistics."""
    if not self._db_path:
        return {"total": 0, "valid": 0, "expired": 0}
    now = time.time()
    try:
        with self._connect() as conn:
            total = conn.execute("SELECT COUNT(*) FROM domain_cache").fetchone()[0]
            valid = conn.execute(
                "SELECT COUNT(*) FROM domain_cache WHERE expires_at > ?", (now,)
            ).fetchone()[0]
        return {"total": total, "valid": valid, "expired": total - valid}
    except Exception:
        return {"total": 0, "valid": 0, "expired": 0}

Finder config & orchestration

coldreach.core.finder

find_emails() — the main orchestrator.

Runs all configured sources concurrently, deduplicates results, runs the verification pipeline on each candidate, and returns a ranked DomainResult.

Design
  • Sources run in parallel via asyncio.gather (fire-and-forget per source)
  • Each source's results are merged into a shared email→SourceRecord map
  • After all sources complete, the verification pipeline scores each email
  • Final DomainResult is sorted by confidence descending

Source precedence for confidence_hint (added on top of pipeline score): website/contact +35 website/team +30 website/about +25 github/commit +25 whois +20 reddit +15 website/generic +15

FinderConfig dataclass

FinderConfig(
    use_web_crawler=True,
    use_whois=True,
    use_github=True,
    use_reddit=True,
    use_search_engine=True,
    use_intelligent_search=True,
    use_harvester=True,
    use_spiderfoot=True,
    use_firecrawl=False,
    use_crawl4ai=False,
    use_role_emails=True,
    github_token=None,
    searxng_url="http://localhost:8088",
    firecrawl_url="http://localhost:3002",
    brave_api_key=None,
    spiderfoot_container="coldreach-spiderfoot",
    spiderfoot_max_wait=180.0,
    harvester_container="coldreach-theharvester",
    harvester_max_wait=240.0,
    background_slow_sources=False,
    harvester_sources=None,
    reacher_url="http://localhost:8083",
    use_reacher=True,
    use_holehe=False,
    cache_db="~/.coldreach/cache.db",
    redis_url=None,
    cache_ttl_days=7,
    use_cache=True,
    refresh_cache=False,
    min_confidence=0,
    request_timeout=10.0,
    max_concurrent_sources=6,
)

Runtime options for find_emails().

Attributes:

Name Type Description
use_web_crawler bool

Crawl company website pages.

use_whois bool

Query WHOIS for registrant contact emails.

use_github bool

Mine public GitHub commits for domain emails.

use_reddit bool

Search Reddit for domain email mentions.

github_token str | None

Optional GitHub PAT for higher rate limits (5000/hr vs 60/hr).

min_confidence int

Exclude emails below this confidence from the final result.

request_timeout float

Per-source HTTP timeout in seconds.

max_concurrent_sources int

Maximum number of sources to run simultaneously.

find_emails async

find_emails(domain, *, person_name=None, config=None)

Discover and verify all email addresses for domain.

Parameters:

Name Type Description Default
domain str

Target domain, e.g. "stripe.com".

required
person_name str | None

Optional full name for pattern-based narrowing.

None
config FinderConfig | None

Finder configuration. Uses sensible defaults if not provided.

None

Returns:

Type Description
DomainResult

All discovered, verified, and ranked email addresses for domain.

Source code in coldreach/core/finder.py
async def find_emails(
    domain: str,
    *,
    person_name: str | None = None,
    config: FinderConfig | None = None,
) -> DomainResult:
    """Discover and verify all email addresses for *domain*.

    Parameters
    ----------
    domain:
        Target domain, e.g. ``"stripe.com"``.
    person_name:
        Optional full name for pattern-based narrowing.
    config:
        Finder configuration. Uses sensible defaults if not provided.

    Returns
    -------
    DomainResult
        All discovered, verified, and ranked email addresses for *domain*.
    """
    cfg = config or FinderConfig()
    domain = domain.strip().lower().removeprefix("www.")

    # ── Cache lookup ──────────────────────────────────────────────────────────
    cache: CacheStore | None = None
    if cfg.use_cache and cfg.cache_db:
        cache = CacheStore(
            db_path=cfg.cache_db,
            redis_url=cfg.redis_url,
            ttl_days=cfg.cache_ttl_days,
        )
        if not cfg.refresh_cache:
            cached = cache.get(domain)
            if cached is not None:
                logger.info("Cache hit for %s — skipping sources", domain)
                cached.emails = cached.sorted_emails(min_confidence=cfg.min_confidence)
                return cached

    # ── Build source list ─────────────────────────────────────────────────────
    sources: list[BaseSource] = []
    if cfg.use_web_crawler:
        sources.append(WebCrawlerSource(timeout=cfg.request_timeout))
    if cfg.use_whois:
        sources.append(WhoisSource(timeout=cfg.request_timeout))
    if cfg.use_github:
        sources.append(GitHubSource(token=cfg.github_token, timeout=cfg.request_timeout))
    if cfg.use_reddit:
        sources.append(RedditSource(timeout=cfg.request_timeout))
    if cfg.use_search_engine:
        sources.append(
            SearchEngineSource(
                searxng_url=cfg.searxng_url,
                brave_api_key=cfg.brave_api_key,
                timeout=cfg.request_timeout,
            )
        )
    if cfg.use_intelligent_search:
        sources.append(
            IntelligentSearchSource(
                searxng_url=cfg.searxng_url or "http://localhost:8088",
                timeout=cfg.request_timeout,
            )
        )
    if cfg.use_harvester:
        sources.append(
            HarvesterSource(
                sources=cfg.harvester_sources,
                timeout=cfg.harvester_max_wait,
            )
        )
    if cfg.use_spiderfoot:
        sources.append(
            SpiderFootSource(
                container=cfg.spiderfoot_container,
                max_wait=cfg.spiderfoot_max_wait,
            )
        )
    if cfg.use_firecrawl:
        sources.append(
            FirecrawlSource(firecrawl_url=cfg.firecrawl_url, timeout=cfg.request_timeout)
        )
    if cfg.use_crawl4ai:
        sources.append(Crawl4AISource(timeout=cfg.request_timeout))

    # Split fast and slow sources so fast results are returned immediately
    fast_sources = [s for s in sources if s.name not in _SLOW_SOURCE_NAMES]
    slow_sources = [s for s in sources if s.name in _SLOW_SOURCE_NAMES]

    logger.info(
        "Running %d fast + %d slow source(s) for domain: %s",
        len(fast_sources),
        len(slow_sources),
        domain,
    )

    # ── Run sources concurrently (semaphore-limited) ──────────────────────────
    sem = asyncio.Semaphore(cfg.max_concurrent_sources)

    async def _run_source(src: BaseSource) -> list[SourceResult]:
        async with sem:
            results, summary = await src.run(domain, person_name=person_name)
            if summary.errors:
                logger.warning("[%s] errors: %s", src.name, "; ".join(summary.errors))
            return results

    # Run fast sources — results available in ~30s
    fast_results_nested = await asyncio.gather(*[_run_source(s) for s in fast_sources])
    all_raw: list[SourceResult] = [r for batch in fast_results_nested for r in batch]

    # Run slow sources — either inline or as background task
    if slow_sources:
        if cfg.background_slow_sources:
            # Keep a reference to prevent garbage collection (RUF006)
            _bg_task = asyncio.create_task(
                _run_slow_sources_background(slow_sources, domain, person_name, cfg, cache)
            )
            # Suppress "task was destroyed" warning for fire-and-forget tasks
            _bg_task.add_done_callback(lambda t: t.exception() if not t.cancelled() else None)
            logger.info(
                "Background scan started for %s: %s",
                domain,
                ", ".join(s.name for s in slow_sources),
            )
        else:
            # Inline: wait for slow sources too (blocks but gives complete results)
            slow_results_nested = await asyncio.gather(*[_run_source(s) for s in slow_sources])
            for batch in slow_results_nested:
                all_raw.extend(batch)

    logger.info("Sources returned %d raw email candidate(s) for %s", len(all_raw), domain)

    # ── Pattern generation (when person_name is given) ────────────────────────
    if person_name:
        found_emails = [r.email for r in all_raw]
        # Higher confidence_hint when format is inferred from real emails
        has_known = bool(found_emails)
        patterns = targeted_patterns(person_name, domain, found_emails)
        for pat in patterns:
            all_raw.append(
                SourceResult(
                    email=pat.email,
                    source=EmailSource.PATTERN_GENERATED,
                    url="",
                    context=f"pattern: {pat.format_name}",
                    confidence_hint=10 if has_known else 5,
                )
            )
        if patterns:
            logger.debug(
                "Pattern learner added %d candidate(s) for '%s' at %s",
                len(patterns),
                person_name,
                domain,
            )

    # ── Role emails (info@, sales@, contact@, …) ──────────────────────────────
    if cfg.use_role_emails:
        existing = {r.email.strip().lower() for r in all_raw}
        for rp in generate_role_emails(domain):
            if rp.email not in existing:
                all_raw.append(
                    SourceResult(
                        email=rp.email,
                        source=EmailSource.PATTERN_GENERATED,
                        url="",
                        context=rp.format_name,
                        confidence_hint=5,
                    )
                )

    # ── Deduplicate + build source map ────────────────────────────────────────
    grouped = _merge_results(all_raw)

    # ── Verify each unique candidate ──────────────────────────────────────────
    domain_result = DomainResult(domain=domain)

    reacher_url = cfg.reacher_url if cfg.use_reacher else None

    for email, source_results in grouped.items():
        pipeline = await run_basic_pipeline(
            email,
            reacher_url=reacher_url,
            run_holehe=cfg.use_holehe,
        )

        # Cumulative confidence: pipeline score + sum of source hints (clamped)
        source_hint = sum(sr.confidence_hint for sr in source_results)
        confidence = min(100, pipeline.score + source_hint)

        # Map pipeline result to VerificationStatus
        reacher_check = pipeline.checks.get("reacher")
        if not pipeline.passed:
            status = VerificationStatus.INVALID
            confidence = 0
        elif reacher_check and reacher_check.passed:
            status = VerificationStatus.VALID
        elif reacher_check and reacher_check.warned:
            status = VerificationStatus.CATCH_ALL
        elif reacher_check and reacher_check.failed:
            status = VerificationStatus.UNDELIVERABLE
        elif pipeline.mx_records:
            status = VerificationStatus.UNKNOWN  # DNS OK, SMTP not verified
        else:
            status = VerificationStatus.RISKY

        # Build SourceRecord list (deduplicated by source type)
        seen_sources: set[EmailSource] = set()
        source_records: list[SourceRecord] = []
        for sr in source_results:
            if sr.source not in seen_sources:
                seen_sources.add(sr.source)
                source_records.append(
                    SourceRecord(
                        source=sr.source,
                        url=sr.url or None,
                        context=sr.context,
                    )
                )

        record = EmailRecord(
            email=pipeline.normalized_email,
            confidence=confidence,
            status=status,
            sources=source_records,
            mx_records=pipeline.mx_records,
        )
        domain_result.add_email(record)

    # ── Store full result in cache (before min_confidence filter) ────────────
    if cache is not None:
        cache.set(domain, domain_result)

    # ── Filter + sort ─────────────────────────────────────────────────────────
    domain_result.emails = domain_result.sorted_emails(min_confidence=cfg.min_confidence)

    logger.info(
        "find_emails complete for %s: %d verified email(s)", domain, len(domain_result.emails)
    )
    return domain_result