跳到主要内容

FIFO算法详解

📋 算法概述

FIFO(First In First Out)算法是算力点系统的核心消费策略,确保用户优先使用即将过期的点数,最大化点数利用率,减少用户损失。

🎯 设计目标

用户体验优化

  • 减少点数过期浪费
  • 透明的消费优先级
  • 合理的资源利用

系统性能优化

  • 高效的消费计算
  • 最小化数据库查询
  • 保证数据一致性

🔧 算法原理

基本思路

  1. 获取账本: 查询用户所有活跃账本
  2. 时间排序: 按过期时间升序排列
  3. 逐个扣减: 从最早账本开始消费
  4. 状态更新: 更新账本余额和状态

核心流程

FIFO 消费算法流程:

  1. 接收消费请求:接收点数消费请求
  2. 查询用户活跃账本:查询用户所有活跃状态的点数账本
  3. 按过期时间排序:将账本按过期时间升序排列
  4. 计算总余额:计算所有账本的总余额
  5. 余额充足判断
    • 余额不足 → 返回余额不足错误
    • 余额充足 → 开始 FIFO 消费
  6. FIFO 消费
    • 从最早过期的账本开始扣减
    • 判断是否还需继续消费
    • 如果还需消费 → 继续下一个账本
    • 如果消费完成 → 创建消费记录
  7. 创建消费记录:记录本次消费的详细信息
  8. 更新账本状态:更新相关账本的余额和状态
  9. 返回消费结果:返回消费成功的结果

💻 实现细节

数据结构

type UserPointLedger struct {
ID string
UserID string
Balance int64
ExpiresAt time.Time
Status LedgerStatus
}

type LedgerDeduction struct {
LedgerID string
Amount int64
}

核心算法

func (uc *PointUseCase) ConsumePoints(
ctx context.Context,
userID, tenantID, productID, description string,
requiredAmount int64,
) (*PointConsumptionRecord, *PointTransaction, error) {
// 1. 获取活跃账本(按过期时间排序)
ledgers, err := uc.userPointLedgerRepo.ListActiveUserPointLedgers(ctx, userID)
if err != nil {
return nil, nil, err
}

// 2. 计算总余额
var totalBalance int64
for _, ledger := range ledgers {
totalBalance += ledger.Balance
}

// 3. 检查余额是否充足
if totalBalance < requiredAmount {
return nil, nil, ErrorInsufficientBalance("余额不足")
}

// 4. FIFO消费逻辑
remainingAmount := requiredAmount
var deductions []LedgerDeduction

for _, ledger := range ledgers {
if remainingAmount <= 0 {
break
}

// 计算本次扣减金额
deductionAmount := min(ledger.Balance, remainingAmount)
deductions = append(deductions, LedgerDeduction{
LedgerID: ledger.ID,
Amount: deductionAmount,
})

// 更新账本余额
newBalance := ledger.Balance - deductionAmount
status := UserPointLedgerStatusActive
if newBalance == 0 {
status = UserPointLedgerStatusFullyConsumed
}

err = uc.userPointLedgerRepo.UpdateBalance(ctx, ledger.ID, newBalance, status)
if err != nil {
return nil, nil, err
}

remainingAmount -= deductionAmount
}

// 5. 创建消费记录和交易记录
record := &PointConsumptionRecord{
UserID: userID,
TenantID: tenantID,
ProductID: productID,
ConsumedAmount: requiredAmount,
LedgerDeductions: marshalDeductions(deductions),
Description: description,
CreatedAt: time.Now(),
}

// ... 保存记录并返回结果
}

📊 性能优化

数据库优化

-- 优化查询索引
CREATE INDEX idx_user_ledger_expires ON user_point_ledgers(user_id, expires_at);
CREATE INDEX idx_user_ledger_status ON user_point_ledgers(user_id, status);

-- 高效的账本查询
SELECT id, balance, expires_at
FROM user_point_ledgers
WHERE user_id = $1 AND status = 'ACTIVE'
ORDER BY expires_at ASC;

缓存策略

  • 热点数据缓存: 用户总余额
  • 查询结果缓存: 账本列表
  • 失效策略: 账本变更时清除

批量处理

// 批量更新账本余额
func (r *UserPointLedgerRepo) BatchUpdateBalance(
ctx context.Context,
updates []BalanceUpdate,
) error {
// 使用事务批量更新
return r.client.Transaction(ctx, func(tx *ent.Tx) error {
for _, update := range updates {
err := tx.UserPointLedger.
UpdateOneID(update.LedgerID).
SetBalance(update.NewBalance).
SetStatus(update.Status).
Exec(ctx)
if err != nil {
return err
}
}
return nil
})
}

🔍 边界情况处理

余额不足

if totalBalance < requiredAmount {
return nil, nil, &InsufficientBalanceError{
Required: requiredAmount,
Available: totalBalance,
UserID: userID,
}
}

并发消费

// 使用乐观锁防止并发问题
func (r *UserPointLedgerRepo) UpdateBalanceWithVersion(
ctx context.Context,
ledgerID string,
newBalance int64,
expectedVersion int,
) error {
return r.client.UserPointLedger.
UpdateOneID(ledgerID).
SetBalance(newBalance).
Where(userpointledger.Version(expectedVersion)).
Exec(ctx)
}

异常恢复

// 消费失败时回滚
func (uc *PointUseCase) rollbackConsumption(
ctx context.Context,
deductions []LedgerDeduction,
) error {
for _, deduction := range deductions {
// 恢复账本余额
ledger, err := uc.userPointLedgerRepo.Get(ctx, deduction.LedgerID)
if err != nil {
continue
}

newBalance := ledger.Balance + deduction.Amount
status := UserPointLedgerStatusActive

err = uc.userPointLedgerRepo.UpdateBalance(
ctx, deduction.LedgerID, newBalance, status)
if err != nil {
uc.log.Errorf("回滚账本失败: %v", err)
}
}
return nil
}

📈 监控指标

业务指标

  • 消费成功率: 成功消费次数 / 总消费次数
  • 平均消费账本数: 每次消费涉及的平均账本数量
  • 点数利用率: 已消费点数 / 总发放点数

性能指标

  • 消费响应时间: 消费接口的平均响应时间
  • 数据库查询时间: 账本查询的平均耗时
  • 并发处理能力: 系统支持的最大并发消费数

异常指标

  • 余额不足错误: 余额不足的失败次数
  • 并发冲突: 乐观锁冲突的次数
  • 回滚操作: 消费失败回滚的次数

🧪 测试策略

单元测试

func TestFIFOConsumption(t *testing.T) {
tests := []struct {
name string
ledgers []UserPointLedger
requiredAmount int64
expectedDeductions []LedgerDeduction
expectError bool
}{
{
name: "单账本充足",
ledgers: []UserPointLedger{
{ID: "1", Balance: 100, ExpiresAt: time.Now().Add(24 * time.Hour)},
},
requiredAmount: 50,
expectedDeductions: []LedgerDeduction{
{LedgerID: "1", Amount: 50},
},
expectError: false,
},
{
name: "多账本组合",
ledgers: []UserPointLedger{
{ID: "1", Balance: 30, ExpiresAt: time.Now().Add(1 * time.Hour)},
{ID: "2", Balance: 40, ExpiresAt: time.Now().Add(2 * time.Hour)},
{ID: "3", Balance: 50, ExpiresAt: time.Now().Add(3 * time.Hour)},
},
requiredAmount: 70,
expectedDeductions: []LedgerDeduction{
{LedgerID: "1", Amount: 30},
{LedgerID: "2", Amount: 40},
},
expectError: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// 测试实现
})
}
}

集成测试

  • 完整的消费流程测试
  • 并发消费场景测试
  • 异常情况恢复测试

🔮 未来优化

算法优化

  • 智能预测: 基于历史数据预测消费模式
  • 动态调整: 根据系统负载动态调整策略
  • 多级缓存: 分层缓存提升性能

功能扩展

  • 消费策略配置: 支持多种消费策略选择
  • 优先级标签: 支持账本优先级设置
  • 部分消费支持: 支持部分金额的消费

最后更新:2024年12月