diff --git a/lib/mix/tasks/bench_sqlite.ex b/lib/mix/tasks/bench_sqlite.ex new file mode 100644 index 0000000..01cdea4 --- /dev/null +++ b/lib/mix/tasks/bench_sqlite.ex @@ -0,0 +1,377 @@ +defmodule Mix.Tasks.Bench.Sqlite do + @shortdoc "Benchmark SQLite concurrency against the running Phoenix stack" + @moduledoc """ + Runs concurrent HTTP read and DB write benchmarks against the full + Phoenix stack to measure SQLite performance under realistic load. + + Starts the server on port 4098, seeds temporary product data, runs + three scenarios at increasing concurrency, then cleans up. + + ## Usage + + mix bench.sqlite + mix bench.sqlite --prod + + ## Options + + * `--port PORT` - Port to start the server on (default: 4098) + * `--prod` - Re-run the benchmark compiled in prod mode (compiled + templates, no code reloader, no debug annotations). Automatically + sets `MIX_ENV=prod` and supplies the required env vars. + * `--pool-size N` - Override the DB connection pool size (default: 5). + Higher values allow more concurrent DB connections. + * `--scale N` - Multiply all concurrency levels by N (default: 1). + E.g. `--scale 4` runs 200/240/400/800 concurrent tasks. + * `--busy-timeout MS` - Override SQLite busy_timeout in milliseconds + (default: 5000). Higher values let writers wait longer for the lock. + """ + + use Mix.Task + + require Logger + + @default_port 4098 + + @scenarios [ + {"reads only", [readers: 50, writers: 0]}, + {"mixed", [readers: 45, writers: 15]}, + {"heavy mixed", [readers: 67, writers: 33]}, + {"stress", [readers: 134, writers: 66]} + ] + + @impl Mix.Task + def run(args) do + {opts, _, _} = + OptionParser.parse(args, + strict: [ + port: :integer, + prod: :boolean, + pool_size: :integer, + scale: :integer, + busy_timeout: :integer + ] + ) + + bench_opts = %{ + port: opts[:port] || @default_port, + pool_size: opts[:pool_size], + scale: opts[:scale] || 1, + busy_timeout: opts[:busy_timeout] + } + + if opts[:prod] do + run_in_prod(bench_opts) + else + run_bench(bench_opts) + end + end + + defp run_in_prod(%{port: port, pool_size: pool_size, scale: scale, busy_timeout: busy_timeout}) do + if Mix.env() == :prod do + run_bench(%{port: port, pool_size: pool_size, scale: scale, busy_timeout: busy_timeout}) + else + Mix.shell().info("Re-launching in MIX_ENV=prod...") + + db_path = Path.expand("berrypod_dev.db") + pool_env = if pool_size, do: "#{pool_size}", else: "5" + + env = [ + {"MIX_ENV", "prod"}, + {"DATABASE_PATH", db_path}, + {"SECRET_KEY_BASE", "bench_only_not_real_" <> String.duplicate("x", 44)}, + {"PHX_HOST", "localhost"}, + {"PORT", "#{port}"}, + {"POOL_SIZE", pool_env} + ] + + args = + ["bench.sqlite", "--port", "#{port}", "--scale", "#{scale}"] ++ + if(pool_size, do: ["--pool-size", "#{pool_size}"], else: []) ++ + if(busy_timeout, do: ["--busy-timeout", "#{busy_timeout}"], else: []) + + System.cmd("mix", args, + env: env, + into: IO.stream(:stdio, :line), + stderr_to_stdout: true + ) + end + end + + defp run_bench(%{port: port, pool_size: pool_size, scale: scale, busy_timeout: busy_timeout}) do + base_url = start_server!(port) + + # Apply repo overrides after app.start (runtime.exs runs during app.start + # and would overwrite anything we set before). Bounce the repo to pick up + # the new config. + if pool_size || busy_timeout do + repo_config = Application.get_env(:berrypod, Berrypod.Repo, []) + + repo_config = + if pool_size, do: Keyword.put(repo_config, :pool_size, pool_size), else: repo_config + + repo_config = + if busy_timeout, + do: Keyword.put(repo_config, :busy_timeout, busy_timeout), + else: repo_config + + Application.put_env(:berrypod, Berrypod.Repo, repo_config) + + Supervisor.terminate_child(Berrypod.Supervisor, Berrypod.Repo) + Supervisor.restart_child(Berrypod.Supervisor, Berrypod.Repo) + end + + pause_oban!() + {conn, slugs} = seed_data!() + + repo_conf = Application.get_env(:berrypod, Berrypod.Repo, []) + actual_pool = repo_conf[:pool_size] || 5 + actual_bt = repo_conf[:busy_timeout] || 5000 + mode = if Mix.env() == :prod, do: "prod", else: "dev" + + try do + Mix.shell().info( + "\n--- SQLite bench (#{mode}, pool=#{actual_pool}, busy_timeout=#{actual_bt}ms, scale=#{scale}x) ---\n" + ) + + for {label, opts} <- @scenarios do + readers = opts[:readers] * scale + writers = opts[:writers] * scale + run_scenario(label, base_url, slugs, readers: readers, writers: writers) + end + after + resume_oban!() + cleanup!(conn) + end + end + + # -- Server lifecycle -- + + defp start_server!(port) do + config = Application.get_env(:berrypod, BerrypodWeb.Endpoint, []) + + config = + Keyword.merge(config, + http: [ip: {127, 0, 0, 1}, port: port], + server: true, + watchers: [] + ) + + Application.put_env(:berrypod, BerrypodWeb.Endpoint, config) + Application.put_env(:logger, :level, :warning) + + Mix.Task.run("app.start") + + Logger.configure(level: :warning) + + base_url = "http://localhost:#{port}" + wait_for_server(base_url, 20) + Mix.shell().info("Server started on #{base_url}") + base_url + end + + defp wait_for_server(_url, 0) do + Mix.raise("Server failed to start") + end + + defp wait_for_server(url, attempts) do + Application.ensure_all_started(:inets) + Application.ensure_all_started(:ssl) + + case :httpc.request(:get, {~c"#{url}", []}, [timeout: 2_000], []) do + {:ok, {{_, status, _}, _, _}} when status in 200..399 -> + :ok + + _ -> + Process.sleep(500) + wait_for_server(url, attempts - 1) + end + end + + defp pause_oban! do + Oban.pause_all_queues(Oban) + Mix.shell().info("Oban queues paused for benchmark") + end + + defp resume_oban! do + Oban.resume_all_queues(Oban) + Mix.shell().info("Oban queues resumed") + end + + # -- Data seeding -- + + defp seed_data! do + alias Berrypod.{Products, Repo} + + Mix.shell().info("Seeding bench data...") + + {:ok, conn} = + Products.create_provider_connection(%{ + provider_type: "printify", + name: "Bench Shop", + api_key: "bench_fake_key_12345", + config: %{"shop_id" => "99999"} + }) + + for i <- 1..20 do + {:ok, product, _} = + Products.upsert_product(conn, %{ + provider_product_id: "bench-prov-#{i}", + title: "Bench Product #{i}", + description: "A benchmarking product, number #{i}." + }) + + for v <- 1..3 do + Products.create_product_variant(%{ + product_id: product.id, + provider_variant_id: "bvar-#{i}-#{v}", + title: Enum.at(["S", "M", "L"], v - 1), + price: 1999 + v * 500, + is_enabled: true, + is_available: true + }) + end + end + + Berrypod.Search.rebuild_index() + + %{rows: slug_rows} = Repo.query!("SELECT slug FROM products WHERE slug LIKE 'bench-%'") + slugs = List.flatten(slug_rows) + Mix.shell().info(" Seeded #{length(slugs)} products with variants") + {conn, slugs} + end + + # -- Cleanup -- + + defp cleanup!(conn) do + alias Berrypod.{Products, Repo} + + Mix.shell().info("\nCleaning up bench data...") + + Repo.query!( + "DELETE FROM order_items WHERE order_id IN (SELECT id FROM orders WHERE customer_email LIKE 'bench_%')" + ) + + Repo.query!("DELETE FROM orders WHERE customer_email LIKE 'bench_%'") + + Repo.query!( + "DELETE FROM product_variants WHERE product_id IN (SELECT id FROM products WHERE slug LIKE 'bench-%')" + ) + + Repo.query!( + "DELETE FROM product_images WHERE product_id IN (SELECT id FROM products WHERE slug LIKE 'bench-%')" + ) + + Repo.query!("DELETE FROM products WHERE slug LIKE 'bench-%'") + Products.delete_provider_connection(conn) + Berrypod.Search.rebuild_index() + + Mix.shell().info(" Done.") + end + + # -- Benchmark runner -- + + defp run_scenario(label, base_url, slugs, opts) do + readers = opts[:readers] + writers = opts[:writers] + total = readers + writers + + read_urls = [base_url <> "/"] ++ Enum.map(slugs, &(base_url <> "/products/" <> &1)) + + # Interleave readers and writers for realistic ordering + task_types = + (List.duplicate(:read, readers) ++ List.duplicate(:write, writers)) + |> Enum.shuffle() + + {wall_time, results} = + :timer.tc(fn -> + task_types + |> Enum.with_index(1) + |> Enum.map(fn {type, i} -> + Task.async(fn -> run_task(type, i, read_urls) end) + end) + |> Task.await_many(120_000) + end) + + print_results(label, total, readers, writers, wall_time, results) + end + + defp run_task(:read, _i, read_urls) do + url = Enum.random(read_urls) + + {time, result} = + :timer.tc(fn -> + try do + %{status: status} = Req.get!(url) + {:ok, status} + rescue + e -> {:error, Exception.message(e)} + end + end) + + {time, :read, result} + end + + defp run_task(:write, i, _read_urls) do + {time, result} = + :timer.tc(fn -> + try do + Berrypod.Orders.create_order(%{ + customer_email: "bench_#{i}@example.com", + currency: "gbp", + items: [ + %{ + variant_id: "bench-var", + name: "Bench Item", + variant: "M", + price: 2499, + quantity: 1 + } + ] + }) + rescue + e -> {:error, Exception.message(e)} + end + end) + + {time, :write, result} + end + + # -- Reporting -- + + defp print_results(label, total, readers, writers, wall_time, results) do + read_results = Enum.filter(results, fn {_, type, _} -> type == :read end) + write_results = Enum.filter(results, fn {_, type, _} -> type == :write end) + + read_ok = Enum.count(read_results, fn {_, _, r} -> match?({:ok, 200}, r) end) + write_ok = Enum.count(write_results, fn {_, _, r} -> match?({:ok, _}, r) end) + write_err = length(write_results) - write_ok + + read_lats = latencies(read_results) + write_lats = latencies(write_results) + + Mix.shell().info(""" + #{label} (#{total} concurrent: #{readers} reads + #{writers} writes) + wall time: #{div(wall_time, 1000)}ms + reads: #{read_ok}/#{length(read_results)} ok#{format_latencies(read_lats)} + writes: #{write_ok}/#{length(write_results)} ok, #{write_err} errors#{format_latencies(write_lats)} + """) + end + + defp latencies([]), do: [] + + defp latencies(results) do + results + |> Enum.map(fn {us, _, _} -> div(us, 1000) end) + |> Enum.sort() + end + + defp format_latencies([]), do: "" + + defp format_latencies(sorted) do + n = length(sorted) + p50 = Enum.at(sorted, div(n, 2)) + p95 = Enum.at(sorted, round(n * 0.95) - 1) + + "\n p50: #{p50}ms p95: #{p95}ms max: #{List.last(sorted)}ms" + end +end diff --git a/test/berrypod/sqlite_concurrency_test.exs b/test/berrypod/sqlite_concurrency_test.exs new file mode 100644 index 0000000..76a6a1c --- /dev/null +++ b/test/berrypod/sqlite_concurrency_test.exs @@ -0,0 +1,389 @@ +defmodule Berrypod.SQLiteConcurrencyTest do + @moduledoc """ + Tests and benchmarks for SQLite concurrent read/write behaviour. + + Correctness tests run as part of the normal suite. Benchmark tests are + tagged :benchmark and excluded by default — run them with: + + mix test test/berrypod/sqlite_concurrency_test.exs --include benchmark + """ + use ExUnit.Case, async: false + + @tmp_dir System.tmp_dir!() + + # ------------------------------------------------------------------- + # Helpers + # ------------------------------------------------------------------- + + defp start_bench_repo(opts) do + db_path = Path.join(@tmp_dir, "bench_#{System.unique_integer([:positive])}.db") + + defaults = [ + database: db_path, + pool_size: 5, + journal_mode: :wal, + busy_timeout: 5000 + ] + + config = Keyword.merge(defaults, opts) + {:ok, pid} = Berrypod.BenchRepo.start_link(config) + + Berrypod.BenchRepo.query!(""" + CREATE TABLE bench ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + value TEXT NOT NULL, + payload TEXT DEFAULT '' + ) + """) + + {pid, db_path} + end + + defp stop_bench_repo(pid, db_path) do + GenServer.stop(pid) + File.rm(db_path) + File.rm(db_path <> "-wal") + File.rm(db_path <> "-shm") + end + + defp seed_rows(n) do + for i <- 1..n do + Berrypod.BenchRepo.query!( + "INSERT INTO bench (value, payload) VALUES (?, ?)", + ["row_#{i}", String.duplicate("x", 200)] + ) + end + end + + defp run_concurrent(n, fun) do + tasks = + for i <- 1..n do + Task.async(fn -> + {time_us, result} = :timer.tc(fn -> fun.(i) end) + {time_us, result} + end) + end + + Task.await_many(tasks, 30_000) + end + + defp count_rows do + %{rows: [[count]]} = Berrypod.BenchRepo.query!("SELECT COUNT(*) FROM bench") + count + end + + # ------------------------------------------------------------------- + # Correctness tests (always run) + # ------------------------------------------------------------------- + + describe "pragma configuration" do + test "PRAGMAs are applied correctly on connection" do + {pid, db_path} = + start_bench_repo( + journal_mode: :wal, + cache_size: -20_000, + journal_size_limit: 67_108_864, + custom_pragmas: [mmap_size: 134_217_728] + ) + + assert_pragma("journal_mode", "wal") + assert_pragma("synchronous", 1) + assert_pragma("foreign_keys", 1) + assert_pragma("temp_store", 2) + assert_pragma("cache_size", -20_000) + assert_pragma("mmap_size", 134_217_728) + assert_pragma("journal_size_limit", 67_108_864) + + # busy_timeout isn't queryable via PRAGMA in all SQLite builds, + # but we can verify it was accepted by the absence of errors above + + stop_bench_repo(pid, db_path) + end + end + + describe "concurrent reads" do + test "multiple readers don't block each other under WAL" do + {pid, db_path} = start_bench_repo(pool_size: 5) + seed_rows(500) + + results = + run_concurrent(20, fn _i -> + %{rows: rows} = + Berrypod.BenchRepo.query!("SELECT * FROM bench WHERE value LIKE '%_5%'") + + {:ok, length(rows)} + end) + + # All 20 should succeed + outcomes = Enum.map(results, fn {_us, result} -> result end) + assert Enum.all?(outcomes, &match?({:ok, _}, &1)) + + # All should find the same number of matching rows + counts = Enum.map(outcomes, fn {:ok, n} -> n end) + assert length(Enum.uniq(counts)) == 1 + + stop_bench_repo(pid, db_path) + end + end + + describe "concurrent writes" do + test "bare inserts all succeed with busy_timeout" do + {pid, db_path} = start_bench_repo(pool_size: 5, busy_timeout: 5000) + + results = + run_concurrent(20, fn i -> + Berrypod.BenchRepo.query!( + "INSERT INTO bench (value) VALUES (?)", + ["writer_#{i}"] + ) + + :ok + end) + + outcomes = Enum.map(results, fn {_us, result} -> result end) + assert Enum.all?(outcomes, &(&1 == :ok)) + assert count_rows() == 20 + + stop_bench_repo(pid, db_path) + end + + test "transactions succeed with IMMEDIATE mode" do + {pid, db_path} = + start_bench_repo( + pool_size: 5, + busy_timeout: 5000, + default_transaction_mode: :immediate + ) + + results = + run_concurrent(20, fn i -> + Berrypod.BenchRepo.transaction(fn -> + Berrypod.BenchRepo.query!( + "INSERT INTO bench (value) VALUES (?)", + ["tx_writer_#{i}"] + ) + end) + end) + + outcomes = Enum.map(results, fn {_us, result} -> result end) + assert Enum.all?(outcomes, &match?({:ok, _}, &1)) + assert count_rows() == 20 + + stop_bench_repo(pid, db_path) + end + end + + describe "transaction upgrade behaviour" do + test "DEFERRED mode may fail when upgrading read to write" do + # This test demonstrates the transaction upgrade problem. + # Under DEFERRED, a transaction that starts with a read and then + # tries to write can get SQLITE_BUSY if another writer holds the lock. + # It's timing-dependent so we run many attempts to increase the chance. + {pid, db_path} = start_bench_repo(pool_size: 5, busy_timeout: 50) + seed_rows(100) + + results = + run_concurrent(30, fn i -> + try do + Berrypod.BenchRepo.transaction(fn -> + # Start with a read (deferred transaction stays as reader) + Berrypod.BenchRepo.query!("SELECT * FROM bench WHERE id = ?", [rem(i, 100) + 1]) + # Small delay to increase contention window + Process.sleep(1) + # Now try to write (requires upgrade from reader to writer) + Berrypod.BenchRepo.query!( + "INSERT INTO bench (value) VALUES (?)", + ["upgrade_#{i}"] + ) + end) + rescue + e -> {:error, e} + end + end) + + errors = + Enum.count(results, fn + {_us, {:error, _}} -> true + _ -> false + end) + + # We can't guarantee failures (timing-dependent), but log what happened + successes = 30 - errors + IO.puts("\n DEFERRED upgrade: #{successes}/30 succeeded, #{errors}/30 failed") + + stop_bench_repo(pid, db_path) + end + + test "IMMEDIATE mode prevents transaction upgrade failures" do + {pid, db_path} = + start_bench_repo( + pool_size: 5, + busy_timeout: 5000, + default_transaction_mode: :immediate + ) + + seed_rows(100) + + results = + run_concurrent(30, fn i -> + Berrypod.BenchRepo.transaction(fn -> + # Read then write — no upgrade problem because lock taken upfront + Berrypod.BenchRepo.query!("SELECT * FROM bench WHERE id = ?", [rem(i, 100) + 1]) + Process.sleep(1) + + Berrypod.BenchRepo.query!( + "INSERT INTO bench (value) VALUES (?)", + ["immediate_#{i}"] + ) + end) + end) + + outcomes = Enum.map(results, fn {_us, result} -> result end) + successes = Enum.count(outcomes, &match?({:ok, _}, &1)) + + errors = + Enum.count(outcomes, fn + {:error, _} -> true + _ -> false + end) + + IO.puts("\n IMMEDIATE mode: #{successes}/30 succeeded, #{errors}/30 failed") + assert errors == 0, "IMMEDIATE mode should prevent all upgrade failures" + + stop_bench_repo(pid, db_path) + end + end + + # ------------------------------------------------------------------- + # Benchmarks (tagged, skipped by default) + # ------------------------------------------------------------------- + + describe "benchmark: read throughput vs pool size" do + @describetag :benchmark + + test "concurrent reads scale with pool size" do + # Pool size 1 — all reads serialized + {pid1, path1} = start_bench_repo(pool_size: 1) + seed_rows(5000) + + {time_pool1, _} = + :timer.tc(fn -> + run_concurrent(50, fn _i -> + Berrypod.BenchRepo.query!("SELECT * FROM bench WHERE value LIKE '%_42%'") + end) + end) + + stop_bench_repo(pid1, path1) + + # Pool size 5 — reads can run concurrently under WAL + {pid5, path5} = start_bench_repo(pool_size: 5) + seed_rows(5000) + + {time_pool5, _} = + :timer.tc(fn -> + run_concurrent(50, fn _i -> + Berrypod.BenchRepo.query!("SELECT * FROM bench WHERE value LIKE '%_42%'") + end) + end) + + stop_bench_repo(pid5, path5) + + IO.puts(""" + \n Read throughput (50 concurrent reads, 5000 rows): + pool_size=1: #{div(time_pool1, 1000)}ms + pool_size=5: #{div(time_pool5, 1000)}ms + speedup: #{Float.round(time_pool1 / max(time_pool5, 1), 2)}x + """) + + assert time_pool5 < time_pool1, + "Pool size 5 should be faster than 1 for concurrent reads" + end + end + + describe "benchmark: write throughput vs pool size" do + @describetag :benchmark + + test "concurrent writes with different pool sizes" do + for pool_size <- [1, 3, 5] do + {pid, db_path} = + start_bench_repo( + pool_size: pool_size, + busy_timeout: 5000, + default_transaction_mode: :immediate + ) + + {time_us, results} = + :timer.tc(fn -> + run_concurrent(100, fn i -> + Berrypod.BenchRepo.query!( + "INSERT INTO bench (value) VALUES (?)", + ["pool#{pool_size}_#{i}"] + ) + + :ok + end) + end) + + successes = Enum.count(results, fn {_us, r} -> r == :ok end) + rows = count_rows() + + IO.puts( + " pool_size=#{pool_size}: #{div(time_us, 1000)}ms, " <> + "#{successes}/100 succeeded, #{rows} rows" + ) + + stop_bench_repo(pid, db_path) + end + end + end + + describe "benchmark: mmap_size impact" do + @describetag :benchmark + + test "read performance with and without mmap" do + # Without mmap + {pid0, path0} = start_bench_repo(pool_size: 5, custom_pragmas: [mmap_size: 0]) + seed_rows(10_000) + + {time_no_mmap, _} = + :timer.tc(fn -> + run_concurrent(30, fn _i -> + Berrypod.BenchRepo.query!("SELECT COUNT(*), AVG(length(payload)) FROM bench") + end) + end) + + stop_bench_repo(pid0, path0) + + # With 128MB mmap + {pid128, path128} = + start_bench_repo(pool_size: 5, custom_pragmas: [mmap_size: 134_217_728]) + + seed_rows(10_000) + + {time_mmap, _} = + :timer.tc(fn -> + run_concurrent(30, fn _i -> + Berrypod.BenchRepo.query!("SELECT COUNT(*), AVG(length(payload)) FROM bench") + end) + end) + + stop_bench_repo(pid128, path128) + + IO.puts(""" + \n mmap impact (30 concurrent reads, 10000 rows): + mmap_size=0: #{div(time_no_mmap, 1000)}ms + mmap_size=128M: #{div(time_mmap, 1000)}ms + speedup: #{Float.round(time_no_mmap / max(time_mmap, 1), 2)}x + """) + end + end + + # ------------------------------------------------------------------- + # Private + # ------------------------------------------------------------------- + + defp assert_pragma(name, expected) do + %{rows: [[value]]} = Berrypod.BenchRepo.query!("PRAGMA #{name}") + assert value == expected, "PRAGMA #{name}: expected #{expected}, got #{value}" + end +end diff --git a/test/support/bench_repo.ex b/test/support/bench_repo.ex new file mode 100644 index 0000000..9d8a907 --- /dev/null +++ b/test/support/bench_repo.ex @@ -0,0 +1,12 @@ +defmodule Berrypod.BenchRepo do + @moduledoc """ + Standalone Ecto Repo for SQLite concurrency tests. + + Started dynamically against temp DB files — no Sandbox, no Application + config dependency. Each test controls pool size, PRAGMAs, and transaction + mode independently. + """ + use Ecto.Repo, + otp_app: :berrypod, + adapter: Ecto.Adapters.SQLite3 +end diff --git a/test/test_helper.exs b/test/test_helper.exs index a1965ad..8971e31 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1,2 +1,2 @@ -ExUnit.start() +ExUnit.start(exclude: [:benchmark]) Ecto.Adapters.SQL.Sandbox.mode(Berrypod.Repo, :manual)