From e66d51a08b3c2f3ab9aaaaf9b78d8022dbeecab5 Mon Sep 17 00:00:00 2001 From: jamey Date: Mon, 2 Mar 2026 10:59:55 +0000 Subject: [PATCH] cap image variant concurrency to CPU core count Replaces unbounded Task.async parallelism with Task.async_stream capped at System.schedulers_online(). On shared-cpu-1x this prevents CPU saturation and SQLite locking; on beefier machines it still saturates all cores. Also releases the DB connection before starting libvips processing. Co-Authored-By: Claude Opus 4.6 --- lib/berrypod/images/optimizer.ex | 53 ++++++++++++++++---------------- 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/lib/berrypod/images/optimizer.ex b/lib/berrypod/images/optimizer.ex index 5443de5..ef91372 100644 --- a/lib/berrypod/images/optimizer.ex +++ b/lib/berrypod/images/optimizer.ex @@ -71,21 +71,25 @@ defmodule Berrypod.Images.Optimizer do Called by Oban worker. """ def process_for_image(image_id) do - case Repo.get(ImageSchema, image_id) do + # Load the image row and release the DB connection immediately, + # so libvips processing doesn't block other queries. + image = Repo.get(ImageSchema, image_id) + + case image do nil -> {:error, :not_found} %{data: nil} -> {:error, :no_data} - %{is_svg: true} = image -> + %{is_svg: true} -> Repo.update!(ImageSchema.changeset(image, %{variants_status: "complete"})) {:ok, :svg_skipped} %{data: data} when byte_size(data) < @min_image_bytes -> {:error, :too_small} - %{data: data, source_width: width} = image -> + %{data: data, source_width: width} -> File.mkdir_p!(cache_dir()) # Write source WebP to disk so it can be served by Plug.Static @@ -95,14 +99,19 @@ defmodule Berrypod.Images.Optimizer do with {:ok, vips_image} <- Image.from_binary(data) do widths = applicable_widths(width) - tasks = [ - Task.async(fn -> generate_thumbnail(vips_image, image_id) end) - | for w <- widths, fmt <- @pregenerated_formats do - Task.async(fn -> generate_variant(vips_image, image_id, w, fmt) end) + all_tasks = + [fn -> generate_thumbnail(vips_image, image_id) end] ++ + for w <- widths, fmt <- @pregenerated_formats do + fn -> generate_variant(vips_image, image_id, w, fmt) end end - ] - Task.await_many(tasks, :timer.seconds(120)) + # Cap concurrency to the number of CPU cores — keeps small + # machines from choking while still saturating bigger ones. + Task.async_stream(all_tasks, & &1.(), + max_concurrency: System.schedulers_online(), + timeout: :timer.seconds(120) + ) + |> Stream.run() Repo.update!(ImageSchema.changeset(image, %{variants_status: "complete"})) {:ok, widths} @@ -182,25 +191,17 @@ defmodule Berrypod.Images.Optimizer do {:ok, vips_image} <- Image.from_binary(webp_data) do widths = applicable_widths(source_width) - tasks = [ - Task.async(fn -> - generate_variant_to_dir( - vips_image, - output_basename, - output_dir, - "thumb", - :jpg, - @thumb_size - ) - end) - | for w <- widths, fmt <- @pregenerated_formats do - Task.async(fn -> - generate_variant_to_dir(vips_image, output_basename, output_dir, w, fmt, w) - end) + all_tasks = + [fn -> generate_variant_to_dir(vips_image, output_basename, output_dir, "thumb", :jpg, @thumb_size) end] ++ + for w <- widths, fmt <- @pregenerated_formats do + fn -> generate_variant_to_dir(vips_image, output_basename, output_dir, w, fmt, w) end end - ] - Task.await_many(tasks, :timer.seconds(120)) + Task.async_stream(all_tasks, & &1.(), + max_concurrency: System.schedulers_online(), + timeout: :timer.seconds(120) + ) + |> Stream.run() {:ok, source_width} end rescue