Two-tier architecture: a global dedup layer owned by the enrichment pipeline, and a workspace layer that stores per-tenant CRM context without duplicating contact data.
┌──────────────────────────────────────────────────────────────────────────────────────┐
│ WORKSPACE LAYER (per-tenant, CRM context) │
│ │
│ ┌──────────────────────────────────────────┐ ┌───────────────────────────────────┐│
│ │ workspace_contact_binding │ │ org_bindings ││
│ │ │ │ ││
│ │ • workspace_id │ │ • workspace_id ││
│ │ • first_name, last_name, full_name │ │ • data_owner (user/enrichment) ││
│ │ • title, headline, seniority │ │ • labels[] ││
│ │ • departments[] │ │ • custom_fields JSONB ││
│ │ • primary_email, primary_phone │ │ • bound_by_user_id ││
│ │ • city, state, country, timezone │ │ ││
│ │ • current_org_id (denorm — no JOIN) │ └─────────────────┬─────────────────┘│
│ │ • current_org_name (denorm) │ │ org_id FK │
│ │ • stage (crm_stage_enum) │ │ │
│ │ • labels[], custom_fields JSONB │ │ │
│ │ • contact_owner_id, is_favorite, notes │ │ │
│ │ • data_owner, import_source │ │ │
│ └──────────────────────┬───────────────────┘ │ │
│ │ contact_id FK │ │
│ │ │ │
│ ┌──────────────────────┴──────────────┐ │ │
│ │ contact_latest_data │ │ │
│ │ (authoritative binding pointer) │ │ │
│ │ PK: contact_id │ │ │
│ │ UQ: workspace_contact_binding_id │ │ │
│ └──────────────────────┬──────────────┘ │ │
└─────────────────────────┼──────────────────────────────────────── ┼───────────────────┘
│ contact_id FK │ org_id FK
▼ ▼
┌──────────────────────────────────────────────────────────────────────────────────────┐
│ GLOBAL / PLATFORM LAYER (enrichment pipeline writes — all workspaces share) │
│ │
│ ┌───────────────────────────────┐ ┌────────────────────────────────────────┐ │
│ │ contact (dedup anchor) │ │ organisation │ │
│ │ • dedup_hash │ │ • name, primary_domain │ │
│ │ • enrichment_score 0–100 │ │ • industry, employee_count │ │
│ │ • last_enriched_at │ │ • total_funding_cents │ │
│ │ • soft-delete: deleted_at │ │ • linkedin_uid, stock_symbol │ │
│ └───────────┬───────────────────┘ │ • parent_org_id (self-ref hierarchy) │ │
│ │ │ • dedup_hash, enrichment_score │ │
│ ┌────────┴──────────────────────┐ └──────────────────┬─────────────────────┘ │
│ │ contact sub-tables │ │ │
│ │ ├── contact_identifier │ ┌──────────────────┴──────────────────┐ │
│ │ │ (dedup gate) │ │ org sub-tables │ │
│ │ ├── contact_email │ │ ├── org_identifier │ │
│ │ ├── contact_phones │ │ ├── org_funding_event │ │
│ │ ├── contact_employment ─────────── ┼─► org_technologies │ │
│ │ │ (→ links to org) │ │ └── org_phones │ │
│ │ ├── contact_education │ └─────────────────────────────────────┘ │
│ │ ├── contact_skills │ │
│ │ ├── contact_certifications │ ┌──────────────────────────────────────┐ │
│ │ └── contact_languages │ │ data_source (reference / registry) │ │
│ └───────────────────────────────┘ │ 20 seeded rows │ │
│ │ manual / integration / enrichment │ │
│ └──────────────────────────────────────┘ │
│ │
│ ┌───────────────────────────────────────────────────────────────────────────────┐ │
│ │ enrichment_requests (lifecycle + credit tracking) │ │
│ │ entity_type: contact | organisation status: pending → processing → success │ │
│ └───────────────────────────────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────────────────────────┘
All 20 tables in the contact schema, grouped by domain.
schema: contact
│
├── REFERENCE
│ └── data_source
│ source_type: manual | integration | enrichment | ai
│ 20 seeded rows (manual_entry, gmail_api, apollo, apify_linkedin_*, …)
│
├── GLOBAL CONTACT GRAPH
│ ├── contact ← dedup anchor, minimal fields
│ │ dedup_hash, enrichment_score 0–100, last_enriched_at
│ │
│ ├── contact_latest_data ← authoritative binding pointer
│ │ PK(contact_id), UQ(workspace_contact_binding_id)
│ │ Both FKs ON DELETE CASCADE
│ │
│ ├── contact_identifier ← dedup gate
│ │ UQ (identifier_type, normalized_value) WHERE deleted_at IS NULL
│ │ types: email | phone_e164 | linkedin_urn | linkedin_public_id
│ │ ig_username | ig_user_id | fb_user_id | twitter_handle
│ │ github_username | domain
│ │
│ ├── contact_email
│ │ UQ (contact_id, email) | is_primary flag
│ │ status: verified | unverified | bounced | catch_all | spam_trap
│ │ confidence SMALLINT 0–100
│ │
│ ├── contact_phones
│ │ e164_number, UQ (contact_id, e164_number)
│ │ status: valid | invalid | dnc | unverified
│ │
│ ├── contact_employment ──────────────────────────► organisation
│ │ is_current BOOL, start_date, end_date
│ │ UQ (contact_id, org_id, COALESCE(source_id,0)) WHERE is_current=true
│ │
│ ├── contact_education
│ │ UQ (contact_id, school_name, COALESCE(degree,''), COALESCE(field_of_study,''))
│ │
│ ├── contact_skills
│ │ UQ (contact_id, skill_name) | endorsements INT
│ │
│ ├── contact_certifications
│ │ name, authority, license_number, start_date, end_date
│ │
│ └── contact_languages
│ UQ (contact_id, language)
│ proficiency: native | full_professional | professional | limited | elementary
│
├── GLOBAL ORG GRAPH
│ ├── organisation
│ │ primary_domain, linkedin_uid (indexed)
│ │ parent_org_id → self-ref (subsidiary / holding structure)
│ │ funding: total_funding_cents, latest_funding_stage, latest_funding_date
│ │
│ ├── org_identifier
│ │ types: domain | linkedin_uid | crunchbase_slug | tracxn_slug | name_normalized
│ │ UQ (identifier_type, normalized_value) WHERE deleted_at IS NULL
│ │
│ ├── org_funding_event
│ │ UQ (org_id, funding_type, date) WHERE date IS NOT NULL
│ │ types: pre_seed → seed → series_a … ipo | angel | grant | private_equity
│ │
│ ├── org_technologies
│ │ UQ (org_id, tech_name)
│ │ category: crm | cloud_infrastructure | analytics | payment_processing | …
│ │
│ └── org_phones
│ raw_number + e164_number, is_primary flag
│
├── WORKSPACE LAYER
│ ├── workspace_contact_binding
│ │ UQ (contact_id, workspace_id) WHERE deleted_at IS NULL
│ │ All filterable fields denormalized — no JOIN needed for list views
│ │ Indexes: workspace_id, email, phone, title, seniority, city, country,
│ │ current_org_id, contact_owner_id, stage, is_favorite, labels GIN
│ │
│ └── org_bindings
│ UQ (org_id, workspace_id) WHERE deleted_at IS NULL
│ Only workspace CRM metadata — org data never copied
│
└── ENRICHMENT
└── enrichment_requests
entity_type: contact | organisation
status: pending → processing → success | failed | cached | cancelled
idempotency_key UQ (retry-safe)
credits_charged, credit_txn_id → wallet service (external)
Every incoming contact signal runs through the identifier dedup gate before a new contact row is created. The entire flow is atomic (serializable transaction / CTE).
contact_identifier(identifier_type, normalized_value) has a UNIQUE partial index scoped to active rows (WHERE deleted_at IS NULL). A HIT on any identifier type routes to the existing contact.
Incoming contact signal
(csv_upload / gmail_api / api_push / enrichment / manual_entry)
│
▼
┌─────────────────────────────┐
│ Normalize identifiers │
│ • lowercase email │
│ • e164 phone format │
│ • extract linkedin slug │
│ • lowercase domain │
└────────────┬────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ SELECT contact_id FROM contact_identifier │
│ WHERE identifier_type = $type │
│ AND normalized_value = $value │
│ AND deleted_at IS NULL │
│ (hits the UQ partial index — single index scan) │
└────────────┬────────────────────────────────┬───────────────┘
│ MISS │ HIT
▼ ▼
┌────────────────────────────┐ ┌─────────────────────────────────────┐
│ BEGIN serializable txn │ │ Existing contact found │
│ │ │ │
│ 1. INSERT contact │ │ • UPSERT contact sub-tables │
│ 2. INSERT contact_ │ │ (email, phone, employment …) │
│ identifier(s) │ │ │
│ 3. INSERT workspace_ │ │ • UPDATE workspace_contact_binding │
│ contact_binding │ │ if enrichment_score improved │
│ 4. UPSERT contact_ │ │ │
│ latest_data │ │ • UPSERT contact_latest_data │
│ │ │ if this binding is now best │
│ COMMIT │ │ │
└────────────────────────────┘ └─────────────────────────────────────┘
Dedup identifier priority (weakest → strongest signal):
─────────────────────────────────────────────────────
name_normalized (org only) weakest — last resort
domain / personal domain
twitter_handle / github_username
ig_username / ig_user_id / fb_user_id
phone_e164
email
linkedin_public_id (slug)
linkedin_urn (numeric) strongest — platform-stable ID
org_identifier.identifier_type hierarchy ────────────────────────────────────────────────────────── domain company.com high confidence linkedin_uid linkedin.com/company/stripe crunchbase_slug crunchbase.com/organization/stripe tracxn_slug tracxn.com/o/stripe name_normalized stripe_inc weakest — collision risk ────────────────────────────────────────────────────────── UQ partial index: (identifier_type, normalized_value) WHERE deleted_at IS NULL → app tries strongest type first, falls back down the chain
Enrichment requests are persisted with an idempotency key for safe retries. Credits are charged atomically with job completion via the external wallet service.
App / API Worker contact schema External
──────────────── ────────────── ────────────────────
User requests enrichment
for contact_id=42
│
▼
Check idempotency_key ──────► SELECT FROM enrichment_requests
(dedupe in-flight jobs) WHERE idempotency_key = $key
│ not found
▼
INSERT enrichment_requests
entity_type = 'contact'
entity_id = 42
source_id → data_source (e.g. apollo)
workspace_id = W
status = 'pending'
idempotency_key = UUID
│
▼
┌───────────────────────────────────────────────────────────────────────┐
│ Worker loop (pg-boss / BullMQ) │
│ │
│ 1. Poll enrichment_requests WHERE status = 'pending' │
│ 2. UPDATE status = 'processing', started_at = now() │
│ │ │
│ ▼ │
│ 3. Call external API ─────────────────────────────► apollo / apify │
│ │ crunchbase etc. │
│ │ ◄──────────────────────────────────────── raw JSON payload │
│ ▼ │
│ 4. Parse + normalize payload │
│ │ │
│ ├──► UPSERT organisation (if found) │
│ ├──► UPSERT org_identifier / org_technologies │
│ ├──► UPSERT contact_identifier (dedup check) │
│ ├──► UPSERT contact_email (all emails found) │
│ ├──► UPSERT contact_phones │
│ ├──► UPSERT contact_employment (is_current logic) │
│ ├──► UPSERT contact_education / skills / languages │
│ │ │
│ ├──► UPDATE contact.enrichment_score (if improved) │
│ ├──► UPDATE contact.last_enriched_at = now() │
│ │ │
│ └──► UPSERT contact_latest_data │
│ (flip authoritative ptr if this binding scores higher) │
│ │
│ 5. UPDATE enrichment_requests │
│ status = 'success' | 'failed' │
│ response_summary = { fields_updated: N, … } │
│ credits_charged = N │
│ credit_txn_id → wallet.credit_transactions │
│ completed_at = now() │
└───────────────────────────────────────────────────────────────────────┘
Status transitions:
pending ──► processing ──► success
└──► failed
└──► cancelled (user cancelled before pickup)
└──► cached (result served from recent cache)
| source_provider | source_type | priority | description |
|---|---|---|---|
| manual_entry | manual | 100 | User manual input |
| api_push | manual | 15 | External API push |
| csv_upload | manual | 10 | CSV / Excel file import |
| gmail_api | integration | 20 | Gmail contact sync |
| outlook_api | integration | 20 | Outlook contact sync |
| hubspot_crm | integration | 25 | HubSpot CRM sync |
| salesforce_crm | integration | 25 | Salesforce CRM sync |
| apollo | enrichment | 30 | Apollo.io person/org enrichment |
| crunchbase | enrichment | 30 | Crunchbase org data |
| tracxn | enrichment | 30 | Tracxn org data |
| apify_linkedin_profile | enrichment | 35 | Apify LinkedIn Profile Scraper |
| apify_instagram_profile | enrichment | 35 | Apify Instagram Profile Scraper |
| ai_deep_research | ai | 40 | AI deep research agent |
| ai_classification | ai | 40 | AI classification / labeling |
A single global contact row can be bound to unlimited workspaces. Each workspace stores its own CRM context — stage, labels, owner, notes — without touching or copying the global data.
Global Platform Layer
┌──────────────────────────────────────┐
│ contact (id = 42) │
│ • dedup_hash = "abc123" │
│ • enrichment_score = 87 │
│ │
│ organisation (id = 7) │
│ • name = "Stripe, Inc." │
│ • primary_domain = "stripe.com" │
└───────────────┬──────────────────────┘
│ shared read — no copy
┌─────────────┼──────────────┐
│ │ │
▼ ▼ ▼
Workspace A Workspace B Workspace C
┌────────────┐ ┌────────────┐ ┌────────────┐
│ binding │ │ binding │ │ binding │
│ ws_id = 1 │ │ ws_id = 2 │ │ ws_id = 3 │
│ │ │ │ │ │
│ stage: │ │ stage: │ │ stage: │
│ qualified │ │ lead │ │ customer │
│ │ │ │ │ │
│ labels: │ │ labels: │ │ labels: │
│ ['vip', │ │ ['cold'] │ │ ['renewal']│
│ 'partner']│ │ │ │ │
│ │ │ owner: 99 │ │ owner: 55 │
│ notes: │ │ │ │ │
│ "met at │ │ custom_ │ │ custom_ │
│ SaaStr" │ │ fields:{…} │ │ fields:{…} │
└────────────┘ └────────────┘ └────────────┘
Each workspace sees its own CRM data.
No workspace can read another's binding.
All read the same contact + org enrichment data.
UQ constraint: (contact_id, workspace_id) WHERE deleted_at IS NULL
→ one binding per contact per workspace, always
Problem: multiple workspaces each have a binding for contact #42.
Which binding's data should be used to update the global contact
when a richer enrichment arrives?
Solution: contact_latest_data acts as a single pointer.
┌───────────────────────────────────────────────────────┐
│ contact_latest_data │
│ │
│ contact_id (PK) → contact.id │
│ binding_id (UQ FK) → workspace_contact_binding.id │
│ │
│ Invariants enforced by DB: │
│ • ONE authoritative binding per contact (PK) │
│ • A binding is authoritative for AT MOST one contact │
│ (UQ on binding_id) │
│ • Auto-removed if contact OR binding is deleted │
│ (both FKs ON DELETE CASCADE) │
└───────────────────────────────────────────────────────┘
When to flip the pointer:
├── New import with higher priority source_id
├── Enrichment arrives with higher enrichment_score
└── User manually "unlocks" a contact to take ownership
(data_owner = 'user' always wins over 'enrichment')
Every field write carries a source_id reference to data_source. Priority determines whose data wins during upsert conflicts.
Priority Source data_owner Wins over
──────── ─────────────────────────── ─────────────── ─────────────────────────
100 manual_entry (user typed) user Everything — user intent
40 ai_deep_research ai All non-user sources
40 ai_classification ai All non-user sources
35 apify_linkedin_profile enrichment Apollo, integrations, csv
35 apify_instagram_profile enrichment Apollo, integrations, csv
30 apollo enrichment Integrations, csv, manual
30 crunchbase enrichment Integrations, csv, manual
30 tracxn enrichment Integrations, csv, manual
25 hubspot_crm integration csv, gmail, api_push
25 salesforce_crm integration csv, gmail, api_push
20 gmail_api integration csv, api_push
20 outlook_api integration csv, api_push
15 api_push user csv_upload only
10 csv_upload user — (lowest)
──────── ─────────────────────────── ─────────────── ─────────────────────────
Conflict resolution rules:
┌──────────────────────────────────────────────────────────────────────┐
│ 1. data_owner = 'user' ALWAYS wins for workspace_contact_binding │
│ (if a user manually edited a field, enrichment cannot overwrite) │
│ │
│ 2. For global sub-tables (contact_email, employment…): │
│ Higher source priority wins on conflict │
│ On tie → most recent last_enriched_at wins │
│ │
│ 3. contact_latest_data pointer flips only when: │
│ new_enrichment_score > current contact.enrichment_score │
│ OR same score but newer last_enriched_at │
└──────────────────────────────────────────────────────────────────────┘
data_owner columns:
├── workspace_contact_binding.data_owner contact_binding_data_owner_enum
│ values: user | enrichment
└── org_bindings.data_owner org_binding_data_owner_enum
values: user | enrichment
Two critical gaps exist in the current schema: field-level change logging and dedup merge tracking. These must be added in the next migration.
workspace_contact_binding or org_bindings is currently silent — no audit trail. There is also no record of when two contact rows are merged.
CREATE TABLE contact.contact_field_audit (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
entity_type TEXT NOT NULL, -- 'contact_binding' | 'org_binding'
entity_id BIGINT NOT NULL, -- workspace_contact_binding.id or org_bindings.id
workspace_id BIGINT NOT NULL,
field_name VARCHAR(255) NOT NULL, -- e.g. 'title', 'stage', 'primary_email'
old_value TEXT,
new_value TEXT,
changed_by BIGINT, -- user_id (NULL = enrichment / system)
source_id BIGINT REFERENCES contact.data_source(id),
changed_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_audit_entity ON contact.contact_field_audit (entity_type, entity_id);
CREATE INDEX idx_audit_workspace ON contact.contact_field_audit (workspace_id, changed_at DESC);
CREATE INDEX idx_audit_field ON contact.contact_field_audit (field_name, changed_at DESC);
── Trigger pattern ───────────────────────────────────────────────────────────────────
CREATE OR REPLACE FUNCTION contact.audit_binding_changes()
RETURNS TRIGGER LANGUAGE plpgsql AS $$
DECLARE col TEXT;
BEGIN
FOREACH col IN ARRAY ARRAY[
'title','headline','stage','primary_email','primary_phone',
'city','country','seniority','current_org_id','labels','notes'
] LOOP
IF (to_jsonb(NEW) ->> col) IS DISTINCT FROM (to_jsonb(OLD) ->> col) THEN
INSERT INTO contact.contact_field_audit
(entity_type, entity_id, workspace_id, field_name, old_value, new_value)
VALUES
('contact_binding', NEW.id, NEW.workspace_id, col,
to_jsonb(OLD) ->> col,
to_jsonb(NEW) ->> col);
END IF;
END LOOP;
RETURN NEW;
END;
$$;
CREATE TRIGGER trg_audit_contact_binding
AFTER UPDATE ON contact.workspace_contact_binding
FOR EACH ROW EXECUTE FUNCTION contact.audit_binding_changes();
CREATE TABLE contact.contact_merge_log (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
merged_into_id BIGINT NOT NULL REFERENCES contact.contact(id),
merged_from_id BIGINT NOT NULL, -- the contact that was absorbed (now deleted)
merged_by BIGINT, -- user_id (NULL = auto-merge by system)
workspace_id BIGINT, -- NULL = global merge
similarity_score SMALLINT, -- 0–100, what triggered the merge
merge_reason TEXT, -- 'same_email' | 'same_linkedin_urn' | 'manual'
merged_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_merge_into ON contact.contact_merge_log (merged_into_id);
CREATE INDEX idx_merge_from ON contact.contact_merge_log (merged_from_id);
| Gap | Proposed Table / Approach | Priority |
|---|---|---|
| Field-level change log | contact_field_audit + AFTER UPDATE trigger on bindings | critical |
| Dedup merge tracking | contact_merge_log — who merged into whom, why, when | critical |
| Duplicate candidate queue | contact_dedup_candidates(contact_id_a, contact_id_b, similarity_score, resolved_at) | high |
| Full-text search on names | GIN index on to_tsvector('english', full_name) + pg_trgm on workspace_contact_binding | high |
| Full-text search on org names | Same pattern on organisation.name | high |
| Activity timeline | contact_activities(activity_type, actor_id, workspace_id, contact_id, occurred_at) — email_sent / call / note / stage_change | medium |
| Social posts cache | contact_social_posts (apify source) — platform, post_url, posted_at, content_snippet | medium |
| Org FTS + dedup candidates | Same dedup candidate queue for organisations | medium |