cap image variant concurrency to CPU core count
All checks were successful
deploy / deploy (push) Successful in 1m5s

Replaces unbounded Task.async parallelism with Task.async_stream
capped at System.schedulers_online(). On shared-cpu-1x this prevents
CPU saturation and SQLite locking; on beefier machines it still
saturates all cores. Also releases the DB connection before starting
libvips processing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
jamey 2026-03-02 10:59:55 +00:00
parent 9a27723b52
commit e66d51a08b

View File

@ -71,21 +71,25 @@ defmodule Berrypod.Images.Optimizer do
Called by Oban worker. Called by Oban worker.
""" """
def process_for_image(image_id) do def process_for_image(image_id) do
case Repo.get(ImageSchema, image_id) do # Load the image row and release the DB connection immediately,
# so libvips processing doesn't block other queries.
image = Repo.get(ImageSchema, image_id)
case image do
nil -> nil ->
{:error, :not_found} {:error, :not_found}
%{data: nil} -> %{data: nil} ->
{:error, :no_data} {:error, :no_data}
%{is_svg: true} = image -> %{is_svg: true} ->
Repo.update!(ImageSchema.changeset(image, %{variants_status: "complete"})) Repo.update!(ImageSchema.changeset(image, %{variants_status: "complete"}))
{:ok, :svg_skipped} {:ok, :svg_skipped}
%{data: data} when byte_size(data) < @min_image_bytes -> %{data: data} when byte_size(data) < @min_image_bytes ->
{:error, :too_small} {:error, :too_small}
%{data: data, source_width: width} = image -> %{data: data, source_width: width} ->
File.mkdir_p!(cache_dir()) File.mkdir_p!(cache_dir())
# Write source WebP to disk so it can be served by Plug.Static # Write source WebP to disk so it can be served by Plug.Static
@ -95,14 +99,19 @@ defmodule Berrypod.Images.Optimizer do
with {:ok, vips_image} <- Image.from_binary(data) do with {:ok, vips_image} <- Image.from_binary(data) do
widths = applicable_widths(width) widths = applicable_widths(width)
tasks = [ all_tasks =
Task.async(fn -> generate_thumbnail(vips_image, image_id) end) [fn -> generate_thumbnail(vips_image, image_id) end] ++
| for w <- widths, fmt <- @pregenerated_formats do for w <- widths, fmt <- @pregenerated_formats do
Task.async(fn -> generate_variant(vips_image, image_id, w, fmt) end) fn -> generate_variant(vips_image, image_id, w, fmt) end
end end
]
Task.await_many(tasks, :timer.seconds(120)) # Cap concurrency to the number of CPU cores — keeps small
# machines from choking while still saturating bigger ones.
Task.async_stream(all_tasks, & &1.(),
max_concurrency: System.schedulers_online(),
timeout: :timer.seconds(120)
)
|> Stream.run()
Repo.update!(ImageSchema.changeset(image, %{variants_status: "complete"})) Repo.update!(ImageSchema.changeset(image, %{variants_status: "complete"}))
{:ok, widths} {:ok, widths}
@ -182,25 +191,17 @@ defmodule Berrypod.Images.Optimizer do
{:ok, vips_image} <- Image.from_binary(webp_data) do {:ok, vips_image} <- Image.from_binary(webp_data) do
widths = applicable_widths(source_width) widths = applicable_widths(source_width)
tasks = [ all_tasks =
Task.async(fn -> [fn -> generate_variant_to_dir(vips_image, output_basename, output_dir, "thumb", :jpg, @thumb_size) end] ++
generate_variant_to_dir( for w <- widths, fmt <- @pregenerated_formats do
vips_image, fn -> generate_variant_to_dir(vips_image, output_basename, output_dir, w, fmt, w) end
output_basename,
output_dir,
"thumb",
:jpg,
@thumb_size
)
end)
| for w <- widths, fmt <- @pregenerated_formats do
Task.async(fn ->
generate_variant_to_dir(vips_image, output_basename, output_dir, w, fmt, w)
end)
end end
]
Task.await_many(tasks, :timer.seconds(120)) Task.async_stream(all_tasks, & &1.(),
max_concurrency: System.schedulers_online(),
timeout: :timer.seconds(120)
)
|> Stream.run()
{:ok, source_width} {:ok, source_width}
end end
rescue rescue