Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ All notable changes to this project will be documented in this file.
- BREAKING: `configOverrides` now only accepts the known config file `opensearch.yml`.
Previously, arbitrary file names were silently accepted and ignored ([#137]).
- Bump `stackable-operator` to 0.110.1 ([#137]).
- test: Bump vector-aggregator to 0.55.0, replace /graphql call with gRPC call ([#146]).

[#129]: https://github.com/stackabletech/opensearch-operator/pull/129
[#130]: https://github.com/stackabletech/opensearch-operator/pull/130
[#137]: https://github.com/stackabletech/opensearch-operator/pull/137
[#141]: https://github.com/stackabletech/opensearch-operator/pull/141
[#146]: https://github.com/stackabletech/opensearch-operator/pull/146

## [26.3.0] - 2026-03-16

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ commands:
- script: >-
helm install opensearch-vector-aggregator vector
--namespace $NAMESPACE
--version 0.49.0
--version 0.52.0 `# app version 0.55.0`
--repo https://helm.vector.dev
--values 10_opensearch-vector-aggregator-values.yaml
---
Expand Down
125 changes: 56 additions & 69 deletions tests/templates/kuttl/logging/30-test-opensearch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,94 +45,81 @@ metadata:
name: test-log-aggregation
data:
test.py: |
import requests
import json
import subprocess


def check_sent_events():
response = requests.post(
'http://opensearch-vector-aggregator:8686/graphql',
json={
'query': """
{
transforms(first:100) {
nodes {
componentId
metrics {
sentEventsTotal {
sentEventsTotal
}
}
}
}
}
"""
}
def grpc_get_components(host):
"""Call the Vector gRPC observability API and return the parsed result."""
response = subprocess.run(
[
"grpcurl",
"-plaintext",
"-d",
'{"limit": 100}',
host,
"vector.observability.v1.ObservabilityService/GetComponents",
],
capture_output=True,
text=True,
check=True,
timeout=20,
)
result = json.loads(response.stdout)
# TODO: Remove this debug dump once the gRPC response shape is confirmed
print(f"gRPC response from {host}:")
print(json.dumps(result, indent=2))
return result


assert response.status_code == 200, \
'Cannot access the API of the vector aggregator.'
def check_sent_events():
result = grpc_get_components("opensearch-vector-aggregator:8686")
components = result.get("components", [])
transforms = [
c for c in components if c.get("componentType") == "COMPONENT_TYPE_TRANSFORM"
]

result = response.json()
assert len(transforms) > 0, "No transform components found"

transforms = result['data']['transforms']['nodes']
for transform in transforms:
sentEvents = transform['metrics']['sentEventsTotal']
componentId = transform['componentId']
sentEvents = transform["metrics"]["sentEventsTotal"]
componentId = transform["componentId"]

if componentId == 'filteredInvalidEvents':
assert sentEvents is None or \
sentEvents['sentEventsTotal'] == 0, \
'Invalid log events were sent.'
if componentId == "filteredInvalidEvents":
assert sentEvents is None or int(sentEvents) == 0, (
"Invalid log events were sent."
)
else:
assert sentEvents is not None and \
sentEvents['sentEventsTotal'] > 0, \
f'No events were sent in "{componentId}".'
assert sentEvents is not None and int(sentEvents) > 0, (
f'No events were sent in "{componentId}".'
)


def check_log_file_rollover():
response = requests.post(
'http://opensearch-nodes-automatic-vector:8686/graphql',
json={
'query': """
{
sources {
nodes {
componentId
metrics {
receivedBytesTotal {
receivedBytesTotal
}
}
}
}
}
"""
}
)

assert response.status_code == 200, \
'Cannot access the API of the vector agent.'

result = response.json()
result = grpc_get_components("opensearch-nodes-automatic-vector:8686")
components = result.get("components", [])
sources = [
c for c in components if c.get("componentType") == "COMPONENT_TYPE_SOURCE"
]

sources = result['data']['sources']['nodes']
for source in sources:
receivedBytes = source['metrics']['receivedBytesTotal']
componentId = source['componentId']
receivedBytes = source["metrics"]["receivedBytesTotal"]
componentId = source["componentId"]

if componentId == 'files_opensearch_server':
if componentId == "files_opensearch_server":
assert receivedBytes is not None
receivedBytes = receivedBytes['receivedBytesTotal']
receivedBytes = int(receivedBytes)
MAX_LOG_FILE_SIZE = 5_500_000
expectedBytes = 2 * MAX_LOG_FILE_SIZE
assert receivedBytes >= expectedBytes, \
'Log file rollover did not yet happen twice ' \
f'({receivedBytes:,.0f} Bytes of {expectedBytes:,d} Bytes received). ' \
'The first rollover requires write permission to rename the log file, ' \
'the second rollover additionally requires delete permission to remove the old log file.'
assert receivedBytes >= expectedBytes, (
"Log file rollover did not yet happen twice "
f"({receivedBytes:,.0f} Bytes of {expectedBytes:,d} Bytes received). "
"The first rollover requires write permission to rename the log file, "
"the second rollover additionally requires delete permission to remove the old log file."
)


if __name__ == '__main__':
if __name__ == "__main__":
check_sent_events()
check_log_file_rollover()
print('Test successful!')
print("Test successful!")
Loading