BenchRepo for isolated concurrency testing against temp DB files. Correctness tests prove WAL concurrent reads, IMMEDIATE transaction mode vs DEFERRED upgrade failures, and PRAGMA application. Benchmark tests (tagged :benchmark, excluded by default) measure throughput. mix bench.sqlite runs HTTP load scenarios against the full Phoenix stack with --prod, --scale, --pool-size, and --busy-timeout options. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
390 lines
11 KiB
Elixir
390 lines
11 KiB
Elixir
defmodule Berrypod.SQLiteConcurrencyTest do
|
|
@moduledoc """
|
|
Tests and benchmarks for SQLite concurrent read/write behaviour.
|
|
|
|
Correctness tests run as part of the normal suite. Benchmark tests are
|
|
tagged :benchmark and excluded by default — run them with:
|
|
|
|
mix test test/berrypod/sqlite_concurrency_test.exs --include benchmark
|
|
"""
|
|
use ExUnit.Case, async: false
|
|
|
|
@tmp_dir System.tmp_dir!()
|
|
|
|
# -------------------------------------------------------------------
|
|
# Helpers
|
|
# -------------------------------------------------------------------
|
|
|
|
defp start_bench_repo(opts) do
|
|
db_path = Path.join(@tmp_dir, "bench_#{System.unique_integer([:positive])}.db")
|
|
|
|
defaults = [
|
|
database: db_path,
|
|
pool_size: 5,
|
|
journal_mode: :wal,
|
|
busy_timeout: 5000
|
|
]
|
|
|
|
config = Keyword.merge(defaults, opts)
|
|
{:ok, pid} = Berrypod.BenchRepo.start_link(config)
|
|
|
|
Berrypod.BenchRepo.query!("""
|
|
CREATE TABLE bench (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
value TEXT NOT NULL,
|
|
payload TEXT DEFAULT ''
|
|
)
|
|
""")
|
|
|
|
{pid, db_path}
|
|
end
|
|
|
|
defp stop_bench_repo(pid, db_path) do
|
|
GenServer.stop(pid)
|
|
File.rm(db_path)
|
|
File.rm(db_path <> "-wal")
|
|
File.rm(db_path <> "-shm")
|
|
end
|
|
|
|
defp seed_rows(n) do
|
|
for i <- 1..n do
|
|
Berrypod.BenchRepo.query!(
|
|
"INSERT INTO bench (value, payload) VALUES (?, ?)",
|
|
["row_#{i}", String.duplicate("x", 200)]
|
|
)
|
|
end
|
|
end
|
|
|
|
defp run_concurrent(n, fun) do
|
|
tasks =
|
|
for i <- 1..n do
|
|
Task.async(fn ->
|
|
{time_us, result} = :timer.tc(fn -> fun.(i) end)
|
|
{time_us, result}
|
|
end)
|
|
end
|
|
|
|
Task.await_many(tasks, 30_000)
|
|
end
|
|
|
|
defp count_rows do
|
|
%{rows: [[count]]} = Berrypod.BenchRepo.query!("SELECT COUNT(*) FROM bench")
|
|
count
|
|
end
|
|
|
|
# -------------------------------------------------------------------
|
|
# Correctness tests (always run)
|
|
# -------------------------------------------------------------------
|
|
|
|
describe "pragma configuration" do
|
|
test "PRAGMAs are applied correctly on connection" do
|
|
{pid, db_path} =
|
|
start_bench_repo(
|
|
journal_mode: :wal,
|
|
cache_size: -20_000,
|
|
journal_size_limit: 67_108_864,
|
|
custom_pragmas: [mmap_size: 134_217_728]
|
|
)
|
|
|
|
assert_pragma("journal_mode", "wal")
|
|
assert_pragma("synchronous", 1)
|
|
assert_pragma("foreign_keys", 1)
|
|
assert_pragma("temp_store", 2)
|
|
assert_pragma("cache_size", -20_000)
|
|
assert_pragma("mmap_size", 134_217_728)
|
|
assert_pragma("journal_size_limit", 67_108_864)
|
|
|
|
# busy_timeout isn't queryable via PRAGMA in all SQLite builds,
|
|
# but we can verify it was accepted by the absence of errors above
|
|
|
|
stop_bench_repo(pid, db_path)
|
|
end
|
|
end
|
|
|
|
describe "concurrent reads" do
|
|
test "multiple readers don't block each other under WAL" do
|
|
{pid, db_path} = start_bench_repo(pool_size: 5)
|
|
seed_rows(500)
|
|
|
|
results =
|
|
run_concurrent(20, fn _i ->
|
|
%{rows: rows} =
|
|
Berrypod.BenchRepo.query!("SELECT * FROM bench WHERE value LIKE '%_5%'")
|
|
|
|
{:ok, length(rows)}
|
|
end)
|
|
|
|
# All 20 should succeed
|
|
outcomes = Enum.map(results, fn {_us, result} -> result end)
|
|
assert Enum.all?(outcomes, &match?({:ok, _}, &1))
|
|
|
|
# All should find the same number of matching rows
|
|
counts = Enum.map(outcomes, fn {:ok, n} -> n end)
|
|
assert length(Enum.uniq(counts)) == 1
|
|
|
|
stop_bench_repo(pid, db_path)
|
|
end
|
|
end
|
|
|
|
describe "concurrent writes" do
|
|
test "bare inserts all succeed with busy_timeout" do
|
|
{pid, db_path} = start_bench_repo(pool_size: 5, busy_timeout: 5000)
|
|
|
|
results =
|
|
run_concurrent(20, fn i ->
|
|
Berrypod.BenchRepo.query!(
|
|
"INSERT INTO bench (value) VALUES (?)",
|
|
["writer_#{i}"]
|
|
)
|
|
|
|
:ok
|
|
end)
|
|
|
|
outcomes = Enum.map(results, fn {_us, result} -> result end)
|
|
assert Enum.all?(outcomes, &(&1 == :ok))
|
|
assert count_rows() == 20
|
|
|
|
stop_bench_repo(pid, db_path)
|
|
end
|
|
|
|
test "transactions succeed with IMMEDIATE mode" do
|
|
{pid, db_path} =
|
|
start_bench_repo(
|
|
pool_size: 5,
|
|
busy_timeout: 5000,
|
|
default_transaction_mode: :immediate
|
|
)
|
|
|
|
results =
|
|
run_concurrent(20, fn i ->
|
|
Berrypod.BenchRepo.transaction(fn ->
|
|
Berrypod.BenchRepo.query!(
|
|
"INSERT INTO bench (value) VALUES (?)",
|
|
["tx_writer_#{i}"]
|
|
)
|
|
end)
|
|
end)
|
|
|
|
outcomes = Enum.map(results, fn {_us, result} -> result end)
|
|
assert Enum.all?(outcomes, &match?({:ok, _}, &1))
|
|
assert count_rows() == 20
|
|
|
|
stop_bench_repo(pid, db_path)
|
|
end
|
|
end
|
|
|
|
describe "transaction upgrade behaviour" do
|
|
test "DEFERRED mode may fail when upgrading read to write" do
|
|
# This test demonstrates the transaction upgrade problem.
|
|
# Under DEFERRED, a transaction that starts with a read and then
|
|
# tries to write can get SQLITE_BUSY if another writer holds the lock.
|
|
# It's timing-dependent so we run many attempts to increase the chance.
|
|
{pid, db_path} = start_bench_repo(pool_size: 5, busy_timeout: 50)
|
|
seed_rows(100)
|
|
|
|
results =
|
|
run_concurrent(30, fn i ->
|
|
try do
|
|
Berrypod.BenchRepo.transaction(fn ->
|
|
# Start with a read (deferred transaction stays as reader)
|
|
Berrypod.BenchRepo.query!("SELECT * FROM bench WHERE id = ?", [rem(i, 100) + 1])
|
|
# Small delay to increase contention window
|
|
Process.sleep(1)
|
|
# Now try to write (requires upgrade from reader to writer)
|
|
Berrypod.BenchRepo.query!(
|
|
"INSERT INTO bench (value) VALUES (?)",
|
|
["upgrade_#{i}"]
|
|
)
|
|
end)
|
|
rescue
|
|
e -> {:error, e}
|
|
end
|
|
end)
|
|
|
|
errors =
|
|
Enum.count(results, fn
|
|
{_us, {:error, _}} -> true
|
|
_ -> false
|
|
end)
|
|
|
|
# We can't guarantee failures (timing-dependent), but log what happened
|
|
successes = 30 - errors
|
|
IO.puts("\n DEFERRED upgrade: #{successes}/30 succeeded, #{errors}/30 failed")
|
|
|
|
stop_bench_repo(pid, db_path)
|
|
end
|
|
|
|
test "IMMEDIATE mode prevents transaction upgrade failures" do
|
|
{pid, db_path} =
|
|
start_bench_repo(
|
|
pool_size: 5,
|
|
busy_timeout: 5000,
|
|
default_transaction_mode: :immediate
|
|
)
|
|
|
|
seed_rows(100)
|
|
|
|
results =
|
|
run_concurrent(30, fn i ->
|
|
Berrypod.BenchRepo.transaction(fn ->
|
|
# Read then write — no upgrade problem because lock taken upfront
|
|
Berrypod.BenchRepo.query!("SELECT * FROM bench WHERE id = ?", [rem(i, 100) + 1])
|
|
Process.sleep(1)
|
|
|
|
Berrypod.BenchRepo.query!(
|
|
"INSERT INTO bench (value) VALUES (?)",
|
|
["immediate_#{i}"]
|
|
)
|
|
end)
|
|
end)
|
|
|
|
outcomes = Enum.map(results, fn {_us, result} -> result end)
|
|
successes = Enum.count(outcomes, &match?({:ok, _}, &1))
|
|
|
|
errors =
|
|
Enum.count(outcomes, fn
|
|
{:error, _} -> true
|
|
_ -> false
|
|
end)
|
|
|
|
IO.puts("\n IMMEDIATE mode: #{successes}/30 succeeded, #{errors}/30 failed")
|
|
assert errors == 0, "IMMEDIATE mode should prevent all upgrade failures"
|
|
|
|
stop_bench_repo(pid, db_path)
|
|
end
|
|
end
|
|
|
|
# -------------------------------------------------------------------
|
|
# Benchmarks (tagged, skipped by default)
|
|
# -------------------------------------------------------------------
|
|
|
|
describe "benchmark: read throughput vs pool size" do
|
|
@describetag :benchmark
|
|
|
|
test "concurrent reads scale with pool size" do
|
|
# Pool size 1 — all reads serialized
|
|
{pid1, path1} = start_bench_repo(pool_size: 1)
|
|
seed_rows(5000)
|
|
|
|
{time_pool1, _} =
|
|
:timer.tc(fn ->
|
|
run_concurrent(50, fn _i ->
|
|
Berrypod.BenchRepo.query!("SELECT * FROM bench WHERE value LIKE '%_42%'")
|
|
end)
|
|
end)
|
|
|
|
stop_bench_repo(pid1, path1)
|
|
|
|
# Pool size 5 — reads can run concurrently under WAL
|
|
{pid5, path5} = start_bench_repo(pool_size: 5)
|
|
seed_rows(5000)
|
|
|
|
{time_pool5, _} =
|
|
:timer.tc(fn ->
|
|
run_concurrent(50, fn _i ->
|
|
Berrypod.BenchRepo.query!("SELECT * FROM bench WHERE value LIKE '%_42%'")
|
|
end)
|
|
end)
|
|
|
|
stop_bench_repo(pid5, path5)
|
|
|
|
IO.puts("""
|
|
\n Read throughput (50 concurrent reads, 5000 rows):
|
|
pool_size=1: #{div(time_pool1, 1000)}ms
|
|
pool_size=5: #{div(time_pool5, 1000)}ms
|
|
speedup: #{Float.round(time_pool1 / max(time_pool5, 1), 2)}x
|
|
""")
|
|
|
|
assert time_pool5 < time_pool1,
|
|
"Pool size 5 should be faster than 1 for concurrent reads"
|
|
end
|
|
end
|
|
|
|
describe "benchmark: write throughput vs pool size" do
|
|
@describetag :benchmark
|
|
|
|
test "concurrent writes with different pool sizes" do
|
|
for pool_size <- [1, 3, 5] do
|
|
{pid, db_path} =
|
|
start_bench_repo(
|
|
pool_size: pool_size,
|
|
busy_timeout: 5000,
|
|
default_transaction_mode: :immediate
|
|
)
|
|
|
|
{time_us, results} =
|
|
:timer.tc(fn ->
|
|
run_concurrent(100, fn i ->
|
|
Berrypod.BenchRepo.query!(
|
|
"INSERT INTO bench (value) VALUES (?)",
|
|
["pool#{pool_size}_#{i}"]
|
|
)
|
|
|
|
:ok
|
|
end)
|
|
end)
|
|
|
|
successes = Enum.count(results, fn {_us, r} -> r == :ok end)
|
|
rows = count_rows()
|
|
|
|
IO.puts(
|
|
" pool_size=#{pool_size}: #{div(time_us, 1000)}ms, " <>
|
|
"#{successes}/100 succeeded, #{rows} rows"
|
|
)
|
|
|
|
stop_bench_repo(pid, db_path)
|
|
end
|
|
end
|
|
end
|
|
|
|
describe "benchmark: mmap_size impact" do
|
|
@describetag :benchmark
|
|
|
|
test "read performance with and without mmap" do
|
|
# Without mmap
|
|
{pid0, path0} = start_bench_repo(pool_size: 5, custom_pragmas: [mmap_size: 0])
|
|
seed_rows(10_000)
|
|
|
|
{time_no_mmap, _} =
|
|
:timer.tc(fn ->
|
|
run_concurrent(30, fn _i ->
|
|
Berrypod.BenchRepo.query!("SELECT COUNT(*), AVG(length(payload)) FROM bench")
|
|
end)
|
|
end)
|
|
|
|
stop_bench_repo(pid0, path0)
|
|
|
|
# With 128MB mmap
|
|
{pid128, path128} =
|
|
start_bench_repo(pool_size: 5, custom_pragmas: [mmap_size: 134_217_728])
|
|
|
|
seed_rows(10_000)
|
|
|
|
{time_mmap, _} =
|
|
:timer.tc(fn ->
|
|
run_concurrent(30, fn _i ->
|
|
Berrypod.BenchRepo.query!("SELECT COUNT(*), AVG(length(payload)) FROM bench")
|
|
end)
|
|
end)
|
|
|
|
stop_bench_repo(pid128, path128)
|
|
|
|
IO.puts("""
|
|
\n mmap impact (30 concurrent reads, 10000 rows):
|
|
mmap_size=0: #{div(time_no_mmap, 1000)}ms
|
|
mmap_size=128M: #{div(time_mmap, 1000)}ms
|
|
speedup: #{Float.round(time_no_mmap / max(time_mmap, 1), 2)}x
|
|
""")
|
|
end
|
|
end
|
|
|
|
# -------------------------------------------------------------------
|
|
# Private
|
|
# -------------------------------------------------------------------
|
|
|
|
defp assert_pragma(name, expected) do
|
|
%{rows: [[value]]} = Berrypod.BenchRepo.query!("PRAGMA #{name}")
|
|
assert value == expected, "PRAGMA #{name}: expected #{expected}, got #{value}"
|
|
end
|
|
end
|