Skip to content

Implement Apache Kafka transport for PubSub (Part 14 Annex B.2) with HA (#3942)#3949

Open
marcschier wants to merge 14 commits into
masterfrom
feature/3942-kafka-pubsub
Open

Implement Apache Kafka transport for PubSub (Part 14 Annex B.2) with HA (#3942)#3949
marcschier wants to merge 14 commits into
masterfrom
feature/3942-kafka-pubsub

Conversation

@marcschier

Copy link
Copy Markdown
Collaborator

Implements #3942 (Apache Kafka transport for PubSub, OPC UA Part 14 Annex B.2) with HA publishers & subscribers (#3924, Part 14 §9.1.6).

Kafka transport (Part 14 Annex B.2)

New Opc.Ua.PubSub.Kafka library mirroring the MQTT broker transport and the existing IPubSubTransport/IPubSubTransportFactory + injectable-adapter pattern:

  • Profiles pubsub-kafka-uadp / pubsub-kafka-json (library-local KafkaProfiles); kafka:// / kafkas:// addresses.
  • Reuses the existing Broker*TransportDataType types (QueueName = topic, MetaDataQueueName = metadata topic, ResourceUri/AuthenticationProfileUri = auth, RequestedDeliveryGuarantee = QoS) — no new DataTypes, matching the spec.
  • Normative content-type record header (application/opcua+uadp / application/json); QoS → producer acks/idempotence; SASL/TLS auth with passwords resolved via ISecretRegistry.
  • AddKafkaTransport() DI (JSON + UADP factories), KafkaConnectionOptions (bootstrap servers, GroupId, security, delivery guarantee, offset policy).

Dual-source client by TFM (AOT + broad TFM support)

Via the IKafkaClientAdapter seam:

  • net10.0 → Dekaf (pure-managed, no native dependency) ⇒ the library is IsAotCompatible and Kafka runs in the NativeAOT-published sample directly.
  • net472 / net48 / netstandard2.1 / net8.0 / net9.0 → Confluent.Kafka (mature, native librdkafka).

NativeAOT publish of ConsoleReferencePubSubClient (win-x64) was validated end-to-end: a 26.9 MB native binary with the Kafka/Dekaf transport and zero IL2xxx/IL3xxx trim/AOT warnings.

High availability (#3924, §9.1.6)

Transport-agnostic, additive, and behaviour-neutral by default:

  • IPubSubActivationCoordinator (per-component Active/Standby + RoleChanged) with the default no-op AlwaysActiveCoordinator.
  • IPubSubLeaseStore + InMemoryPubSubLeaseStore (monotonic fencing tokens) and LeaseActivationCoordinator (leader election with per-component renew/acquire loops).
  • Runtime wiring: PubSubApplication starts/stops the coordinator with the app; WriterGroup/ReaderGroup gate at the enable/pause boundary — Standby pauses publishing/dispatch, Active resumes. No-op under the default coordinator, so existing behaviour is unchanged.
  • Kafka HA: consumer GroupId drives Kafka-native subscriber failover; AtLeastOnce/ExactlyOnce enable idempotent producing for active-publisher handover.
  • DI: WithActivationCoordinator(...) / WithLeaseStore(...).

Samples & docs

Kafka profiles (KafkaUadp/KafkaJson) added to the publisher & subscriber samples; new Docs/PubSubKafka.md (config, DI, auth, QoS, AOT note) linked from Docs/README.md + the Docs/PubSub.md transport table.

Tests & validation

  • Opc.Ua.PubSub.Kafka.Tests: 82 tests, 80.28% line coverage on the Kafka source, using a FakeKafkaClientAdapter over an in-memory topic bus. A Testcontainers.Kafka integration test runs a real produce→consume round-trip and Assert.Ignore-skips when Docker is unavailable.
  • HA failover tests (PubSubActivationFailoverTests): leader election + failover, fencing-token monotonicity, writer/reader standby gating (deterministic, FakeTimeProvider).
  • Opc.Ua.PubSub.Tests: 1134 passed / 0 failed (1130 existing + 4 new).
  • Full UA.slnx net10 build: 0 errors; the only warnings are pre-existing CA warnings in unrelated test projects (new code is 0-warning). net48 (Confluent path) validated for the transport + test project.

Dependencies (approved)

  • Dekaf (MIT, pure-managed, net10-only) and Confluent.Kafka 2.15.0 (Apache-2.0) + librdkafka.redist (BSD-2).
  • Test-only Testcontainers.Kafka 4.13.0 (MIT).
  • Dekaf's net10 package required bumping the Microsoft.Extensions.* / System.* 10.0.8 servicing family to 10.0.9.

Closes #3942

Add the Apache Kafka PubSub transport library skeleton per Part 14 Annex B.2:
- Opc.Ua.PubSub.Kafka.csproj (LibxTargetFrameworks; deliberately no IsAotCompatible
  because Confluent.Kafka/native librdkafka is not NativeAOT/trim-safe).
- KafkaProfiles (pubsub-kafka-uadp / pubsub-kafka-json), kept library-local like EthProfiles.
- Confluent.Kafka 2.15.0 (Apache-2.0) pinned in Directory.Packages.props.
- NugetREADME + CLSCompliant assembly attribute; wired into UA.slnx.

Builds clean on net10 and net48 (net462 Confluent asset resolves under the existing
net4x SuppressTfmSupportBuildWarnings). Transport implementation follows.
Apache Kafka broker transport (Part 14 Annex B.2) mirroring the MQTT transport:
value types (options, endpoint kafka://kafkas://, content-type encoding, QoS mapping,
message envelope), IKafkaClientAdapter abstraction, KafkaBrokerTransport
(IPubSubTransport + IPubSubTopicProvider), KafkaPubSubTransportFactory (json+uadp
profiles, Broker*TransportDataType QueueName/MetaDataQueueName/auth, ISecretRegistry
password), and AddKafkaTransport DI.

Dual-source client by TFM via the adapter seam:
- net10.0 -> Dekaf (pure-managed, no native dep) -> IsAotCompatible=true.
- net472/net48/netstandard2.1/net8.0/net9.0 -> Confluent.Kafka (native librdkafka).
DI registers the default IKafkaClientFactory available per TFM (#if NET10_0_OR_GREATER).

Content-type record header ('application/opcua+uadp' / 'application/json') is normative
per Annex B.2. QoS BrokerTransportQualityOfService -> acks/idempotence. Auth reuses the
MQTT AuthenticationProfileUri/ResourceUri + secret-store pattern.

Bumps the Microsoft.Extensions.*/System.* 10.0.8 family to 10.0.9 (required by the Dekaf
net10 package). Adds Testcontainers 4.13.0 for the upcoming Docker Kafka integration test.

Builds clean (0 warnings) on net10.0 (Dekaf/AOT), net8.0 and net48 (Confluent).
Transport-agnostic HA foundation for Part 14 Sec 9.1.6, additive and behaviour-neutral:
- PubSubRedundancyMode (None/Cold/Warm/Hot), PubSubComponentRole (Active/Standby),
  PubSubRoleChangedEventArgs.
- IPubSubActivationCoordinator (per-component Active/Standby decision + RoleChanged) with
  the default AlwaysActiveCoordinator (no-op: every component active, so non-redundant
  deployments are unaffected).
- IPubSubLeaseStore + PubSubLease (monotonic fencing token) for leader election, with a
  single-process InMemoryPubSubLeaseStore.

The runtime wiring (pause standby / resume active in WriterGroup/ReaderGroup/PubSubConnection)
and the lease-based coordinator + DI follow. Builds clean (0 warnings) on net10.
LeaseActivationCoordinator elects the active instance of each component via
leadership leases in a shared IPubSubLeaseStore: the lease holder is Active, others
are Standby and take over automatically when renewal stops. Per-component background
renewal/acquire loops raise RoleChanged on transitions; fencing tokens guard against
superseded owners. Cross-TFM (uses the TimeProvider.Delay extension and Cancel()).
Builds clean (0 warnings) on net10 and net48.
Add IPubSubBuilder.WithActivationCoordinator / WithLeaseStore and register the default
AlwaysActiveCoordinator (no-op) so non-redundant deployments are unaffected. Completes the
transport-agnostic HA foundation; the runtime pause/resume wiring consumes
IPubSubActivationCoordinator from DI. Builds clean (0 warnings) on net10.
Add KafkaUadp/KafkaJson profiles to ConsoleReferencePubSubClient (publisher + subscriber),
AddKafkaTransport wiring, profile->URI mapping, kafka://localhost:9092 default, and the
Opc.Ua.PubSub.Kafka ProjectReference. Since net10 Kafka uses the pure-managed Dekaf client,
Kafka is in the AOT-published sample directly (no conditional hack).

Add Docs/PubSubKafka.md (transport config, DI, auth, QoS, AOT note) linked from Docs/README.md
and the Docs/PubSub.md transport table.

Also implement the two new IPubSubBuilder HA methods (WithActivationCoordinator/WithLeaseStore)
in the EthTransportBuilder/UdpTransportBuilder decorators.

Validated: NativeAOT publish (win-x64) of the sample succeeds and produces a 26.9 MB native
binary with the Kafka/Dekaf transport and ZERO IL2xxx/IL3xxx trim/AOT warnings.
…3942)

New Opc.Ua.PubSub.Kafka.Tests: a FakeKafkaClientAdapter over a shared in-memory topic bus
(publisher/subscriber fakes exchange preserved KafkaMessage records) plus endpoint-parser,
options, QoS-mapping, encoding/content-type, factory, lifecycle, edge, produce->consume
round-trip, DI, and guard tests linked to Part 14 Annex B.2. A Testcontainers.Kafka
integration test (net8.0+) runs a real produce->consume round-trip against a Kafka container
and Assert.Ignore-skips when Docker is unavailable.

82 non-integration tests pass on net10 (1 Docker test skipped without Docker); Kafka source
line coverage 80.28% (meets the 80% bar). Builds clean on net10 and net48.
…tests (#3942, #3924)

Consume IPubSubActivationCoordinator in the runtime so redundant publishers/subscribers
fail over (Part 14 Sec 9.1.6): PubSubApplication starts the coordinator before enabling
connections and stops it after disabling them; WriterGroup and ReaderGroup consult the
coordinator per component id (pubsub:writergroup:<conn>:<group> / pubsub:readergroup:...)
and gate at the enable/pause boundary — Standby pauses publishing/dispatch, Active resumes,
driven by RoleChanged. Strictly no-op under the default AlwaysActiveCoordinator, so
non-redundant behaviour is unchanged.

Kafka: AtLeastOnce delivery guarantee now also enables idempotent producing (alongside
ExactlyOnce); GroupId already drives Kafka-native consumer-group subscriber failover.

Tests: new PubSubActivationFailoverTests (leader election + failover, fencing-token
monotonicity, writer-group standby pause/resume, reader-group standby suppression) over the
in-memory lease store with a FakeTimeProvider. Opc.Ua.PubSub.Tests 1134 passed/0 failed
(1130 existing + 4 new); Kafka tests 82 passed; net10/net48 build 0 warnings.
Copilot AI review requested due to automatic review settings July 3, 2026 12:58

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot couldn't run its full agentic review because no GitHub Actions runner was available. Make sure your repository has a runner available to run Copilot's review, or add a copilot-setup-steps.yml file specifying one with the runs-on attribute. See the docs for more details.

Adds an OPC UA Part 14 Annex B.2 Apache Kafka broker transport plus transport-agnostic high-availability activation/lease coordination, and wires both into DI, samples, docs, and tests.

Changes:

  • Introduces new Opc.Ua.PubSub.Kafka library (DI registration, options, endpoint parsing, QoS mapping, and adapter seam).
  • Adds HA coordination primitives (IPubSubActivationCoordinator, lease store + coordinator) and gates WriterGroup/ReaderGroup runtime behavior based on Active/Standby.
  • Updates reference sample, documentation, solution/project files, dependencies, and adds a dedicated Kafka test suite (unit + Docker integration).

Reviewed changes

Copilot reviewed 65 out of 65 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
UA.slnx Adds Kafka library and Kafka test project to the solution.
Tests/Opc.Ua.PubSub.Kafka.Tests/Opc.Ua.PubSub.Kafka.Tests.csproj New Kafka test project (multi-TFM + conditional Testcontainers dependency).
Tests/Opc.Ua.PubSub.Kafka.Tests/KafkaTransportServiceCollectionExtensionsTests.cs Tests DI registration and configuration binding for Kafka transport.
Tests/Opc.Ua.PubSub.Kafka.Tests/KafkaTestHelper.cs Shared test helpers to construct Kafka transports, factories, and connections.
Tests/Opc.Ua.PubSub.Kafka.Tests/KafkaQosMappingTests.cs Tests QoS mapping to Kafka producer settings.
Tests/Opc.Ua.PubSub.Kafka.Tests/KafkaPubSubTransportFactoryTests.cs Tests Kafka transport factory validation and option resolution.
Tests/Opc.Ua.PubSub.Kafka.Tests/KafkaIntegrationDockerTests.cs Adds Docker/Testcontainers integration test for real broker round trip.
Tests/Opc.Ua.PubSub.Kafka.Tests/KafkaEndpointParserTests.cs Tests Kafka endpoint parsing and validation.
Tests/Opc.Ua.PubSub.Kafka.Tests/KafkaEncodingTests.cs Tests encoding helpers and profile URIs.
Tests/Opc.Ua.PubSub.Kafka.Tests/KafkaConnectionOptionsTests.cs Tests Kafka option defaults and configuration binding.
Tests/Opc.Ua.PubSub.Kafka.Tests/KafkaClientAdapterGuardTests.cs Tests guard/validation behavior for Dekaf adapter (net10+).
Tests/Opc.Ua.PubSub.Kafka.Tests/KafkaBrokerTransportRoundTripTests.cs In-memory bus producer→consumer transport round trips.
Tests/Opc.Ua.PubSub.Kafka.Tests/KafkaBrokerTransportLifecycleTests.cs Lifecycle/state-event tests for Kafka broker transport.
Tests/Opc.Ua.PubSub.Kafka.Tests/KafkaBrokerTransportEdgeTests.cs Edge-case coverage for topic routing, headers, cancellation, dispose.
Tests/Opc.Ua.PubSub.Kafka.Tests/FakeKafkaClientAdapter.cs In-memory fake adapter/bus/factory used by Kafka tests.
Libraries/Opc.Ua.PubSub/Redundancy/PubSubRoleChangedEventArgs.cs New event args type for role change notifications.
Libraries/Opc.Ua.PubSub/Redundancy/PubSubRedundancyMode.cs Adds redundancy mode enum (None/Cold/Warm/Hot).
Libraries/Opc.Ua.PubSub/Redundancy/PubSubLease.cs Adds lease record struct (lease key/owner/fencing token/expiry).
Libraries/Opc.Ua.PubSub/Redundancy/PubSubComponentRole.cs Adds Active/Standby role enum.
Libraries/Opc.Ua.PubSub/Redundancy/LeaseActivationCoordinator.cs Adds lease-based activation coordinator implementation.
Libraries/Opc.Ua.PubSub/Redundancy/InMemoryPubSubLeaseStore.cs Adds in-memory lease store implementation.
Libraries/Opc.Ua.PubSub/Redundancy/IPubSubLeaseStore.cs Adds lease store abstraction used for HA activation.
Libraries/Opc.Ua.PubSub/Redundancy/IPubSubActivationCoordinator.cs Adds abstraction for per-component Active/Standby coordination.
Libraries/Opc.Ua.PubSub/Redundancy/AlwaysActiveCoordinator.cs Default coordinator that keeps behavior unchanged (always active).
Libraries/Opc.Ua.PubSub/Groups/WriterGroup.cs Adds HA role gating + role-change subscriptions for publishing.
Libraries/Opc.Ua.PubSub/Groups/ReaderGroup.cs Adds HA role gating + role-change subscriptions for dispatching.
Libraries/Opc.Ua.PubSub/DependencyInjection/PubSubBuilder.cs Adds builder extension points for activation coordinator + lease store.
Libraries/Opc.Ua.PubSub/DependencyInjection/OpcUaPubSubBuilderExtensions.cs Registers default activation coordinator and wires it into PubSubApplication DI.
Libraries/Opc.Ua.PubSub/DependencyInjection/IPubSubBuilder.cs Updates builder interface to expose HA wiring methods.
Libraries/Opc.Ua.PubSub/Connections/PubSubConnection.cs Propagates activation coordinator into reader/writer groups and applies roles on enable/start.
Libraries/Opc.Ua.PubSub/Application/PubSubApplication.cs Starts/stops coordinator with app lifecycle and applies activation roles after start.
Libraries/Opc.Ua.PubSub.Udp/DependencyInjection/UdpTransportBuilder.cs Plumbs new builder methods through UDP transport builder wrapper.
Libraries/Opc.Ua.PubSub.Eth/DependencyInjection/EthTransportBuilder.cs Plumbs new builder methods through Ethernet transport builder wrapper.
Libraries/Opc.Ua.PubSub.Kafka/Properties/AssemblyInfo.cs Adds assembly-level CLS compliance attribute for Kafka library.
Libraries/Opc.Ua.PubSub.Kafka/Opc.Ua.PubSub.Kafka.csproj New Kafka transport library project with dual client dependency by TFM.
Libraries/Opc.Ua.PubSub.Kafka/NugetREADME.md Package readme for Kafka transport (usage + limitations).
Libraries/Opc.Ua.PubSub.Kafka/KafkaTopicOptions.cs Adds topic-prefix options for fallback topic naming.
Libraries/Opc.Ua.PubSub.Kafka/KafkaTlsOptions.cs Adds TLS options bag for librdkafka/Dekaf client configuration.
Libraries/Opc.Ua.PubSub.Kafka/KafkaQualityOfService.cs Adds QoS enums + mapping helpers for acks/idempotence.
Libraries/Opc.Ua.PubSub.Kafka/KafkaPubSubTransportFactory.cs Adds Kafka transport factory resolving endpoint/auth/QoS/password secret.
Libraries/Opc.Ua.PubSub.Kafka/KafkaProfiles.cs Adds Kafka transport profile URI constants.
Libraries/Opc.Ua.PubSub.Kafka/KafkaMessage.cs Defines adapter message envelope (topic/key/value/content-type/headers).
Libraries/Opc.Ua.PubSub.Kafka/KafkaIncomingMessageEventArgs.cs Defines event args for adapter-delivered messages.
Libraries/Opc.Ua.PubSub.Kafka/KafkaEndpointParser.cs Adds parser for kafka/kafkas bootstrap lists and validation.
Libraries/Opc.Ua.PubSub.Kafka/KafkaEndpoint.cs Defines parsed endpoint struct (bootstrap servers + TLS flag).
Libraries/Opc.Ua.PubSub.Kafka/KafkaEncoding.cs Adds encoding enum and content-type/topic-segment mapping.
Libraries/Opc.Ua.PubSub.Kafka/KafkaConnectionStateChangedEventArgs.cs Defines event args for adapter connection state changes.
Libraries/Opc.Ua.PubSub.Kafka/KafkaConnectionOptions.cs Adds full Kafka connection option surface (security/QoS/offsets/timeouts).
Libraries/Opc.Ua.PubSub.Kafka/Internal/IKafkaClientFactory.cs Adds adapter factory interface used by transport (TFM-specific impls).
Libraries/Opc.Ua.PubSub.Kafka/Internal/IKafkaClientAdapter.cs Adds adapter interface for broker clients (Confluent/Dekaf/fakes).
Libraries/Opc.Ua.PubSub.Kafka/Internal/DekafKafkaClientFactory.cs net10+ factory using Dekaf managed client.
Libraries/Opc.Ua.PubSub.Kafka/Internal/ConfluentKafkaClientFactory.cs non-net10 factory using Confluent.Kafka (native librdkafka).
Libraries/Opc.Ua.PubSub.Kafka/DependencyInjection/KafkaTransportServiceCollectionExtensions.cs Adds AddKafkaTransport() DI extensions registering factories + options.
Docs/README.md Links to Kafka transport docs from docs index.
Docs/PubSubKafka.md Adds Kafka transport configuration/DI/topic/QoS/auth/AOT documentation.
Docs/PubSub.md Updates transport list/table and adds Kafka transport section.
Directory.Packages.props Adds Kafka/Testcontainers deps and bumps Microsoft.Extensions.*/System.* to 10.0.9 family.
Applications/ConsoleReferencePubSubClient/SubscriberConfigurationBuilder.cs Adds Kafka profiles/endpoints/topics to subscriber sample config builder.
Applications/ConsoleReferencePubSubClient/PublisherConfigurationBuilder.cs Adds Kafka profiles/endpoints/topics to publisher sample config builder.
Applications/ConsoleReferencePubSubClient/Program.cs Adds Kafka profiles to CLI and DI wiring in reference client.
Applications/ConsoleReferencePubSubClient/ConsoleReferencePubSubClient.csproj Adds project reference to Kafka transport library.
Comments suppressed due to low confidence (3)

Libraries/Opc.Ua.PubSub/Redundancy/InMemoryPubSubLeaseStore.cs:1

  • TryRenewAsync renews a lease even if it is already expired (current.ExpiresAt <= now). That breaks HA failover semantics (a stalled owner can 'revive' an expired lease with the same fencing token). Add an expiry check (e.g., fail renewal when current.ExpiresAt <= now) so an expired lease requires reacquisition (and fencing token can advance).
    Libraries/Opc.Ua.PubSub/Redundancy/LeaseActivationCoordinator.cs:1
  • When renewal fails, the loop immediately continues into a new acquire attempt without any delay, which can cause a tight retry loop (high CPU) if the lease store keeps rejecting renewals/acquisitions. Consider delaying by m_retryInterval (or a small backoff) after losing the lease (and/or after acquire failure) to avoid busy-spinning under persistent contention.
    Libraries/Opc.Ua.PubSub/Groups/WriterGroup.cs:1
  • The default componentId format uses a double-colon (pubsub:writergroup::...), which is inconsistent with the documented examples elsewhere in this PR (e.g., pubsub:writergroup:WriterGroup1 / pubsub:writergroup:<connection>:<group>). This increases the risk of mismatched IDs between coordinator events and group instances. Prefer a single-colon form (and keep it consistent with PubSubConnection.BuildWriterGroupComponentId).

Comment thread Libraries/Opc.Ua.PubSub.Kafka/Internal/IKafkaClientFactory.cs
Comment thread Libraries/Opc.Ua.PubSub.Kafka/KafkaPubSubTransportFactory.cs
Comment thread Libraries/Opc.Ua.PubSub.Kafka/KafkaQualityOfService.cs
Comment thread Docs/PubSubKafka.md Outdated
Comment thread Libraries/Opc.Ua.PubSub.Kafka/NugetREADME.md Outdated
@codecov

codecov Bot commented Jul 3, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 78.38828% with 295 lines in your changes missing coverage. Please review.
✅ Project coverage is 73.55%. Comparing base (26ef554) to head (52d243a).
⚠️ Report is 1 commits behind head on master.

Files with missing lines Patch % Lines
...a.PubSub.Kafka/Internal/DekafKafkaClientAdapter.cs 75.44% 69 Missing and 14 partials ⚠️
...raries/Opc.Ua.PubSub.Kafka/KafkaBrokerTransport.cs 76.45% 57 Missing and 24 partials ⚠️
...Ua.PubSub/Redundancy/LeaseActivationCoordinator.cs 63.77% 31 Missing and 15 partials ⚠️
...Opc.Ua.PubSub.Kafka/KafkaPubSubTransportFactory.cs 91.76% 5 Missing and 9 partials ⚠️
...ction/KafkaTransportServiceCollectionExtensions.cs 67.56% 6 Missing and 6 partials ⚠️
Libraries/Opc.Ua.PubSub/Groups/WriterGroup.cs 83.05% 5 Missing and 5 partials ⚠️
Libraries/Opc.Ua.PubSub.Kafka/KafkaTlsOptions.cs 0.00% 8 Missing ⚠️
...ies/Opc.Ua.PubSub/Application/PubSubApplication.cs 61.90% 8 Missing ⚠️
...Opc.Ua.PubSub/DependencyInjection/PubSubBuilder.cs 0.00% 8 Missing ⚠️
Libraries/Opc.Ua.PubSub/Groups/ReaderGroup.cs 86.66% 4 Missing and 4 partials ⚠️
... and 6 more
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #3949      +/-   ##
==========================================
+ Coverage   73.52%   73.55%   +0.02%     
==========================================
  Files        1170     1187      +17     
  Lines      170163   171533    +1370     
  Branches    29361    29639     +278     
==========================================
+ Hits       125117   126164    +1047     
- Misses      34043    34280     +237     
- Partials    11003    11089      +86     
Files with missing lines Coverage Δ
...ries/Opc.Ua.PubSub.Kafka/KafkaConnectionOptions.cs 100.00% <100.00%> (ø)
...bSub.Kafka/KafkaConnectionStateChangedEventArgs.cs 100.00% <100.00%> (ø)
Libraries/Opc.Ua.PubSub.Kafka/KafkaEncoding.cs 100.00% <100.00%> (ø)
...c.Ua.PubSub.Kafka/KafkaIncomingMessageEventArgs.cs 100.00% <100.00%> (ø)
...aries/Opc.Ua.PubSub.Kafka/KafkaQualityOfService.cs 100.00% <100.00%> (ø)
Libraries/Opc.Ua.PubSub.Kafka/KafkaTopicOptions.cs 100.00% <100.00%> (ø)
...ries/Opc.Ua.PubSub/Connections/PubSubConnection.cs 67.84% <100.00%> (+0.53%) ⬆️
...ependencyInjection/OpcUaPubSubBuilderExtensions.cs 71.81% <100.00%> (+0.57%) ⬆️
...pc.Ua.PubSub/Redundancy/AlwaysActiveCoordinator.cs 100.00% <100.00%> (ø)
...Ua.PubSub/Redundancy/PubSubRoleChangedEventArgs.cs 66.66% <66.66%> (ø)
... and 15 more

... and 26 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@marcschier marcschier marked this pull request as draft July 3, 2026 13:50
Comment thread Docs/PubSubKafka.md Outdated
Comment thread Libraries/Opc.Ua.PubSub/Redundancy/PubSubRedundancyMode.cs
Inline review comments:
- KafkaPubSubTransportFactory: deep-clone Tls/Topics options per connection so
  the automatic kafkas TLS upgrade no longer mutates the shared default options
  (added KafkaTlsOptions.Clone / KafkaTopicOptions.Clone).
- KafkaQualityOfService: align AtLeastOnce enum XML-doc with the mapping, which
  enables the idempotent producer (acks=all + enable.idempotence=true).
- Docs/PubSubKafka.md: same AtLeastOnce idempotence correction.
- NugetREADME: rewrite the NativeAOT note to document the per-TFM client split
  (net10 uses managed Dekaf = AOT-compatible; other TFMs use Confluent/librdkafka
  = not AOT-compatible).
- IKafkaClientFactory internal member: no change (mirrors the shipped
  Opc.Ua.PubSub.Mqtt IMqttClientFactory pattern; builds clean on all TFMs).

Low-confidence findings also addressed:
- InMemoryPubSubLeaseStore.TryRenewAsync now rejects renewal of an expired lease
  so the owner must reacquire (advancing the fencing token). Added a test.
- LeaseActivationCoordinator waits retryInterval after losing a lease before
  re-acquiring, avoiding a tight retry loop.
- WriterGroup/ReaderGroup default componentId no longer emits a double colon
  (pubsub:writergroup:<name> instead of pubsub:writergroup::<name>).
Addresses PR #3949 review feedback:
- Merge Docs/PubSubKafka.md into Docs/PubSub.md as the last transport under
  "Transports" (Apache Kafka now follows the DTLS transport-status note),
  condensing the code snippet and pointing at the reference sample for full
  publisher/subscriber configurations. Delete PubSubKafka.md and repoint the
  Docs/README.md and NugetREADME.md links to PubSub.md#apache-kafka.
- Document PubSub high availability + redundancy in PubSub.md: renamed the HA
  section, added an "Active/standby activation" subsection covering
  IPubSubActivationCoordinator, IPubSubLeaseStore, PubSubComponentRole,
  fencing tokens, AlwaysActiveCoordinator/LeaseActivationCoordinator, and
  PubSubRedundancyMode.
- Align the redundancy docs to #3918's core abstractions
  (Opc.Ua.Redundancy.ISharedKeyValueStore CompareAndSwapAsync + ILeaderElection,
  ServiceLevel) and note the intended convergence of the PubSub lease store onto
  the shared compare-and-swap primitive; cross-reference Docs/HighAvailability.md.
- Update the Native AOT section for the Kafka per-TFM client split.
GetBootstrapAddress() returns a scheme-qualified URI (e.g.
PLAINTEXT://127.0.0.1:49158); the test prepended kafka:// verbatim, producing
a malformed kafka://PLAINTEXT://... endpoint that KafkaEndpointParser rejects
with 'empty port component'. This only surfaced on CI runners where Docker is
available (the test skips locally). Extract the host:port authority from the
bootstrap URI so the kafka:// endpoint is well-formed.
On net10.0 the transport uses the Dekaf client, whose consumer requires a
Kafka 4.0+ broker (KIP-848 ConsumerGroupHeartbeat API). The test used
confluentinc/cp-kafka:7.5.12 (Kafka 3.5), so the real-broker round trip threw
BrokerVersionException and the subscriber never received the frame. Switch to
apache/kafka:4.0.0 (KRaft, new consumer group protocol enabled by default);
the Confluent client on other TFMs uses the classic protocol, also supported.
@marcschier marcschier marked this pull request as ready for review July 4, 2026 09:36
…ce (CI)

With the apache/kafka:4.0.0 broker the Dekaf consumer now connects, but the
test produced a single record immediately after opening the subscriber, before
the consumer had joined the group and received its partition assignment, so the
subscriber never observed the record (frame was null after 20s). Resend the
same topic/payload every 2s until a record is observed or a 30s deadline
elapses; a genuine no-delivery still fails the assertion.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement Kafka transport for PubSub

2 participants