diff --git a/__perf_gate__.sh b/__perf_gate__.sh new file mode 100755 index 00000000..8ad79ab5 --- /dev/null +++ b/__perf_gate__.sh @@ -0,0 +1,57 @@ +#!/usr/bin/env bash +set -uo pipefail + +: "${ARK_ROOT:=$PWD}" +export ARK_ROOT +export PYTHONPATH="${PYTHONPATH:-$ARK_ROOT/python}" + +target_ms=$(python3 - <<'PY' +import importlib.util +import pathlib + +path = pathlib.Path("../examples/qwen3/bench_allreduce.py") +spec = importlib.util.spec_from_file_location("bench_allreduce", path) +module = importlib.util.module_from_spec(spec) +spec.loader.exec_module(module) +print(f"{module._DECODE_TARGET_MS:.4f}") +PY +) + +tmpdir=$(mktemp -d) +trap 'rm -rf "$tmpdir"' EXIT +status=0 +python3 ../examples/qwen3/bench_allreduce.py --world-size 2 --shape decode >"$tmpdir/tp2.log" 2>"$tmpdir/tp2.err" || status=1 +python3 ../examples/qwen3/bench_allreduce.py --world-size 8 --shape decode >"$tmpdir/tp8.log" 2>"$tmpdir/tp8.err" || status=1 + +ark_ms=$(python3 - "$tmpdir/tp2.log" "$tmpdir/tp8.log" "$status" <<'PY' +import re +import sys + +values = [] +for name in sys.argv[1:3]: + text = open(name, encoding="utf-8").read() + match = re.search(r"PERF_GATE name=allreduce\s+ark_ms=([0-9.]+)", text) + if match: + values.append(float(match.group(1))) +if int(sys.argv[3]) or len(values) != 2: + print("999999.0000") +else: + print(f"{max(values):.4f}") +PY +) +ratio=$(python3 - "$ark_ms" "$target_ms" <<'PY' +import sys + +print(f"{float(sys.argv[1]) / float(sys.argv[2]):.4f}") +PY +) +printf 'PERF_GATE name=allreduce ark_ms=%s sglang_ms=%s ratio=%s\n' "$ark_ms" "$target_ms" "$ratio" +python3 - "$ark_ms" "$target_ms" "$status" <<'PY' +import sys + +ark_ms = float(sys.argv[1]) +target_ms = float(sys.argv[2]) +status = int(sys.argv[3]) +if status or ark_ms >= target_ms: + raise SystemExit(1) +PY diff --git a/ark/codegen.cpp b/ark/codegen.cpp index dc080d60..db49fea3 100644 --- a/ark/codegen.cpp +++ b/ark/codegen.cpp @@ -311,18 +311,20 @@ std::string CodeGenerator::Impl::def_task(const Json &task_json) { ss_desc << "(" << tns->data_type()->type_str() << "*)_" << ptr_idx; } else if (arg.type_name() == "OFFSET") { + // OFFSET args are offsets into ARK registered memory; external + // placeholders must be passed through TENSOR-pointer kernels or + // staged into internal tensors before OFFSET-only kernels. auto moff = arg.value(); size_t buffer_id = moff.buffer_id(); auto buf_info = buf_reg.get(buffer_id); if (buf_info && buf_info->is_external) { ERR(InternalError, "cannot offset external buffer"); } - size_t buffer_offset; auto it = buffer_id_to_offset_.find(buffer_id); if (it == buffer_id_to_offset_.end()) { ERR(InternalError, "buffer ID not found: ", buffer_id); } - buffer_offset = it->second; + size_t buffer_offset = it->second; size_t offset = buffer_offset + moff.value(); ss_desc << offset; } else { diff --git a/ark/codegen_test.cpp b/ark/codegen_test.cpp new file mode 100644 index 00000000..64a443a6 --- /dev/null +++ b/ark/codegen_test.cpp @@ -0,0 +1,265 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +#include "codegen.hpp" + +#include +#include +#include +#include + +#include "ark/model.hpp" +#include "ark/planner.hpp" +#include "buffer_registry.hpp" +#include "model/model_buffer.hpp" +#include "model/model_node.hpp" +#include "model/model_op.hpp" +#include "model/model_op_arg.hpp" +#include "model/model_tensor.hpp" +#include "unittest/unittest_utils.h" + +// Collect all buffer IDs referenced by OFFSET args in a plan's TaskInfos. +static std::set collect_offset_buffer_ids(const ark::Json &plan) { + std::set ids; + for (auto &ti : plan.at("TaskInfos")) { + for (auto &op_json : ti.at("Ops")) { + auto op = ark::ModelOp::deserialize(op_json); + auto args = op->impl_args(op_json.at("Config")); + for (auto &arg : args) { + if (arg.type_name() == "OFFSET") { + ids.insert(arg.value().buffer_id()); + } + } + } + } + return ids; +} + +// Collect all buffer IDs referenced by TENSOR args in a plan's TaskInfos. +static std::set collect_tensor_buffer_ids(const ark::Json &plan) { + std::set ids; + for (auto &ti : plan.at("TaskInfos")) { + for (auto &op_json : ti.at("Ops")) { + auto op = ark::ModelOp::deserialize(op_json); + auto args = op->impl_args(op_json.at("Config")); + for (auto &arg : args) { + if (arg.type_name() == "TENSOR") { + ids.insert( + arg.value()->buffer()->id()); + } + } + } + } + return ids; +} + +ark::unittest::State test_codegen_external_buffer_offset_rejected() { + // send_packet kernels receive OFFSET args into registered ARK memory, not + // external base pointers. Marking those buffers external must fail instead + // of generating offsets relative to the wrong base. + ark::Model model(0, 2); + ark::Tensor tns = model.tensor({1024}, ark::FP16); + model.send_packet(tns, 1, /*tag=*/0, /*flag=*/1); + + ark::Planner planner(model, 0); + auto plan = ark::Json::parse(planner.plan(false)); + UNITTEST_TRUE(plan.contains("TaskInfos")); + UNITTEST_TRUE(plan["TaskInfos"].size() > 0); + + auto offset_buf_ids = collect_offset_buffer_ids(plan); + auto tensor_buf_ids = collect_tensor_buffer_ids(plan); + UNITTEST_TRUE(offset_buf_ids.size() > 0); + + auto &buf_reg = ark::BufferRegistry::get_instance(); + for (size_t id : offset_buf_ids) { + buf_reg.set(id, reinterpret_cast(0x1), 0, /*is_external=*/true); + } + + std::set extra; + extra.insert(offset_buf_ids.begin(), offset_buf_ids.end()); + extra.insert(tensor_buf_ids.begin(), tensor_buf_ids.end()); + + ark::PlanJson pj(plan); + UNITTEST_THROW(ark::CodeGenerator(pj, /*buffer_id_to_offset=*/{}, extra), + ark::InternalError); + + return ark::unittest::SUCCESS; +} + +ark::unittest::State test_codegen_normal_offset() { + // Non-external OFFSET args are resolved through buffer_id_to_offset_. + ark::Model model(0, 2); + ark::Tensor tns = model.tensor({1024}, ark::FP16); + model.all_reduce_packet(tns, 0, 2); + + ark::Planner planner(model, 0); + auto plan = ark::Json::parse(planner.plan(false)); + UNITTEST_TRUE(plan.contains("TaskInfos")); + UNITTEST_TRUE(plan["TaskInfos"].size() > 0); + + auto offset_buf_ids = collect_offset_buffer_ids(plan); + auto tensor_buf_ids = collect_tensor_buffer_ids(plan); + UNITTEST_TRUE(offset_buf_ids.size() > 0); + + std::map buf_id_to_offset; + for (size_t id : offset_buf_ids) { + buf_id_to_offset[id] = 0; + } + for (size_t id : tensor_buf_ids) { + buf_id_to_offset[id] = 0; + } + + ark::PlanJson pj(plan); + ark::CodeGenerator codegen(pj, buf_id_to_offset, {}); + + std::string code = codegen.code(); + UNITTEST_TRUE(code.size() > 0); + + return ark::unittest::SUCCESS; +} + +ark::unittest::State test_all_reduce_packet_external_input_is_staged() { + ark::Model model(0, 2); + ark::Tensor input = model.placeholder({1024}, ark::FP16, {}, {}, {}, -1, + reinterpret_cast(0x1)); + model.all_reduce_packet(input, 0, 2); + + size_t placeholder_id = input.ref()->buffer()->id(); + auto placeholder_info = + ark::BufferRegistry::get_instance().get(placeholder_id); + UNITTEST_TRUE(placeholder_info && placeholder_info->is_external); + + std::set copy_output_ids; + bool found_copy_from_placeholder = false; + bool found_fused = false; + for (auto &node : model.nodes()) { + auto &op = node->op; + if (op->is_virtual()) { + continue; + } + if (op->type() == ark::ModelOpT::from_name("Copy")) { + auto reads = op->read_tensors(); + auto results = op->result_tensors(); + UNITTEST_TRUE(reads.size() > 0); + UNITTEST_TRUE(results.size() > 0); + size_t output_id = results[0]->buffer()->id(); + copy_output_ids.insert(output_id); + if (reads[0]->buffer()->id() == placeholder_id) { + found_copy_from_placeholder = true; + } + } else if (op->type() == + ark::ModelOpT::from_name("AllReducePacketFused")) { + found_fused = true; + auto reads = op->read_tensors(); + UNITTEST_TRUE(reads.size() > 0); + size_t fused_input_id = reads[0]->buffer()->id(); + UNITTEST_TRUE(fused_input_id != placeholder_id); + UNITTEST_TRUE(copy_output_ids.count(fused_input_id) > 0); + auto fused_info = + ark::BufferRegistry::get_instance().get(fused_input_id); + UNITTEST_FALSE(fused_info && fused_info->is_external); + } + } + UNITTEST_TRUE(found_copy_from_placeholder); + UNITTEST_TRUE(found_fused); + + return ark::unittest::SUCCESS; +} + +ark::unittest::State +test_all_reduce_packet_invalid_external_input_does_not_mutate_graph() { + ark::Model model(0, 2); + ark::Tensor input = model.placeholder({1025}, ark::FP16, {}, {}, {}, -1, + reinterpret_cast(0x1)); + + auto count_ops = [&model]() { + std::pair counts{0, 0}; + for (auto &node : model.nodes()) { + auto &op = node->op; + if (op->is_virtual()) { + continue; + } + if (op->type() == ark::ModelOpT::from_name("Copy")) { + counts.first++; + } else if (op->type() == + ark::ModelOpT::from_name("AllReducePacketFused")) { + counts.second++; + } + } + return counts; + }; + + auto before = count_ops(); + UNITTEST_THROW(model.all_reduce_packet(input, 0, 2), ark::ModelError); + auto after = count_ops(); + UNITTEST_EQ(before.first, after.first); + UNITTEST_EQ(before.second, after.second); + + return ark::unittest::SUCCESS; +} + +ark::unittest::State test_all_reduce_packet_internal_input_is_not_staged() { + ark::Model model(0, 2); + ark::Tensor input = model.tensor({1024}, ark::FP16); + model.all_reduce_packet(input, 0, 2); + + size_t input_id = input.ref()->buffer()->id(); + bool found_copy_from_input = false; + bool found_fused = false; + for (auto &node : model.nodes()) { + auto &op = node->op; + if (op->is_virtual()) { + continue; + } + if (op->type() == ark::ModelOpT::from_name("Copy")) { + auto reads = op->read_tensors(); + UNITTEST_TRUE(reads.size() > 0); + if (reads[0]->buffer()->id() == input_id) { + found_copy_from_input = true; + } + } else if (op->type() == + ark::ModelOpT::from_name("AllReducePacketFused")) { + found_fused = true; + auto reads = op->read_tensors(); + UNITTEST_TRUE(reads.size() > 0); + UNITTEST_TRUE(reads[0]->buffer()->id() == input_id); + } + } + UNITTEST_FALSE(found_copy_from_input); + UNITTEST_TRUE(found_fused); + + return ark::unittest::SUCCESS; +} + +ark::unittest::State test_codegen_missing_buffer_id() { + // Use fresh model buffers so external registrations from earlier tests + // cannot satisfy this OFFSET lookup. + ark::Model model(0, 2); + ark::Tensor tns = model.tensor({512}, ark::FP16); + model.send_packet(tns, 1, /*tag=*/0, /*flag=*/1); + + ark::Planner planner(model, 0); + auto plan = ark::Json::parse(planner.plan(false)); + auto offset_buf_ids = collect_offset_buffer_ids(plan); + UNITTEST_TRUE(offset_buf_ids.size() > 0); + for (size_t id : offset_buf_ids) { + auto info = ark::BufferRegistry::get_instance().get(id); + UNITTEST_FALSE(info && info->is_external); + } + + ark::PlanJson pj(plan); + UNITTEST_THROW(ark::CodeGenerator(pj, {}, {}), ark::InternalError); + + return ark::unittest::SUCCESS; +} + +int main() { + UNITTEST(test_codegen_external_buffer_offset_rejected); + UNITTEST(test_codegen_normal_offset); + UNITTEST(test_all_reduce_packet_external_input_is_staged); + UNITTEST( + test_all_reduce_packet_invalid_external_input_does_not_mutate_graph); + UNITTEST(test_all_reduce_packet_internal_input_is_not_staged); + UNITTEST(test_codegen_missing_buffer_id); + return 0; +} diff --git a/ark/ops/ops_all_reduce.cpp b/ark/ops/ops_all_reduce.cpp index 320194e4..36d9b445 100644 --- a/ark/ops/ops_all_reduce.cpp +++ b/ark/ops/ops_all_reduce.cpp @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. +#include "buffer_registry.hpp" #include "ops_common.hpp" #include "ops_communication.hpp" @@ -50,6 +51,21 @@ Tensor Model::all_reduce_packet(Tensor input, int rank, int rank_num, ERR(ModelError, "all_reduce_packet requires rank_num >= 2"); } + size_t nelems = input.shape().nelems(); + size_t elems_per_uint32 = sizeof(uint32_t) / input.data_type().bytes(); + if (nelems % (elems_per_uint32 * 2 * rank_num) != 0) { + ERR(ModelError, "all_reduce_packet: nelems (", nelems, + ") must be divisible by ", elems_per_uint32 * 2 * rank_num); + } + + // Copy external input into an internal buffer so it resides in mscclpp + // registered memory. Internal ARK tensors are already registered. + auto input_info = + BufferRegistry::get_instance().get(input.ref()->buffer()->id()); + if (input.is_external() || (input_info && input_info->is_external)) { + input = this->copy(input); + } + if (output.is_null()) { output = this->tensor(input.shape(), input.data_type()); } @@ -57,12 +73,6 @@ Tensor Model::all_reduce_packet(Tensor input, int rank, int rank_num, // Scratch layout: [input_section | result_section] // Each section holds NPkts = nelems_int32 / 2 packets of 16 bytes each. // Total: 2 × NPkts × 16 = nelems_int32 × 16 = nelems_fp16 × 8 bytes. - size_t nelems = input.shape().nelems(); - size_t elems_per_uint32 = sizeof(uint32_t) / input.data_type().bytes(); - if (nelems % (elems_per_uint32 * 2 * rank_num) != 0) { - ERR(ModelError, "all_reduce_packet: nelems (", nelems, - ") must be divisible by ", elems_per_uint32 * 2 * rank_num); - } size_t nelems_int32 = nelems / elems_per_uint32; size_t n_pkts = nelems_int32 / 2; // each packet carries uint2 = 2×u32 size_t packet_size = 16; // sizeof(mscclpp::LL16Packet) diff --git a/ark/ops/ops_all_reduce_test.cpp b/ark/ops/ops_all_reduce_test.cpp index 14b9835c..97e71461 100644 --- a/ark/ops/ops_all_reduce_test.cpp +++ b/ark/ops/ops_all_reduce_test.cpp @@ -1,6 +1,8 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. +#include "ark/executor.hpp" +#include "gpu/gpu.hpp" #include "model/model_buffer.hpp" #include "model/model_node.hpp" #include "model/model_op.hpp" @@ -334,11 +336,94 @@ ark::unittest::State test_all_reduce_sm_8gpus() { return ark::unittest::SUCCESS; } +template +void test_all_reduce_packet_fused_internal(ark::DimType nelem) { + for (int gpu_id = 0; gpu_id < NumGpus; ++gpu_id) { + ark::unittest::spawn_process([gpu_id, nelem]() { + UNITTEST_SKIP(ark::unittest::get_gpu_count() < NumGpus); + // Each GPU's data is equal to its GPU ID + 1. + ark::Model m(gpu_id, NumGpus); + ark::Tensor ones = m.tensor({nelem}, ark::FP16); + ark::Tensor data = m.mul(ones, float(gpu_id + 1)); + ark::Tensor output = m.all_reduce_packet(data, gpu_id, NumGpus); + + std::vector ones_vec(ones.shape().nelems(), + ark::half_t(1.0f)); + auto result = ark::op_test( + "all_reduce_packet_fused", m, {ones}, {output}, + baseline_all_reduce, {ones_vec.data()}); + UNITTEST_LOG(result); + UNITTEST_EQ(result.max_diff[0], 0.0f); + return ark::unittest::SUCCESS; + }); + } + ark::unittest::wait_all_processes(); +} + +// Variant with external-buffer (placeholder) input — exercises staging the +// external input into registered memory before the fused packet collective. +// Cannot use op_test() because placeholders require pre-allocated GPU memory; +// drive the executor manually instead. +template +void test_all_reduce_packet_fused_ext_internal(ark::DimType nelem) { + for (int gpu_id = 0; gpu_id < NumGpus; ++gpu_id) { + ark::unittest::spawn_process([gpu_id, nelem]() { + UNITTEST_SKIP(ark::unittest::get_gpu_count() < NumGpus); + + UNITTEST_EQ(ark::gpuSetDevice(gpu_id), ark::gpuSuccess); + + // Allocate GPU memory and fill with (gpu_id + 1). + ark::half_t *d_input = nullptr; + size_t nbytes = nelem * sizeof(ark::half_t); + UNITTEST_EQ(ark::gpuMalloc(&d_input, nbytes), ark::gpuSuccess); + std::vector h_input(nelem, + ark::half_t(float(gpu_id + 1))); + UNITTEST_EQ(ark::gpuMemcpy(d_input, h_input.data(), nbytes, + ark::gpuMemcpyHostToDevice), + ark::gpuSuccess); + + ark::Model m(gpu_id, NumGpus); + ark::Tensor input = + m.placeholder({nelem}, ark::FP16, {}, {}, {}, -1, d_input); + ark::Tensor output = m.all_reduce_packet(input, gpu_id, NumGpus); + + ark::DefaultExecutor exe(m, gpu_id); + exe.launch(); + exe.run(1); + exe.stop(); + + std::vector h_output(nelem); + exe.tensor_read(output, h_output); + + float expected = float(NumGpus * (NumGpus + 1)) / 2.0f; + for (ark::DimType i = 0; i < nelem; ++i) { + UNITTEST_EQ(float(h_output[i]), expected); + } + + UNITTEST_EQ(ark::gpuFree(d_input), ark::gpuSuccess); + return ark::unittest::SUCCESS; + }); + } + ark::unittest::wait_all_processes(); +} + +ark::unittest::State test_all_reduce_packet_fused_ext_2gpus() { + test_all_reduce_packet_fused_ext_internal<2>(4096); + return ark::unittest::SUCCESS; +} + +ark::unittest::State test_all_reduce_packet_fused_2gpus() { + test_all_reduce_packet_fused_internal<2>(4096); + return ark::unittest::SUCCESS; +} + int main() { UNITTEST(test_all_reduce_4gpus); UNITTEST(test_all_reduce_8gpus); UNITTEST(test_all_reduce_packet_4gpus); UNITTEST(test_all_reduce_packet_8gpus); + UNITTEST(test_all_reduce_packet_fused_ext_2gpus); + UNITTEST(test_all_reduce_packet_fused_2gpus); UNITTEST(test_all_reduce_sm_4gpus); UNITTEST(test_all_reduce_sm_8gpus); UNITTEST(test_all_reduce_inplace_2gpus); diff --git a/examples/qwen3/__init__.py b/examples/qwen3/__init__.py new file mode 100644 index 00000000..9a045456 --- /dev/null +++ b/examples/qwen3/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. diff --git a/examples/qwen3/_env.py b/examples/qwen3/_env.py new file mode 100644 index 00000000..e9782bee --- /dev/null +++ b/examples/qwen3/_env.py @@ -0,0 +1,220 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +"""Shared subprocess environment helpers for Qwen3 examples. + +Used by both ``bench_allreduce.py`` and ``test_allreduce.py`` to build +a consistent PYTHONPATH / CUDA_VISIBLE_DEVICES env for worker processes. +Workers are launched from ``cwd="/"``, so a simple relative path prepend is +not enough. Prefer the checkout/build under ``ARK_ROOT`` while also supporting +an already-imported or build-tree ``ark`` package, and synthesize +``LD_LIBRARY_PATH`` only when a build root can be inferred. + +This is intentionally not a general Python package resolver. It is constrained +to making rank subprocesses import the same compiled ``ark`` package as the +parent/CI build while preserving ``cwd="/"``; the fallbacks are kept because +workers cannot rely on the parent process's current directory, and source-only +``python/ark`` checkouts lack the compiled ``core`` extension. +""" + +import glob +import importlib.util +import json +import os +import sys + +# Repo root — used to locate the built ark Python package for subprocesses. +_REPO_ROOT = os.path.normpath( + os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..") +) + + +def _has_compiled_ark(parent_dir: str) -> bool: + """Return True if *parent_dir*/ark/ contains the compiled C++ extension. + + The source tree's ``python/ark/`` has ``__init__.py`` but no compiled + ``core.cpython-*.so``. Adding it to PYTHONPATH causes workers to fail + with ``ModuleNotFoundError: No module named 'ark.core'``. + """ + ark_pkg = os.path.join(parent_dir, "ark") + if not os.path.isfile(os.path.join(ark_pkg, "__init__.py")): + return False + # Check for compiled extension (Linux .so, Windows .pyd) + return bool( + glob.glob(os.path.join(ark_pkg, "core*.so")) + or glob.glob(os.path.join(ark_pkg, "core*.pyd")) + ) + + +def _build_root_from_python_parent(parent_dir: str): + """Infer the CMake build root from a build-tree Python package path.""" + if os.path.basename(parent_dir) != "python": + return None + build_root = os.path.dirname(parent_dir) + if os.path.isdir(build_root): + return build_root + return None + + +def _append_unique(paths, path): + """Append *path* to *paths* once when it is non-empty.""" + if path and path not in paths: + paths.append(path) + + +def _append_pythonpath(paths, value): + """Append PYTHONPATH entries individually, preserving first use.""" + for entry in value.split(os.pathsep): + if not entry: + continue + if not os.path.isabs(entry): + entry = os.path.abspath(entry) + else: + entry = os.path.normpath(entry) + _append_unique(paths, entry) + + +def _load_worker_result(stdout): + """Return the last JSON object from worker stdout, ignoring log lines.""" + for line in reversed(stdout.decode().splitlines()): + line = line.strip() + if not line: + continue + try: + return json.loads(line) + except json.JSONDecodeError: + continue + return None + + +def _subprocess_env(world_size: int) -> dict: + """Build env dict for worker subprocesses. + + Resolution order for the ``ark`` package path: + 1. ``$ARK_ROOT/python`` (CI sets ``ARK_ROOT=$PWD``). + 2. ``importlib.util.find_spec("ark")`` — wherever the parent already + resolved ark (handles build-tree, install, and namespace packages). + 3. ``sys.path`` entries for any other compiled ``ark`` package. + 4. ``/build/python`` or ``/python``. + 5. inherited ``PYTHONPATH``. + + Also propagates the repo root for package imports in workers and sets + ``ARK_ROOT`` / ``LD_LIBRARY_PATH`` when a build-tree package is found. + """ + extra = [] # type: list[str] + compiled_ark_parent = None + resolved_ark_root = None + ark_root = os.environ.get("ARK_ROOT", "") + + # --- Primary: $ARK_ROOT/python --- + # Prefer the checkout under test over inherited PYTHONPATH entries. + if ark_root: + ark_root = os.path.abspath(ark_root) + ark_root_py = os.path.join(ark_root, "python") + if _has_compiled_ark(ark_root_py): + _append_unique(extra, ark_root_py) + compiled_ark_parent = ark_root_py + resolved_ark_root = ark_root + + # --- Secondary: resolve from the running interpreter's import state --- + try: + spec = importlib.util.find_spec("ark") + if spec is not None: + if spec.submodule_search_locations: + # Regular package: parent of the package directory. + ark_pkg_dir = next(iter(spec.submodule_search_locations)) + ark_parent = os.path.dirname(ark_pkg_dir) + elif spec.origin: + # Single-file or namespace with origin. + ark_parent = os.path.dirname(os.path.dirname(spec.origin)) + else: + ark_parent = None + if ark_parent and _has_compiled_ark(ark_parent): + _append_unique(extra, ark_parent) + if compiled_ark_parent is None: + compiled_ark_parent = ark_parent + if resolved_ark_root is None: + resolved_ark_root = _build_root_from_python_parent( + ark_parent + ) + except (ModuleNotFoundError, ValueError, TypeError): + pass + + # --- Tertiary: scan sys.path for a compiled ark package --- + # When PYTHONPATH points at the source tree (e.g., /w/python), + # find_spec("ark") resolves to source-only ark/ (no core*.so). + # Keep searching for an installed/built ark with compiled extension. + for entry in sys.path: + if not entry: + continue + if not os.path.isabs(entry): + entry = os.path.abspath(entry) + else: + entry = os.path.normpath(entry) + if _has_compiled_ark(entry): + _append_unique(extra, entry) + if compiled_ark_parent is None: + compiled_ark_parent = entry + if resolved_ark_root is None: + resolved_ark_root = _build_root_from_python_parent(entry) + break + + # --- Fallback: repo build/python or python --- + for subdir in ("build/python", "python"): + candidate = os.path.join(_REPO_ROOT, subdir) + if _has_compiled_ark(candidate): + _append_unique(extra, candidate) + if compiled_ark_parent is None: + compiled_ark_parent = candidate + if resolved_ark_root is None: + resolved_ark_root = _build_root_from_python_parent(candidate) + + if compiled_ark_parent is None: + raise RuntimeError( + "no compiled ark package found for worker PYTHONPATH; " + "expected ark/core*.so under $ARK_ROOT/python or a build path" + ) + + # --- Propagate repo root for examples.qwen3 package imports --- + _append_unique(extra, _REPO_ROOT) + + # --- Inherited PYTHONPATH --- + existing = os.environ.get("PYTHONPATH", "") + if existing: + _append_pythonpath(extra, existing) + + if "CUDA_VISIBLE_DEVICES" in os.environ: + visible = os.environ["CUDA_VISIBLE_DEVICES"] + devices = [d.strip() for d in visible.split(",") if d.strip()] + if len(devices) < world_size: + raise RuntimeError( + "CUDA_VISIBLE_DEVICES exposes fewer devices " + f"({len(devices)}) than world_size ({world_size})" + ) + cuda_visible_devices = ",".join(devices[:world_size]) + else: + cuda_visible_devices = ",".join(str(i) for i in range(world_size)) + + env = { + **os.environ, + "CUDA_VISIBLE_DEVICES": cuda_visible_devices, + } + if extra: + env["PYTHONPATH"] = os.pathsep.join(extra) + if resolved_ark_root is not None: + env["ARK_ROOT"] = resolved_ark_root + + ld_paths = [] + for root in (resolved_ark_root, ark_root): + if not root: + continue + for subdir in ("", "lib", "ark", os.path.join("python", "ark")): + candidate = os.path.join(root, subdir) + if os.path.isdir(candidate): + _append_unique(ld_paths, candidate) + existing_ld = os.environ.get("LD_LIBRARY_PATH", "") + if existing_ld: + _append_unique(ld_paths, existing_ld) + if ld_paths: + env["LD_LIBRARY_PATH"] = os.pathsep.join(ld_paths) + return env diff --git a/examples/qwen3/bench_allreduce.py b/examples/qwen3/bench_allreduce.py new file mode 100644 index 00000000..70ef76bf --- /dev/null +++ b/examples/qwen3/bench_allreduce.py @@ -0,0 +1,244 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +"""Benchmark end-to-end ``ark.all_reduce_packet`` latency on torch input. + +Measures single-iteration latency for Qwen3 TP decode (1, 4096) and prefill +(2048, 4096) shapes, including registered-memory staging when needed. Each +rank runs as its own process and the parent reports max-rank latency. + + python -m examples.qwen3.bench_allreduce --world-size 8 + +TIMING METHOD (critical): ARK runs a PERSISTENT loop kernel that owns all SMs +between ``rt.launch()`` and ``rt.stop()`` — by design. Torch synchronization is +safe before ``rt.launch()``, but any torch GPU op issued while the runtime is +live (``torch.cuda.synchronize``, ``torch.cuda.Event``, ``torch.allclose``, ...) +can never be scheduled and deadlocks. So we time with plain host wall-clock +around a single ``rt.run(iter=1)`` (which host-blocks on ARK's own completion +flags, not ``cudaDeviceSynchronize``) and align ranks with ``rt.barrier()``. +NO torch device sync while launched, NO CUDA events. +""" + +import argparse +import os +import subprocess +import sys + +try: + from ._env import _load_worker_result, _subprocess_env +except ImportError: + sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + from _env import _load_worker_result, _subprocess_env + +_WORKER_SCRIPT = ''' +"""Worker: time torch-input ARK all-reduce without torch ops while launched.""" +import json +import os +import sys +import time + +import torch +import ark +from ark.executor import Executor + +rank = int(sys.argv[1]) +world_size = int(sys.argv[2]) +n_elements = int(sys.argv[3]) +label = sys.argv[4] + +ark.init() +ark.set_rank(rank) +ark.set_world_size(world_size) + +# Input is created and synchronized BEFORE launch, while no ARK loop kernel is +# live (safe). The benchmark includes any staging done by ark.all_reduce_packet. +x = torch.randn(n_elements, dtype=torch.float16, device=f"cuda:{rank}") +torch.cuda.synchronize(rank) +result = ark.all_reduce_packet(x, rank, world_size) + +with ark.Runtime() as rt: + rt.launch(device_id=rank) + + if world_size > 1: + rt.barrier() + + t0 = time.perf_counter() + rt.run(iter=1) + host_s = time.perf_counter() - t0 + if world_size > 1: + rt.barrier() + + rt.stop() + +latency_us = host_s * 1e6 + +print(json.dumps({ + "rank": rank, + "label": label, + "world_size": world_size, + "n_elements": n_elements, + "latency_us": round(latency_us, 3), +})) +sys.stdout.flush() + +# Workaround: mscclpp's UnixSocketServer destructor races during normal +# Python shutdown (static destruction order is undefined across TUs), +# causing SIGABRT. Executor.reset() forces orderly mscclpp teardown, +# then os._exit() skips Python's atexit / gc finalizers entirely. +Executor.reset() +os._exit(0) +''' + +# SGLang PROFILE.md Q7 nccl / comm target: 214.69 ms over 657 calls +# on 8xA100 TP=8, batch=1 decode-dominated Qwen3-8B. +_DECODE_TARGET_MS = 214.69 / 657.0 + +SHAPES = { + "decode": ("decode (1, 4096)", 4096), + "prefill": ("prefill (2048, 4096)", 2048 * 4096), +} + + +def run_bench(world_size, timeout, shape): + results = [] + any_failed = False + shapes = SHAPES.values() if shape == "all" else [SHAPES[shape]] + for label, n_elements in shapes: + procs = [] + for rank in range(world_size): + procs.append( + subprocess.Popen( + [ + sys.executable, + "-c", + _WORKER_SCRIPT, + str(rank), + str(world_size), + str(n_elements), + label, + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + cwd="/", + env=_subprocess_env(world_size), + ) + ) + shape_failed = False + rank_results = [] + try: + for rank, p in enumerate(procs): + try: + out, err = p.communicate(timeout=timeout) + except subprocess.TimeoutExpired: + shape_failed = True + print( + f"ERROR rank={rank} {label}: timed out after " + f"{timeout}s", + file=sys.stderr, + ) + break + if p.returncode != 0: + shape_failed = True + print( + f"ERROR rank={rank} {label}: " + f"{err.decode().strip()[-500:]}", + file=sys.stderr, + ) + result = _load_worker_result(out) + if result is None: + shape_failed = True + print( + f"ERROR rank={rank} {label}: no result", + file=sys.stderr, + ) + else: + rank_results.append(result) + if not shape_failed and len(rank_results) == world_size: + rank_results.sort(key=lambda d: d["rank"]) + max_result = max(rank_results, key=lambda d: d["latency_us"]) + results.append( + { + "label": max_result["label"], + "world_size": world_size, + "n_elements": max_result["n_elements"], + "max_rank": max_result["rank"], + "latency_us": max_result["latency_us"], + "rank_latencies_us": [ + d["latency_us"] for d in rank_results + ], + } + ) + else: + any_failed = True + if not shape_failed: + print( + f"ERROR {label}: expected {world_size} rank results, " + f"got {len(rank_results)}", + file=sys.stderr, + ) + finally: + for p in procs: + p.kill() + p.wait() + + print(f"\n{'=' * 72}") + print( + f"ARK all_reduce_packet torch-input latency | TP={world_size} " + f"(single iteration, max rank, includes staging)" + ) + print(f"{'=' * 72}") + print(f"{'Shape':<24}{'Elements':>12}{'Max rank':>10}{'ARK us':>10}") + print(f"{'-' * 72}") + for d in results: + print( + f"{d['label']:<24}{d['n_elements']:>12,}" + f"{d['max_rank']:>10}{d['latency_us']:>10.2f}" + ) + print(f"{'=' * 72}\n") + return results, any_failed + + +def main(): + ap = argparse.ArgumentParser( + description=( + "Benchmark end-to-end ark.all_reduce_packet latency on torch input " + "at Qwen3 TP shapes, including registered-memory staging " + "when needed" + ) + ) + ap.add_argument("--world-size", type=int, default=2) + ap.add_argument("--timeout", type=int, default=120) + ap.add_argument( + "--shape", + choices=("decode", "prefill", "all"), + default="all", + help="Qwen3 shape to benchmark; the perf gate uses decode", + ) + args = ap.parse_args() + + # Repeated-iteration timing is intentionally unsupported until packet flag + # rotation/reset exists. + results, any_failed = run_bench(args.world_size, args.timeout, args.shape) + + decode = [r for r in results if r["n_elements"] == SHAPES["decode"][1]] + if decode: + ark_ms = decode[0]["latency_us"] / 1000.0 + else: + ark_ms = 999999.0 + ratio = ark_ms / _DECODE_TARGET_MS + print( + f"PERF_GATE name=allreduce" + f" ark_ms={ark_ms:.4f}" + f" sglang_ms={_DECODE_TARGET_MS:.4f}" + f" ratio={ratio:.4f}" + ) + if any_failed: + print("ERROR: one or more benchmark workers failed", file=sys.stderr) + raise SystemExit(1) + if not decode: + print("ERROR: decode benchmark produced no result", file=sys.stderr) + raise SystemExit(1) + + +if __name__ == "__main__": + main() diff --git a/examples/qwen3/test_allreduce.py b/examples/qwen3/test_allreduce.py new file mode 100644 index 00000000..bf23433c --- /dev/null +++ b/examples/qwen3/test_allreduce.py @@ -0,0 +1,290 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +"""Equivalence tests for ARK fused-packet all-reduce at Qwen3 TP shapes. + +Verifies that ``ark.all_reduce_packet`` produces the same result as a +torch all-reduce (sum) across ranks. Tests are d2h-safe: each worker +copies the result to CPU (``result.to_torch().cpu()``) AFTER stopping +the ARK runtime, then asserts on the host with ``torch.allclose``. + +No torch GPU kernel is issued while the ARK runtime is launched. + +Requires ≥2 GPUs; skips gracefully on single-GPU machines. Large TP=8 and +prefill cases are opt-in with ``ARK_QWEN3_LARGE_TESTS=1``. +""" + +import os +import subprocess +import sys + +import pytest + +try: + import torch +except ImportError: + pytest.skip("torch is not installed", allow_module_level=True) + +try: + from . import _env as qwen3_env + from ._env import _load_worker_result, _subprocess_env +except ImportError: + sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + import _env as qwen3_env + from _env import _load_worker_result, _subprocess_env + + +def _gpu_count() -> int: + """Return available CUDA device count (0 if CUDA unavailable).""" + if not torch.cuda.is_available(): + return 0 + return torch.cuda.device_count() + + +def _large_tests_enabled() -> bool: + """Return True when expensive Qwen3 all-reduce cases are requested.""" + return os.environ.get("ARK_QWEN3_LARGE_TESTS") == "1" + + +def _fake_ark_package(parent_dir, compiled): + """Create a minimal ark package tree for _subprocess_env tests.""" + ark_pkg = parent_dir / "ark" + ark_pkg.mkdir(parents=True) + (ark_pkg / "__init__.py").write_text("# fake ark\n", encoding="utf-8") + if compiled: + (ark_pkg / "core.cpython-310-x86_64-linux-gnu.so").write_text( + "", encoding="utf-8" + ) + + +def test_subprocess_env_prefers_ark_root_python(monkeypatch, tmp_path): + """Worker PYTHONPATH starts with the built ark package under ARK_ROOT.""" + build_root = tmp_path / "build" + build_python = build_root / "python" + source_python = tmp_path / "source" / "python" + repo_root = tmp_path / "repo" + _fake_ark_package(build_python, compiled=True) + _fake_ark_package(source_python, compiled=False) + repo_root.mkdir() + + monkeypatch.setattr(qwen3_env, "_REPO_ROOT", str(repo_root)) + monkeypatch.setattr( + qwen3_env.importlib.util, "find_spec", lambda name: None + ) + monkeypatch.setattr(sys, "path", [str(source_python)]) + monkeypatch.setenv("ARK_ROOT", str(build_root)) + monkeypatch.setenv("PYTHONPATH", str(source_python)) + monkeypatch.delenv("CUDA_VISIBLE_DEVICES", raising=False) + + env = _subprocess_env(world_size=2) + paths = env["PYTHONPATH"].split(os.pathsep) + + assert paths[:2] == [str(build_python), str(repo_root)] + assert paths.count(str(build_python)) == 1 + assert paths.count(str(source_python)) == 1 + assert env["ARK_ROOT"] == str(build_root) + assert env["CUDA_VISIBLE_DEVICES"] == "0,1" + + +def test_subprocess_env_skips_source_only_inherited_path(monkeypatch, tmp_path): + """A source-only inherited ark package cannot shadow compiled ark.core.""" + source_python = tmp_path / "source" / "python" + build_python = tmp_path / "other-build" / "python" + repo_root = tmp_path / "repo" + _fake_ark_package(source_python, compiled=False) + _fake_ark_package(build_python, compiled=True) + repo_root.mkdir() + + monkeypatch.setattr(qwen3_env, "_REPO_ROOT", str(repo_root)) + monkeypatch.setattr( + qwen3_env.importlib.util, "find_spec", lambda name: None + ) + monkeypatch.setattr(sys, "path", [str(source_python), str(build_python)]) + monkeypatch.delenv("ARK_ROOT", raising=False) + monkeypatch.setenv( + "PYTHONPATH", + os.pathsep.join([str(source_python), str(build_python)]), + ) + + env = _subprocess_env(world_size=1) + paths = env["PYTHONPATH"].split(os.pathsep) + + assert paths[0] == str(build_python) + assert paths.count(str(build_python)) == 1 + assert paths.count(str(source_python)) == 1 + assert paths.index(str(build_python)) < paths.index(str(source_python)) + + +# Worker script executed in each subprocess rank. +# Uses a deterministic seed per rank so the expected sum is reproducible. +_WORKER_SCRIPT = ''' +"""Worker: run ARK all-reduce and verify result on CPU.""" +import json +import os +import sys + +import torch +import ark +from ark.executor import Executor + +rank = int(sys.argv[1]) +world_size = int(sys.argv[2]) +n_elements = int(sys.argv[3]) + +ark.init() +ark.set_rank(rank) +ark.set_world_size(world_size) + +# --- Input: deterministic per-rank values (BEFORE ARK launch) --- +# Generate on CPU first so the host reference uses the exact same values. +torch.manual_seed(42 + rank) +x_cpu = torch.randn(n_elements, dtype=torch.float16) +x = x_cpu.to(device=f"cuda:{rank}") +# Safe: ARK has not launched yet, so the GPU copy can be synchronized. +torch.cuda.synchronize(rank) + +# Build ARK graph (no GPU kernel launched yet). +result = ark.all_reduce_packet(x, rank, world_size) + +with ark.Runtime() as rt: + rt.launch(device_id=rank) + if world_size > 1: + rt.barrier() + # Single iteration — correctness, not throughput. + rt.run(iter=1) + if world_size > 1: + rt.barrier() + rt.stop() + +# --- D2H transfer AFTER runtime stopped (safe: no ARK loop kernel live) --- +result_cpu = result.to_torch().cpu() + +# --- Expected: sum of all ranks' inputs --- +# Regenerate all ranks' CPU inputs and sum them. +expected = torch.zeros(n_elements, dtype=torch.float16) +for r in range(world_size): + torch.manual_seed(42 + r) + expected += torch.randn(n_elements, dtype=torch.float16) + +# FP16 all-reduce may accumulate rounding; use relaxed tolerance. +close = torch.allclose(result_cpu, expected, rtol=1e-2, atol=1e-2) + +# Report result as JSON on stdout (only rank 0 for simplicity). +if rank == 0: + max_diff = (result_cpu - expected).abs().max().item() + print(json.dumps({ + "rank": rank, + "world_size": world_size, + "n_elements": n_elements, + "pass": close, + "max_diff": max_diff, + })) + sys.stdout.flush() + +# Workaround: mscclpp's UnixSocketServer destructor races during normal +# Python shutdown (static destruction order is undefined across TUs), +# causing SIGABRT. Executor.reset() forces orderly mscclpp teardown, +# then os._exit() skips Python's atexit / gc finalizers entirely. +Executor.reset() +os._exit(0 if close else 1) +''' + + +def _tail(data, limit=500): + """Return a short decoded tail for subprocess diagnostics.""" + return data.decode(errors="replace").strip()[-limit:] + + +def _run_allreduce_test(world_size: int, n_elements: int, timeout: int = 120): + """Spawn *world_size* workers and assert all-reduce correctness.""" + procs = [] + for rank in range(world_size): + procs.append( + subprocess.Popen( + [ + sys.executable, + "-c", + _WORKER_SCRIPT, + str(rank), + str(world_size), + str(n_elements), + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + cwd="/", + env=_subprocess_env(world_size), + ) + ) + + errors = [] + result_json = None + try: + for rank, p in enumerate(procs): + try: + out, err = p.communicate(timeout=timeout) + except subprocess.TimeoutExpired: + p.kill() + p.wait() + errors.append(f"rank {rank}: timed out after {timeout}s") + continue + if p.returncode != 0: + errors.append( + f"rank {rank}: exit={p.returncode} " + f"stderr={_tail(err, 300)}" + ) + if rank == 0 and out.strip(): + result_json = _load_worker_result(out) + if result_json is None: + errors.append( + "rank 0: stdout contained no JSON result " + f"stdout_tail={_tail(out)} stderr_tail={_tail(err)}" + ) + finally: + for p in procs: + p.kill() + p.wait() + + assert not errors, "\n".join(errors) + assert result_json is not None, "rank 0 produced no output" + assert result_json[ + "pass" + ], f"allclose failed: max_diff={result_json['max_diff']}" + + +# ---------- Decode shape (1, 4096) = 4096 elements ---------- + + +@pytest.mark.skipif(_gpu_count() < 2, reason="need ≥2 GPUs") +def test_allreduce_decode_tp2(): + """Decode (1,4096) all-reduce at TP=2.""" + _run_allreduce_test(world_size=2, n_elements=4096) + + +@pytest.mark.skipif( + not _large_tests_enabled(), reason="set ARK_QWEN3_LARGE_TESTS=1" +) +@pytest.mark.skipif(_gpu_count() < 8, reason="need ≥8 GPUs") +def test_allreduce_decode_tp8(): + """Decode (1,4096) all-reduce at TP=8.""" + _run_allreduce_test(world_size=8, n_elements=4096) + + +# ---------- Prefill shape (2048, 4096) = 8388608 elements ---------- + + +@pytest.mark.skipif( + not _large_tests_enabled(), reason="set ARK_QWEN3_LARGE_TESTS=1" +) +@pytest.mark.skipif(_gpu_count() < 2, reason="need ≥2 GPUs") +def test_allreduce_prefill_tp2(): + """Prefill (2048,4096) all-reduce at TP=2.""" + _run_allreduce_test(world_size=2, n_elements=2048 * 4096) + + +@pytest.mark.skipif( + not _large_tests_enabled(), reason="set ARK_QWEN3_LARGE_TESTS=1" +) +@pytest.mark.skipif(_gpu_count() < 8, reason="need ≥8 GPUs") +def test_allreduce_prefill_tp8(): + """Prefill (2048,4096) all-reduce at TP=8.""" + _run_allreduce_test(world_size=8, n_elements=2048 * 4096)