Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions __perf_gate__.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/usr/bin/env bash
set -uo pipefail

: "${ARK_ROOT:=$PWD}"
export ARK_ROOT
export PYTHONPATH="${PYTHONPATH:-$ARK_ROOT/python}"

target_ms=$(python3 - <<'PY'
import importlib.util
import pathlib

path = pathlib.Path("../examples/qwen3/bench_allreduce.py")
spec = importlib.util.spec_from_file_location("bench_allreduce", path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
print(f"{module._DECODE_TARGET_MS:.4f}")
PY
)

tmpdir=$(mktemp -d)
trap 'rm -rf "$tmpdir"' EXIT
status=0
python3 ../examples/qwen3/bench_allreduce.py --world-size 2 --shape decode >"$tmpdir/tp2.log" 2>"$tmpdir/tp2.err" || status=1
python3 ../examples/qwen3/bench_allreduce.py --world-size 8 --shape decode >"$tmpdir/tp8.log" 2>"$tmpdir/tp8.err" || status=1

ark_ms=$(python3 - "$tmpdir/tp2.log" "$tmpdir/tp8.log" "$status" <<'PY'
import re
import sys

values = []
for name in sys.argv[1:3]:
text = open(name, encoding="utf-8").read()
match = re.search(r"PERF_GATE name=allreduce\s+ark_ms=([0-9.]+)", text)
if match:
values.append(float(match.group(1)))
if int(sys.argv[3]) or len(values) != 2:
print("999999.0000")
else:
print(f"{max(values):.4f}")
PY
)
ratio=$(python3 - "$ark_ms" "$target_ms" <<'PY'
import sys

print(f"{float(sys.argv[1]) / float(sys.argv[2]):.4f}")
PY
)
printf 'PERF_GATE name=allreduce ark_ms=%s sglang_ms=%s ratio=%s\n' "$ark_ms" "$target_ms" "$ratio"
python3 - "$ark_ms" "$target_ms" "$status" <<'PY'
import sys

ark_ms = float(sys.argv[1])
target_ms = float(sys.argv[2])
status = int(sys.argv[3])
if status or ark_ms >= target_ms:
raise SystemExit(1)
PY
6 changes: 4 additions & 2 deletions ark/codegen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,18 +311,20 @@ std::string CodeGenerator::Impl::def_task(const Json &task_json) {
ss_desc << "(" << tns->data_type()->type_str() << "*)_"
<< ptr_idx;
} else if (arg.type_name() == "OFFSET") {
// OFFSET args are offsets into ARK registered memory; external
// placeholders must be passed through TENSOR-pointer kernels or
// staged into internal tensors before OFFSET-only kernels.
auto moff = arg.value<ModelOffset>();
size_t buffer_id = moff.buffer_id();
auto buf_info = buf_reg.get(buffer_id);
if (buf_info && buf_info->is_external) {
ERR(InternalError, "cannot offset external buffer");
}
size_t buffer_offset;
auto it = buffer_id_to_offset_.find(buffer_id);
if (it == buffer_id_to_offset_.end()) {
ERR(InternalError, "buffer ID not found: ", buffer_id);
}
buffer_offset = it->second;
size_t buffer_offset = it->second;
size_t offset = buffer_offset + moff.value();
ss_desc << offset;
} else {
Expand Down
265 changes: 265 additions & 0 deletions ark/codegen_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

#include "codegen.hpp"

#include <map>
#include <nlohmann/json.hpp>
#include <set>
#include <utility>

#include "ark/model.hpp"
#include "ark/planner.hpp"
#include "buffer_registry.hpp"
#include "model/model_buffer.hpp"
#include "model/model_node.hpp"
#include "model/model_op.hpp"
#include "model/model_op_arg.hpp"
#include "model/model_tensor.hpp"
#include "unittest/unittest_utils.h"

// Collect all buffer IDs referenced by OFFSET args in a plan's TaskInfos.
static std::set<size_t> collect_offset_buffer_ids(const ark::Json &plan) {
std::set<size_t> ids;
for (auto &ti : plan.at("TaskInfos")) {
for (auto &op_json : ti.at("Ops")) {
auto op = ark::ModelOp::deserialize(op_json);
auto args = op->impl_args(op_json.at("Config"));
for (auto &arg : args) {
if (arg.type_name() == "OFFSET") {
ids.insert(arg.value<ark::ModelOffset>().buffer_id());
}
}
}
}
return ids;
}

// Collect all buffer IDs referenced by TENSOR args in a plan's TaskInfos.
static std::set<size_t> collect_tensor_buffer_ids(const ark::Json &plan) {
std::set<size_t> ids;
for (auto &ti : plan.at("TaskInfos")) {
for (auto &op_json : ti.at("Ops")) {
auto op = ark::ModelOp::deserialize(op_json);
auto args = op->impl_args(op_json.at("Config"));
for (auto &arg : args) {
if (arg.type_name() == "TENSOR") {
ids.insert(
arg.value<ark::ModelTensorRef>()->buffer()->id());
}
}
}
}
return ids;
}

ark::unittest::State test_codegen_external_buffer_offset_rejected() {
// send_packet kernels receive OFFSET args into registered ARK memory, not
// external base pointers. Marking those buffers external must fail instead
// of generating offsets relative to the wrong base.
ark::Model model(0, 2);
ark::Tensor tns = model.tensor({1024}, ark::FP16);
model.send_packet(tns, 1, /*tag=*/0, /*flag=*/1);

ark::Planner planner(model, 0);
auto plan = ark::Json::parse(planner.plan(false));
UNITTEST_TRUE(plan.contains("TaskInfos"));
UNITTEST_TRUE(plan["TaskInfos"].size() > 0);

auto offset_buf_ids = collect_offset_buffer_ids(plan);
auto tensor_buf_ids = collect_tensor_buffer_ids(plan);
UNITTEST_TRUE(offset_buf_ids.size() > 0);

auto &buf_reg = ark::BufferRegistry::get_instance();
for (size_t id : offset_buf_ids) {
buf_reg.set(id, reinterpret_cast<void *>(0x1), 0, /*is_external=*/true);
}

std::set<size_t> extra;
extra.insert(offset_buf_ids.begin(), offset_buf_ids.end());
extra.insert(tensor_buf_ids.begin(), tensor_buf_ids.end());

ark::PlanJson pj(plan);
UNITTEST_THROW(ark::CodeGenerator(pj, /*buffer_id_to_offset=*/{}, extra),
ark::InternalError);

return ark::unittest::SUCCESS;
}

ark::unittest::State test_codegen_normal_offset() {
// Non-external OFFSET args are resolved through buffer_id_to_offset_.
ark::Model model(0, 2);
ark::Tensor tns = model.tensor({1024}, ark::FP16);
model.all_reduce_packet(tns, 0, 2);

ark::Planner planner(model, 0);
auto plan = ark::Json::parse(planner.plan(false));
UNITTEST_TRUE(plan.contains("TaskInfos"));
UNITTEST_TRUE(plan["TaskInfos"].size() > 0);

auto offset_buf_ids = collect_offset_buffer_ids(plan);
auto tensor_buf_ids = collect_tensor_buffer_ids(plan);
UNITTEST_TRUE(offset_buf_ids.size() > 0);

std::map<size_t, size_t> buf_id_to_offset;
for (size_t id : offset_buf_ids) {
buf_id_to_offset[id] = 0;
}
for (size_t id : tensor_buf_ids) {
buf_id_to_offset[id] = 0;
}

ark::PlanJson pj(plan);
ark::CodeGenerator codegen(pj, buf_id_to_offset, {});

std::string code = codegen.code();
UNITTEST_TRUE(code.size() > 0);

return ark::unittest::SUCCESS;
}

ark::unittest::State test_all_reduce_packet_external_input_is_staged() {
ark::Model model(0, 2);
ark::Tensor input = model.placeholder({1024}, ark::FP16, {}, {}, {}, -1,
reinterpret_cast<void *>(0x1));
model.all_reduce_packet(input, 0, 2);

size_t placeholder_id = input.ref()->buffer()->id();
auto placeholder_info =
ark::BufferRegistry::get_instance().get(placeholder_id);
UNITTEST_TRUE(placeholder_info && placeholder_info->is_external);

std::set<size_t> copy_output_ids;
bool found_copy_from_placeholder = false;
bool found_fused = false;
for (auto &node : model.nodes()) {
auto &op = node->op;
if (op->is_virtual()) {
continue;
}
if (op->type() == ark::ModelOpT::from_name("Copy")) {
auto reads = op->read_tensors();
auto results = op->result_tensors();
UNITTEST_TRUE(reads.size() > 0);
UNITTEST_TRUE(results.size() > 0);
size_t output_id = results[0]->buffer()->id();
copy_output_ids.insert(output_id);
if (reads[0]->buffer()->id() == placeholder_id) {
found_copy_from_placeholder = true;
}
} else if (op->type() ==
ark::ModelOpT::from_name("AllReducePacketFused")) {
found_fused = true;
auto reads = op->read_tensors();
UNITTEST_TRUE(reads.size() > 0);
size_t fused_input_id = reads[0]->buffer()->id();
UNITTEST_TRUE(fused_input_id != placeholder_id);
UNITTEST_TRUE(copy_output_ids.count(fused_input_id) > 0);
auto fused_info =
ark::BufferRegistry::get_instance().get(fused_input_id);
UNITTEST_FALSE(fused_info && fused_info->is_external);
}
}
UNITTEST_TRUE(found_copy_from_placeholder);
UNITTEST_TRUE(found_fused);

return ark::unittest::SUCCESS;
}

ark::unittest::State
test_all_reduce_packet_invalid_external_input_does_not_mutate_graph() {
ark::Model model(0, 2);
ark::Tensor input = model.placeholder({1025}, ark::FP16, {}, {}, {}, -1,
reinterpret_cast<void *>(0x1));

auto count_ops = [&model]() {
std::pair<size_t, size_t> counts{0, 0};
for (auto &node : model.nodes()) {
auto &op = node->op;
if (op->is_virtual()) {
continue;
}
if (op->type() == ark::ModelOpT::from_name("Copy")) {
counts.first++;
} else if (op->type() ==
ark::ModelOpT::from_name("AllReducePacketFused")) {
counts.second++;
}
}
return counts;
};

auto before = count_ops();
UNITTEST_THROW(model.all_reduce_packet(input, 0, 2), ark::ModelError);
auto after = count_ops();
UNITTEST_EQ(before.first, after.first);
UNITTEST_EQ(before.second, after.second);

return ark::unittest::SUCCESS;
}

ark::unittest::State test_all_reduce_packet_internal_input_is_not_staged() {
ark::Model model(0, 2);
ark::Tensor input = model.tensor({1024}, ark::FP16);
model.all_reduce_packet(input, 0, 2);

size_t input_id = input.ref()->buffer()->id();
bool found_copy_from_input = false;
bool found_fused = false;
for (auto &node : model.nodes()) {
auto &op = node->op;
if (op->is_virtual()) {
continue;
}
if (op->type() == ark::ModelOpT::from_name("Copy")) {
auto reads = op->read_tensors();
UNITTEST_TRUE(reads.size() > 0);
if (reads[0]->buffer()->id() == input_id) {
found_copy_from_input = true;
}
} else if (op->type() ==
ark::ModelOpT::from_name("AllReducePacketFused")) {
found_fused = true;
auto reads = op->read_tensors();
UNITTEST_TRUE(reads.size() > 0);
UNITTEST_TRUE(reads[0]->buffer()->id() == input_id);
}
}
UNITTEST_FALSE(found_copy_from_input);
UNITTEST_TRUE(found_fused);

return ark::unittest::SUCCESS;
}

ark::unittest::State test_codegen_missing_buffer_id() {
// Use fresh model buffers so external registrations from earlier tests
// cannot satisfy this OFFSET lookup.
ark::Model model(0, 2);
ark::Tensor tns = model.tensor({512}, ark::FP16);
model.send_packet(tns, 1, /*tag=*/0, /*flag=*/1);

ark::Planner planner(model, 0);
auto plan = ark::Json::parse(planner.plan(false));
auto offset_buf_ids = collect_offset_buffer_ids(plan);
UNITTEST_TRUE(offset_buf_ids.size() > 0);
for (size_t id : offset_buf_ids) {
auto info = ark::BufferRegistry::get_instance().get(id);
UNITTEST_FALSE(info && info->is_external);
}

ark::PlanJson pj(plan);
UNITTEST_THROW(ark::CodeGenerator(pj, {}, {}), ark::InternalError);

return ark::unittest::SUCCESS;
}

int main() {
UNITTEST(test_codegen_external_buffer_offset_rejected);
UNITTEST(test_codegen_normal_offset);
UNITTEST(test_all_reduce_packet_external_input_is_staged);
UNITTEST(
test_all_reduce_packet_invalid_external_input_does_not_mutate_graph);
UNITTEST(test_all_reduce_packet_internal_input_is_not_staged);
UNITTEST(test_codegen_missing_buffer_id);
return 0;
}
22 changes: 16 additions & 6 deletions ark/ops/ops_all_reduce.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

#include "buffer_registry.hpp"
#include "ops_common.hpp"
#include "ops_communication.hpp"

Expand Down Expand Up @@ -50,19 +51,28 @@ Tensor Model::all_reduce_packet(Tensor input, int rank, int rank_num,
ERR(ModelError, "all_reduce_packet requires rank_num >= 2");
}

size_t nelems = input.shape().nelems();
size_t elems_per_uint32 = sizeof(uint32_t) / input.data_type().bytes();
if (nelems % (elems_per_uint32 * 2 * rank_num) != 0) {
ERR(ModelError, "all_reduce_packet: nelems (", nelems,
") must be divisible by ", elems_per_uint32 * 2 * rank_num);
}

// Copy external input into an internal buffer so it resides in mscclpp
// registered memory. Internal ARK tensors are already registered.
auto input_info =
BufferRegistry::get_instance().get(input.ref()->buffer()->id());
if (input.is_external() || (input_info && input_info->is_external)) {
input = this->copy(input);
}

if (output.is_null()) {
output = this->tensor(input.shape(), input.data_type());
}

// Scratch layout: [input_section | result_section]
// Each section holds NPkts = nelems_int32 / 2 packets of 16 bytes each.
// Total: 2 × NPkts × 16 = nelems_int32 × 16 = nelems_fp16 × 8 bytes.
size_t nelems = input.shape().nelems();
size_t elems_per_uint32 = sizeof(uint32_t) / input.data_type().bytes();
if (nelems % (elems_per_uint32 * 2 * rank_num) != 0) {
ERR(ModelError, "all_reduce_packet: nelems (", nelems,
") must be divisible by ", elems_per_uint32 * 2 * rank_num);
}
size_t nelems_int32 = nelems / elems_per_uint32;
size_t n_pkts = nelems_int32 / 2; // each packet carries uint2 = 2×u32
size_t packet_size = 16; // sizeof(mscclpp::LL16Packet)
Expand Down
Loading
Loading