add activity log with order timeline and global feed
All checks were successful
deploy / deploy (push) Successful in 4m22s
All checks were successful
deploy / deploy (push) Successful in 4m22s
Single activity_log table powering two views: chronological timeline on each order detail page (replacing the old fulfilment card) and a global feed at /admin/activity with tabs, category filters, search, and pagination. Real-time via PubSub — new entries appear instantly, nav badge updates across all admin pages. Instrumented across all event points: Stripe webhooks, order notifier, submission worker, fulfilment status worker, product sync worker, and Oban exhausted-job telemetry. Contextual action buttons (retry submission, retry sync, dismiss) with Oban unique constraints to prevent double-enqueue. 90-day pruning via cron. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
175
lib/berrypod/activity_log.ex
Normal file
175
lib/berrypod/activity_log.ex
Normal file
@@ -0,0 +1,175 @@
|
||||
defmodule Berrypod.ActivityLog do
|
||||
@moduledoc """
|
||||
Records and queries meaningful system events (orders, syncs, emails,
|
||||
abandoned carts). Powers the order timeline and global activity feed.
|
||||
"""
|
||||
|
||||
import Ecto.Query
|
||||
|
||||
require Logger
|
||||
|
||||
alias Berrypod.Repo
|
||||
alias Berrypod.ActivityLog.Entry
|
||||
|
||||
# ── Write ──
|
||||
|
||||
@doc """
|
||||
Records an activity event. Fire-and-forget — never raises, never crashes
|
||||
the calling process.
|
||||
|
||||
## Options
|
||||
|
||||
* `:level` - "info" (default), "warning", or "error"
|
||||
* `:order_id` - links the event to an order
|
||||
* `:payload` - arbitrary map of snapshot data
|
||||
* `:occurred_at` - when the event happened (defaults to now)
|
||||
|
||||
"""
|
||||
def log_event(event_type, message, opts \\ []) do
|
||||
attrs = %{
|
||||
event_type: event_type,
|
||||
level: opts[:level] || "info",
|
||||
order_id: opts[:order_id],
|
||||
payload: opts[:payload] || %{},
|
||||
message: message,
|
||||
occurred_at: opts[:occurred_at] || DateTime.utc_now() |> DateTime.truncate(:second)
|
||||
}
|
||||
|
||||
case %Entry{} |> Entry.changeset(attrs) |> Repo.insert() do
|
||||
{:ok, entry} ->
|
||||
broadcast(entry)
|
||||
{:ok, entry}
|
||||
|
||||
{:error, changeset} ->
|
||||
Logger.warning("ActivityLog insert failed: #{inspect(changeset.errors)}")
|
||||
:ok
|
||||
end
|
||||
rescue
|
||||
e ->
|
||||
Logger.warning("ActivityLog insert crashed: #{Exception.message(e)}")
|
||||
:ok
|
||||
end
|
||||
|
||||
# ── Read ──
|
||||
|
||||
@doc """
|
||||
Returns all activity entries for an order, oldest first.
|
||||
"""
|
||||
def list_for_order(order_id) do
|
||||
Entry
|
||||
|> where([a], a.order_id == ^order_id)
|
||||
|> order_by([a], asc: a.occurred_at)
|
||||
|> Repo.all()
|
||||
end
|
||||
|
||||
@doc """
|
||||
Paginated activity feed for the global activity page.
|
||||
|
||||
## Options
|
||||
|
||||
* `:page` - page number (default 1)
|
||||
* `:tab` - "all" (default) or "attention" (unresolved warnings/errors)
|
||||
* `:category` - "orders", "syncs", "emails", "carts", or nil for all
|
||||
* `:search` - order number substring search
|
||||
|
||||
"""
|
||||
def list_recent(opts \\ []) do
|
||||
Entry
|
||||
|> order_by([a], desc: a.occurred_at)
|
||||
|> maybe_filter_tab(opts[:tab])
|
||||
|> maybe_filter_category(opts[:category])
|
||||
|> maybe_search_order_number(opts[:search])
|
||||
|> Berrypod.Pagination.paginate(page: opts[:page], per_page: 50)
|
||||
end
|
||||
|
||||
@doc """
|
||||
Count of unresolved warnings and errors.
|
||||
"""
|
||||
def count_needing_attention do
|
||||
Entry
|
||||
|> where([a], a.level in ["warning", "error"] and is_nil(a.resolved_at))
|
||||
|> Repo.aggregate(:count)
|
||||
end
|
||||
|
||||
# ── Resolve ──
|
||||
|
||||
@doc """
|
||||
Mark a single entry as resolved.
|
||||
"""
|
||||
def resolve(id) do
|
||||
Entry
|
||||
|> where([a], a.id == ^id)
|
||||
|> Repo.update_all(set: [resolved_at: DateTime.utc_now() |> DateTime.truncate(:second)])
|
||||
end
|
||||
|
||||
@doc """
|
||||
Resolve all unresolved entries for an order.
|
||||
"""
|
||||
def resolve_all_for_order(order_id) do
|
||||
Entry
|
||||
|> where([a], a.order_id == ^order_id and is_nil(a.resolved_at))
|
||||
|> Repo.update_all(set: [resolved_at: DateTime.utc_now() |> DateTime.truncate(:second)])
|
||||
end
|
||||
|
||||
# ── PubSub ──
|
||||
|
||||
@doc """
|
||||
Subscribe to all activity events.
|
||||
"""
|
||||
def subscribe do
|
||||
Phoenix.PubSub.subscribe(Berrypod.PubSub, "activity:all")
|
||||
end
|
||||
|
||||
@doc """
|
||||
Subscribe to activity events for a specific order.
|
||||
"""
|
||||
def subscribe(order_id) do
|
||||
Phoenix.PubSub.subscribe(Berrypod.PubSub, "activity:order:#{order_id}")
|
||||
end
|
||||
|
||||
# ── Private ──
|
||||
|
||||
defp broadcast(entry) do
|
||||
Phoenix.PubSub.broadcast(Berrypod.PubSub, "activity:all", {:new_activity, entry})
|
||||
|
||||
if entry.order_id do
|
||||
Phoenix.PubSub.broadcast(
|
||||
Berrypod.PubSub,
|
||||
"activity:order:#{entry.order_id}",
|
||||
{:new_activity, entry}
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
defp maybe_filter_tab(query, "attention") do
|
||||
where(query, [a], a.level in ["warning", "error"] and is_nil(a.resolved_at))
|
||||
end
|
||||
|
||||
defp maybe_filter_tab(query, _), do: query
|
||||
|
||||
defp maybe_filter_category(query, "orders") do
|
||||
where(query, [a], like(a.event_type, "order.%"))
|
||||
end
|
||||
|
||||
defp maybe_filter_category(query, "syncs") do
|
||||
where(query, [a], like(a.event_type, "sync.%"))
|
||||
end
|
||||
|
||||
defp maybe_filter_category(query, "emails") do
|
||||
where(query, [a], like(a.event_type, "%.email.%"))
|
||||
end
|
||||
|
||||
defp maybe_filter_category(query, "carts") do
|
||||
where(query, [a], like(a.event_type, "abandoned_cart.%"))
|
||||
end
|
||||
|
||||
defp maybe_filter_category(query, _), do: query
|
||||
|
||||
defp maybe_search_order_number(query, search) when is_binary(search) and search != "" do
|
||||
query
|
||||
|> join(:inner, [a], o in "orders", on: a.order_id == o.id)
|
||||
|> where([_a, o], like(o.order_number, ^"%#{search}%"))
|
||||
end
|
||||
|
||||
defp maybe_search_order_number(query, _), do: query
|
||||
end
|
||||
36
lib/berrypod/activity_log/entry.ex
Normal file
36
lib/berrypod/activity_log/entry.ex
Normal file
@@ -0,0 +1,36 @@
|
||||
defmodule Berrypod.ActivityLog.Entry do
|
||||
use Ecto.Schema
|
||||
import Ecto.Changeset
|
||||
|
||||
@primary_key {:id, :binary_id, autogenerate: true}
|
||||
@foreign_key_type :binary_id
|
||||
|
||||
@levels ~w(info warning error)
|
||||
|
||||
schema "activity_log" do
|
||||
field :event_type, :string
|
||||
field :level, :string, default: "info"
|
||||
field :order_id, :binary_id
|
||||
field :payload, :map, default: %{}
|
||||
field :message, :string
|
||||
field :resolved_at, :utc_datetime
|
||||
field :occurred_at, :utc_datetime
|
||||
|
||||
timestamps(type: :utc_datetime)
|
||||
end
|
||||
|
||||
def changeset(entry, attrs) do
|
||||
entry
|
||||
|> cast(attrs, [
|
||||
:event_type,
|
||||
:level,
|
||||
:order_id,
|
||||
:payload,
|
||||
:message,
|
||||
:resolved_at,
|
||||
:occurred_at
|
||||
])
|
||||
|> validate_required([:event_type, :level, :message, :occurred_at])
|
||||
|> validate_inclusion(:level, @levels)
|
||||
end
|
||||
end
|
||||
35
lib/berrypod/activity_log/oban_telemetry_handler.ex
Normal file
35
lib/berrypod/activity_log/oban_telemetry_handler.ex
Normal file
@@ -0,0 +1,35 @@
|
||||
defmodule Berrypod.ActivityLog.ObanTelemetryHandler do
|
||||
@moduledoc """
|
||||
Captures Oban job exhaustion (all retries failed) as activity log entries.
|
||||
Attached in Application.start/2.
|
||||
"""
|
||||
|
||||
def attach do
|
||||
:telemetry.attach(
|
||||
"activity-log-oban-exhausted",
|
||||
[:oban, :job, :exception],
|
||||
&__MODULE__.handle_event/4,
|
||||
[]
|
||||
)
|
||||
end
|
||||
|
||||
def handle_event([:oban, :job, :exception], _measurements, metadata, _config) do
|
||||
# Only log when job is being discarded (exhausted all retries)
|
||||
if metadata.state == :discard do
|
||||
worker = metadata.job.worker
|
||||
queue = metadata.job.queue
|
||||
error = Exception.message(metadata.reason)
|
||||
|
||||
Berrypod.ActivityLog.log_event(
|
||||
"job.exhausted",
|
||||
"Background job exhausted all retries: #{worker}",
|
||||
level: "error",
|
||||
payload: %{
|
||||
worker: worker,
|
||||
queue: queue,
|
||||
error: String.slice(error, 0, 500)
|
||||
}
|
||||
)
|
||||
end
|
||||
end
|
||||
end
|
||||
26
lib/berrypod/activity_log/prune_worker.ex
Normal file
26
lib/berrypod/activity_log/prune_worker.ex
Normal file
@@ -0,0 +1,26 @@
|
||||
defmodule Berrypod.ActivityLog.PruneWorker do
|
||||
@moduledoc """
|
||||
Nightly Oban cron job that prunes activity log entries older than 90 days.
|
||||
"""
|
||||
|
||||
use Oban.Worker, queue: :default, max_attempts: 1
|
||||
|
||||
import Ecto.Query
|
||||
|
||||
alias Berrypod.Repo
|
||||
alias Berrypod.ActivityLog.Entry
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(_job) do
|
||||
cutoff = DateTime.utc_now() |> DateTime.add(-90, :day)
|
||||
|
||||
{count, _} = Repo.delete_all(from(a in Entry, where: a.inserted_at < ^cutoff))
|
||||
|
||||
if count > 0 do
|
||||
require Logger
|
||||
Logger.info("Pruned #{count} activity log entries older than 90 days")
|
||||
end
|
||||
|
||||
:ok
|
||||
end
|
||||
end
|
||||
@@ -10,6 +10,7 @@ defmodule Berrypod.Application do
|
||||
# Create ETS table here so the supervisor process owns it (lives forever).
|
||||
# The Task below only warms it with data from the DB.
|
||||
Berrypod.Redirects.create_table()
|
||||
Berrypod.ActivityLog.ObanTelemetryHandler.attach()
|
||||
|
||||
children = [
|
||||
BerrypodWeb.Telemetry,
|
||||
|
||||
@@ -9,7 +9,7 @@ defmodule Berrypod.Orders.FulfilmentStatusWorker do
|
||||
|
||||
use Oban.Worker, queue: :sync, max_attempts: 1
|
||||
|
||||
alias Berrypod.Orders
|
||||
alias Berrypod.{ActivityLog, Orders}
|
||||
|
||||
require Logger
|
||||
|
||||
@@ -38,6 +38,8 @@ defmodule Berrypod.Orders.FulfilmentStatusWorker do
|
||||
Logger.info(
|
||||
"Order #{order.order_number} status: #{order.fulfilment_status} → #{updated.fulfilment_status}"
|
||||
)
|
||||
|
||||
log_status_change(order, updated)
|
||||
end
|
||||
|
||||
{:error, reason} ->
|
||||
@@ -46,4 +48,42 @@ defmodule Berrypod.Orders.FulfilmentStatusWorker do
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
defp log_status_change(order, updated) do
|
||||
{event_type, message, level, payload} =
|
||||
case updated.fulfilment_status do
|
||||
"processing" ->
|
||||
{"order.production", "In production", "info", %{}}
|
||||
|
||||
"shipped" ->
|
||||
{"order.shipped", "Shipped#{tracking_suffix(updated)}", "info",
|
||||
%{tracking_number: updated.tracking_number, tracking_url: updated.tracking_url}}
|
||||
|
||||
"delivered" ->
|
||||
{"order.delivered", "Delivered", "info", %{}}
|
||||
|
||||
"failed" ->
|
||||
{"order.submission_failed",
|
||||
"Fulfilment failed: #{updated.fulfilment_error || "unknown error"}", "error",
|
||||
%{error: updated.fulfilment_error}}
|
||||
|
||||
"cancelled" ->
|
||||
{"order.cancelled", "Order cancelled", "warning", %{}}
|
||||
|
||||
other ->
|
||||
{"order.status_changed", "Status changed to #{other}", "info", %{}}
|
||||
end
|
||||
|
||||
ActivityLog.log_event(event_type, message,
|
||||
level: level,
|
||||
order_id: order.id,
|
||||
payload: payload
|
||||
)
|
||||
end
|
||||
|
||||
defp tracking_suffix(%{tracking_number: num}) when is_binary(num) and num != "" do
|
||||
" — tracking: #{num}"
|
||||
end
|
||||
|
||||
defp tracking_suffix(_), do: ""
|
||||
end
|
||||
|
||||
@@ -7,7 +7,7 @@ defmodule Berrypod.Orders.OrderNotifier do
|
||||
|
||||
import Swoosh.Email
|
||||
|
||||
alias Berrypod.Cart
|
||||
alias Berrypod.{ActivityLog, Cart}
|
||||
alias Berrypod.Mailer
|
||||
|
||||
require Logger
|
||||
@@ -39,7 +39,21 @@ defmodule Berrypod.Orders.OrderNotifier do
|
||||
==============================
|
||||
"""
|
||||
|
||||
deliver(order.customer_email, subject, body)
|
||||
result = deliver(order.customer_email, subject, body)
|
||||
|
||||
case result do
|
||||
{:ok, _} ->
|
||||
ActivityLog.log_event(
|
||||
"order.email.confirmation_sent",
|
||||
"Order confirmation sent to #{order.customer_email}",
|
||||
order_id: order.id
|
||||
)
|
||||
|
||||
_ ->
|
||||
:ok
|
||||
end
|
||||
|
||||
result
|
||||
end
|
||||
|
||||
@doc """
|
||||
@@ -89,7 +103,22 @@ defmodule Berrypod.Orders.OrderNotifier do
|
||||
==============================
|
||||
"""
|
||||
|
||||
deliver(order.customer_email, subject, body)
|
||||
result = deliver(order.customer_email, subject, body)
|
||||
|
||||
case result do
|
||||
{:ok, _} ->
|
||||
ActivityLog.log_event(
|
||||
"order.email.shipping_sent",
|
||||
"Shipping notification sent to #{order.customer_email}",
|
||||
order_id: order.id,
|
||||
payload: %{tracking_number: order.tracking_number}
|
||||
)
|
||||
|
||||
_ ->
|
||||
:ok
|
||||
end
|
||||
|
||||
result
|
||||
end
|
||||
|
||||
@doc """
|
||||
@@ -116,7 +145,21 @@ defmodule Berrypod.Orders.OrderNotifier do
|
||||
==============================
|
||||
"""
|
||||
|
||||
deliver(cart.customer_email, "You left something behind", body)
|
||||
result = deliver(cart.customer_email, "You left something behind", body)
|
||||
|
||||
case result do
|
||||
{:ok, _} ->
|
||||
ActivityLog.log_event(
|
||||
"abandoned_cart.email_sent",
|
||||
"Recovery email sent to #{cart.customer_email}",
|
||||
payload: %{email: cart.customer_email}
|
||||
)
|
||||
|
||||
_ ->
|
||||
:ok
|
||||
end
|
||||
|
||||
result
|
||||
end
|
||||
|
||||
# --- Private ---
|
||||
|
||||
@@ -7,14 +7,14 @@ defmodule Berrypod.Orders.OrderSubmissionWorker do
|
||||
Retries up to 3 times with backoff for transient failures.
|
||||
"""
|
||||
|
||||
use Oban.Worker, queue: :checkout, max_attempts: 3
|
||||
use Oban.Worker, queue: :checkout, max_attempts: 3, unique: [period: 60]
|
||||
|
||||
alias Berrypod.Orders
|
||||
alias Berrypod.{ActivityLog, Orders}
|
||||
|
||||
require Logger
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(%Oban.Job{args: %{"order_id" => order_id}}) do
|
||||
def perform(%Oban.Job{args: %{"order_id" => order_id}} = job) do
|
||||
case Orders.get_order(order_id) do
|
||||
nil ->
|
||||
Logger.warning("Order submission: order #{order_id} not found")
|
||||
@@ -39,10 +39,30 @@ defmodule Berrypod.Orders.OrderSubmissionWorker do
|
||||
"Order #{updated.order_number} submitted to provider (#{updated.provider_order_id})"
|
||||
)
|
||||
|
||||
ActivityLog.log_event(
|
||||
"order.submitted",
|
||||
"Submitted to provider (#{updated.provider_order_id})",
|
||||
order_id: order.id,
|
||||
payload: %{provider_order_id: updated.provider_order_id}
|
||||
)
|
||||
|
||||
:ok
|
||||
|
||||
{:error, reason} ->
|
||||
Logger.error("Order #{order.order_number} submission failed: #{inspect(reason)}")
|
||||
|
||||
ActivityLog.log_event(
|
||||
"order.submission_failed",
|
||||
"Submission failed: #{inspect(reason)}",
|
||||
level: "error",
|
||||
order_id: order.id,
|
||||
payload: %{
|
||||
error: inspect(reason),
|
||||
attempt: job.attempt,
|
||||
max_attempts: job.max_attempts
|
||||
}
|
||||
)
|
||||
|
||||
{:error, reason}
|
||||
end
|
||||
end
|
||||
|
||||
@@ -15,9 +15,9 @@ defmodule Berrypod.Sync.ProductSyncWorker do
|
||||
* `provider_connection_id` - The ID of the provider connection to sync
|
||||
"""
|
||||
|
||||
use Oban.Worker, queue: :sync, max_attempts: 3
|
||||
use Oban.Worker, queue: :sync, max_attempts: 3, unique: [period: 60]
|
||||
|
||||
alias Berrypod.Products
|
||||
alias Berrypod.{ActivityLog, Products}
|
||||
alias Berrypod.Products.ProviderConnection
|
||||
alias Berrypod.Providers.Provider
|
||||
alias Berrypod.Sync.ImageDownloadWorker
|
||||
@@ -66,6 +66,11 @@ defmodule Berrypod.Sync.ProductSyncWorker do
|
||||
|
||||
defp sync_products(conn) do
|
||||
Logger.info("Starting product sync for #{conn.provider_type} (#{conn.id})")
|
||||
|
||||
ActivityLog.log_event("sync.started", "Product sync started (#{conn.provider_type})",
|
||||
payload: %{provider_type: conn.provider_type, connection_id: conn.id}
|
||||
)
|
||||
|
||||
Products.update_sync_status(conn, "syncing")
|
||||
broadcast_sync(conn.id, {:sync_status, "syncing"})
|
||||
|
||||
@@ -97,6 +102,18 @@ defmodule Berrypod.Sync.ProductSyncWorker do
|
||||
"#{created} created, #{updated} updated, #{unchanged} unchanged, #{errors} errors"
|
||||
)
|
||||
|
||||
ActivityLog.log_event(
|
||||
"sync.completed",
|
||||
"Product sync complete — #{created + updated + unchanged} products, #{created} created, #{updated} updated",
|
||||
payload: %{
|
||||
provider_type: conn.provider_type,
|
||||
created: created,
|
||||
updated: updated,
|
||||
unchanged: unchanged,
|
||||
errors: errors
|
||||
}
|
||||
)
|
||||
|
||||
# Enqueue mockup enrichment for Printful products (extra angle images)
|
||||
if conn.provider_type == "printful" do
|
||||
enqueue_mockup_enrichment(conn, results)
|
||||
@@ -116,6 +133,16 @@ defmodule Berrypod.Sync.ProductSyncWorker do
|
||||
else
|
||||
{:error, reason} = error ->
|
||||
Logger.error("Product sync failed for #{conn.provider_type}: #{inspect(reason)}")
|
||||
|
||||
ActivityLog.log_event("sync.failed", "Product sync failed: #{inspect(reason)}",
|
||||
level: "error",
|
||||
payload: %{
|
||||
provider_type: conn.provider_type,
|
||||
connection_id: conn.id,
|
||||
error: inspect(reason)
|
||||
}
|
||||
)
|
||||
|
||||
Products.update_sync_status(conn, "failed")
|
||||
broadcast_sync(conn.id, {:sync_status, "failed"})
|
||||
error
|
||||
|
||||
Reference in New Issue
Block a user