Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d5285d8
ark-dev: examples/qwen3: TP all-reduce component — mscclpp fused-pack…
Jun 13, 2026
e0b1323
ark-dev: examples/qwen3: TP all-reduce component — mscclpp fused-pack…
Jun 14, 2026
a790cb0
ark: fix composed-graph cudaErrorMisalignedAddress crash at 4D produc…
chhwang Jun 15, 2026
c9c5872
ci: add --no-install-recommends to lcov install to prevent OOM
chhwang Jun 15, 2026
7bd4587
Merge remote-tracking branch 'origin/qwen3-qb-graphbug' into qwen3-q7…
chhwang Jun 15, 2026
fd1b79f
ci+qwen3: move Qwen3 tests before coverage, add world_size validation…
chhwang Jun 15, 2026
32d075a
fix(bench): remove sys.path hack that shadows ark with C++ source dir
chhwang Jun 15, 2026
27145c7
test(qwen3): xfail 4 multi-GPU allreduce tests blocked by codegen lim…
chhwang Jun 15, 2026
1e5592c
bench(qwen3): add PERF_GATE line to allreduce bench, handle worker fa…
chhwang Jun 15, 2026
5d12e68
fix(codegen+allreduce): support external-buffer OFFSET, internalize i…
chhwang Jun 16, 2026
1147e28
fix(bench): correct ark.init() ordering in bench_allreduce worker
chhwang Jun 16, 2026
c7fd8c5
Merge branch 'main' into qwen3-q7-allreduce
chhwang Jun 16, 2026
8a3993e
fix(allreduce): ARK_ROOT subprocess env + Executor.reset() teardown w…
chhwang Jun 16, 2026
3bb35d7
Fix PR #268 ( → ) perf-gate failure: bench subprocess workers fail wi…
chhwang Jun 16, 2026
5d56e48
Fix PR #268 ( → ) perf-gate failure: bench subprocess workers fail wi…
chhwang Jun 16, 2026
cb104f9
Fix PR #268 ( → ) perf-gate failure: bench subprocess workers fail wi…
chhwang Jun 16, 2026
d58aee9
Fix PR #268 ( → ) red check. Add real unit tests covering the new co…
chhwang Jun 16, 2026
d020080
Fix PR #268 ( → ) red check on commit . Read the CI log for the Unit…
chhwang Jun 16, 2026
c3b0aea
Fix PR #268 ( → ) red . Previous cycle diagnosed red as runner infra…
chhwang Jun 17, 2026
7b3845e
Fix PR #268 ( → ) red . Previous cycle diagnosed red as runner infra…
chhwang Jun 17, 2026
e414822
Fix PR #268 ( → ) red . Previous cycle diagnosed red as runner infra…
chhwang Jun 17, 2026
db7dc3a
Fix PR #268 ( → ) red . Attempt 22 () CI red on . Add real tests cove…
chhwang Jun 17, 2026
437824e
Continue Q7 (#268, branch , base ): run on available GPUs, record th…
chhwang Jun 17, 2026
dada256
Continue Q7 (#268, branch , base ): run on available GPUs, record th…
chhwang Jun 17, 2026
7ef8416
Continue Q7 (#268, branch , base ): run on available GPUs, record th…
chhwang Jun 17, 2026
997365b
Q7 allreduce: fix linters failure (clang-format at codegen_test.cpp:1…
chhwang Jun 17, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ ignore:
- "*/dist-packages/*"
- "*/third_party/*"
- "*/ark/*_test.*"
- "*/examples/*"
- "examples/**"
- "*/python/unittest/*"
- "*/ark/unittest/*"
- "*/ark/ops/ops_test_common.*"
Expand Down
12 changes: 11 additions & 1 deletion .github/workflows/ut.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,16 @@ jobs:
--verbose \
../python/unittest/

- name: Run Qwen3 Example Tests
if: github.event_name != 'schedule'
run: |
cd build
PYTHONPATH=$PWD/python ARK_ROOT=$PWD python3 -m pytest \
--cov=../examples/qwen3 \
--cov-report lcov:qwen3_coverage.info \
--verbose \
../examples/qwen3/

- name: C++ Coverage
if: github.event_name != 'schedule'
run: |
Expand All @@ -111,7 +121,7 @@ jobs:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
run: |
cd build
lcov -a cpp_coverage.info -a py_coverage.info -o coverage.info
lcov -a cpp_coverage.info -a py_coverage.info $([ -f qwen3_coverage.info ] && echo "-a qwen3_coverage.info") -o coverage.info
bash <(curl -s https://codecov.io/bash) -f coverage.info || echo "Codecov did not collect coverage reports"

- name: Install Python
Expand Down
19 changes: 10 additions & 9 deletions ark/codegen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,16 +315,17 @@ std::string CodeGenerator::Impl::def_task(const Json &task_json) {
size_t buffer_id = moff.buffer_id();
auto buf_info = buf_reg.get(buffer_id);
if (buf_info && buf_info->is_external) {
ERR(InternalError, "cannot offset external buffer");
}
size_t buffer_offset;
auto it = buffer_id_to_offset_.find(buffer_id);
if (it == buffer_id_to_offset_.end()) {
ERR(InternalError, "buffer ID not found: ", buffer_id);
// External buffer: offset relative to its own base.
ss_desc << moff.value();
} else {
auto it = buffer_id_to_offset_.find(buffer_id);
if (it == buffer_id_to_offset_.end()) {
ERR(InternalError, "buffer ID not found: ", buffer_id);
}
size_t buffer_offset = it->second;
size_t offset = buffer_offset + moff.value();
ss_desc << offset;
}
buffer_offset = it->second;
size_t offset = buffer_offset + moff.value();
ss_desc << offset;
} else {
ss_desc << arg.serialize().begin().value();
}
Expand Down
173 changes: 173 additions & 0 deletions ark/codegen_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

#include "codegen.hpp"

#include <map>
#include <nlohmann/json.hpp>
#include <set>

#include "ark/model.hpp"
#include "ark/planner.hpp"
#include "buffer_registry.hpp"
#include "model/model_buffer.hpp"
#include "model/model_node.hpp"
#include "model/model_op.hpp"
#include "model/model_op_arg.hpp"
#include "model/model_tensor.hpp"
#include "unittest/unittest_utils.h"

// Collect all buffer IDs referenced by OFFSET args in a plan's TaskInfos.
static std::set<size_t> collect_offset_buffer_ids(const ark::Json &plan) {
std::set<size_t> ids;
for (auto &ti : plan.at("TaskInfos")) {
for (auto &op_json : ti.at("Ops")) {
auto op = ark::ModelOp::deserialize(op_json);
auto args = op->impl_args(op_json.at("Config"));
for (auto &arg : args) {
if (arg.type_name() == "OFFSET") {
ids.insert(arg.value<ark::ModelOffset>().buffer_id());
}
}
}
}
return ids;
}

// Collect all buffer IDs referenced by TENSOR args in a plan's TaskInfos.
static std::set<size_t> collect_tensor_buffer_ids(const ark::Json &plan) {
std::set<size_t> ids;
for (auto &ti : plan.at("TaskInfos")) {
for (auto &op_json : ti.at("Ops")) {
auto op = ark::ModelOp::deserialize(op_json);
auto args = op->impl_args(op_json.at("Config"));
for (auto &arg : args) {
if (arg.type_name() == "TENSOR") {
ids.insert(
arg.value<ark::ModelTensorRef>()->buffer()->id());
}
}
}
}
return ids;
}

// Test 1: CodeGenerator exercises the external-buffer OFFSET path
// (codegen.cpp line 319: `ss_desc << moff.value();`).
ark::unittest::State test_codegen_external_buffer_offset() {
// Build a 2-rank model with a send_packet op on rank 0.
// send_packet's impl_args are two OFFSET args whose buffer IDs we will
// register as external in BufferRegistry before constructing CodeGenerator.
ark::Model model(0, 2);
ark::Tensor tns = model.tensor({1024}, ark::FP16);
model.send_packet(tns, 1, /*tag=*/0, /*flag=*/1);

// Plan on GPU 0.
ark::Planner planner(model, 0);
auto plan = ark::Json::parse(planner.plan(false));

// Verify the plan has TaskInfos.
UNITTEST_TRUE(plan.contains("TaskInfos"));
UNITTEST_TRUE(plan["TaskInfos"].size() > 0);

// Collect OFFSET and TENSOR buffer IDs from the plan.
auto offset_buf_ids = collect_offset_buffer_ids(plan);
auto tensor_buf_ids = collect_tensor_buffer_ids(plan);
UNITTEST_TRUE(offset_buf_ids.size() > 0);

// Register every OFFSET buffer as external in BufferRegistry.
// Use a dummy non-null pointer; CodeGenerator only checks is_external,
// it does not dereference the pointer.
auto &buf_reg = ark::BufferRegistry::get_instance();
for (size_t id : offset_buf_ids) {
buf_reg.set(id, reinterpret_cast<void *>(0x1), 0, /*is_external=*/true);
}

// All referenced buffer IDs go into extra_buffer_ids (external).
std::set<size_t> extra;
extra.insert(offset_buf_ids.begin(), offset_buf_ids.end());
extra.insert(tensor_buf_ids.begin(), tensor_buf_ids.end());

// Construct CodeGenerator — exercises the external OFFSET path.
ark::PlanJson pj(plan);
ark::CodeGenerator codegen(pj, /*buffer_id_to_offset=*/{}, extra);

// Verify non-empty generated code.
std::string code = codegen.code();
UNITTEST_TRUE(code.size() > 0);

return ark::unittest::State::SUCCESS;
}

// Test 2: CodeGenerator exercises the normal (non-external) OFFSET path
// (codegen.cpp lines 320-325: buffer_id_to_offset_ lookup).
// Also exercises Model::all_reduce_packet which covers the new
// `input = this->copy(input)` line in ops_all_reduce.cpp:57.
ark::unittest::State test_codegen_normal_offset() {
// Build a 2-rank model using all_reduce_packet (exercises
// ops_all_reduce.cpp line 57: `input = this->copy(input)`).
ark::Model model(0, 2);
ark::Tensor tns = model.tensor({1024}, ark::FP16);
model.all_reduce_packet(tns, 0, 2);

// Plan on GPU 0.
ark::Planner planner(model, 0);
auto plan = ark::Json::parse(planner.plan(false));
UNITTEST_TRUE(plan.contains("TaskInfos"));
UNITTEST_TRUE(plan["TaskInfos"].size() > 0);

// Collect ALL buffer IDs (OFFSET + TENSOR) from the plan.
auto offset_buf_ids = collect_offset_buffer_ids(plan);
auto tensor_buf_ids = collect_tensor_buffer_ids(plan);
UNITTEST_TRUE(offset_buf_ids.size() > 0);

// Put all buffer IDs in buffer_id_to_offset_ with offset 0.
// Do NOT register them as external in BufferRegistry.
std::map<size_t, size_t> buf_id_to_offset;
for (size_t id : offset_buf_ids) {
buf_id_to_offset[id] = 0;
}
for (size_t id : tensor_buf_ids) {
buf_id_to_offset[id] = 0;
}

// Construct CodeGenerator — exercises the normal OFFSET path
// (buffer_id_to_offset_ lookup, lines 320-325 of codegen.cpp).
ark::PlanJson pj(plan);
ark::CodeGenerator codegen(pj, buf_id_to_offset, {});

std::string code = codegen.code();
UNITTEST_TRUE(code.size() > 0);

return ark::unittest::State::SUCCESS;
}

// Test 3: CodeGenerator throws InternalError when an OFFSET arg's buffer ID
// is neither external nor in buffer_id_to_offset (codegen.cpp line 323).
ark::unittest::State test_codegen_missing_buffer_id() {
// Build a fresh model so its buffer IDs are new (not in BufferRegistry
// from test 1).
// Safe because ModelBuffer::curr_id is a monotonically-increasing static
// counter — IDs allocated here will never collide with test 1's entries.
ark::Model model(0, 2);
ark::Tensor tns = model.tensor({512}, ark::FP16);
model.send_packet(tns, 1, /*tag=*/0, /*flag=*/1);

ark::Planner planner(model, 0);
auto plan = ark::Json::parse(planner.plan(false));

// Do NOT register any buffer in BufferRegistry.
// Do NOT populate buffer_id_to_offset.
// CodeGenerator should throw InternalError on the OFFSET lookup.
ark::PlanJson pj(plan);
UNITTEST_THROW(ark::CodeGenerator(pj, {}, {}), ark::InternalError);

return ark::unittest::State::SUCCESS;
}

int main() {
UNITTEST(test_codegen_external_buffer_offset);
UNITTEST(test_codegen_normal_offset);
UNITTEST(test_codegen_missing_buffer_id);
return 0;
}
6 changes: 6 additions & 0 deletions ark/ops/ops_all_reduce.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ Tensor Model::all_reduce_packet(Tensor input, int rank, int rank_num,
ERR(ModelError, "all_reduce_packet requires rank_num >= 2");
}

// Copy input into an internal buffer so it resides in mscclpp
// registered memory. External buffers (e.g. from torch tensors)
// are not part of the registered allocation, and putPackets needs
// to read from a registered source.
input = this->copy(input);

if (output.is_null()) {
output = this->tensor(input.shape(), input.data_type());
}
Expand Down
100 changes: 100 additions & 0 deletions ark/ops/ops_all_reduce_test.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

#include "ark/executor.hpp"
#include "gpu/gpu.hpp"
#include "model/model_buffer.hpp"
#include "model/model_node.hpp"
#include "model/model_op.hpp"
Expand Down Expand Up @@ -334,11 +336,109 @@ ark::unittest::State test_all_reduce_sm_8gpus() {
return ark::unittest::SUCCESS;
}

template <int NumGpus>
void test_all_reduce_packet_fused_internal(ark::DimType nelem) {
for (int gpu_id = 0; gpu_id < NumGpus; ++gpu_id) {
ark::unittest::spawn_process([gpu_id, nelem]() {
UNITTEST_SKIP(ark::unittest::get_gpu_count() < NumGpus);
// Each GPU's data is equal to its GPU ID + 1.
ark::Model m(gpu_id, NumGpus);
ark::Tensor ones = m.tensor({nelem}, ark::FP16);
ark::Tensor data = m.mul(ones, float(gpu_id + 1));
ark::Tensor output = m.all_reduce_packet(data, gpu_id, NumGpus);

std::vector<ark::half_t> ones_vec(ones.shape().nelems(),
ark::half_t(1.0f));
auto result = ark::op_test(
"all_reduce_packet_fused", m, {ones}, {output},
baseline_all_reduce<ark::half_t, NumGpus>, {ones_vec.data()});
UNITTEST_LOG(result);
UNITTEST_EQ(result.max_diff[0], 0.0f);
return ark::unittest::SUCCESS;
});
}
ark::unittest::wait_all_processes();
}

// Variant with external-buffer (placeholder) input — exercises the
// codegen external-buffer OFFSET path added for all_reduce_packet's
// internal copy. Cannot use op_test() because placeholders require
// pre-allocated GPU memory; drive the executor manually instead.
template <int NumGpus>
void test_all_reduce_packet_fused_ext_internal(ark::DimType nelem) {
for (int gpu_id = 0; gpu_id < NumGpus; ++gpu_id) {
ark::unittest::spawn_process([gpu_id, nelem]() {
UNITTEST_SKIP(ark::unittest::get_gpu_count() < NumGpus);

UNITTEST_EQ(ark::gpuSetDevice(gpu_id), ark::gpuSuccess);

// Allocate GPU memory and fill with (gpu_id + 1).
ark::half_t *d_input = nullptr;
size_t nbytes = nelem * sizeof(ark::half_t);
UNITTEST_EQ(ark::gpuMalloc(&d_input, nbytes), ark::gpuSuccess);
std::vector<ark::half_t> h_input(nelem,
ark::half_t(float(gpu_id + 1)));
UNITTEST_EQ(ark::gpuMemcpy(d_input, h_input.data(), nbytes,
ark::gpuMemcpyHostToDevice),
ark::gpuSuccess);

ark::Model m(gpu_id, NumGpus);
ark::Tensor input =
m.placeholder({nelem}, ark::FP16, {}, {}, {}, -1, d_input);
ark::Tensor output = m.all_reduce_packet(input, gpu_id, NumGpus);

ark::DefaultExecutor exe(m, gpu_id);
exe.launch();
exe.run(1);
exe.stop();

std::vector<ark::half_t> h_output(nelem);
exe.tensor_read(output, h_output);

float expected = float(NumGpus * (NumGpus + 1)) / 2.0f;
for (ark::DimType i = 0; i < nelem; ++i) {
UNITTEST_EQ(float(h_output[i]), expected);
}

UNITTEST_EQ(ark::gpuFree(d_input), ark::gpuSuccess);
return ark::unittest::SUCCESS;
});
}
ark::unittest::wait_all_processes();
}

ark::unittest::State test_all_reduce_packet_fused_ext_2gpus() {
test_all_reduce_packet_fused_ext_internal<2>(4096);
return ark::unittest::SUCCESS;
}

ark::unittest::State test_all_reduce_packet_fused_2gpus() {
test_all_reduce_packet_fused_internal<2>(4096);
test_all_reduce_packet_fused_internal<2>(8192);
return ark::unittest::SUCCESS;
}

ark::unittest::State test_all_reduce_packet_fused_4gpus() {
test_all_reduce_packet_fused_internal<4>(2048);
test_all_reduce_packet_fused_internal<4>(8192);
return ark::unittest::SUCCESS;
}

ark::unittest::State test_all_reduce_packet_fused_8gpus() {
test_all_reduce_packet_fused_internal<8>(2048);
test_all_reduce_packet_fused_internal<8>(8192);
return ark::unittest::SUCCESS;
}

int main() {
UNITTEST(test_all_reduce_4gpus);
UNITTEST(test_all_reduce_8gpus);
UNITTEST(test_all_reduce_packet_4gpus);
UNITTEST(test_all_reduce_packet_8gpus);
UNITTEST(test_all_reduce_packet_fused_ext_2gpus);
UNITTEST(test_all_reduce_packet_fused_2gpus);
UNITTEST(test_all_reduce_packet_fused_4gpus);
UNITTEST(test_all_reduce_packet_fused_8gpus);
UNITTEST(test_all_reduce_sm_4gpus);
UNITTEST(test_all_reduce_sm_8gpus);
UNITTEST(test_all_reduce_inplace_2gpus);
Expand Down
10 changes: 10 additions & 0 deletions examples/qwen3/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

# Submodules:
# ark_allreduce - all-reduce wrapper (active)
# test_allreduce - tests (active)
# bench_allreduce - benchmarks (active)
# equiv - equivalence test helper (staged for inference pipeline)
# microbench - CUDA microbench harness (staged for profiling scripts)
# qwen3_config - model config dataclass (staged for inference pipeline)
Loading
Loading