Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for pool_count #636

Merged
merged 2 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,16 @@ jobs:
- "11.11-alpine"
- "9.6-alpine"
- "9.5-alpine"
include:
- elixirbase: "1.11.4-erlang-23.3.4.9-alpine-3.16.9"
postgres: "16.2-alpine"
pool_count: "4"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was not picked up? 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@greg-rychlewski do you know if this was meant to be picked up? Or only after we merge?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@josevalim sorry I missed this. do you still want me to look at it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do! I tried investigating this but I could not come up with anything. It seems the whole include is ignored. :(

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I think I know what it is. I think I need a default pool_count 1. Let me give it a try.

steps:
- uses: earthly/actions-setup@v1
- uses: actions/checkout@v3
- name: test ecto_sql
env:
POOL_COUNT: ${{ matrix.pool_count || '1' }}
run: earthly -P --ci --build-arg ELIXIR_BASE=${{matrix.elixirbase}} --build-arg POSTGRES=${{matrix.postgres}} +integration-test-postgres

test-mysql:
Expand Down
15 changes: 9 additions & 6 deletions integration_test/myxql/test_helper.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ Application.put_env(:ecto, :primary_key_type, :id)
Application.put_env(:ecto, :async_integration_tests, false)
Application.put_env(:ecto_sql, :lock_for_update, "FOR UPDATE")

Code.require_file "../support/repo.exs", __DIR__
Code.require_file("../support/repo.exs", __DIR__)

# Configure MySQL connection
Application.put_env(:ecto_sql, :mysql_test_url,
Application.put_env(
:ecto_sql,
:mysql_test_url,
"ecto://" <> (System.get_env("MYSQL_URL") || "root@127.0.0.1")
)

Expand Down Expand Up @@ -53,7 +55,8 @@ alias Ecto.Integration.PoolRepo
Application.put_env(:ecto_sql, PoolRepo,
adapter: Ecto.Adapters.MyXQL,
url: Application.get_env(:ecto_sql, :mysql_test_url) <> "/ecto_test",
pool_size: 10,
pool_size: 5,
pool_count: String.to_integer(System.get_env("POOL_COUNT", "1")),
show_sensitive_data_on_connection_error: true
)

Expand All @@ -63,8 +66,8 @@ end

# Load support files
ecto = Mix.Project.deps_paths()[:ecto]
Code.require_file "#{ecto}/integration_test/support/schemas.exs", __DIR__
Code.require_file "../support/migration.exs", __DIR__
Code.require_file("#{ecto}/integration_test/support/schemas.exs", __DIR__)
Code.require_file("../support/migration.exs", __DIR__)

defmodule Ecto.Integration.Case do
use ExUnit.CaseTemplate
Expand All @@ -77,7 +80,7 @@ end
{:ok, _} = Ecto.Adapters.MyXQL.ensure_all_started(TestRepo.config(), :temporary)

# Load up the repository, start it, and run migrations
_ = Ecto.Adapters.MyXQL.storage_down(TestRepo.config())
_ = Ecto.Adapters.MyXQL.storage_down(TestRepo.config())
:ok = Ecto.Adapters.MyXQL.storage_up(TestRepo.config())

{:ok, _pid} = TestRepo.start_link()
Expand Down
33 changes: 22 additions & 11 deletions integration_test/pg/test_helper.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ Application.put_env(:ecto, :async_integration_tests, true)
Application.put_env(:ecto_sql, :lock_for_update, "FOR UPDATE")

# Configure PG connection
Application.put_env(:ecto_sql, :pg_test_url,
Application.put_env(
:ecto_sql,
:pg_test_url,
"ecto://" <> (System.get_env("PG_URL") || "postgres:postgres@127.0.0.1")
)

Code.require_file "../support/repo.exs", __DIR__
Code.require_file("../support/repo.exs", __DIR__)

# Define type module
opts = if Code.ensure_loaded?(Duration), do: [interval_decode_type: Duration], else: []
Expand Down Expand Up @@ -59,21 +61,28 @@ end

pool_repo_config = [
url: Application.get_env(:ecto_sql, :pg_test_url) <> "/ecto_test",
pool_size: 10,
pool_size: 5,
pool_count: String.to_integer(System.get_env("POOL_COUNT", "1")),
max_restarts: 20,
max_seconds: 10
]

Application.put_env(:ecto_sql, PoolRepo, pool_repo_config)
Application.put_env(:ecto_sql, AdvisoryLockPoolRepo, pool_repo_config ++ [
migration_source: "advisory_lock_schema_migrations",
migration_lock: :pg_advisory_lock
])

Application.put_env(
:ecto_sql,
AdvisoryLockPoolRepo,
pool_repo_config ++
[
migration_source: "advisory_lock_schema_migrations",
migration_lock: :pg_advisory_lock
]
)

# Load support files
ecto = Mix.Project.deps_paths()[:ecto]
Code.require_file "#{ecto}/integration_test/support/schemas.exs", __DIR__
Code.require_file "../support/migration.exs", __DIR__
Code.require_file("#{ecto}/integration_test/support/schemas.exs", __DIR__)
Code.require_file("../support/migration.exs", __DIR__)

defmodule Ecto.Integration.Case do
use ExUnit.CaseTemplate
Expand All @@ -86,7 +95,7 @@ end
{:ok, _} = Ecto.Adapters.Postgres.ensure_all_started(TestRepo.config(), :temporary)

# Load up the repository, start it, and run migrations
_ = Ecto.Adapters.Postgres.storage_down(TestRepo.config())
_ = Ecto.Adapters.Postgres.storage_down(TestRepo.config())
:ok = Ecto.Adapters.Postgres.storage_up(TestRepo.config())

{:ok, _pid} = TestRepo.start_link()
Expand All @@ -112,7 +121,9 @@ exclude_list = excludes ++ excludes_above_9_5

cond do
Version.match?(version, "< 9.6.0") ->
ExUnit.configure(exclude: exclude_list ++ excludes_below_9_6 ++ excludes_below_12_0 ++ excludes_below_15_0)
ExUnit.configure(
exclude: exclude_list ++ excludes_below_9_6 ++ excludes_below_12_0 ++ excludes_below_15_0
)

Version.match?(version, "< 12.0.0") ->
ExUnit.configure(exclude: exclude_list ++ excludes_below_12_0 ++ excludes_below_15_0)
Expand Down
115 changes: 71 additions & 44 deletions lib/ecto/adapters/sql.ex
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,22 @@ defmodule Ecto.Adapters.SQL do
disconnect_all(Ecto.Adapter.lookup_meta(repo), interval, opts)
end

def disconnect_all(%{pid: pid} = _adapter_meta, interval, opts) do
DBConnection.disconnect_all(pid, interval, opts)
def disconnect_all(adapter_meta, interval, opts) do
case adapter_meta do
%{partition_supervisor: {name, count}} ->
1..count
|> Enum.map(fn i ->
Task.async(fn ->
DBConnection.disconnect_all({:via, PartitionSupervisor, {name, i}}, interval, opts)
end)
end)
|> Task.await_many(:infinity)

:ok

%{pid: pool} ->
DBConnection.disconnect_all(pool, interval, opts)
end
end

@doc """
Expand Down Expand Up @@ -646,7 +660,7 @@ defmodule Ecto.Adapters.SQL do

defp sql_call(adapter_meta, callback, args, params, opts) do
%{pid: pool, telemetry: telemetry, sql: sql, opts: default_opts} = adapter_meta
conn = get_conn_or_pool(pool)
conn = get_conn_or_pool(pool, adapter_meta)
opts = with_log(telemetry, params, opts ++ default_opts)
args = args ++ [params, opts]
apply(sql, callback, [conn | args])
Expand All @@ -662,7 +676,7 @@ defmodule Ecto.Adapters.SQL do
end

@doc """
Check if the given `table` exists.
Checks if the given `table` exists.

Returns `true` if the `table` exists in the `repo`, otherwise `false`.
The table is checked against the current database/schema in the connection.
Expand Down Expand Up @@ -702,7 +716,7 @@ defmodule Ecto.Adapters.SQL do
def format_table(%{columns: columns, rows: rows}) do
column_widths =
[columns | rows]
|> List.zip()
|> Enum.zip()
|> Enum.map(&Tuple.to_list/1)
|> Enum.map(fn column_with_rows ->
column_with_rows |> Enum.map(&binary_length/1) |> Enum.max()
Expand Down Expand Up @@ -733,7 +747,7 @@ defmodule Ecto.Adapters.SQL do
defp cells(items, widths) do
cell =
[items, widths]
|> List.zip()
|> Enum.zip()
|> Enum.map(fn {item, width} -> [?|, " ", format_item(item, width), " "] end)

[cell | [?|]]
Expand Down Expand Up @@ -827,6 +841,8 @@ defmodule Ecto.Adapters.SQL do
@pool_opts [:timeout, :pool, :pool_size] ++
[:queue_target, :queue_interval, :ownership_timeout, :repo]

@valid_log_levels ~w(false debug info notice warning error critical alert emergency)a

@doc false
def init(connection, driver, config) do
unless Code.ensure_loaded?(connection) do
Expand All @@ -845,24 +861,12 @@ defmodule Ecto.Adapters.SQL do

log = Keyword.get(config, :log, :debug)

valid_log_levels = [
false,
:debug,
:info,
:notice,
:warning,
:error,
:critical,
:alert,
:emergency
]

if log not in valid_log_levels do
if log not in @valid_log_levels do
raise """
invalid value for :log option in Repo config

The accepted values for the :log option are:
#{Enum.map_join(valid_log_levels, ", ", &inspect/1)}
#{Enum.map_join(@valid_log_levels, ", ", &inspect/1)}

See https://hexdocs.pm/ecto/Ecto.Repo.html for more information.
"""
Expand All @@ -872,35 +876,49 @@ defmodule Ecto.Adapters.SQL do
telemetry_prefix = Keyword.fetch!(config, :telemetry_prefix)
telemetry = {config[:repo], log, telemetry_prefix ++ [:query]}

config = adapter_config(config)
opts = Keyword.take(config, @pool_opts)
meta = %{telemetry: telemetry, sql: connection, stacktrace: stacktrace, opts: opts}
{:ok, connection.child_spec(config), meta}
end
{name, config} = Keyword.pop(config, :name, config[:repo])
{pool_count, config} = Keyword.pop(config, :pool_count, 1)
{pool, config} = pool_config(config)
child_spec = connection.child_spec(config)

defp adapter_config(config) do
if Keyword.has_key?(config, :pool_timeout) do
message = """
:pool_timeout option no longer has an effect and has been replaced with an improved queuing system.
See \"Queue config\" in DBConnection.start_link/2 documentation for more information.
"""
meta = %{
telemetry: telemetry,
sql: connection,
stacktrace: stacktrace,
opts: Keyword.take(config, @pool_opts)
}

IO.warn(message)
end
if pool_count > 1 do
if name == nil do
raise ArgumentError, "the option :pool_count requires a :name"
end

config
|> Keyword.delete(:name)
|> Keyword.update(:pool, DBConnection.ConnectionPool, &normalize_pool/1)
end
if pool == DBConnection.Ownership do
raise ArgumentError, "the option :pool_count does not work with the SQL sandbox"
end

defp normalize_pool(pool) do
if Code.ensure_loaded?(pool) && function_exported?(pool, :unboxed_run, 2) do
DBConnection.Ownership
name = Module.concat(name, PartitionSupervisor)
partition_opts = [name: name, child_spec: child_spec, partitions: pool_count]
child_spec = Supervisor.child_spec({PartitionSupervisor, partition_opts}, [])
{:ok, child_spec, Map.put(meta, :partition_supervisor, {name, pool_count})}
else
pool
{:ok, child_spec, meta}
end
end

defp pool_config(config) do
{pool, config} = Keyword.pop(config, :pool, DBConnection.ConnectionPool)

pool =
if Code.ensure_loaded?(pool) && function_exported?(pool, :unboxed_run, 2) do
DBConnection.Ownership
else
pool
end

{pool, [pool: pool] ++ config}
end

@doc false
def checkout(adapter_meta, opts, callback) do
checkout_or_transaction(:run, adapter_meta, opts, callback)
Expand Down Expand Up @@ -1385,11 +1403,20 @@ defmodule Ecto.Adapters.SQL do
end
end

apply(DBConnection, fun, [get_conn_or_pool(pool), callback, opts])
apply(DBConnection, fun, [get_conn_or_pool(pool, adapter_meta), callback, opts])
end

defp get_conn_or_pool(pool) do
Process.get(key(pool), pool)
defp get_conn_or_pool(pool, adapter_meta) do
case :erlang.get(key(pool)) do
:undefined ->
case adapter_meta do
%{partition_supervisor: {name, _}} -> {:via, PartitionSupervisor, {name, self()}}
_ -> pool
end

conn ->
conn
end
end

defp get_conn(pool) do
Expand Down
Loading