cap image variant concurrency to CPU core count
All checks were successful
deploy / deploy (push) Successful in 1m36s

Replaces unbounded Task.async parallelism with Task.async_stream
capped at System.schedulers_online(). On shared-cpu-1x this prevents
CPU saturation and SQLite locking; on beefier machines it still
saturates all cores. Also releases the DB connection before starting
libvips processing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
jamey 2026-03-02 10:59:55 +00:00
parent 9a27723b52
commit 2b333650fd

View File

@ -71,21 +71,25 @@ defmodule Berrypod.Images.Optimizer do
Called by Oban worker. Called by Oban worker.
""" """
def process_for_image(image_id) do def process_for_image(image_id) do
case Repo.get(ImageSchema, image_id) do # Load the image row and release the DB connection immediately,
# so libvips processing doesn't block other queries.
image = Repo.get(ImageSchema, image_id)
case image do
nil -> nil ->
{:error, :not_found} {:error, :not_found}
%{data: nil} -> %{data: nil} ->
{:error, :no_data} {:error, :no_data}
%{is_svg: true} = image -> %{is_svg: true} ->
Repo.update!(ImageSchema.changeset(image, %{variants_status: "complete"})) Repo.update!(ImageSchema.changeset(image, %{variants_status: "complete"}))
{:ok, :svg_skipped} {:ok, :svg_skipped}
%{data: data} when byte_size(data) < @min_image_bytes -> %{data: data} when byte_size(data) < @min_image_bytes ->
{:error, :too_small} {:error, :too_small}
%{data: data, source_width: width} = image -> %{data: data, source_width: width} ->
File.mkdir_p!(cache_dir()) File.mkdir_p!(cache_dir())
# Write source WebP to disk so it can be served by Plug.Static # Write source WebP to disk so it can be served by Plug.Static
@ -95,14 +99,19 @@ defmodule Berrypod.Images.Optimizer do
with {:ok, vips_image} <- Image.from_binary(data) do with {:ok, vips_image} <- Image.from_binary(data) do
widths = applicable_widths(width) widths = applicable_widths(width)
tasks = [ all_tasks =
Task.async(fn -> generate_thumbnail(vips_image, image_id) end) [fn -> generate_thumbnail(vips_image, image_id) end] ++
| for w <- widths, fmt <- @pregenerated_formats do for w <- widths, fmt <- @pregenerated_formats do
Task.async(fn -> generate_variant(vips_image, image_id, w, fmt) end) fn -> generate_variant(vips_image, image_id, w, fmt) end
end end
]
Task.await_many(tasks, :timer.seconds(120)) # Cap concurrency to the number of CPU cores — keeps small
# machines from choking while still saturating bigger ones.
Task.async_stream(all_tasks, & &1.(),
max_concurrency: System.schedulers_online(),
timeout: :timer.seconds(120)
)
|> Stream.run()
Repo.update!(ImageSchema.changeset(image, %{variants_status: "complete"})) Repo.update!(ImageSchema.changeset(image, %{variants_status: "complete"}))
{:ok, widths} {:ok, widths}