核心内容摘要
17c在线播放:点亮你的娱乐视界,尽享视听盛宴
动态形状处理深度指南
1 动态形状基础与配置# dynamic_shape_basics.py import mindspore as ms import mindspore.nn as nn import mindspore.ops as ops import numpy as np from typing import Tuple, Optional, Union class DynamicShapeConfig: 动态形状配置管理器 def __init__(self, enable_dynamic_shape: bool True, max_dynamic_memory: str 80%, # 最大动态形状内存 min_dynamic_memory: str 2GB, # 最小动态形状内存 enable_shape_cache: bool True, cache_capacity: int 50, enable_auto_padding: bool True): self.enable_dynamic_shape enable_dynamic_shape self.max_dynamic_memory max_dynamic_memory self.min_dynamic_memory min_dynamic_memory self.enable_shape_cache enable_shape_cache self.cache_capacity cache_capacity self.enable_auto_padding enable_auto_padding # 动态形状策略 self.strategies { padding: self._padding_strategy, reshape: self._reshape_strategy, slice: self._slice_strategy, batch_aware: self._batch_aware_strategy } # 形状历史记录 self.shape_history {} self.cache_hits 0 self.cache_misses 0 def configure_context(self): 配置动态形状相关上下文 if not self.enable_dynamic_shape: return # 设置昇腾动态形状配置 ascend_config { dynamic_shape_enable: True, dynamic_shape_mem_limit: self.max_dynamic_memory, dynamic_shape_min_mem: self.min_dynamic_memory, dynamic_shape_cache_enable: self.enable_shape_cache, dynamic_shape_cache_capacity: self.cache_capacity, dynamic_inputs_shape_range: {} # 动态形状范围 } # 设置MindSpore上下文 ms.set_context( modems.GRAPH_MODE, device_targetAscend, ascend_configascend_config, enable_dynamic_shapeTrue, max_device_memoryself.max_dynamic_memory, graph_kernel_flags--enable_dynamic_shape_fusionTrue ) print(f动态形状已启用内存限制: {self.max_dynamic_memory}) if self.enable_shape_cache: print(f形状缓存容量: {self.cache_capacity}) def set_dynamic_range(self, model, input_shapes_ranges): 设置动态形状范围 Args: model: 模型实例 input_shapes_ranges: 输入形状范围字典 Example: { input1: [(None, 3, 224,
, # 动态batch (32, 3, 224,
, # 最小batch (256, 3, 224,
] # 最大batch } if not self.enable_dynamic_shape: return # 设置动态输入 dynamic_inputs [] for name, shape_range in input_shapes_ranges.items(): min_shape, opt_shape, max_shape shape_range # 创建动态张量 dynamic_tensor ms.Tensor( shape[s if s is not None else -1 for s in min_shape], dtypems.float32 ) dynamic_inputs.append(dynamic_tensor) # 记录形状范围 self.shape_history[name] { min: min_shape, opt: opt_shape, max: max_shape, current: None } # 编译模型时设置动态输入 model.set_inputs(*dynamic_inputs) print(f设置动态形状范围: {input_shapes_ranges}) def _padding_strategy(self, tensor, target_shape): 填充策略 - 处理可变长度 current_shape tensor.shape pad_widths [] for curr, target in zip(current_shape, target_shape): if curr target: pad_widths.append((0, target - curr)) else: pad_widths.append((0,
) return ops.Pad(pad_widths)(tensor) def _reshape_strategy(self, tensor, target_shape): 重塑策略 - 重新排列数据 # 确保总元素数不变 current_elements np.prod(tensor.shape) target_elements np.prod(target_shape) if current_elements ! target_elements: raise ValueError(f元素数不匹配: {current_elements} ! {target_elements}) return ops.Reshape()(tensor, target_shape) def _slice_strategy(self, tensor, target_shape): 切片策略 - 截断多余部分 slices [] for curr, target in zip(tensor.shape, target_shape): if curr target: slices.append(slice(0, target)) else: slices.append(slice(None)) return tensor[tuple(slices)] def _batch_aware_strategy(self, tensor, target_shape): 批处理感知策略 - 智能处理批维度 # 分离批维度和特征维度 batch_dim 0 batch_size tensor.shape[batch_dim] target_batch target_shape[batch_dim] if batch_size target_batch: return tensor if batch_size target_batch: # 需要填充 return self._batch_padding(tensor, target_shape) else: # 需要切片 return self._batch_slicing(tensor, target_shape) def _batch_padding(self, tensor, target_shape): 批维度填充 pad_config [(0, target_shape[0] - tensor.shape[0])] pad_config.extend([(0,
] * (len(tensor.shape) -
) return ops.Pad(pad_config)(tensor) def _batch_slicing(self, tensor, target_shape): 批维度切片 slices [slice(0, target_shape[0])] slices.extend([slice(None)] * (len(tensor.shape) -
) return tensor[tuple(slices)] def adapt_shape(self, tensor, target_shape, strategybatch_aware): 自适应形状调整 if strategy not in self.strategies: raise ValueError(f未知策略: {strategy}) # 检查是否已经匹配 if tensor.shape target_shape: return tensor # 应用策略 adapted self.strategies[strategy](tensor, target_shape) # 记录形状变化 self._log_shape_adaptation(tensor.shape, target_shape, strategy) return adapted def _log_shape_adaptation(self, src_shape, dst_shape, strategy): 记录形状适配日志 key f{src_shape}-{dst_shape}:{strategy} if key in self.shape_history: self.shape_history[key][count] 1 self.cache_hits 1 else: self.shape_history[key] { count: 1, strategy: strategy, timestamp: time.time() } self.cache_misses 1 # 打印重要形状变化 if src_shape[0] ! dst_shape[0]: # 批维度变化 print(f批维度变化: {src_shape[0]} - {dst_shape[0]} f(策略: {strategy}))
2 动态形状模型设计# dynamic_shape_models.py class DynamicConv2D(nn.Cell): 动态卷积层 - 支持可变输入尺寸 def __init__(self, in_channels: int, out_channels: int, kernel_size: Union[int, Tuple[int, int]] 3, stride: Union[int, Tuple[int, int]] 1, padding: Union[int, Tuple[int, int]] 0, dilation: Union[int, Tuple[int, int]] 1, groups: int 1, dynamic_kernel: bool False): super().__init__() # 基本卷积参数 self.in_channels in_channels self.out_channels out_channels self.kernel_size kernel_size if isinstance(kernel_size, tuple) else (kernel_size, kernel_size) self.stride stride if isinstance(stride, tuple) else (stride, stride) self.padding padding if isinstance(padding, tuple) else (padding, padding) self.dilation dilation if isinstance(dilation, tuple) else (dilation, dilation) self.groups groups # 动态特性 self.dynamic_kernel dynamic_kernel self.shape_adapter DynamicShapeConfig() # 卷积核 if dynamic_kernel: # 动态卷积核 - 可适应不同输入尺寸 self.kernel_generator self._create_kernel_generator() self.current_kernel None else: # 固定卷积核 self.conv nn.Conv2d( in_channels, out_channels, kernel_size, stridestride, pad_modepad, paddingpadding, dilationdilation, groupgroups, has_biasTrue, weight_initHeUniform ) def _create_kernel_generator(self): 创建动态卷积核生成器 class KernelGenerator(nn.Cell): def __init__(self, base_channels, base_kernel): super().__init__() self.base_weight ms.Parameter( ms.Tensor( np.random.randn(*base_kernel) *
01, dtypems.float32 ) ) self.scale_factors nn.Dense(base_channels, base_channels) def construct(self, input_shape): # 根据输入形状调整卷积核 _, _, h, w input_shape # 计算缩放因子 scale_h h / 224 # 假设224是基准高度 scale_w w / 224 # 假设224是基准宽度 # 调整卷积核 # 这里简化为插值实际可能需要更复杂的调整 if scale_h !
0 or scale_w !
0: # 使用双线性插值调整卷积核大小 weight ops.ResizeBilinear( size(int(self.base_weight.shape[2] * scale_h), int(self.base_weight.shape[3] * scale_w)), align_cornersFalse )(self.base_weight) else: weight self.base_weight return weight base_kernel (self.out_channels, self.in_channels // self.groups, self.kernel_size[0], self.kernel_size[1]) return KernelGenerator(self.in_channels, base_kernel) def construct(self, x): 前向传播 - 动态形状处理 batch_size, channels, height, width x.shape # 动态调整卷积核如果需要 if self.dynamic_kernel: # 生成适应当前形状的卷积核 kernel self.kernel_generator(x.shape) # 动态卷积 output self._dynamic_conv2d(x, kernel) else: # 标准卷积 output self.conv(x) # 动态调整输出形状如果需要 output self._adapt_output_shape(output, x.shape) return output def _dynamic_conv2d(self, x, weight): 动态卷积实现 # 使用自定义卷积实现支持动态形状 # 这里简化实现实际可能需要更复杂的处理 # 计算输出形状 out_h (x.shape[2] 2 * self.padding[0] - self.dilation[0] * (weight.shape[2] -
-
// self.stride[0] 1 out_w (x.shape[3] 2 * self.padding[1] - self.dilation[1] * (weight.shape[3] -
-
// self.stride[1] 1 # 实现卷积简化版 # 实际生产环境应使用优化实现 output ops.Conv2D( out_channelself.out_channels, kernel_sizeself.kernel_size, mode1, pad_modepad, padself.padding, strideself.stride, dilationself.dilation, groupself.groups )(x, weight) return output def _adapt_output_shape(self, output, input_shape): 调整输出形状 # 这里可以添加后处理逻辑 # 例如动态批归一化、动态激活等 return output class DynamicSequenceModel(nn.Cell): 动态序列模型 - 支持可变序列长度 def __init__(self, input_dim: int, hidden_dim: int, num_layers: int 2, bidirectional: bool True, dynamic_length: bool True): super().__init__() self.input_dim input_dim self.hidden_dim hidden_dim self.num_layers num_layers self.bidirectional bidirectional self.dynamic_length dynamic_length # RNN层支持动态序列长度 self.rnn nn.LSTM( input_sizeinput_dim, hidden_sizehidden_dim, num_layersnum_layers, has_biasTrue, bidirectionalbidirectional, dropout
0, batch_firstTrue ) # 动态形状处理器 self.shape_processor DynamicSequenceProcessor( hidden_dim * (2 if bidirectional else
) def construct(self, x, seq_lengthsNone): 前向传播 Args: x: 输入张量 [batch, seq_len, features] seq_lengths: 每个序列的实际长度 [batch] batch_size, seq_len, features x.shape # 处理动态序列长度 if self.dynamic_length and seq_lengths is not None: # 使用pack_padded_sequence处理变长序列 x_packed self._pack_sequences(x, seq_lengths) # RNN处理 output_packed, (h_n, c_n) self.rnn(x_packed) # 解包 output, output_lengths self._unpack_sequences( output_packed, seq_lengths, batch_size, seq_len ) else: # 固定长度处理 output, (h_n, c_n) self.rnn(x) output_lengths None # 动态后处理 output self.shape_processor(output, seq_lengths) return output, h_n def _pack_sequences(self, x, lengths): 打包变长序列 # 按长度降序排序 sorted_lengths, sorted_indices ops.Sort(descendingTrue)(lengths) sorted_x x[sorted_indices] # 打包序列 packed nn.PackSequence(sorted_x, sorted_lengths) return packed def _unpack_sequences(self, packed_output, lengths, batch_size, max_len): 解包序列 # 恢复原始顺序 _, original_indices ops.Sort()(lengths) original_indices ops.Argsort()(original_indices) # 解包 output, output_lengths nn.UnpackSequence(packed_output, batch_size, max_len) # 恢复原始顺序 output output[original_indices] return output, output_lengths class DynamicSequenceProcessor(nn.Cell): 动态序列处理器 def __init__(self, hidden_dim): super().__init__() # 动态注意力机制 self.attention DynamicAttention(hidden_dim) # 动态层归一化 self.layer_norm DynamicLayerNorm(hidden_dim) # 动态dropout self.dropout nn.Dropout(keep_prob
0.
def construct(self, x, lengthsNone): 处理序列 # 动态掩码如果提供了长度 if lengths is not None: mask self._create_mask(x.shape[:2], lengths) x x * mask.unsqueeze(-
# 动态注意力 x self.attention(x, mask if lengths is not None else None) # 动态层归一化 x self.layer_norm(x) # dropout x self.dropout(x) return x def _create_mask(self, shape, lengths): 创建序列掩码 batch_size, max_len shape # 创建范围张量 range_tensor ops.arange(max_len).broadcast_to((batch_size, max_len)) # 创建掩码 lengths_expanded lengths.view(-1,
mask range_tensor lengths_expanded return mask.astype(ms.float
32)
3 动态形状训练循环# dynamic_training.py class DynamicShapeTrainingLoop: 动态形状训练循环 def __init__(self, model: nn.Cell, optimizer: nn.Optimizer, loss_fn: nn.Cell, dynamic_config: DynamicShapeConfig, enable_gradient_accumulation: bool True, accumulation_steps: int
: self.model model self.optimizer optimizer self.loss_fn loss_fn self.dynamic_config dynamic_config self.enable_gradient_accumulation enable_gradient_accumulation self.accumulation_steps accumulation_steps # 梯度累积 self.accumulated_gradients None self.accumulation_counter 0 # 形状统计 self.shape_statistics { batch_sizes: [], sequence_lengths: [], image_sizes: [] } # 性能监控 self.recompilation_count 0 self.cache_hit_rate
0 def train_step(self, data, labels, sample_infoNone): 动态形状训练步骤 # 记录输入形状 self._record_input_shapes(data, sample_info) # 动态调整模型如果需要 if self._needs_recompilation(data.shape): self._recompile_model(data.shape) # 前向传播 outputs self.model(data) # 计算损失 loss self.loss_fn(outputs, labels) # 反向传播 grads self._compute_gradients(loss) # 梯度累积 if self.enable_gradient_accumulation: grads self._accumulate_gradients(grads) # 优化器步骤 if not self.enable_gradient_accumulation or self.accumulation_counter self.accumulation_steps: self.optimizer(grads) self._reset_accumulation() return loss, outputs def _record_input_shapes(self, data, sample_info): 记录输入形状统计 shape data.shape # 记录批大小 self.shape_statistics[batch_sizes].append(shape[0]) # 记录序列长度或图像尺寸 if len(shape) 4: # 图像 [B, C, H, W] self.shape_statistics[image_sizes].append((shape[2], shape[3])) elif len(shape) 3: # 序列 [B, T, F] self.shape_statistics[sequence_lengths].append(shape[1]) # 限制历史大小 for key in self.shape_statistics: if len(self.shape_statistics[key]) 1000: self.shape_statistics[key] self.shape_statistics[key][-500:] def _needs_recompilation(self, new_shape): 检查是否需要重新编译 if not hasattr(self.model, last_compiled_shape): return True last_shape self.model.last_compiled_shape # 检查批维度变化 if new_shape[0] ! last_shape[0]: return True # 检查序列长度变化对于序列模型 if len(new_shape) 3 and new_shape[1] ! last_shape[1]: return True # 检查图像尺寸变化对于视觉模型 if len(new_shape) 4 and (new_shape[2] ! last_shape[2] or new_shape[3] ! last_shape[3]): return True return False def _recompile_model(self, new_shape): 重新编译模型以适应新形状 print(f重新编译模型以适应形状: {new_shape}) # 设置动态输入 dynamic_input ms.Tensor(shapenew_shape, dtypems.float
self.model.set_inputs(dynamic_input) # 编译模型 self.model.compile() # 记录编译形状 self.model.last_compiled_shape new_shape # 更新统计 self.recompilation_count 1 def _compute_gradients(self, loss): 计算梯度 # 使用MindSpore的自动微分 grads ms.grad(self._forward_and_loss, None, self.optimizer.parameters)(loss) return grads def _forward_and_loss(self, data, labels): 前向传播和损失计算 outputs self.model(data) loss self.loss_fn(outputs, labels) return loss def _accumulate_gradients(self, grads): 梯度累积 if self.accumulated_gradients is None: self.accumulated_gradients [ ms.ops.zeros_like(g) if g is not None else None for g in grads ] # 累积梯度 for i, grad in enumerate(grads): if grad is not None and self.accumulated_gradients[i] is not None: self.accumulated_gradients[i] grad / self.accumulation_steps self.accumulation_counter 1 return self.accumulated_gradients if self.accumulation_counter self.accumulation_steps else None def _reset_accumulation(self): 重置梯度累积 self.accumulated_gradients None self.accumulation_counter 0 def get_shape_statistics(self): 获取形状统计信息 stats {} for key, values in self.shape_statistics.items(): if values: stats[f{key}_mean] np.mean(values) stats[f{key}_std] np.std(values) stats[f{key}_min] np.min(values) stats[f{key}_max] np.max(values) stats[f{key}_unique] len(np.unique(values)) stats[recompilation_count] self.recompilation_count stats[cache_hit_rate] self.cache_hit_rate return stats class DynamicBatchSampler: 动态批采样器 - 根据序列长度动态调整批大小 def __init__(self, dataset_lengths, # 每个样本的长度 max_tokens_per_batch: int 4096, max_sequences_per_batch: int 32, shuffle: bool True): self.dataset_lengths dataset_lengths self.max_tokens_per_batch max_tokens_per_batch self.max_sequences_per_batch max_sequences_per_batch self.shuffle shuffle # 索引数组 self.indices np.arange(len(dataset_lengths)) # 当前批次 self.current_batch [] self.current_tokens 0 def __iter__(self): 迭代器 if self.shuffle: np.random.shuffle(self.indices) self.current_batch [] self.current_tokens 0 for idx in self.indices: sample_length self.dataset_lengths[idx] # 检查是否可以添加到当前批次 if (len(self.current_batch) self.max_sequences_per_batch and self.current_tokens sample_length self.max_tokens_per_batch): self.current_batch.append(idx) self.current_tokens sample_length else: # 返回当前批次 if self.current_batch: yield self.current_batch # 开始新批次 self.current_batch [idx] self.current_tokens sample_length # 返回最后一批 if self.current_batch: yield self.current_batch def __len__(self): 估计批次数 # 这里简化为固定估计实际需要动态计算 return len(self.indices) // self.max_sequences_per_batch
稀疏计算高级特性
1 稀疏张量基础# sparse_tensor_basics.py import mindspore as ms from mindspore import Tensor, CSRTensor, COOTensor import numpy as np from scipy import sparse class SparseTensorFactory: 稀疏张量工厂 staticmethod def dense_to_csr(dense_tensor: Tensor, threshold: float
0.
: 稠密张量转CSR格式 dense_np dense_tensor.asnumpy() # 创建稀疏矩阵 sparse_matrix sparse.csr_matrix(dense_np) # 应用阈值可选 if threshold 0: sparse_matrix.data[np.abs(sparse_matrix.data) threshold] 0 sparse_matrix.eliminate_zeros() # 转换为MindSpore CSRTensor indptr Tensor(sparse_matrix.indptr, dtypems.int
indices Tensor(sparse_matrix.indices, dtypems.int
values Tensor(sparse_matrix.data, dtypedense_tensor.dtype) shape dense_tensor.shape return CSRTensor(indptr, indices, values, shape) staticmethod def dense_to_coo(dense_tensor: Tensor, threshold: float
0.