feat: add Products context with provider integration (Phase 1)
Implement the schema foundation for syncing products from POD providers like Printify. This includes encrypted credential storage, product/variant schemas, and an Oban worker for background sync. New modules: - Vault: AES-256-GCM encryption for API keys - Products context: CRUD and sync operations for products - Provider behaviour: abstraction for POD provider implementations - ProductSyncWorker: Oban job for async product sync Schemas: ProviderConnection, Product, ProductImage, ProductVariant Also reorganizes Printify client to lib/simpleshop_theme/clients/ and mockup generator to lib/simpleshop_theme/mockups/ for better structure. 134 tests added covering all new functionality. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
148
lib/simpleshop_theme/sync/product_sync_worker.ex
Normal file
148
lib/simpleshop_theme/sync/product_sync_worker.ex
Normal file
@@ -0,0 +1,148 @@
|
||||
defmodule SimpleshopTheme.Sync.ProductSyncWorker do
|
||||
@moduledoc """
|
||||
Oban worker for syncing products from POD providers.
|
||||
|
||||
This worker fetches products from a provider, normalizes them,
|
||||
and upserts them into the local database.
|
||||
|
||||
## Usage
|
||||
|
||||
# Enqueue a sync for a provider connection
|
||||
ProductSyncWorker.enqueue(provider_connection_id)
|
||||
|
||||
## Job Args
|
||||
|
||||
* `provider_connection_id` - The ID of the provider connection to sync
|
||||
"""
|
||||
|
||||
use Oban.Worker, queue: :sync, max_attempts: 3
|
||||
|
||||
alias SimpleshopTheme.Products
|
||||
alias SimpleshopTheme.Products.ProviderConnection
|
||||
alias SimpleshopTheme.Providers.Provider
|
||||
|
||||
require Logger
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(%Oban.Job{args: %{"provider_connection_id" => conn_id}}) do
|
||||
case Products.get_provider_connection(conn_id) do
|
||||
nil ->
|
||||
{:cancel, :connection_not_found}
|
||||
|
||||
%ProviderConnection{enabled: false} ->
|
||||
{:cancel, :connection_disabled}
|
||||
|
||||
conn ->
|
||||
sync_products(conn)
|
||||
end
|
||||
end
|
||||
|
||||
@doc """
|
||||
Enqueue a product sync for a provider connection.
|
||||
"""
|
||||
def enqueue(provider_connection_id) do
|
||||
%{provider_connection_id: provider_connection_id}
|
||||
|> new()
|
||||
|> Oban.insert()
|
||||
end
|
||||
|
||||
@doc """
|
||||
Enqueue a product sync with a delay.
|
||||
"""
|
||||
def enqueue(provider_connection_id, delay_seconds) when is_integer(delay_seconds) do
|
||||
%{provider_connection_id: provider_connection_id}
|
||||
|> new(scheduled_at: DateTime.add(DateTime.utc_now(), delay_seconds, :second))
|
||||
|> Oban.insert()
|
||||
end
|
||||
|
||||
# =============================================================================
|
||||
# Private
|
||||
# =============================================================================
|
||||
|
||||
defp sync_products(conn) do
|
||||
Logger.info("Starting product sync for #{conn.provider_type} (#{conn.id})")
|
||||
|
||||
Products.update_sync_status(conn, "syncing")
|
||||
|
||||
with {:ok, provider} <- Provider.for_connection(conn),
|
||||
{:ok, products} <- provider.fetch_products(conn) do
|
||||
results = sync_all_products(conn, products)
|
||||
|
||||
created = Enum.count(results, fn {_, _, status} -> status == :created end)
|
||||
updated = Enum.count(results, fn {_, _, status} -> status == :updated end)
|
||||
unchanged = Enum.count(results, fn {_, _, status} -> status == :unchanged end)
|
||||
errors = Enum.count(results, fn result -> match?({:error, _}, result) end)
|
||||
|
||||
Logger.info(
|
||||
"Product sync complete for #{conn.provider_type}: " <>
|
||||
"#{created} created, #{updated} updated, #{unchanged} unchanged, #{errors} errors"
|
||||
)
|
||||
|
||||
Products.update_sync_status(conn, "completed", DateTime.utc_now())
|
||||
:ok
|
||||
else
|
||||
{:error, reason} = error ->
|
||||
Logger.error("Product sync failed for #{conn.provider_type}: #{inspect(reason)}")
|
||||
Products.update_sync_status(conn, "failed")
|
||||
error
|
||||
end
|
||||
end
|
||||
|
||||
defp sync_all_products(conn, products) do
|
||||
Enum.map(products, fn product_data ->
|
||||
case sync_product(conn, product_data) do
|
||||
{:ok, product, status} ->
|
||||
sync_product_associations(product, product_data)
|
||||
{:ok, product, status}
|
||||
|
||||
error ->
|
||||
error
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
defp sync_product(conn, product_data) do
|
||||
attrs = %{
|
||||
provider_product_id: product_data[:provider_product_id],
|
||||
title: product_data[:title],
|
||||
description: product_data[:description],
|
||||
category: product_data[:category],
|
||||
provider_data: product_data[:provider_data]
|
||||
}
|
||||
|
||||
Products.upsert_product(conn, attrs)
|
||||
end
|
||||
|
||||
defp sync_product_associations(product, product_data) do
|
||||
# Sync images
|
||||
images =
|
||||
(product_data[:images] || [])
|
||||
|> Enum.map(fn img ->
|
||||
%{
|
||||
src: img[:src],
|
||||
position: img[:position],
|
||||
alt: img[:alt]
|
||||
}
|
||||
end)
|
||||
|
||||
Products.sync_product_images(product, images)
|
||||
|
||||
# Sync variants
|
||||
variants =
|
||||
(product_data[:variants] || [])
|
||||
|> Enum.map(fn var ->
|
||||
%{
|
||||
provider_variant_id: var[:provider_variant_id],
|
||||
title: var[:title],
|
||||
sku: var[:sku],
|
||||
price: var[:price],
|
||||
cost: var[:cost],
|
||||
options: var[:options],
|
||||
is_enabled: var[:is_enabled],
|
||||
is_available: var[:is_available]
|
||||
}
|
||||
end)
|
||||
|
||||
Products.sync_product_variants(product, variants)
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user