Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions bindings/elixir/lib/fluss/admin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ defmodule Fluss.Admin do
admin = Fluss.Admin.new!(conn)
:ok = Fluss.Admin.create_database(admin, "my_db")

database_descriptor =
Fluss.DatabaseDescriptor.new()
|> Fluss.DatabaseDescriptor.comment("App data")

:ok = Fluss.Admin.create_database(admin, "another_db", database_descriptor)

schema = Fluss.Schema.new() |> Fluss.Schema.column("ts", :bigint)
descriptor = Fluss.TableDescriptor.new!(schema)
:ok = Fluss.Admin.create_table(admin, "my_db", "events", descriptor)
Expand Down Expand Up @@ -65,10 +71,11 @@ defmodule Fluss.Admin do
end
end

@spec create_database(t(), String.t(), boolean()) :: :ok | {:error, Fluss.Error.t()}
def create_database(admin, name, ignore_if_exists \\ true) do
@spec create_database(t(), String.t(), Fluss.DatabaseDescriptor.t() | nil, boolean()) ::
:ok | {:error, Fluss.Error.t()}
def create_database(admin, name, descriptor \\ nil, ignore_if_exists \\ true) do
admin
|> Native.admin_create_database(name, ignore_if_exists)
|> Native.admin_create_database(name, descriptor, ignore_if_exists)
|> Native.await_nif()
end

Expand Down
37 changes: 35 additions & 2 deletions bindings/elixir/lib/fluss/database_descriptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,19 @@ defmodule Fluss.DatabaseDescriptor do
User-supplied configuration of a Fluss database — its comment and any custom
properties.

Embedded as the `:descriptor` field of `Fluss.DatabaseInfo`; produced
indirectly via `Fluss.Admin.get_database_info/2`.
Constructed via the builder helpers (or as a struct directly) and passed to
`Fluss.Admin.create_database/3`. Also returned as the `:descriptor` field of
`Fluss.DatabaseInfo` from `Fluss.Admin.get_database_info/2`.

## Examples

descriptor =
Fluss.DatabaseDescriptor.new()
|> Fluss.DatabaseDescriptor.comment("My database")
|> Fluss.DatabaseDescriptor.put_custom_property("owner", "team-x")

:ok = Fluss.Admin.create_database(admin, "my_db", descriptor)

"""

@enforce_keys [:comment, :custom_properties]
Expand All @@ -31,4 +42,26 @@ defmodule Fluss.DatabaseDescriptor do
comment: String.t() | nil,
custom_properties: %{optional(String.t()) => String.t()}
}

@spec new() :: t()
def new, do: %__MODULE__{comment: nil, custom_properties: %{}}

@spec comment(t(), String.t()) :: t()
def comment(%__MODULE__{} = desc, c) when is_binary(c), do: %{desc | comment: c}

@spec put_custom_property(t(), String.t(), String.t()) :: t()
def put_custom_property(%__MODULE__{} = desc, k, v) when is_binary(k) and is_binary(v),
do: %{desc | custom_properties: Map.put(desc.custom_properties, k, v)}

@spec put_custom_properties(t(), %{optional(String.t()) => String.t()}) :: t()
def put_custom_properties(%__MODULE__{} = desc, props) when is_map(props) do
Enum.each(props, fn {k, v} ->
unless is_binary(k) and is_binary(v) do
raise ArgumentError,
"put_custom_properties/2 requires String keys and values, got #{inspect({k, v})}"
end
end)

%{desc | custom_properties: Map.merge(desc.custom_properties, props)}
end
end
2 changes: 1 addition & 1 deletion bindings/elixir/lib/fluss/native.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ defmodule Fluss.Native do

def admin_get_server_nodes(_admin), do: :erlang.nif_error(:nif_not_loaded)

def admin_create_database(_admin, _name, _ignore_if_exists),
def admin_create_database(_admin, _name, _descriptor, _ignore_if_exists),
do: :erlang.nif_error(:nif_not_loaded)

def admin_get_database_info(_admin, _database_name), do: :erlang.nif_error(:nif_not_loaded)
Expand Down
14 changes: 13 additions & 1 deletion bindings/elixir/native/fluss_nif/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ impl NifDatabaseDescriptor {
custom_properties: desc.custom_properties().clone(),
}
}

pub fn to_core(&self) -> DatabaseDescriptor {
let mut desc = DatabaseDescriptor::builder();
if let Some(comment) = &self.comment {
desc = desc.comment(comment);
}

desc.custom_properties(self.custom_properties.clone())
.build()
}
}

#[derive(NifStruct)]
Expand Down Expand Up @@ -124,12 +134,14 @@ fn admin_create_database<'a>(
env: Env<'a>,
admin: ResourceArc<AdminResource>,
database_name: String,
descriptor: Option<NifDatabaseDescriptor>,
ignore_if_exists: bool,
) -> Term<'a> {
async_nif::spawn_task(env, async move {
let core_descriptor = descriptor.as_ref().map(NifDatabaseDescriptor::to_core);
admin
.inner
.create_database(&database_name, None, ignore_if_exists)
.create_database(&database_name, core_descriptor.as_ref(), ignore_if_exists)
.await
})
}
Expand Down
169 changes: 169 additions & 0 deletions bindings/elixir/test/database_descriptor_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

defmodule Fluss.DatabaseDescriptorTest do
use ExUnit.Case, async: true

alias Fluss.DatabaseDescriptor

describe "new/0" do
test "returns a descriptor with no comment and no custom properties" do
assert DatabaseDescriptor.new() ==
%DatabaseDescriptor{comment: nil, custom_properties: %{}}
end
end

describe "comment/2" do
test "sets the comment field" do
desc = DatabaseDescriptor.new() |> DatabaseDescriptor.comment("hello")
assert desc.comment == "hello"
end

test "overwrites a previous comment" do
desc =
DatabaseDescriptor.new()
|> DatabaseDescriptor.comment("first")
|> DatabaseDescriptor.comment("second")

assert desc.comment == "second"
end

test "preserves existing custom_properties" do
desc =
DatabaseDescriptor.new()
|> DatabaseDescriptor.put_custom_property("k", "v")
|> DatabaseDescriptor.comment("c")

assert desc.comment == "c"
assert desc.custom_properties == %{"k" => "v"}
end

test "raises on non-binary input" do
assert_raise FunctionClauseError, fn ->
DatabaseDescriptor.comment(DatabaseDescriptor.new(), 42)
end
end
end

describe "put_custom_property/3" do
test "adds a property" do
desc = DatabaseDescriptor.new() |> DatabaseDescriptor.put_custom_property("k", "v")
assert desc.custom_properties == %{"k" => "v"}
end

test "accumulates multiple properties" do
desc =
DatabaseDescriptor.new()
|> DatabaseDescriptor.put_custom_property("k1", "v1")
|> DatabaseDescriptor.put_custom_property("k2", "v2")

assert desc.custom_properties == %{"k1" => "v1", "k2" => "v2"}
end

test "overwrites a previous value for the same key" do
desc =
DatabaseDescriptor.new()
|> DatabaseDescriptor.put_custom_property("k", "v1")
|> DatabaseDescriptor.put_custom_property("k", "v2")

assert desc.custom_properties == %{"k" => "v2"}
end

test "preserves existing comment" do
desc =
DatabaseDescriptor.new()
|> DatabaseDescriptor.comment("c")
|> DatabaseDescriptor.put_custom_property("k", "v")

assert desc.comment == "c"
assert desc.custom_properties == %{"k" => "v"}
end

test "raises on non-binary key" do
assert_raise FunctionClauseError, fn ->
DatabaseDescriptor.put_custom_property(DatabaseDescriptor.new(), :atom_key, "v")
end
end

test "raises on non-binary value" do
assert_raise FunctionClauseError, fn ->
DatabaseDescriptor.put_custom_property(DatabaseDescriptor.new(), "k", 42)
end
end
end

describe "put_custom_properties/2" do
test "adds entries from a map" do
desc =
DatabaseDescriptor.new()
|> DatabaseDescriptor.put_custom_properties(%{"k1" => "v1", "k2" => "v2"})

assert desc.custom_properties == %{"k1" => "v1", "k2" => "v2"}
end

test "merges with existing entries" do
desc =
DatabaseDescriptor.new()
|> DatabaseDescriptor.put_custom_property("k0", "v0")
|> DatabaseDescriptor.put_custom_properties(%{"k1" => "v1"})

assert desc.custom_properties == %{"k0" => "v0", "k1" => "v1"}
end

test "overwrites on key collision" do
desc =
DatabaseDescriptor.new()
|> DatabaseDescriptor.put_custom_property("k", "v1")
|> DatabaseDescriptor.put_custom_properties(%{"k" => "v2"})

assert desc.custom_properties == %{"k" => "v2"}
end

test "accepts an empty map" do
desc = DatabaseDescriptor.new() |> DatabaseDescriptor.put_custom_properties(%{})
assert desc.custom_properties == %{}
end

test "preserves existing comment" do
desc =
DatabaseDescriptor.new()
|> DatabaseDescriptor.comment("c")
|> DatabaseDescriptor.put_custom_properties(%{"k" => "v"})

assert desc.comment == "c"
assert desc.custom_properties == %{"k" => "v"}
end

test "raises FunctionClauseError on non-map input" do
assert_raise FunctionClauseError, fn ->
DatabaseDescriptor.put_custom_properties(DatabaseDescriptor.new(), [])
end
end

test "raises ArgumentError on non-binary key in the map" do
assert_raise ArgumentError, fn ->
DatabaseDescriptor.put_custom_properties(DatabaseDescriptor.new(), %{:atom_key => "v"})
end
end

test "raises ArgumentError on non-binary value in the map" do
assert_raise ArgumentError, fn ->
DatabaseDescriptor.put_custom_properties(DatabaseDescriptor.new(), %{"k" => 42})
end
end
end
end
64 changes: 61 additions & 3 deletions bindings/elixir/test/integration/admin_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,68 @@ defmodule Fluss.Integration.AdminTest do
end
end

describe "create_database/3 with a descriptor" do
test "accepts a comment-only descriptor", %{admin: admin} do
db = "fluss_data_sources_#{:rand.uniform(100_000)}"
descriptor = Fluss.DatabaseDescriptor.new() |> Fluss.DatabaseDescriptor.comment("hello")
:ok = Fluss.Admin.create_database(admin, db, descriptor)
on_exit(fn -> Fluss.Admin.drop_database(admin, db, true) end)

assert {:ok, info} = Fluss.Admin.get_database_info(admin, db)
assert info.descriptor.comment == "hello"
assert info.descriptor.custom_properties == %{}
end

test "accepts a properties-only descriptor", %{admin: admin} do
db = "fluss_data_sources_#{:rand.uniform(100_000)}"

descriptor =
Fluss.DatabaseDescriptor.new()
|> Fluss.DatabaseDescriptor.put_custom_property("region", "eu-west-1")

:ok = Fluss.Admin.create_database(admin, db, descriptor)
on_exit(fn -> Fluss.Admin.drop_database(admin, db, true) end)

assert {:ok, info} = Fluss.Admin.get_database_info(admin, db)
assert info.descriptor.custom_properties == %{"region" => "eu-west-1"}
end

test "accepts a struct constructed directly without builders", %{admin: admin} do
db = "fluss_data_sources_#{:rand.uniform(100_000)}"
descriptor = %Fluss.DatabaseDescriptor{comment: "direct", custom_properties: %{"k" => "v"}}
:ok = Fluss.Admin.create_database(admin, db, descriptor)
on_exit(fn -> Fluss.Admin.drop_database(admin, db, true) end)

assert {:ok, info} = Fluss.Admin.get_database_info(admin, db)
assert info.descriptor.comment == "direct"
assert info.descriptor.custom_properties == %{"k" => "v"}
end
end

describe "get_database_info/2" do
test "returns DatabaseInfo for an existing database", %{admin: admin} do
test "returns DatabaseInfo with the descriptor used at creation", %{admin: admin} do
db = "fluss_data_sources_#{:rand.uniform(100_000)}"
:ok = Fluss.Admin.create_database(admin, db, true)

descriptor =
Fluss.DatabaseDescriptor.new()
|> Fluss.DatabaseDescriptor.comment("Integration test database")
|> Fluss.DatabaseDescriptor.put_custom_property("owner", "ci")
|> Fluss.DatabaseDescriptor.put_custom_property("env", "test")

:ok = Fluss.Admin.create_database(admin, db, descriptor)
on_exit(fn -> Fluss.Admin.drop_database(admin, db, true) end)

assert {:ok, %Fluss.DatabaseInfo{} = info} = Fluss.Admin.get_database_info(admin, db)
assert info.database_name == db
assert info.descriptor.comment == "Integration test database"
assert info.descriptor.custom_properties == %{"owner" => "ci", "env" => "test"}
assert is_integer(info.created_time)
assert is_integer(info.modified_time)
end

test "returns DatabaseInfo for a database created with no descriptor", %{admin: admin} do
db = "fluss_data_sources_#{:rand.uniform(100_000)}"
:ok = Fluss.Admin.create_database(admin, db)
on_exit(fn -> Fluss.Admin.drop_database(admin, db, true) end)

assert {:ok, %Fluss.DatabaseInfo{} = info} = Fluss.Admin.get_database_info(admin, db)
Expand All @@ -76,7 +134,7 @@ defmodule Fluss.Integration.AdminTest do
test "returns {:ok, true} for an existing database", %{admin: admin} do
db = "fluss_data_sources_#{:rand.uniform(100_000)}"

:ok = Fluss.Admin.create_database(admin, db, true)
:ok = Fluss.Admin.create_database(admin, db)
on_exit(fn -> Fluss.Admin.drop_database(admin, db, true) end)

assert {:ok, true} = Fluss.Admin.database_exists(admin, db)
Expand Down
2 changes: 1 addition & 1 deletion bindings/elixir/test/integration/log_table_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ defmodule Fluss.Integration.LogTableTest do
describe "admin operations" do
test "create and drop database", %{admin: admin} do
db_name = "ex_test_db_#{:rand.uniform(100_000)}"
:ok = Fluss.Admin.create_database(admin, db_name, true)
:ok = Fluss.Admin.create_database(admin, db_name)

{:ok, databases} = Fluss.Admin.list_databases(admin)
assert db_name in databases
Expand Down