diff --git a/bindings/elixir/lib/fluss/admin.ex b/bindings/elixir/lib/fluss/admin.ex index 8a8ae9c4..b9d26f70 100644 --- a/bindings/elixir/lib/fluss/admin.ex +++ b/bindings/elixir/lib/fluss/admin.ex @@ -24,6 +24,12 @@ defmodule Fluss.Admin do admin = Fluss.Admin.new!(conn) :ok = Fluss.Admin.create_database(admin, "my_db") + database_descriptor = + Fluss.DatabaseDescriptor.new() + |> Fluss.DatabaseDescriptor.comment("App data") + + :ok = Fluss.Admin.create_database(admin, "another_db", database_descriptor) + schema = Fluss.Schema.new() |> Fluss.Schema.column("ts", :bigint) descriptor = Fluss.TableDescriptor.new!(schema) :ok = Fluss.Admin.create_table(admin, "my_db", "events", descriptor) @@ -65,10 +71,11 @@ defmodule Fluss.Admin do end end - @spec create_database(t(), String.t(), boolean()) :: :ok | {:error, Fluss.Error.t()} - def create_database(admin, name, ignore_if_exists \\ true) do + @spec create_database(t(), String.t(), Fluss.DatabaseDescriptor.t() | nil, boolean()) :: + :ok | {:error, Fluss.Error.t()} + def create_database(admin, name, descriptor \\ nil, ignore_if_exists \\ true) do admin - |> Native.admin_create_database(name, ignore_if_exists) + |> Native.admin_create_database(name, descriptor, ignore_if_exists) |> Native.await_nif() end diff --git a/bindings/elixir/lib/fluss/database_descriptor.ex b/bindings/elixir/lib/fluss/database_descriptor.ex index 71fdc963..43857e46 100644 --- a/bindings/elixir/lib/fluss/database_descriptor.ex +++ b/bindings/elixir/lib/fluss/database_descriptor.ex @@ -20,8 +20,19 @@ defmodule Fluss.DatabaseDescriptor do User-supplied configuration of a Fluss database — its comment and any custom properties. - Embedded as the `:descriptor` field of `Fluss.DatabaseInfo`; produced - indirectly via `Fluss.Admin.get_database_info/2`. + Constructed via the builder helpers (or as a struct directly) and passed to + `Fluss.Admin.create_database/3`. Also returned as the `:descriptor` field of + `Fluss.DatabaseInfo` from `Fluss.Admin.get_database_info/2`. + + ## Examples + + descriptor = + Fluss.DatabaseDescriptor.new() + |> Fluss.DatabaseDescriptor.comment("My database") + |> Fluss.DatabaseDescriptor.put_custom_property("owner", "team-x") + + :ok = Fluss.Admin.create_database(admin, "my_db", descriptor) + """ @enforce_keys [:comment, :custom_properties] @@ -31,4 +42,26 @@ defmodule Fluss.DatabaseDescriptor do comment: String.t() | nil, custom_properties: %{optional(String.t()) => String.t()} } + + @spec new() :: t() + def new, do: %__MODULE__{comment: nil, custom_properties: %{}} + + @spec comment(t(), String.t()) :: t() + def comment(%__MODULE__{} = desc, c) when is_binary(c), do: %{desc | comment: c} + + @spec put_custom_property(t(), String.t(), String.t()) :: t() + def put_custom_property(%__MODULE__{} = desc, k, v) when is_binary(k) and is_binary(v), + do: %{desc | custom_properties: Map.put(desc.custom_properties, k, v)} + + @spec put_custom_properties(t(), %{optional(String.t()) => String.t()}) :: t() + def put_custom_properties(%__MODULE__{} = desc, props) when is_map(props) do + Enum.each(props, fn {k, v} -> + unless is_binary(k) and is_binary(v) do + raise ArgumentError, + "put_custom_properties/2 requires String keys and values, got #{inspect({k, v})}" + end + end) + + %{desc | custom_properties: Map.merge(desc.custom_properties, props)} + end end diff --git a/bindings/elixir/lib/fluss/native.ex b/bindings/elixir/lib/fluss/native.ex index 20f193f8..4d4eea8f 100644 --- a/bindings/elixir/lib/fluss/native.ex +++ b/bindings/elixir/lib/fluss/native.ex @@ -27,7 +27,7 @@ defmodule Fluss.Native do def admin_get_server_nodes(_admin), do: :erlang.nif_error(:nif_not_loaded) - def admin_create_database(_admin, _name, _ignore_if_exists), + def admin_create_database(_admin, _name, _descriptor, _ignore_if_exists), do: :erlang.nif_error(:nif_not_loaded) def admin_get_database_info(_admin, _database_name), do: :erlang.nif_error(:nif_not_loaded) diff --git a/bindings/elixir/native/fluss_nif/src/admin.rs b/bindings/elixir/native/fluss_nif/src/admin.rs index b765725a..7a206a5d 100644 --- a/bindings/elixir/native/fluss_nif/src/admin.rs +++ b/bindings/elixir/native/fluss_nif/src/admin.rs @@ -71,6 +71,16 @@ impl NifDatabaseDescriptor { custom_properties: desc.custom_properties().clone(), } } + + pub fn to_core(&self) -> DatabaseDescriptor { + let mut desc = DatabaseDescriptor::builder(); + if let Some(comment) = &self.comment { + desc = desc.comment(comment); + } + + desc.custom_properties(self.custom_properties.clone()) + .build() + } } #[derive(NifStruct)] @@ -124,12 +134,14 @@ fn admin_create_database<'a>( env: Env<'a>, admin: ResourceArc, database_name: String, + descriptor: Option, ignore_if_exists: bool, ) -> Term<'a> { async_nif::spawn_task(env, async move { + let core_descriptor = descriptor.as_ref().map(NifDatabaseDescriptor::to_core); admin .inner - .create_database(&database_name, None, ignore_if_exists) + .create_database(&database_name, core_descriptor.as_ref(), ignore_if_exists) .await }) } diff --git a/bindings/elixir/test/database_descriptor_test.exs b/bindings/elixir/test/database_descriptor_test.exs new file mode 100644 index 00000000..aa860195 --- /dev/null +++ b/bindings/elixir/test/database_descriptor_test.exs @@ -0,0 +1,169 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +defmodule Fluss.DatabaseDescriptorTest do + use ExUnit.Case, async: true + + alias Fluss.DatabaseDescriptor + + describe "new/0" do + test "returns a descriptor with no comment and no custom properties" do + assert DatabaseDescriptor.new() == + %DatabaseDescriptor{comment: nil, custom_properties: %{}} + end + end + + describe "comment/2" do + test "sets the comment field" do + desc = DatabaseDescriptor.new() |> DatabaseDescriptor.comment("hello") + assert desc.comment == "hello" + end + + test "overwrites a previous comment" do + desc = + DatabaseDescriptor.new() + |> DatabaseDescriptor.comment("first") + |> DatabaseDescriptor.comment("second") + + assert desc.comment == "second" + end + + test "preserves existing custom_properties" do + desc = + DatabaseDescriptor.new() + |> DatabaseDescriptor.put_custom_property("k", "v") + |> DatabaseDescriptor.comment("c") + + assert desc.comment == "c" + assert desc.custom_properties == %{"k" => "v"} + end + + test "raises on non-binary input" do + assert_raise FunctionClauseError, fn -> + DatabaseDescriptor.comment(DatabaseDescriptor.new(), 42) + end + end + end + + describe "put_custom_property/3" do + test "adds a property" do + desc = DatabaseDescriptor.new() |> DatabaseDescriptor.put_custom_property("k", "v") + assert desc.custom_properties == %{"k" => "v"} + end + + test "accumulates multiple properties" do + desc = + DatabaseDescriptor.new() + |> DatabaseDescriptor.put_custom_property("k1", "v1") + |> DatabaseDescriptor.put_custom_property("k2", "v2") + + assert desc.custom_properties == %{"k1" => "v1", "k2" => "v2"} + end + + test "overwrites a previous value for the same key" do + desc = + DatabaseDescriptor.new() + |> DatabaseDescriptor.put_custom_property("k", "v1") + |> DatabaseDescriptor.put_custom_property("k", "v2") + + assert desc.custom_properties == %{"k" => "v2"} + end + + test "preserves existing comment" do + desc = + DatabaseDescriptor.new() + |> DatabaseDescriptor.comment("c") + |> DatabaseDescriptor.put_custom_property("k", "v") + + assert desc.comment == "c" + assert desc.custom_properties == %{"k" => "v"} + end + + test "raises on non-binary key" do + assert_raise FunctionClauseError, fn -> + DatabaseDescriptor.put_custom_property(DatabaseDescriptor.new(), :atom_key, "v") + end + end + + test "raises on non-binary value" do + assert_raise FunctionClauseError, fn -> + DatabaseDescriptor.put_custom_property(DatabaseDescriptor.new(), "k", 42) + end + end + end + + describe "put_custom_properties/2" do + test "adds entries from a map" do + desc = + DatabaseDescriptor.new() + |> DatabaseDescriptor.put_custom_properties(%{"k1" => "v1", "k2" => "v2"}) + + assert desc.custom_properties == %{"k1" => "v1", "k2" => "v2"} + end + + test "merges with existing entries" do + desc = + DatabaseDescriptor.new() + |> DatabaseDescriptor.put_custom_property("k0", "v0") + |> DatabaseDescriptor.put_custom_properties(%{"k1" => "v1"}) + + assert desc.custom_properties == %{"k0" => "v0", "k1" => "v1"} + end + + test "overwrites on key collision" do + desc = + DatabaseDescriptor.new() + |> DatabaseDescriptor.put_custom_property("k", "v1") + |> DatabaseDescriptor.put_custom_properties(%{"k" => "v2"}) + + assert desc.custom_properties == %{"k" => "v2"} + end + + test "accepts an empty map" do + desc = DatabaseDescriptor.new() |> DatabaseDescriptor.put_custom_properties(%{}) + assert desc.custom_properties == %{} + end + + test "preserves existing comment" do + desc = + DatabaseDescriptor.new() + |> DatabaseDescriptor.comment("c") + |> DatabaseDescriptor.put_custom_properties(%{"k" => "v"}) + + assert desc.comment == "c" + assert desc.custom_properties == %{"k" => "v"} + end + + test "raises FunctionClauseError on non-map input" do + assert_raise FunctionClauseError, fn -> + DatabaseDescriptor.put_custom_properties(DatabaseDescriptor.new(), []) + end + end + + test "raises ArgumentError on non-binary key in the map" do + assert_raise ArgumentError, fn -> + DatabaseDescriptor.put_custom_properties(DatabaseDescriptor.new(), %{:atom_key => "v"}) + end + end + + test "raises ArgumentError on non-binary value in the map" do + assert_raise ArgumentError, fn -> + DatabaseDescriptor.put_custom_properties(DatabaseDescriptor.new(), %{"k" => 42}) + end + end + end +end diff --git a/bindings/elixir/test/integration/admin_test.exs b/bindings/elixir/test/integration/admin_test.exs index 7942c5e3..8d732734 100644 --- a/bindings/elixir/test/integration/admin_test.exs +++ b/bindings/elixir/test/integration/admin_test.exs @@ -53,10 +53,68 @@ defmodule Fluss.Integration.AdminTest do end end + describe "create_database/3 with a descriptor" do + test "accepts a comment-only descriptor", %{admin: admin} do + db = "fluss_data_sources_#{:rand.uniform(100_000)}" + descriptor = Fluss.DatabaseDescriptor.new() |> Fluss.DatabaseDescriptor.comment("hello") + :ok = Fluss.Admin.create_database(admin, db, descriptor) + on_exit(fn -> Fluss.Admin.drop_database(admin, db, true) end) + + assert {:ok, info} = Fluss.Admin.get_database_info(admin, db) + assert info.descriptor.comment == "hello" + assert info.descriptor.custom_properties == %{} + end + + test "accepts a properties-only descriptor", %{admin: admin} do + db = "fluss_data_sources_#{:rand.uniform(100_000)}" + + descriptor = + Fluss.DatabaseDescriptor.new() + |> Fluss.DatabaseDescriptor.put_custom_property("region", "eu-west-1") + + :ok = Fluss.Admin.create_database(admin, db, descriptor) + on_exit(fn -> Fluss.Admin.drop_database(admin, db, true) end) + + assert {:ok, info} = Fluss.Admin.get_database_info(admin, db) + assert info.descriptor.custom_properties == %{"region" => "eu-west-1"} + end + + test "accepts a struct constructed directly without builders", %{admin: admin} do + db = "fluss_data_sources_#{:rand.uniform(100_000)}" + descriptor = %Fluss.DatabaseDescriptor{comment: "direct", custom_properties: %{"k" => "v"}} + :ok = Fluss.Admin.create_database(admin, db, descriptor) + on_exit(fn -> Fluss.Admin.drop_database(admin, db, true) end) + + assert {:ok, info} = Fluss.Admin.get_database_info(admin, db) + assert info.descriptor.comment == "direct" + assert info.descriptor.custom_properties == %{"k" => "v"} + end + end + describe "get_database_info/2" do - test "returns DatabaseInfo for an existing database", %{admin: admin} do + test "returns DatabaseInfo with the descriptor used at creation", %{admin: admin} do db = "fluss_data_sources_#{:rand.uniform(100_000)}" - :ok = Fluss.Admin.create_database(admin, db, true) + + descriptor = + Fluss.DatabaseDescriptor.new() + |> Fluss.DatabaseDescriptor.comment("Integration test database") + |> Fluss.DatabaseDescriptor.put_custom_property("owner", "ci") + |> Fluss.DatabaseDescriptor.put_custom_property("env", "test") + + :ok = Fluss.Admin.create_database(admin, db, descriptor) + on_exit(fn -> Fluss.Admin.drop_database(admin, db, true) end) + + assert {:ok, %Fluss.DatabaseInfo{} = info} = Fluss.Admin.get_database_info(admin, db) + assert info.database_name == db + assert info.descriptor.comment == "Integration test database" + assert info.descriptor.custom_properties == %{"owner" => "ci", "env" => "test"} + assert is_integer(info.created_time) + assert is_integer(info.modified_time) + end + + test "returns DatabaseInfo for a database created with no descriptor", %{admin: admin} do + db = "fluss_data_sources_#{:rand.uniform(100_000)}" + :ok = Fluss.Admin.create_database(admin, db) on_exit(fn -> Fluss.Admin.drop_database(admin, db, true) end) assert {:ok, %Fluss.DatabaseInfo{} = info} = Fluss.Admin.get_database_info(admin, db) @@ -76,7 +134,7 @@ defmodule Fluss.Integration.AdminTest do test "returns {:ok, true} for an existing database", %{admin: admin} do db = "fluss_data_sources_#{:rand.uniform(100_000)}" - :ok = Fluss.Admin.create_database(admin, db, true) + :ok = Fluss.Admin.create_database(admin, db) on_exit(fn -> Fluss.Admin.drop_database(admin, db, true) end) assert {:ok, true} = Fluss.Admin.database_exists(admin, db) diff --git a/bindings/elixir/test/integration/log_table_test.exs b/bindings/elixir/test/integration/log_table_test.exs index 16a670fe..d3eb55e4 100644 --- a/bindings/elixir/test/integration/log_table_test.exs +++ b/bindings/elixir/test/integration/log_table_test.exs @@ -213,7 +213,7 @@ defmodule Fluss.Integration.LogTableTest do describe "admin operations" do test "create and drop database", %{admin: admin} do db_name = "ex_test_db_#{:rand.uniform(100_000)}" - :ok = Fluss.Admin.create_database(admin, db_name, true) + :ok = Fluss.Admin.create_database(admin, db_name) {:ok, databases} = Fluss.Admin.list_databases(admin) assert db_name in databases