Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions app/commands/adminapi/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def prepare(self):
)

layer0_repo = repositories.Layer0Repository(self.pg_storage, log)
designation_rules_repo = repositories.DesignationRulesRepository(self.pg_storage, log)
refresh = table_stats.make_table_stats_refresh(layer0_repo)
self.table_stats_cache = cache.BackgroundCache(
"table_stats",
Expand All @@ -64,6 +65,7 @@ def prepare(self):
layer0_repo=layer0_repo,
layer1_repo=repositories.Layer1Repository(self.pg_storage, log),
layer2_repo=repositories.Layer2Repository(self.pg_storage, log),
designation_rules_repo=designation_rules_repo,
authenticator=authenticator,
clients=clients.Clients(cfg.clients.ads_token),
table_stats_cache=self.table_stats_cache,
Expand Down
30 changes: 29 additions & 1 deletion app/commands/dataapi/command.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import threading
from datetime import timedelta
from pathlib import Path
from typing import Any, final

Expand All @@ -9,7 +11,8 @@
from app.data import repositories
from app.domain import dataapi as domain
from app.domain import responders
from app.lib import auth, commands, config, tracing
from app.domain.designation import DesignationFormatter
from app.lib import auth, cache, commands, config, tracing
from app.lib.storage import postgres
from app.lib.tracing import TracingConfig
from app.lib.web import server
Expand All @@ -27,6 +30,11 @@ class DataAPICommand(commands.Command):

def __init__(self, config_path: str) -> None:
self.config_path = config_path
self.pg_auth: postgres.PgStorage | None = None
self.pg_main: postgres.PgStorage | None = None
self.designation_rules_cache: cache.BackgroundCache | None = None
self._designation_rules_thread: threading.Thread | None = None
self.app: presentation.Server | None = None

def prepare(self):
self.config = parse_config(self.config_path)
Expand All @@ -43,10 +51,26 @@ def prepare(self):
self.pg_auth.connect()
self.pg_main.connect()

designation_rules_repo = repositories.DesignationRulesRepository(self.pg_main, log)
self.designation_rules_cache = cache.BackgroundCache(
"designation_rules",
designation_rules_repo.snapshot,
refresh_frequency=timedelta(seconds=30),
refresh_timeout=timedelta(seconds=10),
)
self._designation_rules_thread = threading.Thread(
target=self.designation_rules_cache.run,
daemon=True,
)
self._designation_rules_thread.start()

designation_formatter = DesignationFormatter(self.designation_rules_cache.get)

actions = domain.Actions(
layer2_repo=repositories.Layer2Repository(self.pg_main, log),
catalog_cfg=self.config.catalogs,
metadata_repo=repositories.MetadataRepository(self.pg_main),
designation_formatter=designation_formatter,
)

self.app = presentation.Server(
Expand All @@ -58,9 +82,13 @@ def prepare(self):
)

def run(self):
if self.app is None:
raise RuntimeError("prepare() was not called")
self.app.run()

def cleanup(self):
if self.designation_rules_cache is not None:
self.designation_rules_cache.stop()
if self.pg_auth:
self.pg_auth.disconnect()
if self.pg_main:
Expand Down
2 changes: 2 additions & 0 deletions app/data/repositories/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from app.data.repositories.common import CommonRepository
from app.data.repositories.designation_rules import DesignationRulesRepository
from app.data.repositories.layer0 import INTERNAL_ID_COLUMN_NAME, Layer0Repository
from app.data.repositories.layer1 import Layer1Repository
from app.data.repositories.layer2 import Layer2Repository
from app.data.repositories.metadata import MetadataRepository

__all__ = [
"CommonRepository",
"DesignationRulesRepository",
"Layer0Repository",
"INTERNAL_ID_COLUMN_NAME",
"Layer1Repository",
Expand Down
86 changes: 86 additions & 0 deletions app/data/repositories/designation_rules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import json
from typing import Any, final

import structlog

from app.domain.designation.formatter import RuleModel, RuleSetSnapshot
from app.lib.storage import postgres


def _row_to_model(row: dict[str, Any]) -> RuleModel:
transforms = row["transforms"]
if isinstance(transforms, str):
transforms = json.loads(transforms)
return RuleModel(
id=row["id"],
priority=int(row["priority"]),
pattern=row["pattern"],
template=row["template"],
transforms=transforms or {},
enabled=bool(row["enabled"]),
)


def _transforms_to_json(transforms: dict[str, list[dict[str, str | None]]]) -> str:
return json.dumps(transforms)


@final
class DesignationRulesRepository(postgres.TransactionalPGRepository):
def __init__(self, storage: postgres.PgStorage, logger: structlog.stdlib.BoundLogger) -> None:
self._logger = logger
super().__init__(storage)

def snapshot(self) -> RuleSetSnapshot:
rows = self._storage.query(
"""SELECT id, priority, pattern, template, transforms, enabled,
EXTRACT(EPOCH FROM MAX(modification_time) OVER ()) AS version_epoch
FROM designation.format_rules
WHERE enabled = true
ORDER BY priority ASC""",
)
version = int(rows[0]["version_epoch"]) if rows else 0
rules = [_row_to_model(r) for r in rows]
return RuleSetSnapshot(version=version, rules=rules)

def list_rules(self) -> list[RuleModel]:
rows = self._storage.query(
"""SELECT id, priority, pattern, template, transforms, enabled
FROM designation.format_rules
ORDER BY priority ASC""",
)
return [_row_to_model(r) for r in rows]

def get_rule(self, rule_id: str) -> RuleModel | None:
rows = self._storage.query(
"""SELECT id, priority, pattern, template, transforms, enabled
FROM designation.format_rules WHERE id = %s""",
params=[rule_id],
)
if not rows:
return None
return _row_to_model(rows[0])

def save_rule(
self,
rule_id: str,
priority: int,
pattern: str,
template: str,
transforms: dict[str, list[dict[str, str | None]]],
enabled: bool = True,
) -> RuleModel:
rows = self._storage.query(
"""INSERT INTO designation.format_rules (id, priority, pattern, template, transforms, enabled)
VALUES (%s, %s, %s, %s, %s::jsonb, %s)
ON CONFLICT (id) DO UPDATE SET
priority = EXCLUDED.priority,
pattern = EXCLUDED.pattern,
template = EXCLUDED.template,
transforms = EXCLUDED.transforms,
enabled = EXCLUDED.enabled,
modification_time = NOW()
RETURNING id, priority, pattern, template, transforms, enabled""",
params=[rule_id, priority, pattern, template, _transforms_to_json(transforms), enabled],
)
return _row_to_model(rows[0])
10 changes: 9 additions & 1 deletion app/domain/adminapi/actions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import final

from app.data import repositories
from app.domain.adminapi import crossmatch, layer1_write, login, sources, table_upload
from app.domain.adminapi import crossmatch, designation_rules, layer1_write, login, sources, table_upload
from app.lib import auth, cache, clients
from app.presentation import adminapi

Expand All @@ -14,6 +14,7 @@ def __init__(
layer0_repo: repositories.Layer0Repository,
layer1_repo: repositories.Layer1Repository,
layer2_repo: repositories.Layer2Repository,
designation_rules_repo: repositories.DesignationRulesRepository,
authenticator: auth.Authenticator,
clients: clients.Clients,
table_stats_cache: cache.BackgroundCache[adminapi.TableStatsSnapshot],
Expand All @@ -29,6 +30,7 @@ def __init__(
)
self.crossmatch_manager = crossmatch.CrossmatchManager(layer0_repo, layer1_repo, layer2_repo)
self.layer1_writer = layer1_write.Layer1Writer(layer1_repo)
self.designation_rules_manager = designation_rules.DesignationRulesManager(designation_rules_repo)

def create_source(self, r: adminapi.CreateSourceRequest) -> adminapi.CreateSourceResponse:
return self.source_manager.create_source(r)
Expand Down Expand Up @@ -68,3 +70,9 @@ def set_crossmatch_results(self, r: adminapi.SetCrossmatchResultsRequest) -> adm

def assign_record_pgcs(self, r: adminapi.AssignRecordPgcsRequest) -> adminapi.AssignRecordPgcsResponse:
return self.crossmatch_manager.assign_record_pgcs(r)

def list_designation_rules(self) -> adminapi.ListRulesResponse:
return self.designation_rules_manager.list_rules()

def save_designation_rule(self, r: adminapi.SaveRuleRequest) -> adminapi.SaveRuleResponse:
return self.designation_rules_manager.save_rule(r)
49 changes: 49 additions & 0 deletions app/domain/adminapi/designation_rules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from typing import final

from app.data import repositories
from app.domain.designation.engine import Rule, RuleEngine
from app.domain.designation.formatter import RuleModel
from app.lib.web.errors import RuleValidationError
from app.presentation import adminapi


def _validate_rule(rule: RuleModel, examples: list[tuple[str, str]] | None = None) -> None:
engine_rule = Rule(
id=rule.id,
priority=rule.priority,
pattern=rule.pattern,
template=rule.template,
transforms=rule.to_engine_rule().transforms,
examples=examples or [],
)
try:
RuleEngine.compile([engine_rule]).validate_rule(engine_rule)
except ValueError as e:
raise RuleValidationError(str(e)) from e


@final
class DesignationRulesManager:
def __init__(self, rules_repo: repositories.DesignationRulesRepository) -> None:
self._repo = rules_repo

def list_rules(self) -> adminapi.ListRulesResponse:
rules = self._repo.list_rules()
return adminapi.ListRulesResponse(rules=[adminapi.RuleModel(**r.model_dump()) for r in rules])

def save_rule(self, request: adminapi.SaveRuleRequest) -> adminapi.SaveRuleResponse:
rule = RuleModel(**request.rule.model_dump())
if not rule.id.strip():
raise RuleValidationError("rule.id is required")
if rule.enabled:
examples = [(e.input, e.expected) for e in request.examples]
_validate_rule(rule, examples)
saved = self._repo.save_rule(
rule_id=rule.id,
priority=rule.priority,
pattern=rule.pattern,
template=rule.template,
transforms=rule.transforms,
enabled=rule.enabled,
)
return adminapi.SaveRuleResponse(rule=adminapi.RuleModel(**saved.model_dump()))
1 change: 1 addition & 0 deletions app/domain/adminapi/mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def get_mock_actions():
layer0_repo=mock.MagicMock(),
layer1_repo=mock.MagicMock(),
layer2_repo=mock.MagicMock(),
designation_rules_repo=mock.MagicMock(),
authenticator=auth.NoopAuthenticator(),
clients=c,
table_stats_cache=get_mock_table_stats_cache(),
Expand Down
14 changes: 14 additions & 0 deletions app/domain/dataapi/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from app.data import model, repositories
from app.domain import responders
from app.domain.dataapi import parameterized_query, search_parsers, tap_types
from app.domain.designation import DesignationFormatter
from app.presentation import dataapi

ENABLED_CATALOGS = [
Expand Down Expand Up @@ -41,10 +42,12 @@ def __init__(
layer2_repo: repositories.Layer2Repository,
catalog_cfg: responders.CatalogConfig,
metadata_repo: repositories.MetadataRepository,
designation_formatter: DesignationFormatter,
) -> None:
self.layer2_repo = layer2_repo
self.catalog_cfg = catalog_cfg
self.metadata_repo = metadata_repo
self.designation_formatter = designation_formatter
self.parameterized_query_manager = parameterized_query.ParameterizedQueryManager(
layer2_repo, ENABLED_CATALOGS, catalog_cfg
)
Expand All @@ -65,6 +68,17 @@ def query(self, query: dataapi.QueryRequest) -> dataapi.QueryResponse:
def query_fits(self, query: dataapi.FITSRequest) -> bytes:
return self.parameterized_query_manager.query_fits(query)

def format_designations(self, request: dataapi.FormatRequest) -> dataapi.FormatResponse:
batch = self.designation_formatter.format_batch(request.names)
results: list[dataapi.FormatResult] = []
for original, match in batch:
raw = original.strip() if original else ""
if match is None:
results.append(dataapi.FormatResult(formatted=raw, rule_id=None))
else:
results.append(dataapi.FormatResult(formatted=match.formatted, rule_id=match.rule_id))
return dataapi.FormatResponse(results=results)

def query_simple(self, query: dataapi.QuerySimpleRequest) -> dataapi.QuerySimpleResponse:
return self.parameterized_query_manager.query_simple(query)

Expand Down
23 changes: 23 additions & 0 deletions app/domain/designation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from app.domain.designation.engine import (
CompiledRule,
FormatMatch,
Rule,
RuleEngine,
TransformOp,
TransformSpec,
apply_transform,
)
from app.domain.designation.formatter import DesignationFormatter, RuleModel, RuleSetSnapshot

__all__ = [
"CompiledRule",
"DesignationFormatter",
"FormatMatch",
"Rule",
"RuleEngine",
"RuleModel",
"RuleSetSnapshot",
"TransformOp",
"TransformSpec",
"apply_transform",
]
Loading
Loading