Skip to content

Conversation

waliwali777
Copy link

@waliwali777 waliwali777 commented Aug 25, 2025

Summary by CodeRabbit

  • New Features

    • Dynamic neighbor/edge/angle selection for descriptors and graph-indexed updates.
    • Optional exponential neighbor weighting and option to initialize edge features from distance.
    • Distributed execution support across model forward, loss, and training paths.
    • Default local mapping enabled for descriptors.
    • Runtime flag to toggle a JIT-accelerated activation path.
    • Data-loading simplified to fixed batch sizing (sampler behavior changed).
  • Chores

    • Disabled selected pre-commit hooks.
    • Updated example: increased batch sizes and reduced training steps; added distributed run script.

Copy link
Contributor

coderabbitai bot commented Aug 25, 2025

📝 Walkthrough

Walkthrough

The PR adds distributed execution and dynamic graph-indexed descriptor paths, integrates a JIT-switchable SiLUT autograd path, introduces an exponential neighbor switch and preprocessing changes, modifies data loading and training for distributed/CINN runs, reshares loss tensors for distributed training, adjusts environment flags and device handling, disables two pre-commit hooks, and updates example configs/scripts.

Changes

Cohort / File(s) Summary
Pre-commit config
./.pre-commit-config.yaml
Commented out the mirrors-prettier and mirrors-bibtex-tidy hook blocks.
Env flags & entrypoint
deepmd/pd/utils/env.py, deepmd/pd/entrypoints/main.py
Added CUSTOM_OP_USE_JIT (exported) and derive LOCAL_RANK from PADDLE_LOCAL_RANK; main sets env.CUSTOM_OP_USE_JIT = False during train.
SiLUT JIT/autograd & Activation
deepmd/pd/utils/utils.py
Added silut_forward/backward/double-backward, SiLUTScript PyLayer/JIT autograd path; ActivationFn selects script path when env.CUSTOM_OP_USE_JIT is true.
Distributed loss resharing
deepmd/pd/loss/ener.py
Use paddle.distributed.reshard to replicate/reside energy/force/virial diffs before L2/L1/huber loss computations.
Env mat & preprocessing
deepmd/pd/model/descriptor/env_mat.py, deepmd/pd/utils/preprocess.py
Added use_exp_switch flag, new compute_exp_sw, refactored smooth weight and coordinate gathering; threaded flag through prod_env_mat.
RepFlow dynamic selection & helpers
deepmd/pd/model/descriptor/repflow_layer.py, deepmd/pd/model/network/utils.py
Added dynamic index-guided methods (_cal_hg_dynamic, symmetrization_op_dynamic, optim_*_dynamic), forward now accepts edge_index/angle_index; new utilities aggregate and get_graph_index.
Repflows block & DPA3 defaults
deepmd/pd/model/descriptor/repflows.py, deepmd/pd/model/descriptor/dpa3.py
New options (edge_init_use_dist, use_exp_switch, use_dynamic_sel, sel_reduce_factor); use_loc_mapping default changed to True; parallel_mode gating and propagation of flags; dynamic index flow added.
Model distributed wrapping
deepmd/pd/model/model/make_model.py
Wrap neighbor-list builder and lower forward with dist.local_map over fleet mesh (distributed execution/resharding).
Training & dataloader
deepmd/pd/train/training.py, deepmd/pd/utils/dataloader.py
Fleet mesh init in Trainer, DataLoader simplified to batch_size (samplers commented out), CINN to_static scaffolding and sharded inputs, removed sampler-based logic and profiler/validation blocks.
Minor device & spin changes
deepmd/pd/model/descriptor/repformers.py, deepmd/pd/utils/spin.py
Simplified .to(env.DEVICE) calls; removed explicit device placement when allocating zeros in spin extension.
Examples & scripts
examples/water/dpa3/input_torch.json, examples/water/dpa3/run.sh
Increased example batch sizes and reduced steps; updated run.sh to prepare distributed paddle.distributed.launch invocation and environment template.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Trainer
  participant Fleet as FleetMesh
  participant Model
  participant Dist as dist.local_map
  participant Desc as Descriptor

  Trainer->>Fleet: create_mesh() / init()
  Trainer->>Model: forward(input_dict, label_dict)
  Model->>Dist: wrap(extend_input_and_build_neighbor_list) with in_placements
  Dist-->>Model: extended_coord, extended_atype, mapping, nlist
  Model->>Dist: wrap(partial(forward_common_lower,...), reshard_inputs=True)
  Dist-->>Model: model_predict_lower (distributed)
  Model-->>Trainer: model_predict (casted)
Loading
sequenceDiagram
  autonumber
  participant Desc as DescrptBlockRepflows
  participant Utils as network.utils
  participant Layer as RepFlowLayer

  Note over Desc: when use_dynamic_sel == True
  Desc->>Utils: get_graph_index(nlist, masks, nall)
  Utils-->>Desc: edge_index, angle_index
  Desc->>Layer: forward(..., edge_index, angle_index)
  alt Dynamic selection
    Layer->>Layer: _cal_hg_dynamic / symmetrization_op_dynamic
    Layer->>Layer: optim_edge_update_dynamic / optim_angle_update_dynamic
  else Static selection
    Layer->>Layer: static _cal_hg / optim_* (existing)
  end
Loading
sequenceDiagram
  autonumber
  participant Env
  participant Act as ActivationFn
  participant SiLUT
  participant Script as SiLUTScript

  Env-->>Act: CUSTOM_OP_USE_JIT flag
  alt CUSTOM_OP_USE_JIT == False
    Act-->>SiLUT: use native SiLUT implementation
  else CUSTOM_OP_USE_JIT == True
    Act-->>Script: construct SiLUTScript (JIT/PyLayer path)
  end
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

Suggested reviewers

  • njzjz
  • iProzd
  • wanghan-iapcm

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
deepmd/pd/train/training.py (4)

167-179: DataLoader: batch_size and collate_fn set for debugging; make them configurable

  • For real training, batch_size=1 with collate_fn=lambda batch: batch[0] kills throughput and bypasses batching semantics.
  • Respect training_params["training_data"]["batch_size"] and optionally NUM_WORKERS.

Apply this diff:

-                _dataloader = DataLoader(
-                    _data,
-                    batch_size=1,
-                    num_workers=NUM_WORKERS
-                    if dist.is_available()
-                    else 0,  # setting to 0 diverges the behavior of its iterator; should be >=1
-                    collate_fn=lambda batch: batch[0],  # prevent extra conversion
-                )
+                bs = _params.get("batch_size", 32)
+                nw = _params.get(
+                    "num_workers",
+                    NUM_WORKERS if (dist.is_available() and dist.is_initialized()) else 0,
+                )
+                _dataloader = DataLoader(
+                    _data,
+                    batch_size=bs,
+                    num_workers=nw,
+                    # Use default collate_fn to enable batching unless a custom one is explicitly provided.
+                    collate_fn=_params.get("collate_fn", None),
+                )

616-647: CINN to_static input_spec: ensure deterministic structure, cover all label keys, and match dtypes

  • Building label_dict_spec from a single sample risks mismatch at runtime (e.g., missing “natoms”/“numb_copy” that step() later provides).
  • Dtypes are hard-coded to "float64" for labels/inputs, while cur_lr elsewhere uses DEFAULT_PRECISION; mismatch leads to to_static/type errors.

Apply this diff to (1) always provide the full, known label spec and (2) align dtypes with DEFAULT_PRECISION:

-            # NOTE: This is a trick to decide the right input_spec for wrapper.forward
-            _, label_dict, _ = self.get_data(is_train=True)
-
-            # Define specification templates
-            spec_templates = {
-                "find_box": np.float32(1.0),
-                "find_coord": np.float32(1.0),
-                "find_numb_copy": np.float32(0.0),
-                "numb_copy": static.InputSpec([1, 1], "int64", name="numb_copy"),
-                "find_energy": np.float32(1.0),
-                "energy": static.InputSpec([1, 1], "float64", name="energy"),
-                "find_force": np.float32(1.0),
-                "force": static.InputSpec([1, -1, 3], "float64", name="force"),
-                "find_virial": np.float32(0.0),
-                "virial": static.InputSpec([1, 9], "float64", name="virial"),
-                "natoms": static.InputSpec([1, -1], "int32", name="natoms"),
-            }
-            # Build spec only for keys present in sample data
-            label_dict_spec = {
-                k: spec_templates[k] for k in label_dict.keys() if k in spec_templates
-            }
+            # Define a stable, full label specification
+            float_dtype = DEFAULT_PRECISION  # align with training precision
+            spec_templates = {
+                "find_box": np.float32(1.0),
+                "find_coord": np.float32(1.0),
+                "find_numb_copy": np.float32(0.0),
+                "find_energy": np.float32(1.0),
+                "find_force": np.float32(1.0),
+                "find_virial": np.float32(0.0),
+                "numb_copy": static.InputSpec([1, 1], "int64", name="numb_copy"),
+                "energy": static.InputSpec([1, 1], float_dtype, name="energy"),
+                "force": static.InputSpec([1, -1, 3], float_dtype, name="force"),
+                "virial": static.InputSpec([1, 9], float_dtype, name="virial"),
+                "natoms": static.InputSpec([1, -1], "int32", name="natoms"),
+            }
+            # Always provide the full set; unused keys are harmless but ensure stable schema
+            label_dict_spec = spec_templates
@@
-                    static.InputSpec([], "float64", name="cur_lr"),  # cur_lr
+                    static.InputSpec([], float_dtype, name="cur_lr"),  # cur_lr

Also, prefer if k in label_dict instead of if k in label_dict.keys() to satisfy Ruff SIM118.


727-729: Profiler push/pop imbalance

You call core.nvprof_nvtx_push() but the matching pop is commented out. This leaves NVTX ranges unbalanced.

Apply this diff to restore the pop:

-            # if enable_profiling:
-            #     core.nvprof_nvtx_pop()
+            if enable_profiling:
+                core.nvprof_nvtx_pop()

Alternatively, remove the push if you don’t want per-step ranges.

Also applies to: 976-978


1124-1132: Replace invalid to(..., blocking=False) calls with copy_to(..., blocking=False)

Paddle’s Tensor.to method does not accept a blocking (or non_blocking) argument. To perform an asynchronous GPU transfer, use Tensor.copy_to(place, blocking=False) (or paddle.to_tensor(..., place=..., blocking=False)).

• Location: deepmd/pd/train/training.py, lines 1124–1132
• Change all

- tensor.to(DEVICE, blocking=False)
+ tensor.copy_to(DEVICE, blocking=False)

– for both single tensors and list comprehensions.

Example diff:

@@ -1124,7 +1124,7 @@ for key in batch_data:
             if key == "sid" or key == "fid" or key == "box" or "find_" in key:
                 continue
             elif not isinstance(batch_data[key], list):
-                batch_data[key] = batch_data[key].to(DEVICE, blocking=False)
+                batch_data[key] = batch_data[key].copy_to(DEVICE, blocking=False)
             else:
-                batch_data[key] = [
-                    item.to(DEVICE, blocking=False) for item in batch_data[key]
-                ]
+                batch_data[key] = [
+                    item.copy_to(DEVICE, blocking=False) for item in batch_data[key]
+                ]

This ensures the code uses the correct PaddlePaddle API for asynchronous tensor transfers.

🧹 Nitpick comments (5)
examples/water/dpa3/run.sh (1)

1-8: Prune or gate commented environment lines

The commented env unsets/exports are noise in examples. Either remove them or gate with a flag (e.g., set DEBUG_LAUNCHER=1 to toggle).

If you want to keep them for reference, add a short header comment explaining when to use them. Otherwise, delete.

deepmd/pd/train/training.py (4)

707-707: Remove leftover “xxx” from rank log

Stray marker in log message.

-            log.info(f"xxx Rank: {dist.get_rank()}/{dist.get_world_size()}")
+            log.info(f"Rank: {dist.get_rank()}/{dist.get_world_size()}")

953-964: Periodic checkpoint saving is disabled; at least gate it by config

Disabling all intermediate checkpoints increases risk of data loss. Make it conditional (e.g., training_params["save_intermediate"]=True) or restore the original behavior.

I can wire this to save every save_freq steps on rank 0 only.


966-975: TensorBoard logging disabled; consider gating by training_params["tensorboard"]

You already have enable_tensorboard flag; keep the block under that condition rather than fully commenting it out.

No diff provided to avoid churn; happy to propose one if requested.


636-637: Minor: avoid .keys() in membership checks (SIM118)

Use if k in label_dict for clarity and speed.

-            label_dict_spec = {
-                k: spec_templates[k] for k in label_dict.keys() if k in spec_templates
-            }
+            label_dict_spec = {k: spec_templates[k] for k in label_dict if k in spec_templates}

Note: superseded by the earlier refactor that sets a full, stable spec.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between eeabe19 and e79ab47.

📒 Files selected for processing (5)
  • deepmd/pd/loss/ener.py (4 hunks)
  • deepmd/pd/train/training.py (12 hunks)
  • deepmd/pd/utils/dataloader.py (1 hunks)
  • examples/water/dpa3/input_torch.json (1 hunks)
  • examples/water/dpa3/run.sh (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • examples/water/dpa3/input_torch.json
  • deepmd/pd/utils/dataloader.py
  • deepmd/pd/loss/ener.py
🧰 Additional context used
🧬 Code graph analysis (1)
deepmd/pd/train/training.py (3)
deepmd/pt/train/training.py (1)
  • get_data (1096-1138)
source/tests/pd/model/test_saveload_dpa1.py (1)
  • get_data (117-134)
deepmd/pd/model/network/network.py (1)
  • Tensor (30-33)
🪛 Ruff (0.12.2)
deepmd/pd/train/training.py

22-22: paddle.distributed.fleet.utils.hybrid_parallel_util imported but unused

Remove unused import: paddle.distributed.fleet.utils.hybrid_parallel_util

(F401)


29-29: Redefinition of unused dist from line 18

Remove definition: dist

(F811)


30-30: Redefinition of unused fleet from line 20

Remove definition: fleet

(F811)


30-30: paddle.distributed.fleet imported but unused

Remove unused import: paddle.distributed.fleet

(F401)


31-31: Redefinition of unused functools from line 3

Remove definition: functools

(F811)


106-106: Redefinition of unused fleet from line 30

Remove definition: fleet

(F811)


636-636: Use key in dict instead of key in dict.keys()

Remove .keys()

(SIM118)


751-751: Local variable sync_context is assigned to but never used

Remove assignment to unused variable sync_context

(F841)

🪛 Shellcheck (0.10.0)
examples/water/dpa3/run.sh

[error] 1-1: Tips depend on target shell and yours is unknown. Add a shebang or a 'shell' directive.

(SC2148)


[warning] 13-13: nsys_args is referenced but not assigned.

(SC2154)

🔇 Additional comments (2)
examples/water/dpa3/run.sh (1)

18-23: I’m currently unable to fetch live PaddlePaddle docs. Would you like me to:

  • Proceed with details based on my March 2025 knowledge (PaddlePaddle 2.6/2.7)?
  • Or provide your own link or enable web access so I can cite the latest official migration guide?

Let me know which you’d prefer.

deepmd/pd/train/training.py (1)

661-668: DDP wrapping looks fine

fleet.distributed_model and distributed_optimizer are correctly applied after JIT/static transforms. No action needed.

Comment on lines +22 to +31
from paddle.distributed.fleet.utils import hybrid_parallel_util as hpu
from paddle.framework import (
core,
)
from paddle.io import (
DataLoader,
)

import paddle.distributed as dist
from paddle.distributed import fleet
import functools
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix duplicate/unused imports; these will fail Ruff (F401/F811)

  • hybrid_parallel_util (hpu) is unused (F401).
  • dist, fleet, functools are re-imported (F811) and unused in some places.

Apply this diff:

-from paddle.distributed.fleet.utils import hybrid_parallel_util as hpu
+# from paddle.distributed.fleet.utils import hybrid_parallel_util as hpu  # unused; remove if not needed
@@
-import paddle.distributed as dist
-from paddle.distributed import fleet
-import functools
+# Remove duplicate imports; they already appear above

Alternatively, fully dedupe the imports at the top to a single:

-import paddle.distributed as dist
-from paddle.distributed import (
-    fleet,
-)
+import paddle.distributed as dist
+from paddle.distributed import fleet

and ensure there are no second copies later in the file.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from paddle.distributed.fleet.utils import hybrid_parallel_util as hpu
from paddle.framework import (
core,
)
from paddle.io import (
DataLoader,
)
import paddle.distributed as dist
from paddle.distributed import fleet
import functools
# from paddle.distributed.fleet.utils import hybrid_parallel_util as hpu # unused; remove if not needed
from paddle.framework import (
core,
)
from paddle.io import (
DataLoader,
)
# Remove duplicate imports; they already appear above
🧰 Tools
🪛 Ruff (0.12.2)

22-22: paddle.distributed.fleet.utils.hybrid_parallel_util imported but unused

Remove unused import: paddle.distributed.fleet.utils.hybrid_parallel_util

(F401)


29-29: Redefinition of unused dist from line 18

Remove definition: dist

(F811)


30-30: Redefinition of unused fleet from line 20

Remove definition: fleet

(F811)


30-30: paddle.distributed.fleet imported but unused

Remove unused import: paddle.distributed.fleet

(F401)


31-31: Redefinition of unused functools from line 3

Remove definition: functools

(F811)

🤖 Prompt for AI Agents
In deepmd/pd/train/training.py around lines 22 to 31, the import block contains
duplicate and unused imports (hybrid_parallel_util as hpu is unused; dist,
fleet, and functools are re-imported elsewhere), which will trigger Ruff errors
F401/F811; remove the unused hpu import and eliminate duplicate/re-imports so
every symbol is imported exactly once at the top of the file, keeping only the
needed imports (paddle.framework.core, paddle.io.DataLoader, and any single
imports of paddle.distributed/fleet/functools if actually used), ensure no later
re-imports exist in the file, and run ruff to confirm the warnings are resolved.

Comment on lines +106 to +110
from paddle.distributed import fleet
mesh_dims = [("dp", 32)]
fleet.auto.create_mesh(mesh_dims)
fleet.init(is_collective=True)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid hard-coding dp=32; initialize Fleet and ProcessMesh from actual world size

Creating a 32-way mesh unconditionally will break single-GPU or smaller multi-GPU runs and can desync with the launcher’s world size.

Apply this safer initialization:

-        from paddle.distributed import fleet
-        mesh_dims = [("dp", 32)]
-        fleet.auto.create_mesh(mesh_dims)
-        fleet.init(is_collective=True)
+        # Initialize distributed and mesh based on actual world size.
+        # Order: init fleet first, then create a mesh that matches the launched topology.
+        from paddle.distributed import fleet
+        if not dist.is_initialized():
+            fleet.init(is_collective=True)
+        world_size = dist.get_world_size() if dist.is_initialized() else 1
+        mesh_dims = [("dp", world_size)]
+        fleet.auto.create_mesh(mesh_dims)

Optional: store the mesh for later use instead of calling dist.get_mesh() repeatedly.

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 Ruff (0.12.2)

106-106: Redefinition of unused fleet from line 30

Remove definition: fleet

(F811)

🤖 Prompt for AI Agents
In deepmd/pd/train/training.py around lines 106 to 110, the code hard-codes
mesh_dims = [("dp", 32)] which will break runs with different world sizes;
replace the hard-coded 32 by querying the actual distributed world size (e.g.,
via paddle.distributed / fleet APIs), compute mesh_dims = [("dp", world_size)]
(or a safe partitioning based on world_size), create the mesh with that value,
call fleet.init(is_collective=True) as before, and (optionally) assign the
returned mesh to a variable for reuse instead of repeatedly calling
dist.get_mesh().

Comment on lines +751 to +755
sync_context = (
self.wrapper.no_sync
if self.world_size > 1
else contextlib.nullcontext
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove unused sync_context variable (Ruff F841)

The variable is assigned but never used. Either remove it or use it to gate no_sync sections for gradient accumulation.

-                sync_context = (
-                    self.wrapper.no_sync
-                    if self.world_size > 1
-                    else contextlib.nullcontext
-                )
+                # no gradient accumulation; no need for no_sync here

If you intended to accumulate gradients, wrap the forward/backward in a with-block and step the optimizer every N steps.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
sync_context = (
self.wrapper.no_sync
if self.world_size > 1
else contextlib.nullcontext
)
# no gradient accumulation; no need for no_sync here
🧰 Tools
🪛 Ruff (0.12.2)

751-751: Local variable sync_context is assigned to but never used

Remove assignment to unused variable sync_context

(F841)

Comment on lines +758 to +789
log_dict = {}

input_dict = {
"spin": None,
"fparam": None,
"aparam": None,
}
label_dict = {
"find_box": 1.0,
"find_coord": 1.0,
"find_numb_copy": 0.0,
"find_energy": 1.0,
"find_force": 1.0,
"find_virial": 0.0,
}
for k in ["atype", "box", "coord"]:
input_dict[k] = paddle.load(f"./input_{k}.pd")
for k in ["energy", "force", "natoms", "numb_copy", "virial"]:
label_dict[k] = paddle.load(f"./label_{k}.pd")

for __key in ('coord', 'atype', 'box'):
input_dict[__key] = dist.shard_tensor(input_dict[__key], mesh=dist.get_mesh(), placements=[dist.Shard(0)])
for __key, _ in label_dict.items():
if isinstance(label_dict[__key], paddle.Tensor):
label_dict[__key] = dist.shard_tensor(label_dict[__key], mesh=dist.get_mesh(), placements=[dist.Shard(0)])

model_pred, loss, more_loss = self.wrapper(
**input_dict,
cur_lr=paddle.full([], pref_lr, DEFAULT_PRECISION),
label=label_dict,
task_key=task_key,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Forward path bypasses DataLoader and hardcodes on-disk I/O — gate this behind a config flag

Overriding input/labels with paddle.load("./input_*.pd") every step defeats streaming, kills performance, and couples training to external files. Keep it only for debug/regression and make it opt-in.

Apply this diff to make “offline_io” optional and configurable:

-                # with nvprof_context(enable_profiling, "Forward pass"):
-                log_dict = {}
-
-                input_dict = {
-                    "spin": None,
-                    "fparam": None,
-                    "aparam": None,
-                }
-                label_dict = {
-                    "find_box": 1.0,
-                    "find_coord": 1.0,
-                    "find_numb_copy": 0.0,
-                    "find_energy": 1.0,
-                    "find_force": 1.0,
-                    "find_virial": 0.0,
-                }
-                for k in ["atype", "box", "coord"]:
-                    input_dict[k] = paddle.load(f"./input_{k}.pd")
-                for k in ["energy", "force", "natoms", "numb_copy", "virial"]:
-                    label_dict[k] = paddle.load(f"./label_{k}.pd")
-
-                for __key in ('coord', 'atype', 'box'):
-                    input_dict[__key] = dist.shard_tensor(input_dict[__key], mesh=dist.get_mesh(), placements=[dist.Shard(0)])
-                for __key, _ in label_dict.items():
-                    if isinstance(label_dict[__key], paddle.Tensor):
-                        label_dict[__key] = dist.shard_tensor(label_dict[__key], mesh=dist.get_mesh(), placements=[dist.Shard(0)])
-                
+                log_dict = {}
+                offline_io = training_params.get("offline_io", False)
+                if offline_io:
+                    offline_dir = Path(training_params.get("offline_inputs_dir", "."))
+                    input_dict = {
+                        "spin": None,
+                        "fparam": None,
+                        "aparam": None,
+                    }
+                    label_dict = {
+                        "find_box": 1.0,
+                        "find_coord": 1.0,
+                        "find_numb_copy": 0.0,
+                        "find_energy": 1.0,
+                        "find_force": 1.0,
+                        "find_virial": 0.0,
+                    }
+                    for k in ["atype", "box", "coord"]:
+                        input_dict[k] = paddle.load(str(offline_dir / f"input_{k}.pd"))
+                    for k in ["energy", "force", "natoms", "numb_copy", "virial"]:
+                        label_dict[k] = paddle.load(str(offline_dir / f"label_{k}.pd"))
+                    mesh = dist.get_mesh()
+                    for __key in ("coord", "atype", "box"):
+                        input_dict[__key] = dist.shard_tensor(
+                            input_dict[__key], mesh=mesh, placements=[dist.Shard(0)]
+                        )
+                    for __key, __val in list(label_dict.items()):
+                        if isinstance(__val, paddle.Tensor):
+                            label_dict[__key] = dist.shard_tensor(
+                                __val, mesh=mesh, placements=[dist.Shard(0)]
+                            )
+                else:
+                    # Use streaming data
+                    input_dict, label_dict, log_dict = self.get_data(
+                        is_train=True, task_key=task_key
+                    )

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In deepmd/pd/train/training.py around lines 758 to 789, the current code
unconditionally calls paddle.load("./input_*.pd") and "./label_*.pd" each step
which bypasses the DataLoader and forces on-disk I/O; make this offline I/O
behavior optional by adding a configurable flag (e.g., offline_io defaulting to
False) in the training config, and wrap the paddle.load calls and subsequent
dist.shard_tensor calls in an if offline_io: block so they only run when the
flag is enabled; when offline_io is False, continue to use the existing
DataLoader path (do not override input_dict/label_dict), and when offline_io is
True, load files once before the training loop (or cache the loaded tensors)
rather than loading each step to avoid repeated disk access and then perform
sharding as currently done.

@@ -0,0 +1,23 @@
# unset PADDLE_ELASTIC_JOB_ID
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add a shebang and strict mode to satisfy ShellCheck and harden the script

ShellCheck flags the missing shebang (SC2148). Also, enabling strict mode will catch unset vars and abort on errors.

Apply this diff at the top of the file:

+#!/usr/bin/env bash
+set -Eeuo pipefail
+IFS=$'\n\t'
🧰 Tools
🪛 Shellcheck (0.10.0)

[error] 1-1: Tips depend on target shell and yours is unknown. Add a shebang or a 'shell' directive.

(SC2148)

🤖 Prompt for AI Agents
In examples/water/dpa3/run.sh at line 1, the script lacks a shebang and strict
shell options; add a POSIX-compatible bash shebang (e.g. #!/usr/bin/env bash) as
the first line and enable strict mode by setting -euo pipefail (and optionally
IFS=$'\n\t') so the script aborts on errors, treats unset variables as failures,
and propagates pipe failures; ensure this is placed before any other commands or
comments.

Comment on lines +13 to +16
${nsys_args} python -m paddle.distributed.launch \
--log_dir output \
--run_mode=collective \
dp --pd train input_torch.json
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Guard optional nsys_args and ensure output dir exists

  • SC2154 warns nsys_args may be unset. Use a safe default.
  • Make sure the log dir exists even if launcher doesn’t create it.

Apply this diff:

-rm -rf output/
+rm -rf output/
+mkdir -p output
-
-${nsys_args} python -m paddle.distributed.launch \
+${nsys_args:-} python -m paddle.distributed.launch \
     --log_dir output \
     --run_mode=collective \
     dp --pd train input_torch.json
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
${nsys_args} python -m paddle.distributed.launch \
--log_dir output \
--run_mode=collective \
dp --pd train input_torch.json
rm -rf output/
mkdir -p output
${nsys_args:-} python -m paddle.distributed.launch \
--log_dir output \
--run_mode=collective \
dp --pd train input_torch.json
🧰 Tools
🪛 Shellcheck (0.10.0)

[warning] 13-13: nsys_args is referenced but not assigned.

(SC2154)

🤖 Prompt for AI Agents
In examples/water/dpa3/run.sh around lines 13 to 16, guard against an unset
nsys_args and ensure the log directory exists: initialize or reference nsys_args
with a safe default using the parameter expansion form ${nsys_args:-} when
invoking the launcher, and add a mkdir -p output (or the appropriate log_dir
path) before running the python command so the log directory is created
regardless of the launcher behavior.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants