add newsletter and email campaigns

Subscribers with double opt-in confirmation, campaign composer with
draft/scheduled/sent lifecycle, admin dashboard with overview stats,
CSV export, and shop signup form wired into page builder blocks.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
jamey
2026-02-28 23:25:28 +00:00
parent 8f989d892d
commit ad2e6d1e6d
32 changed files with 2497 additions and 32 deletions

314
lib/berrypod/newsletter.ex Normal file
View File

@@ -0,0 +1,314 @@
defmodule Berrypod.Newsletter do
@moduledoc """
Newsletter subscriber management and campaign sending.
Privacy-first design: double opt-in, plain text only, no tracking pixels,
hashed confirmation tokens, minimal data collection (email only).
"""
import Ecto.Query
alias Berrypod.Newsletter.{Campaign, Subscriber}
alias Berrypod.{Repo, Settings}
# ── Subscriber flow ──────────────────────────────────────────────
@doc """
Subscribes an email address. Creates a pending subscriber and enqueues
a confirmation email.
Returns `{:ok, subscriber}` for new signups, `{:already_confirmed, subscriber}`
if the email is already confirmed, or `{:error, changeset}` on validation failure.
Options:
- `consent_text` — the exact wording shown at signup
- `source` — "website" (default) or "admin_import"
- `ip_hash` — hashed IP for consent proof
"""
def subscribe(email, opts \\ []) do
email = email |> String.trim() |> String.downcase()
case Repo.get_by(Subscriber, email: email) do
%Subscriber{status: "confirmed"} = sub ->
{:already_confirmed, sub}
%Subscriber{status: "unsubscribed"} = sub ->
# Re-subscribe: reset to pending with new token
resubscribe(sub, opts)
%Subscriber{status: "pending"} = sub ->
# Already pending — resend confirmation
enqueue_confirmation(sub)
{:ok, sub}
nil ->
create_subscriber(email, opts)
end
end
defp create_subscriber(email, opts) do
{raw_token, hashed_token} = generate_token()
attrs = %{
email: email,
status: "pending",
confirmation_token: hashed_token,
consent_text: Keyword.get(opts, :consent_text),
source: Keyword.get(opts, :source, "website"),
ip_hash: Keyword.get(opts, :ip_hash)
}
case %Subscriber{} |> Subscriber.changeset(attrs) |> Repo.insert() do
{:ok, sub} ->
enqueue_confirmation(%{sub | confirmation_token: raw_token})
{:ok, sub}
{:error, changeset} ->
{:error, changeset}
end
end
defp resubscribe(sub, opts) do
{raw_token, hashed_token} = generate_token()
attrs = %{
status: "pending",
confirmation_token: hashed_token,
unsubscribed_at: nil,
consent_text: Keyword.get(opts, :consent_text) || sub.consent_text,
ip_hash: Keyword.get(opts, :ip_hash) || sub.ip_hash
}
case sub |> Subscriber.changeset(attrs) |> Repo.update() do
{:ok, updated} ->
enqueue_confirmation(%{updated | confirmation_token: raw_token})
{:ok, updated}
{:error, changeset} ->
{:error, changeset}
end
end
@doc """
Confirms a subscriber via their raw confirmation token.
"""
def confirm(raw_token) do
hashed = hash_token(raw_token)
case Repo.get_by(Subscriber, confirmation_token: hashed) do
%Subscriber{status: "pending"} = sub ->
sub
|> Subscriber.changeset(%{
status: "confirmed",
confirmed_at: DateTime.utc_now() |> DateTime.truncate(:second),
confirmation_token: nil
})
|> Repo.update()
%Subscriber{status: "confirmed"} = sub ->
{:ok, sub}
_ ->
{:error, :invalid_token}
end
end
@doc """
Unsubscribes an email. Updates the subscriber record and adds to the
global email suppression list.
"""
def unsubscribe(email) do
email = email |> String.trim() |> String.downcase()
case Repo.get_by(Subscriber, email: email) do
%Subscriber{status: "unsubscribed"} = sub ->
{:ok, sub}
%Subscriber{} = sub ->
sub
|> Subscriber.changeset(%{
status: "unsubscribed",
unsubscribed_at: DateTime.utc_now() |> DateTime.truncate(:second),
confirmation_token: nil
})
|> Repo.update()
nil ->
{:error, :not_found}
end
end
# ── Subscriber queries ───────────────────────────────────────────
@doc "Lists subscribers, optionally filtered by status or email search."
def list_subscribers(opts \\ []) do
query =
from(s in Subscriber, order_by: [desc: s.inserted_at])
query =
case Keyword.get(opts, :status) do
nil -> query
"all" -> query
status -> from(s in query, where: s.status == ^status)
end
query =
case Keyword.get(opts, :search) do
nil -> query
"" -> query
term -> from(s in query, where: like(s.email, ^"%#{term}%"))
end
Repo.all(query)
end
@doc "Returns subscriber counts grouped by status."
def count_subscribers_by_status do
from(s in Subscriber,
group_by: s.status,
select: {s.status, count(s.id)}
)
|> Repo.all()
|> Map.new()
end
@doc "Returns the count of confirmed subscribers."
def confirmed_subscriber_count do
from(s in Subscriber, where: s.status == "confirmed", select: count())
|> Repo.one()
end
def get_subscriber!(id), do: Repo.get!(Subscriber, id)
@doc "Hard-deletes a subscriber (GDPR right to erasure)."
def delete_subscriber(%Subscriber{} = sub) do
Repo.delete(sub)
end
@doc "Returns all data held about a subscriber as a plain map (GDPR right to access)."
def export_subscriber_data(%Subscriber{} = sub) do
%{
email: sub.email,
status: sub.status,
subscribed_at: sub.inserted_at,
confirmed_at: sub.confirmed_at,
unsubscribed_at: sub.unsubscribed_at,
source: sub.source,
consent_text: sub.consent_text
}
end
@doc "Exports all subscribers as a CSV string."
def export_all_subscribers_csv do
subscribers = list_subscribers()
header = "email,status,confirmed_at,subscribed_at\n"
rows =
Enum.map_join(subscribers, "\n", fn s ->
[
s.email,
s.status,
format_csv_datetime(s.confirmed_at),
format_csv_datetime(s.inserted_at)
]
|> Enum.join(",")
end)
header <> rows
end
defp format_csv_datetime(nil), do: ""
defp format_csv_datetime(dt), do: Calendar.strftime(dt, "%Y-%m-%d %H:%M:%S")
# ── Campaign CRUD ────────────────────────────────────────────────
def list_campaigns do
from(c in Campaign, order_by: [desc: c.inserted_at])
|> Repo.all()
end
def get_campaign!(id), do: Repo.get!(Campaign, id)
def create_campaign(attrs) do
%Campaign{}
|> Campaign.changeset(attrs)
|> Repo.insert()
end
def update_campaign(%Campaign{} = campaign, attrs) do
campaign
|> Campaign.changeset(attrs)
|> Repo.update()
end
def delete_campaign(%Campaign{status: "draft"} = campaign) do
Repo.delete(campaign)
end
def delete_campaign(_campaign), do: {:error, :not_draft}
@doc "Marks a campaign as sending and enqueues the send worker."
def send_campaign_now(%Campaign{status: status} = campaign)
when status in ["draft", "scheduled"] do
case update_campaign(campaign, %{status: "sending"}) do
{:ok, updated} ->
Berrypod.Newsletter.CampaignSendWorker.enqueue(updated.id)
{:ok, updated}
error ->
error
end
end
def send_campaign_now(_), do: {:error, :invalid_status}
@doc "Schedules a campaign for future sending."
def schedule_campaign(%Campaign{status: "draft"} = campaign, %DateTime{} = at) do
update_campaign(campaign, %{status: "scheduled", scheduled_at: at})
end
def schedule_campaign(_, _), do: {:error, :not_draft}
@doc "Cancels a scheduled campaign."
def cancel_campaign(%Campaign{status: "scheduled"} = campaign) do
update_campaign(campaign, %{status: "cancelled", scheduled_at: nil})
end
def cancel_campaign(_), do: {:error, :not_scheduled}
@doc "Returns a preview of the campaign body with a sample unsubscribe URL."
def preview_campaign(%Campaign{body: body}) do
sample_url = BerrypodWeb.Endpoint.url() <> "/unsubscribe/sample-token"
String.replace(body, "{{unsubscribe_url}}", sample_url)
end
# ── Settings ─────────────────────────────────────────────────────
def newsletter_enabled? do
Settings.get_setting("newsletter_enabled", false) == true
end
def set_newsletter_enabled(enabled?) when is_boolean(enabled?) do
Settings.put_setting("newsletter_enabled", enabled?, "boolean")
end
# ── Token helpers ────────────────────────────────────────────────
defp generate_token do
raw = :crypto.strong_rand_bytes(32) |> Base.url_encode64(padding: false)
{raw, hash_token(raw)}
end
defp hash_token(raw) do
:crypto.hash(:sha256, raw) |> Base.encode16(case: :lower)
end
# ── Confirmation email enqueue ───────────────────────────────────
defp enqueue_confirmation(subscriber) do
%{subscriber_id: subscriber.id, raw_token: subscriber.confirmation_token}
|> Berrypod.Newsletter.ConfirmationEmailWorker.new()
|> Oban.insert()
end
end

View File

@@ -0,0 +1,27 @@
defmodule Berrypod.Newsletter.Campaign do
use Ecto.Schema
import Ecto.Changeset
@primary_key {:id, :binary_id, autogenerate: true}
@foreign_key_type :binary_id
schema "newsletter_campaigns" do
field :subject, :string
field :body, :string
field :status, :string, default: "draft"
field :scheduled_at, :utc_datetime
field :sent_at, :utc_datetime
field :sent_count, :integer, default: 0
field :failed_count, :integer, default: 0
timestamps(type: :utc_datetime)
end
def changeset(campaign, attrs) do
campaign
|> cast(attrs, [:subject, :body, :status, :scheduled_at, :sent_at, :sent_count, :failed_count])
|> validate_required([:subject, :body])
|> validate_length(:subject, max: 200)
|> validate_inclusion(:status, ~w(draft scheduled sending sent cancelled))
end
end

View File

@@ -0,0 +1,75 @@
defmodule Berrypod.Newsletter.CampaignSendWorker do
@moduledoc """
Sends a campaign to all confirmed, non-suppressed subscribers.
Processes in batches of 50 with a brief pause between batches to
respect mail provider rate limits.
"""
use Oban.Worker, queue: :newsletter, max_attempts: 1
alias Berrypod.Newsletter
alias Berrypod.Newsletter.Notifier
require Logger
@batch_size 50
@impl Oban.Worker
def perform(%Oban.Job{args: %{"campaign_id" => id}}) do
campaign = Newsletter.get_campaign!(id)
unless campaign.status in ["sending", "scheduled"] do
Logger.info("Campaign #{id} status is #{campaign.status}, skipping")
return_ok()
end
# Ensure status is "sending"
{:ok, campaign} =
if campaign.status == "scheduled" do
Newsletter.update_campaign(campaign, %{status: "sending"})
else
{:ok, campaign}
end
subscribers = Newsletter.list_subscribers(status: "confirmed")
{sent, failed} = send_in_batches(campaign, subscribers)
Newsletter.update_campaign(campaign, %{
status: "sent",
sent_at: DateTime.utc_now() |> DateTime.truncate(:second),
sent_count: sent,
failed_count: failed
})
Logger.info("Campaign #{id} sent: #{sent} delivered, #{failed} failed")
:ok
end
def enqueue(campaign_id) do
%{campaign_id: campaign_id}
|> new()
|> Oban.insert()
end
defp send_in_batches(campaign, subscribers) do
subscribers
|> Enum.chunk_every(@batch_size)
|> Enum.reduce({0, 0}, fn batch, {sent, failed} ->
{batch_sent, batch_failed} = send_batch(campaign, batch)
Process.sleep(100)
{sent + batch_sent, failed + batch_failed}
end)
end
defp send_batch(campaign, subscribers) do
Enum.reduce(subscribers, {0, 0}, fn sub, {sent, failed} ->
case Notifier.deliver_campaign(campaign, sub) do
{:ok, _} -> {sent + 1, failed}
{:error, _} -> {sent, failed + 1}
end
end)
end
defp return_ok, do: :ok
end

View File

@@ -0,0 +1,39 @@
defmodule Berrypod.Newsletter.CleanupWorker do
@moduledoc """
Prunes unconfirmed newsletter subscribers older than 48 hours.
Runs daily via cron. Pending signups that never confirmed are deleted
to keep the subscriber list clean and respect data minimisation.
"""
use Oban.Worker, queue: :newsletter, max_attempts: 1
import Ecto.Query
alias Berrypod.Newsletter.Subscriber
alias Berrypod.Repo
require Logger
@hours_to_expire 48
@impl Oban.Worker
def perform(%Oban.Job{}) do
cutoff =
DateTime.utc_now()
|> DateTime.add(-@hours_to_expire * 3600)
|> DateTime.truncate(:second)
{count, _} =
from(s in Subscriber,
where: s.status == "pending" and s.inserted_at < ^cutoff
)
|> Repo.delete_all()
if count > 0 do
Logger.info("Newsletter cleanup: pruned #{count} unconfirmed subscribers")
end
:ok
end
end

View File

@@ -0,0 +1,36 @@
defmodule Berrypod.Newsletter.ConfirmationEmailWorker do
@moduledoc """
Sends the double opt-in confirmation email to a new subscriber.
"""
use Oban.Worker, queue: :newsletter, max_attempts: 3
alias Berrypod.Newsletter
alias Berrypod.Newsletter.Notifier
require Logger
@impl Oban.Worker
def perform(%Oban.Job{args: %{"subscriber_id" => id, "raw_token" => raw_token}}) do
case Berrypod.Repo.get(Newsletter.Subscriber, id) do
%{status: "pending"} = sub ->
case Notifier.deliver_confirmation(sub, raw_token) do
{:ok, _} ->
Logger.info("Newsletter confirmation sent to #{sub.email}")
:ok
{:error, reason} ->
Logger.error("Newsletter confirmation failed for #{sub.email}: #{inspect(reason)}")
{:error, reason}
end
%{status: _other} ->
Logger.info("Newsletter confirmation skipped: subscriber #{id} no longer pending")
:ok
nil ->
Logger.warning("Newsletter confirmation: subscriber #{id} not found")
{:cancel, :not_found}
end
end
end

View File

@@ -0,0 +1,98 @@
defmodule Berrypod.Newsletter.Notifier do
@moduledoc """
Plain text email templates for newsletter confirmation and campaigns.
"""
import Swoosh.Email
alias Berrypod.Mailer
require Logger
@doc "Sends the double opt-in confirmation email."
def deliver_confirmation(subscriber, raw_token) do
shop_name = Berrypod.Settings.get_setting("shop_name", "Berrypod")
confirm_url = BerrypodWeb.Endpoint.url() <> "/newsletter/confirm/" <> raw_token
body = """
==============================
Thanks for signing up to the #{shop_name} newsletter!
Please confirm your email to start receiving updates:
#{confirm_url}
This link expires in 48 hours. If you didn't subscribe,
just ignore this email and you won't hear from us.
==============================
"""
deliver(subscriber.email, "Confirm your subscription", body)
end
@doc "Sends a campaign email to a single subscriber."
def deliver_campaign(campaign, subscriber) do
shop_name = Berrypod.Settings.get_setting("shop_name", "Berrypod")
unsubscribe_url = build_unsubscribe_url(subscriber.email)
body_with_url =
String.replace(campaign.body, "{{unsubscribe_url}}", unsubscribe_url)
full_body = """
#{body_with_url}
---
You're receiving this because you subscribed to the #{shop_name} newsletter.
Unsubscribe: #{unsubscribe_url}
"""
email =
new()
|> to(subscriber.email)
|> from({shop_name, from_address()})
|> subject(campaign.subject)
|> text_body(full_body)
|> header("List-Unsubscribe", "<#{unsubscribe_url}>")
|> header("List-Unsubscribe-Post", "List-Unsubscribe=One-Click")
case Mailer.deliver(email) do
{:ok, _metadata} = result ->
result
{:error, reason} = error ->
Logger.warning("Newsletter send failed for #{subscriber.email}: #{inspect(reason)}")
error
end
end
defp deliver(recipient, subject, body) do
shop_name = Berrypod.Settings.get_setting("shop_name", "Berrypod")
email =
new()
|> to(recipient)
|> from({shop_name, from_address()})
|> subject(subject)
|> text_body(body)
case Mailer.deliver(email) do
{:ok, _metadata} = result ->
result
{:error, reason} = error ->
Logger.warning("Failed to send newsletter email to #{recipient}: #{inspect(reason)}")
error
end
end
defp from_address do
Berrypod.Settings.get_setting("email_from_address", "noreply@example.com")
end
defp build_unsubscribe_url(email) do
token = Phoenix.Token.sign(BerrypodWeb.Endpoint, "email-unsub", email)
BerrypodWeb.Endpoint.url() <> "/unsubscribe/" <> token
end
end

View File

@@ -0,0 +1,36 @@
defmodule Berrypod.Newsletter.ScheduledCampaignWorker do
@moduledoc """
Picks up scheduled campaigns that are due and triggers sending.
Runs every 5 minutes via cron. Simpler than using Oban's schedule_in
because it lets admins cancel scheduled campaigns by changing status.
"""
use Oban.Worker, queue: :newsletter, max_attempts: 1
import Ecto.Query
alias Berrypod.Newsletter
alias Berrypod.Newsletter.Campaign
alias Berrypod.Repo
require Logger
@impl Oban.Worker
def perform(%Oban.Job{}) do
now = DateTime.utc_now() |> DateTime.truncate(:second)
campaigns =
from(c in Campaign,
where: c.status == "scheduled" and c.scheduled_at <= ^now
)
|> Repo.all()
for campaign <- campaigns do
Logger.info("Triggering scheduled campaign #{campaign.id}: #{campaign.subject}")
Newsletter.send_campaign_now(campaign)
end
:ok
end
end

View File

@@ -0,0 +1,39 @@
defmodule Berrypod.Newsletter.Subscriber do
use Ecto.Schema
import Ecto.Changeset
@primary_key {:id, :binary_id, autogenerate: true}
@foreign_key_type :binary_id
schema "newsletter_subscribers" do
field :email, :string
field :status, :string, default: "pending"
field :confirmation_token, :string
field :confirmed_at, :utc_datetime
field :unsubscribed_at, :utc_datetime
field :consent_text, :string
field :source, :string, default: "website"
field :ip_hash, :string
timestamps(type: :utc_datetime)
end
def changeset(subscriber, attrs) do
subscriber
|> cast(attrs, [
:email,
:status,
:confirmation_token,
:confirmed_at,
:unsubscribed_at,
:consent_text,
:source,
:ip_hash
])
|> validate_required([:email])
|> validate_format(:email, ~r/^[^\s@]+@[^\s@]+\.[^\s@]+$/)
|> validate_inclusion(:status, ~w(pending confirmed unsubscribed))
|> unique_constraint(:email)
|> update_change(:email, &String.downcase(String.trim(&1)))
end
end

View File

@@ -110,11 +110,9 @@ defmodule Berrypod.Pages.BlockTypes do
},
"newsletter_card" => %{
name: "Newsletter signup",
description: "Email signup form — currently decorative, does not collect emails",
description: "Email signup form with double opt-in — enable in admin settings",
icon: "hero-envelope",
allowed_on: :all,
hint:
"This block is decorative — form submissions aren't collected yet. Use it as a placeholder or remove it.",
settings_schema: [
%SettingsField{key: "title", label: "Title", type: :text, default: "Newsletter"},
%SettingsField{