add SQLite concurrency tests and bench task
BenchRepo for isolated concurrency testing against temp DB files. Correctness tests prove WAL concurrent reads, IMMEDIATE transaction mode vs DEFERRED upgrade failures, and PRAGMA application. Benchmark tests (tagged :benchmark, excluded by default) measure throughput. mix bench.sqlite runs HTTP load scenarios against the full Phoenix stack with --prod, --scale, --pool-size, and --busy-timeout options. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
04cdb62a8d
commit
162bf4ca53
377
lib/mix/tasks/bench_sqlite.ex
Normal file
377
lib/mix/tasks/bench_sqlite.ex
Normal file
@ -0,0 +1,377 @@
|
|||||||
|
defmodule Mix.Tasks.Bench.Sqlite do
|
||||||
|
@shortdoc "Benchmark SQLite concurrency against the running Phoenix stack"
|
||||||
|
@moduledoc """
|
||||||
|
Runs concurrent HTTP read and DB write benchmarks against the full
|
||||||
|
Phoenix stack to measure SQLite performance under realistic load.
|
||||||
|
|
||||||
|
Starts the server on port 4098, seeds temporary product data, runs
|
||||||
|
three scenarios at increasing concurrency, then cleans up.
|
||||||
|
|
||||||
|
## Usage
|
||||||
|
|
||||||
|
mix bench.sqlite
|
||||||
|
mix bench.sqlite --prod
|
||||||
|
|
||||||
|
## Options
|
||||||
|
|
||||||
|
* `--port PORT` - Port to start the server on (default: 4098)
|
||||||
|
* `--prod` - Re-run the benchmark compiled in prod mode (compiled
|
||||||
|
templates, no code reloader, no debug annotations). Automatically
|
||||||
|
sets `MIX_ENV=prod` and supplies the required env vars.
|
||||||
|
* `--pool-size N` - Override the DB connection pool size (default: 5).
|
||||||
|
Higher values allow more concurrent DB connections.
|
||||||
|
* `--scale N` - Multiply all concurrency levels by N (default: 1).
|
||||||
|
E.g. `--scale 4` runs 200/240/400/800 concurrent tasks.
|
||||||
|
* `--busy-timeout MS` - Override SQLite busy_timeout in milliseconds
|
||||||
|
(default: 5000). Higher values let writers wait longer for the lock.
|
||||||
|
"""
|
||||||
|
|
||||||
|
use Mix.Task
|
||||||
|
|
||||||
|
require Logger
|
||||||
|
|
||||||
|
@default_port 4098
|
||||||
|
|
||||||
|
@scenarios [
|
||||||
|
{"reads only", [readers: 50, writers: 0]},
|
||||||
|
{"mixed", [readers: 45, writers: 15]},
|
||||||
|
{"heavy mixed", [readers: 67, writers: 33]},
|
||||||
|
{"stress", [readers: 134, writers: 66]}
|
||||||
|
]
|
||||||
|
|
||||||
|
@impl Mix.Task
|
||||||
|
def run(args) do
|
||||||
|
{opts, _, _} =
|
||||||
|
OptionParser.parse(args,
|
||||||
|
strict: [
|
||||||
|
port: :integer,
|
||||||
|
prod: :boolean,
|
||||||
|
pool_size: :integer,
|
||||||
|
scale: :integer,
|
||||||
|
busy_timeout: :integer
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
bench_opts = %{
|
||||||
|
port: opts[:port] || @default_port,
|
||||||
|
pool_size: opts[:pool_size],
|
||||||
|
scale: opts[:scale] || 1,
|
||||||
|
busy_timeout: opts[:busy_timeout]
|
||||||
|
}
|
||||||
|
|
||||||
|
if opts[:prod] do
|
||||||
|
run_in_prod(bench_opts)
|
||||||
|
else
|
||||||
|
run_bench(bench_opts)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp run_in_prod(%{port: port, pool_size: pool_size, scale: scale, busy_timeout: busy_timeout}) do
|
||||||
|
if Mix.env() == :prod do
|
||||||
|
run_bench(%{port: port, pool_size: pool_size, scale: scale, busy_timeout: busy_timeout})
|
||||||
|
else
|
||||||
|
Mix.shell().info("Re-launching in MIX_ENV=prod...")
|
||||||
|
|
||||||
|
db_path = Path.expand("berrypod_dev.db")
|
||||||
|
pool_env = if pool_size, do: "#{pool_size}", else: "5"
|
||||||
|
|
||||||
|
env = [
|
||||||
|
{"MIX_ENV", "prod"},
|
||||||
|
{"DATABASE_PATH", db_path},
|
||||||
|
{"SECRET_KEY_BASE", "bench_only_not_real_" <> String.duplicate("x", 44)},
|
||||||
|
{"PHX_HOST", "localhost"},
|
||||||
|
{"PORT", "#{port}"},
|
||||||
|
{"POOL_SIZE", pool_env}
|
||||||
|
]
|
||||||
|
|
||||||
|
args =
|
||||||
|
["bench.sqlite", "--port", "#{port}", "--scale", "#{scale}"] ++
|
||||||
|
if(pool_size, do: ["--pool-size", "#{pool_size}"], else: []) ++
|
||||||
|
if(busy_timeout, do: ["--busy-timeout", "#{busy_timeout}"], else: [])
|
||||||
|
|
||||||
|
System.cmd("mix", args,
|
||||||
|
env: env,
|
||||||
|
into: IO.stream(:stdio, :line),
|
||||||
|
stderr_to_stdout: true
|
||||||
|
)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp run_bench(%{port: port, pool_size: pool_size, scale: scale, busy_timeout: busy_timeout}) do
|
||||||
|
base_url = start_server!(port)
|
||||||
|
|
||||||
|
# Apply repo overrides after app.start (runtime.exs runs during app.start
|
||||||
|
# and would overwrite anything we set before). Bounce the repo to pick up
|
||||||
|
# the new config.
|
||||||
|
if pool_size || busy_timeout do
|
||||||
|
repo_config = Application.get_env(:berrypod, Berrypod.Repo, [])
|
||||||
|
|
||||||
|
repo_config =
|
||||||
|
if pool_size, do: Keyword.put(repo_config, :pool_size, pool_size), else: repo_config
|
||||||
|
|
||||||
|
repo_config =
|
||||||
|
if busy_timeout,
|
||||||
|
do: Keyword.put(repo_config, :busy_timeout, busy_timeout),
|
||||||
|
else: repo_config
|
||||||
|
|
||||||
|
Application.put_env(:berrypod, Berrypod.Repo, repo_config)
|
||||||
|
|
||||||
|
Supervisor.terminate_child(Berrypod.Supervisor, Berrypod.Repo)
|
||||||
|
Supervisor.restart_child(Berrypod.Supervisor, Berrypod.Repo)
|
||||||
|
end
|
||||||
|
|
||||||
|
pause_oban!()
|
||||||
|
{conn, slugs} = seed_data!()
|
||||||
|
|
||||||
|
repo_conf = Application.get_env(:berrypod, Berrypod.Repo, [])
|
||||||
|
actual_pool = repo_conf[:pool_size] || 5
|
||||||
|
actual_bt = repo_conf[:busy_timeout] || 5000
|
||||||
|
mode = if Mix.env() == :prod, do: "prod", else: "dev"
|
||||||
|
|
||||||
|
try do
|
||||||
|
Mix.shell().info(
|
||||||
|
"\n--- SQLite bench (#{mode}, pool=#{actual_pool}, busy_timeout=#{actual_bt}ms, scale=#{scale}x) ---\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
for {label, opts} <- @scenarios do
|
||||||
|
readers = opts[:readers] * scale
|
||||||
|
writers = opts[:writers] * scale
|
||||||
|
run_scenario(label, base_url, slugs, readers: readers, writers: writers)
|
||||||
|
end
|
||||||
|
after
|
||||||
|
resume_oban!()
|
||||||
|
cleanup!(conn)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# -- Server lifecycle --
|
||||||
|
|
||||||
|
defp start_server!(port) do
|
||||||
|
config = Application.get_env(:berrypod, BerrypodWeb.Endpoint, [])
|
||||||
|
|
||||||
|
config =
|
||||||
|
Keyword.merge(config,
|
||||||
|
http: [ip: {127, 0, 0, 1}, port: port],
|
||||||
|
server: true,
|
||||||
|
watchers: []
|
||||||
|
)
|
||||||
|
|
||||||
|
Application.put_env(:berrypod, BerrypodWeb.Endpoint, config)
|
||||||
|
Application.put_env(:logger, :level, :warning)
|
||||||
|
|
||||||
|
Mix.Task.run("app.start")
|
||||||
|
|
||||||
|
Logger.configure(level: :warning)
|
||||||
|
|
||||||
|
base_url = "http://localhost:#{port}"
|
||||||
|
wait_for_server(base_url, 20)
|
||||||
|
Mix.shell().info("Server started on #{base_url}")
|
||||||
|
base_url
|
||||||
|
end
|
||||||
|
|
||||||
|
defp wait_for_server(_url, 0) do
|
||||||
|
Mix.raise("Server failed to start")
|
||||||
|
end
|
||||||
|
|
||||||
|
defp wait_for_server(url, attempts) do
|
||||||
|
Application.ensure_all_started(:inets)
|
||||||
|
Application.ensure_all_started(:ssl)
|
||||||
|
|
||||||
|
case :httpc.request(:get, {~c"#{url}", []}, [timeout: 2_000], []) do
|
||||||
|
{:ok, {{_, status, _}, _, _}} when status in 200..399 ->
|
||||||
|
:ok
|
||||||
|
|
||||||
|
_ ->
|
||||||
|
Process.sleep(500)
|
||||||
|
wait_for_server(url, attempts - 1)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp pause_oban! do
|
||||||
|
Oban.pause_all_queues(Oban)
|
||||||
|
Mix.shell().info("Oban queues paused for benchmark")
|
||||||
|
end
|
||||||
|
|
||||||
|
defp resume_oban! do
|
||||||
|
Oban.resume_all_queues(Oban)
|
||||||
|
Mix.shell().info("Oban queues resumed")
|
||||||
|
end
|
||||||
|
|
||||||
|
# -- Data seeding --
|
||||||
|
|
||||||
|
defp seed_data! do
|
||||||
|
alias Berrypod.{Products, Repo}
|
||||||
|
|
||||||
|
Mix.shell().info("Seeding bench data...")
|
||||||
|
|
||||||
|
{:ok, conn} =
|
||||||
|
Products.create_provider_connection(%{
|
||||||
|
provider_type: "printify",
|
||||||
|
name: "Bench Shop",
|
||||||
|
api_key: "bench_fake_key_12345",
|
||||||
|
config: %{"shop_id" => "99999"}
|
||||||
|
})
|
||||||
|
|
||||||
|
for i <- 1..20 do
|
||||||
|
{:ok, product, _} =
|
||||||
|
Products.upsert_product(conn, %{
|
||||||
|
provider_product_id: "bench-prov-#{i}",
|
||||||
|
title: "Bench Product #{i}",
|
||||||
|
description: "A benchmarking product, number #{i}."
|
||||||
|
})
|
||||||
|
|
||||||
|
for v <- 1..3 do
|
||||||
|
Products.create_product_variant(%{
|
||||||
|
product_id: product.id,
|
||||||
|
provider_variant_id: "bvar-#{i}-#{v}",
|
||||||
|
title: Enum.at(["S", "M", "L"], v - 1),
|
||||||
|
price: 1999 + v * 500,
|
||||||
|
is_enabled: true,
|
||||||
|
is_available: true
|
||||||
|
})
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
Berrypod.Search.rebuild_index()
|
||||||
|
|
||||||
|
%{rows: slug_rows} = Repo.query!("SELECT slug FROM products WHERE slug LIKE 'bench-%'")
|
||||||
|
slugs = List.flatten(slug_rows)
|
||||||
|
Mix.shell().info(" Seeded #{length(slugs)} products with variants")
|
||||||
|
{conn, slugs}
|
||||||
|
end
|
||||||
|
|
||||||
|
# -- Cleanup --
|
||||||
|
|
||||||
|
defp cleanup!(conn) do
|
||||||
|
alias Berrypod.{Products, Repo}
|
||||||
|
|
||||||
|
Mix.shell().info("\nCleaning up bench data...")
|
||||||
|
|
||||||
|
Repo.query!(
|
||||||
|
"DELETE FROM order_items WHERE order_id IN (SELECT id FROM orders WHERE customer_email LIKE 'bench_%')"
|
||||||
|
)
|
||||||
|
|
||||||
|
Repo.query!("DELETE FROM orders WHERE customer_email LIKE 'bench_%'")
|
||||||
|
|
||||||
|
Repo.query!(
|
||||||
|
"DELETE FROM product_variants WHERE product_id IN (SELECT id FROM products WHERE slug LIKE 'bench-%')"
|
||||||
|
)
|
||||||
|
|
||||||
|
Repo.query!(
|
||||||
|
"DELETE FROM product_images WHERE product_id IN (SELECT id FROM products WHERE slug LIKE 'bench-%')"
|
||||||
|
)
|
||||||
|
|
||||||
|
Repo.query!("DELETE FROM products WHERE slug LIKE 'bench-%'")
|
||||||
|
Products.delete_provider_connection(conn)
|
||||||
|
Berrypod.Search.rebuild_index()
|
||||||
|
|
||||||
|
Mix.shell().info(" Done.")
|
||||||
|
end
|
||||||
|
|
||||||
|
# -- Benchmark runner --
|
||||||
|
|
||||||
|
defp run_scenario(label, base_url, slugs, opts) do
|
||||||
|
readers = opts[:readers]
|
||||||
|
writers = opts[:writers]
|
||||||
|
total = readers + writers
|
||||||
|
|
||||||
|
read_urls = [base_url <> "/"] ++ Enum.map(slugs, &(base_url <> "/products/" <> &1))
|
||||||
|
|
||||||
|
# Interleave readers and writers for realistic ordering
|
||||||
|
task_types =
|
||||||
|
(List.duplicate(:read, readers) ++ List.duplicate(:write, writers))
|
||||||
|
|> Enum.shuffle()
|
||||||
|
|
||||||
|
{wall_time, results} =
|
||||||
|
:timer.tc(fn ->
|
||||||
|
task_types
|
||||||
|
|> Enum.with_index(1)
|
||||||
|
|> Enum.map(fn {type, i} ->
|
||||||
|
Task.async(fn -> run_task(type, i, read_urls) end)
|
||||||
|
end)
|
||||||
|
|> Task.await_many(120_000)
|
||||||
|
end)
|
||||||
|
|
||||||
|
print_results(label, total, readers, writers, wall_time, results)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp run_task(:read, _i, read_urls) do
|
||||||
|
url = Enum.random(read_urls)
|
||||||
|
|
||||||
|
{time, result} =
|
||||||
|
:timer.tc(fn ->
|
||||||
|
try do
|
||||||
|
%{status: status} = Req.get!(url)
|
||||||
|
{:ok, status}
|
||||||
|
rescue
|
||||||
|
e -> {:error, Exception.message(e)}
|
||||||
|
end
|
||||||
|
end)
|
||||||
|
|
||||||
|
{time, :read, result}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp run_task(:write, i, _read_urls) do
|
||||||
|
{time, result} =
|
||||||
|
:timer.tc(fn ->
|
||||||
|
try do
|
||||||
|
Berrypod.Orders.create_order(%{
|
||||||
|
customer_email: "bench_#{i}@example.com",
|
||||||
|
currency: "gbp",
|
||||||
|
items: [
|
||||||
|
%{
|
||||||
|
variant_id: "bench-var",
|
||||||
|
name: "Bench Item",
|
||||||
|
variant: "M",
|
||||||
|
price: 2499,
|
||||||
|
quantity: 1
|
||||||
|
}
|
||||||
|
]
|
||||||
|
})
|
||||||
|
rescue
|
||||||
|
e -> {:error, Exception.message(e)}
|
||||||
|
end
|
||||||
|
end)
|
||||||
|
|
||||||
|
{time, :write, result}
|
||||||
|
end
|
||||||
|
|
||||||
|
# -- Reporting --
|
||||||
|
|
||||||
|
defp print_results(label, total, readers, writers, wall_time, results) do
|
||||||
|
read_results = Enum.filter(results, fn {_, type, _} -> type == :read end)
|
||||||
|
write_results = Enum.filter(results, fn {_, type, _} -> type == :write end)
|
||||||
|
|
||||||
|
read_ok = Enum.count(read_results, fn {_, _, r} -> match?({:ok, 200}, r) end)
|
||||||
|
write_ok = Enum.count(write_results, fn {_, _, r} -> match?({:ok, _}, r) end)
|
||||||
|
write_err = length(write_results) - write_ok
|
||||||
|
|
||||||
|
read_lats = latencies(read_results)
|
||||||
|
write_lats = latencies(write_results)
|
||||||
|
|
||||||
|
Mix.shell().info("""
|
||||||
|
#{label} (#{total} concurrent: #{readers} reads + #{writers} writes)
|
||||||
|
wall time: #{div(wall_time, 1000)}ms
|
||||||
|
reads: #{read_ok}/#{length(read_results)} ok#{format_latencies(read_lats)}
|
||||||
|
writes: #{write_ok}/#{length(write_results)} ok, #{write_err} errors#{format_latencies(write_lats)}
|
||||||
|
""")
|
||||||
|
end
|
||||||
|
|
||||||
|
defp latencies([]), do: []
|
||||||
|
|
||||||
|
defp latencies(results) do
|
||||||
|
results
|
||||||
|
|> Enum.map(fn {us, _, _} -> div(us, 1000) end)
|
||||||
|
|> Enum.sort()
|
||||||
|
end
|
||||||
|
|
||||||
|
defp format_latencies([]), do: ""
|
||||||
|
|
||||||
|
defp format_latencies(sorted) do
|
||||||
|
n = length(sorted)
|
||||||
|
p50 = Enum.at(sorted, div(n, 2))
|
||||||
|
p95 = Enum.at(sorted, round(n * 0.95) - 1)
|
||||||
|
|
||||||
|
"\n p50: #{p50}ms p95: #{p95}ms max: #{List.last(sorted)}ms"
|
||||||
|
end
|
||||||
|
end
|
||||||
389
test/berrypod/sqlite_concurrency_test.exs
Normal file
389
test/berrypod/sqlite_concurrency_test.exs
Normal file
@ -0,0 +1,389 @@
|
|||||||
|
defmodule Berrypod.SQLiteConcurrencyTest do
|
||||||
|
@moduledoc """
|
||||||
|
Tests and benchmarks for SQLite concurrent read/write behaviour.
|
||||||
|
|
||||||
|
Correctness tests run as part of the normal suite. Benchmark tests are
|
||||||
|
tagged :benchmark and excluded by default — run them with:
|
||||||
|
|
||||||
|
mix test test/berrypod/sqlite_concurrency_test.exs --include benchmark
|
||||||
|
"""
|
||||||
|
use ExUnit.Case, async: false
|
||||||
|
|
||||||
|
@tmp_dir System.tmp_dir!()
|
||||||
|
|
||||||
|
# -------------------------------------------------------------------
|
||||||
|
# Helpers
|
||||||
|
# -------------------------------------------------------------------
|
||||||
|
|
||||||
|
defp start_bench_repo(opts) do
|
||||||
|
db_path = Path.join(@tmp_dir, "bench_#{System.unique_integer([:positive])}.db")
|
||||||
|
|
||||||
|
defaults = [
|
||||||
|
database: db_path,
|
||||||
|
pool_size: 5,
|
||||||
|
journal_mode: :wal,
|
||||||
|
busy_timeout: 5000
|
||||||
|
]
|
||||||
|
|
||||||
|
config = Keyword.merge(defaults, opts)
|
||||||
|
{:ok, pid} = Berrypod.BenchRepo.start_link(config)
|
||||||
|
|
||||||
|
Berrypod.BenchRepo.query!("""
|
||||||
|
CREATE TABLE bench (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
value TEXT NOT NULL,
|
||||||
|
payload TEXT DEFAULT ''
|
||||||
|
)
|
||||||
|
""")
|
||||||
|
|
||||||
|
{pid, db_path}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp stop_bench_repo(pid, db_path) do
|
||||||
|
GenServer.stop(pid)
|
||||||
|
File.rm(db_path)
|
||||||
|
File.rm(db_path <> "-wal")
|
||||||
|
File.rm(db_path <> "-shm")
|
||||||
|
end
|
||||||
|
|
||||||
|
defp seed_rows(n) do
|
||||||
|
for i <- 1..n do
|
||||||
|
Berrypod.BenchRepo.query!(
|
||||||
|
"INSERT INTO bench (value, payload) VALUES (?, ?)",
|
||||||
|
["row_#{i}", String.duplicate("x", 200)]
|
||||||
|
)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp run_concurrent(n, fun) do
|
||||||
|
tasks =
|
||||||
|
for i <- 1..n do
|
||||||
|
Task.async(fn ->
|
||||||
|
{time_us, result} = :timer.tc(fn -> fun.(i) end)
|
||||||
|
{time_us, result}
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
Task.await_many(tasks, 30_000)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp count_rows do
|
||||||
|
%{rows: [[count]]} = Berrypod.BenchRepo.query!("SELECT COUNT(*) FROM bench")
|
||||||
|
count
|
||||||
|
end
|
||||||
|
|
||||||
|
# -------------------------------------------------------------------
|
||||||
|
# Correctness tests (always run)
|
||||||
|
# -------------------------------------------------------------------
|
||||||
|
|
||||||
|
describe "pragma configuration" do
|
||||||
|
test "PRAGMAs are applied correctly on connection" do
|
||||||
|
{pid, db_path} =
|
||||||
|
start_bench_repo(
|
||||||
|
journal_mode: :wal,
|
||||||
|
cache_size: -20_000,
|
||||||
|
journal_size_limit: 67_108_864,
|
||||||
|
custom_pragmas: [mmap_size: 134_217_728]
|
||||||
|
)
|
||||||
|
|
||||||
|
assert_pragma("journal_mode", "wal")
|
||||||
|
assert_pragma("synchronous", 1)
|
||||||
|
assert_pragma("foreign_keys", 1)
|
||||||
|
assert_pragma("temp_store", 2)
|
||||||
|
assert_pragma("cache_size", -20_000)
|
||||||
|
assert_pragma("mmap_size", 134_217_728)
|
||||||
|
assert_pragma("journal_size_limit", 67_108_864)
|
||||||
|
|
||||||
|
# busy_timeout isn't queryable via PRAGMA in all SQLite builds,
|
||||||
|
# but we can verify it was accepted by the absence of errors above
|
||||||
|
|
||||||
|
stop_bench_repo(pid, db_path)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "concurrent reads" do
|
||||||
|
test "multiple readers don't block each other under WAL" do
|
||||||
|
{pid, db_path} = start_bench_repo(pool_size: 5)
|
||||||
|
seed_rows(500)
|
||||||
|
|
||||||
|
results =
|
||||||
|
run_concurrent(20, fn _i ->
|
||||||
|
%{rows: rows} =
|
||||||
|
Berrypod.BenchRepo.query!("SELECT * FROM bench WHERE value LIKE '%_5%'")
|
||||||
|
|
||||||
|
{:ok, length(rows)}
|
||||||
|
end)
|
||||||
|
|
||||||
|
# All 20 should succeed
|
||||||
|
outcomes = Enum.map(results, fn {_us, result} -> result end)
|
||||||
|
assert Enum.all?(outcomes, &match?({:ok, _}, &1))
|
||||||
|
|
||||||
|
# All should find the same number of matching rows
|
||||||
|
counts = Enum.map(outcomes, fn {:ok, n} -> n end)
|
||||||
|
assert length(Enum.uniq(counts)) == 1
|
||||||
|
|
||||||
|
stop_bench_repo(pid, db_path)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "concurrent writes" do
|
||||||
|
test "bare inserts all succeed with busy_timeout" do
|
||||||
|
{pid, db_path} = start_bench_repo(pool_size: 5, busy_timeout: 5000)
|
||||||
|
|
||||||
|
results =
|
||||||
|
run_concurrent(20, fn i ->
|
||||||
|
Berrypod.BenchRepo.query!(
|
||||||
|
"INSERT INTO bench (value) VALUES (?)",
|
||||||
|
["writer_#{i}"]
|
||||||
|
)
|
||||||
|
|
||||||
|
:ok
|
||||||
|
end)
|
||||||
|
|
||||||
|
outcomes = Enum.map(results, fn {_us, result} -> result end)
|
||||||
|
assert Enum.all?(outcomes, &(&1 == :ok))
|
||||||
|
assert count_rows() == 20
|
||||||
|
|
||||||
|
stop_bench_repo(pid, db_path)
|
||||||
|
end
|
||||||
|
|
||||||
|
test "transactions succeed with IMMEDIATE mode" do
|
||||||
|
{pid, db_path} =
|
||||||
|
start_bench_repo(
|
||||||
|
pool_size: 5,
|
||||||
|
busy_timeout: 5000,
|
||||||
|
default_transaction_mode: :immediate
|
||||||
|
)
|
||||||
|
|
||||||
|
results =
|
||||||
|
run_concurrent(20, fn i ->
|
||||||
|
Berrypod.BenchRepo.transaction(fn ->
|
||||||
|
Berrypod.BenchRepo.query!(
|
||||||
|
"INSERT INTO bench (value) VALUES (?)",
|
||||||
|
["tx_writer_#{i}"]
|
||||||
|
)
|
||||||
|
end)
|
||||||
|
end)
|
||||||
|
|
||||||
|
outcomes = Enum.map(results, fn {_us, result} -> result end)
|
||||||
|
assert Enum.all?(outcomes, &match?({:ok, _}, &1))
|
||||||
|
assert count_rows() == 20
|
||||||
|
|
||||||
|
stop_bench_repo(pid, db_path)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "transaction upgrade behaviour" do
|
||||||
|
test "DEFERRED mode may fail when upgrading read to write" do
|
||||||
|
# This test demonstrates the transaction upgrade problem.
|
||||||
|
# Under DEFERRED, a transaction that starts with a read and then
|
||||||
|
# tries to write can get SQLITE_BUSY if another writer holds the lock.
|
||||||
|
# It's timing-dependent so we run many attempts to increase the chance.
|
||||||
|
{pid, db_path} = start_bench_repo(pool_size: 5, busy_timeout: 50)
|
||||||
|
seed_rows(100)
|
||||||
|
|
||||||
|
results =
|
||||||
|
run_concurrent(30, fn i ->
|
||||||
|
try do
|
||||||
|
Berrypod.BenchRepo.transaction(fn ->
|
||||||
|
# Start with a read (deferred transaction stays as reader)
|
||||||
|
Berrypod.BenchRepo.query!("SELECT * FROM bench WHERE id = ?", [rem(i, 100) + 1])
|
||||||
|
# Small delay to increase contention window
|
||||||
|
Process.sleep(1)
|
||||||
|
# Now try to write (requires upgrade from reader to writer)
|
||||||
|
Berrypod.BenchRepo.query!(
|
||||||
|
"INSERT INTO bench (value) VALUES (?)",
|
||||||
|
["upgrade_#{i}"]
|
||||||
|
)
|
||||||
|
end)
|
||||||
|
rescue
|
||||||
|
e -> {:error, e}
|
||||||
|
end
|
||||||
|
end)
|
||||||
|
|
||||||
|
errors =
|
||||||
|
Enum.count(results, fn
|
||||||
|
{_us, {:error, _}} -> true
|
||||||
|
_ -> false
|
||||||
|
end)
|
||||||
|
|
||||||
|
# We can't guarantee failures (timing-dependent), but log what happened
|
||||||
|
successes = 30 - errors
|
||||||
|
IO.puts("\n DEFERRED upgrade: #{successes}/30 succeeded, #{errors}/30 failed")
|
||||||
|
|
||||||
|
stop_bench_repo(pid, db_path)
|
||||||
|
end
|
||||||
|
|
||||||
|
test "IMMEDIATE mode prevents transaction upgrade failures" do
|
||||||
|
{pid, db_path} =
|
||||||
|
start_bench_repo(
|
||||||
|
pool_size: 5,
|
||||||
|
busy_timeout: 5000,
|
||||||
|
default_transaction_mode: :immediate
|
||||||
|
)
|
||||||
|
|
||||||
|
seed_rows(100)
|
||||||
|
|
||||||
|
results =
|
||||||
|
run_concurrent(30, fn i ->
|
||||||
|
Berrypod.BenchRepo.transaction(fn ->
|
||||||
|
# Read then write — no upgrade problem because lock taken upfront
|
||||||
|
Berrypod.BenchRepo.query!("SELECT * FROM bench WHERE id = ?", [rem(i, 100) + 1])
|
||||||
|
Process.sleep(1)
|
||||||
|
|
||||||
|
Berrypod.BenchRepo.query!(
|
||||||
|
"INSERT INTO bench (value) VALUES (?)",
|
||||||
|
["immediate_#{i}"]
|
||||||
|
)
|
||||||
|
end)
|
||||||
|
end)
|
||||||
|
|
||||||
|
outcomes = Enum.map(results, fn {_us, result} -> result end)
|
||||||
|
successes = Enum.count(outcomes, &match?({:ok, _}, &1))
|
||||||
|
|
||||||
|
errors =
|
||||||
|
Enum.count(outcomes, fn
|
||||||
|
{:error, _} -> true
|
||||||
|
_ -> false
|
||||||
|
end)
|
||||||
|
|
||||||
|
IO.puts("\n IMMEDIATE mode: #{successes}/30 succeeded, #{errors}/30 failed")
|
||||||
|
assert errors == 0, "IMMEDIATE mode should prevent all upgrade failures"
|
||||||
|
|
||||||
|
stop_bench_repo(pid, db_path)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# -------------------------------------------------------------------
|
||||||
|
# Benchmarks (tagged, skipped by default)
|
||||||
|
# -------------------------------------------------------------------
|
||||||
|
|
||||||
|
describe "benchmark: read throughput vs pool size" do
|
||||||
|
@describetag :benchmark
|
||||||
|
|
||||||
|
test "concurrent reads scale with pool size" do
|
||||||
|
# Pool size 1 — all reads serialized
|
||||||
|
{pid1, path1} = start_bench_repo(pool_size: 1)
|
||||||
|
seed_rows(5000)
|
||||||
|
|
||||||
|
{time_pool1, _} =
|
||||||
|
:timer.tc(fn ->
|
||||||
|
run_concurrent(50, fn _i ->
|
||||||
|
Berrypod.BenchRepo.query!("SELECT * FROM bench WHERE value LIKE '%_42%'")
|
||||||
|
end)
|
||||||
|
end)
|
||||||
|
|
||||||
|
stop_bench_repo(pid1, path1)
|
||||||
|
|
||||||
|
# Pool size 5 — reads can run concurrently under WAL
|
||||||
|
{pid5, path5} = start_bench_repo(pool_size: 5)
|
||||||
|
seed_rows(5000)
|
||||||
|
|
||||||
|
{time_pool5, _} =
|
||||||
|
:timer.tc(fn ->
|
||||||
|
run_concurrent(50, fn _i ->
|
||||||
|
Berrypod.BenchRepo.query!("SELECT * FROM bench WHERE value LIKE '%_42%'")
|
||||||
|
end)
|
||||||
|
end)
|
||||||
|
|
||||||
|
stop_bench_repo(pid5, path5)
|
||||||
|
|
||||||
|
IO.puts("""
|
||||||
|
\n Read throughput (50 concurrent reads, 5000 rows):
|
||||||
|
pool_size=1: #{div(time_pool1, 1000)}ms
|
||||||
|
pool_size=5: #{div(time_pool5, 1000)}ms
|
||||||
|
speedup: #{Float.round(time_pool1 / max(time_pool5, 1), 2)}x
|
||||||
|
""")
|
||||||
|
|
||||||
|
assert time_pool5 < time_pool1,
|
||||||
|
"Pool size 5 should be faster than 1 for concurrent reads"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "benchmark: write throughput vs pool size" do
|
||||||
|
@describetag :benchmark
|
||||||
|
|
||||||
|
test "concurrent writes with different pool sizes" do
|
||||||
|
for pool_size <- [1, 3, 5] do
|
||||||
|
{pid, db_path} =
|
||||||
|
start_bench_repo(
|
||||||
|
pool_size: pool_size,
|
||||||
|
busy_timeout: 5000,
|
||||||
|
default_transaction_mode: :immediate
|
||||||
|
)
|
||||||
|
|
||||||
|
{time_us, results} =
|
||||||
|
:timer.tc(fn ->
|
||||||
|
run_concurrent(100, fn i ->
|
||||||
|
Berrypod.BenchRepo.query!(
|
||||||
|
"INSERT INTO bench (value) VALUES (?)",
|
||||||
|
["pool#{pool_size}_#{i}"]
|
||||||
|
)
|
||||||
|
|
||||||
|
:ok
|
||||||
|
end)
|
||||||
|
end)
|
||||||
|
|
||||||
|
successes = Enum.count(results, fn {_us, r} -> r == :ok end)
|
||||||
|
rows = count_rows()
|
||||||
|
|
||||||
|
IO.puts(
|
||||||
|
" pool_size=#{pool_size}: #{div(time_us, 1000)}ms, " <>
|
||||||
|
"#{successes}/100 succeeded, #{rows} rows"
|
||||||
|
)
|
||||||
|
|
||||||
|
stop_bench_repo(pid, db_path)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "benchmark: mmap_size impact" do
|
||||||
|
@describetag :benchmark
|
||||||
|
|
||||||
|
test "read performance with and without mmap" do
|
||||||
|
# Without mmap
|
||||||
|
{pid0, path0} = start_bench_repo(pool_size: 5, custom_pragmas: [mmap_size: 0])
|
||||||
|
seed_rows(10_000)
|
||||||
|
|
||||||
|
{time_no_mmap, _} =
|
||||||
|
:timer.tc(fn ->
|
||||||
|
run_concurrent(30, fn _i ->
|
||||||
|
Berrypod.BenchRepo.query!("SELECT COUNT(*), AVG(length(payload)) FROM bench")
|
||||||
|
end)
|
||||||
|
end)
|
||||||
|
|
||||||
|
stop_bench_repo(pid0, path0)
|
||||||
|
|
||||||
|
# With 128MB mmap
|
||||||
|
{pid128, path128} =
|
||||||
|
start_bench_repo(pool_size: 5, custom_pragmas: [mmap_size: 134_217_728])
|
||||||
|
|
||||||
|
seed_rows(10_000)
|
||||||
|
|
||||||
|
{time_mmap, _} =
|
||||||
|
:timer.tc(fn ->
|
||||||
|
run_concurrent(30, fn _i ->
|
||||||
|
Berrypod.BenchRepo.query!("SELECT COUNT(*), AVG(length(payload)) FROM bench")
|
||||||
|
end)
|
||||||
|
end)
|
||||||
|
|
||||||
|
stop_bench_repo(pid128, path128)
|
||||||
|
|
||||||
|
IO.puts("""
|
||||||
|
\n mmap impact (30 concurrent reads, 10000 rows):
|
||||||
|
mmap_size=0: #{div(time_no_mmap, 1000)}ms
|
||||||
|
mmap_size=128M: #{div(time_mmap, 1000)}ms
|
||||||
|
speedup: #{Float.round(time_no_mmap / max(time_mmap, 1), 2)}x
|
||||||
|
""")
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# -------------------------------------------------------------------
|
||||||
|
# Private
|
||||||
|
# -------------------------------------------------------------------
|
||||||
|
|
||||||
|
defp assert_pragma(name, expected) do
|
||||||
|
%{rows: [[value]]} = Berrypod.BenchRepo.query!("PRAGMA #{name}")
|
||||||
|
assert value == expected, "PRAGMA #{name}: expected #{expected}, got #{value}"
|
||||||
|
end
|
||||||
|
end
|
||||||
12
test/support/bench_repo.ex
Normal file
12
test/support/bench_repo.ex
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
defmodule Berrypod.BenchRepo do
|
||||||
|
@moduledoc """
|
||||||
|
Standalone Ecto Repo for SQLite concurrency tests.
|
||||||
|
|
||||||
|
Started dynamically against temp DB files — no Sandbox, no Application
|
||||||
|
config dependency. Each test controls pool size, PRAGMAs, and transaction
|
||||||
|
mode independently.
|
||||||
|
"""
|
||||||
|
use Ecto.Repo,
|
||||||
|
otp_app: :berrypod,
|
||||||
|
adapter: Ecto.Adapters.SQLite3
|
||||||
|
end
|
||||||
@ -1,2 +1,2 @@
|
|||||||
ExUnit.start()
|
ExUnit.start(exclude: [:benchmark])
|
||||||
Ecto.Adapters.SQL.Sandbox.mode(Berrypod.Repo, :manual)
|
Ecto.Adapters.SQL.Sandbox.mode(Berrypod.Repo, :manual)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user