diff --git a/order-service/src/main/java/cn/mayiming/Consumer/PaySuccessConsumer.java b/order-service/src/main/java/cn/mayiming/Consumer/PaySuccessConsumer.java new file mode 100644 index 0000000..82baa1f --- /dev/null +++ b/order-service/src/main/java/cn/mayiming/Consumer/PaySuccessConsumer.java @@ -0,0 +1,47 @@ +package cn.mayiming.Consumer; + +import cn.mayiming.Mapper.OrderMapper; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.util.Map; + +@Component +@RocketMQMessageListener( + topic = "pay_success_topic", + consumerGroup = "order-pay-success-consumer" +) +public class PaySuccessConsumer implements RocketMQListener> { + + @Autowired + private OrderMapper orderMapper; + + @Transactional(rollbackFor = Exception.class) + @Override + public void onMessage(Map msg) { + try { + String orderNo = msg.get("orderNo").toString(); + System.out.println("========== 订单服务接收支付成功消息 =========="); + System.out.println("订单号:" + orderNo); + System.out.println("支付金额:" + msg.get("payAmount")); + System.out.println("==========================================="); + + // 核心:更新订单状态为「已支付」(假设1=已支付) + int updateResult = orderMapper.updateOrderStatus(orderNo, 0, 1); + if (updateResult == 0) { + throw new RuntimeException("更新订单状态失败,订单号:" + orderNo); + } + System.out.println("订单" + orderNo + "已更新为「已支付」状态"); + + } catch (Exception e) { + e.printStackTrace(); + System.err.println("消费支付成功消息失败:" + e.getMessage()); + // 生产环境:抛出异常触发MQ重试,确保订单状态最终一致 + throw new RuntimeException("消费失败,触发重试", e); + } + } +} + diff --git a/order-service/src/main/java/cn/mayiming/Mapper/OrderMapper.java b/order-service/src/main/java/cn/mayiming/Mapper/OrderMapper.java index a0601fa..0747af5 100644 --- a/order-service/src/main/java/cn/mayiming/Mapper/OrderMapper.java +++ b/order-service/src/main/java/cn/mayiming/Mapper/OrderMapper.java @@ -78,4 +78,8 @@ public interface OrderMapper { // 简单生成规则:时间戳 + 6位随机数 return System.currentTimeMillis() + "" + (int)(Math.random() * 900000 + 100000); } + + // OrderMapper补充更新订单状态方法 + @Update("UPDATE t_order SET order_status = #{newStatus}, update_time = CURRENT_TIMESTAMP WHERE order_no = #{orderNo} AND order_status = #{oldStatus}") + int updateOrderStatus(@Param("orderNo") String orderNo, @Param("oldStatus") Integer oldStatus, @Param("newStatus") Integer newStatus); } diff --git a/pay-service/pom.xml b/pay-service/pom.xml index f6c3c42..8d796bc 100644 --- a/pay-service/pom.xml +++ b/pay-service/pom.xml @@ -82,6 +82,10 @@ org.apache.rocketmq rocketmq-spring-boot-starter - + + org.apache.rocketmq + rocketmq-client + 5.3.2 + diff --git a/pay-service/src/main/java/cn/mayiming/App.java b/pay-service/src/main/java/cn/mayiming/App.java index bd10567..4041a93 100644 --- a/pay-service/src/main/java/cn/mayiming/App.java +++ b/pay-service/src/main/java/cn/mayiming/App.java @@ -1,13 +1,17 @@ package cn.mayiming; +import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.transaction.annotation.EnableTransactionManagement; /** * Hello world! * */ @SpringBootApplication +@EnableTransactionManagement +@MapperScan("cn.mayiming.Mapper") public class App { public static void main( String[] args ) diff --git a/pay-service/src/main/java/cn/mayiming/Consumer/payConsumer.java b/pay-service/src/main/java/cn/mayiming/Consumer/payConsumer.java index b97855a..e1a2ca8 100644 --- a/pay-service/src/main/java/cn/mayiming/Consumer/payConsumer.java +++ b/pay-service/src/main/java/cn/mayiming/Consumer/payConsumer.java @@ -1,6 +1,7 @@ package cn.mayiming.Consumer; import cn.mayiming.Entity.PayTriggerMsgDTO; +import cn.mayiming.Service.PayService; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.factory.annotation.Autowired; @@ -14,6 +15,9 @@ import org.springframework.stereotype.Component; ) public class payConsumer implements RocketMQListener { + @Autowired + private PayService payService; + @Override public void onMessage(PayTriggerMsgDTO msg) { // 消费者接收到消息,模拟支付流程(本地仅生成支付链接,记录到日志/订单) @@ -25,6 +29,13 @@ public class payConsumer implements RocketMQListener { System.out.println("模拟支付链接:http://localhost:8080/order/mock/pay?orderNo=" + msg.getOrderNo()); System.out.println("======================================="); + // 核心:调用PayService处理(写入幂等记录+支付记录) + payService.handlePayTrigger(msg); + + // 生成模拟支付链接(包含订单号+幂等ID,供前端调用) + String mockPayUrl = "http://localhost:8080/pay/mock/success?orderNo=" + msg.getOrderNo() + "&businessId=" + msg.getBusinessId(); + System.out.println("模拟支付链接:" + mockPayUrl); + System.out.println("============================================="); } catch (Exception e) { e.printStackTrace(); diff --git a/pay-service/src/main/java/cn/mayiming/Controller/payController.java b/pay-service/src/main/java/cn/mayiming/Controller/payController.java new file mode 100644 index 0000000..51c47d0 --- /dev/null +++ b/pay-service/src/main/java/cn/mayiming/Controller/payController.java @@ -0,0 +1,33 @@ +package cn.mayiming.Controller; + +import cn.mayiming.Entity.checkpayDTO; +import cn.mayiming.Service.PayService; +import jakarta.annotation.Resource; +import jakarta.servlet.http.HttpServletRequest; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.HashMap; +import java.util.Map; + +@RestController +@RequestMapping("/pay") +public class payController { + @Autowired + PayService payService; + + @PostMapping("/paycheck") + public Map checkpay(@RequestBody checkpayDTO checkpayDTO) { + Map result = new HashMap<>(); + boolean isSuccess = payService.isPaySuccess(checkpayDTO.getOrderno()); + result.put("success", true); + result.put("orderNo", checkpayDTO.getOrderno()); + result.put("isPaySuccess", isSuccess); + result.put("msg", isSuccess ? "该订单已支付成功" : "该订单未支付或不存在"); + return result; + } +} diff --git a/pay-service/src/main/java/cn/mayiming/Entity/IdempotentRecord.java b/pay-service/src/main/java/cn/mayiming/Entity/IdempotentRecord.java new file mode 100644 index 0000000..34a8f3a --- /dev/null +++ b/pay-service/src/main/java/cn/mayiming/Entity/IdempotentRecord.java @@ -0,0 +1,14 @@ +package cn.mayiming.Entity; + +import lombok.Data; + +import java.util.Date; + +@Data +public class IdempotentRecord { + private Long id; + private String requestId; // 对应表的request_id(前端唯一请求ID/订单号) + private Integer status; // 0-处理中 1-处理成功 2-处理失败 + private Date createTime; + private Date updateTime; +} \ No newline at end of file diff --git a/pay-service/src/main/java/cn/mayiming/Entity/PayRecord.java b/pay-service/src/main/java/cn/mayiming/Entity/PayRecord.java new file mode 100644 index 0000000..a0b3782 --- /dev/null +++ b/pay-service/src/main/java/cn/mayiming/Entity/PayRecord.java @@ -0,0 +1,15 @@ +package cn.mayiming.Entity; + +import lombok.Data; + +import java.math.BigDecimal; +import java.util.Date; + +@Data +public class PayRecord { + private Long id; + private String orderNo; // 关联订单号 + private BigDecimal payAmount; // 支付金额 + private Integer payStatus; // 0-未支付 1-支付成功 + private Date createTime; +} \ No newline at end of file diff --git a/pay-service/src/main/java/cn/mayiming/Entity/checkpayDTO.java b/pay-service/src/main/java/cn/mayiming/Entity/checkpayDTO.java new file mode 100644 index 0000000..884e299 --- /dev/null +++ b/pay-service/src/main/java/cn/mayiming/Entity/checkpayDTO.java @@ -0,0 +1,10 @@ +package cn.mayiming.Entity; + +import jakarta.validation.constraints.NotNull; +import lombok.Data; + +@Data +public class checkpayDTO { + @NotNull + private String orderno; +} diff --git a/pay-service/src/main/java/cn/mayiming/Mapper/payMapper.java b/pay-service/src/main/java/cn/mayiming/Mapper/payMapper.java index 5c44fb7..6294c9d 100644 --- a/pay-service/src/main/java/cn/mayiming/Mapper/payMapper.java +++ b/pay-service/src/main/java/cn/mayiming/Mapper/payMapper.java @@ -1,5 +1,82 @@ package cn.mayiming.Mapper; +import cn.mayiming.Entity.IdempotentRecord; +import cn.mayiming.Entity.PayRecord; +import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.Select; +import org.apache.ibatis.annotations.Update; +import org.apache.ibatis.annotations.Param; +import org.springframework.stereotype.Repository; + +import java.math.BigDecimal; + +@Repository public interface payMapper { -} + // ==================== 幂等记录(idempotent_record)相关操作 ==================== + /** + * 根据requestId查询幂等记录 + * @param requestId 前端唯一请求ID(对应businessId) + * @return 幂等记录对象 + */ + @Select("SELECT id, request_id AS requestId, status, create_time AS createTime, update_time AS updateTime " + + "FROM idempotent_record WHERE request_id = #{requestId}") + IdempotentRecord selectIdempotentByRequestId(@Param("requestId") String requestId); + + /** + * 插入幂等记录(初始状态:0-处理中) + * @param requestId 唯一请求ID + * @param status 处理状态 + * @return 插入成功返回1,失败返回0 + */ + @Insert("INSERT INTO idempotent_record (request_id, status, create_time, update_time) " + + "VALUES (#{requestId}, #{status}, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)") + int insertIdempotentRecord(@Param("requestId") String requestId, @Param("status") Integer status); + + /** + * 更新幂等记录状态 + * @param requestId 唯一请求ID + * @param status 新状态(1-成功/2-失败) + * @return 更新成功返回1,无匹配记录返回0 + */ + @Update("UPDATE idempotent_record SET status = #{status}, update_time = CURRENT_TIMESTAMP " + + "WHERE request_id = #{requestId}") + int updateIdempotentStatus(@Param("requestId") String requestId, @Param("status") Integer status); + + // ==================== 支付记录(t_pay_record)相关操作 ==================== + /** + * 根据订单号查询支付记录 + * @param orderNo 订单号 + * @return 支付记录对象 + */ + @Select("SELECT id, order_no AS orderNo, pay_amount AS payAmount, pay_status AS payStatus, create_time AS createTime " + + "FROM t_pay_record WHERE order_no = #{orderNo}") + PayRecord selectPayRecordByOrderNo(@Param("orderNo") String orderNo); + + /** + * 插入支付记录(初始状态:0-未支付) + * @param orderNo 订单号 + * @param payAmount 支付金额 + * @param payStatus 支付状态 + * @return 插入成功返回1,失败返回0 + */ + @Insert("INSERT INTO t_pay_record (order_no, pay_amount, pay_status, create_time) " + + "VALUES (#{orderNo}, #{payAmount}, #{payStatus}, CURRENT_TIMESTAMP)") + int insertPayRecord(@Param("orderNo") String orderNo, + @Param("payAmount") BigDecimal payAmount, + @Param("payStatus") Integer payStatus); + + /** + * 更新支付记录状态(模拟支付成功时调用) + * @param orderNo 订单号 + * @param payStatus 新状态(1-支付成功) + * @return 更新成功返回1,无匹配记录返回0 + */ + @Update("UPDATE t_pay_record SET pay_status = #{payStatus}, create_time = CURRENT_TIMESTAMP " + + "WHERE order_no = #{orderNo} AND pay_status = 0") + int updatePayRecordStatus(@Param("orderNo") String orderNo, @Param("payStatus") Integer payStatus); + + + @Select("SELECT COUNT(1) FROM t_pay_record WHERE order_no = #{orderNo} AND pay_status = 1") + int checkPaySuccessByOrderNo(@Param("orderNo") String orderNo); +} \ No newline at end of file diff --git a/pay-service/src/main/java/cn/mayiming/Service/PayService.java b/pay-service/src/main/java/cn/mayiming/Service/PayService.java new file mode 100644 index 0000000..074b782 --- /dev/null +++ b/pay-service/src/main/java/cn/mayiming/Service/PayService.java @@ -0,0 +1,139 @@ +package cn.mayiming.Service; + +import cn.mayiming.Entity.IdempotentRecord; +import cn.mayiming.Entity.PayRecord; +import cn.mayiming.Entity.PayTriggerMsgDTO; +import cn.mayiming.Mapper.payMapper; +import jakarta.annotation.Resource; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.math.BigDecimal; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +@Service +public class PayService { + // 替换为你实际的 payMapper(整合了幂等+支付记录操作) + @Autowired + private payMapper payMapper; + + + @Resource + private RocketMQTemplate rocketMQTemplate; + + private static final String PAY_SUCCESS_TOPIC = "pay_success_topic"; + + + /** + * 处理订单支付触发消息(核心:事务+幂等) + * @param msg 订单服务发送的支付触发消息 + */ + @Transactional(rollbackFor = Exception.class) + public void handlePayTrigger(PayTriggerMsgDTO msg) { + // 1. 幂等校验:用businessId作为requestId(唯一标识) + String requestId = msg.getBusinessId(); + IdempotentRecord existRecord = payMapper.selectIdempotentByRequestId(requestId); + if (existRecord != null) { + // 已存在幂等记录,直接返回(避免重复处理) + System.out.println("幂等记录已存在,requestId:" + requestId + ",无需重复处理"); + return; + } + + try { + // 2. 写入幂等记录(初始状态:0-处理中) + int insertIdempotentResult = payMapper.insertIdempotentRecord(requestId, 0); + if (insertIdempotentResult == 0) { + throw new RuntimeException("幂等记录插入失败,requestId:" + requestId); + } + + // 3. 校验支付记录是否已存在(双重幂等保障) + PayRecord existPayRecord = payMapper.selectPayRecordByOrderNo(msg.getOrderNo()); + if (existPayRecord != null) { + // 更新幂等记录为成功,返回 + payMapper.updateIdempotentStatus(requestId, 1); + System.out.println("支付记录已存在,订单号:" + msg.getOrderNo()); + return; + } + + // 4. 写入支付记录(初始状态:0-未支付) + BigDecimal payAmount = msg.getPayAmount() == null ? new BigDecimal("0.00") : msg.getPayAmount(); + int insertPayResult = payMapper.insertPayRecord(msg.getOrderNo(), payAmount, 0); + if (insertPayResult == 0) { + throw new RuntimeException("支付记录插入失败,订单号:" + msg.getOrderNo()); + } + + // 5. 所有操作成功,更新幂等记录为处理成功 + payMapper.updateIdempotentStatus(requestId, 1); + System.out.println("支付记录创建成功,订单号:" + msg.getOrderNo() + ",幂等ID:" + requestId); + + } catch (Exception e) { + e.printStackTrace(); + // 6. 操作失败,更新幂等记录为处理失败 + try { + payMapper.updateIdempotentStatus(requestId, 2); + } catch (Exception ex) { + System.err.println("更新幂等记录为失败状态失败,requestId:" + requestId); + } + throw new RuntimeException("处理支付触发消息失败,订单号:" + msg.getOrderNo(), e); + } + } + + /** + * 模拟支付成功(更新支付记录状态) + * @param orderNo 订单号 + * @param businessId 幂等ID + * @return 操作结果 + */ + @Transactional(rollbackFor = Exception.class) + public boolean mockPaySuccess(String orderNo, String businessId) { + // 1. 原有校验逻辑 + IdempotentRecord idempotentRecord = payMapper.selectIdempotentByRequestId(businessId); + if (idempotentRecord == null || idempotentRecord.getStatus() != 1) { + System.err.println("幂等记录异常,requestId:" + businessId); + return false; + } + PayRecord payRecord = payMapper.selectPayRecordByOrderNo(orderNo); + if (payRecord == null || payRecord.getPayStatus() != 0) { + System.err.println("支付记录异常,订单号:" + orderNo); + return false; + } + + // 2. 更新支付记录为成功 + int updateResult = payMapper.updatePayRecordStatus(orderNo, 1); + if (updateResult == 0) { + throw new RuntimeException("更新支付状态失败,订单号:" + orderNo); + } + + // ========== 新增:支付成功后发送MQ消息给订单服务 ========== + try { + // 构建支付成功消息体 + Map paySuccessMsg = new HashMap<>(); + paySuccessMsg.put("orderNo", orderNo); + paySuccessMsg.put("payAmount", payRecord.getPayAmount()); + paySuccessMsg.put("payTime", new Date()); + paySuccessMsg.put("businessId", businessId); + + // 发送消息(同步发送,确保订单服务能收到) + rocketMQTemplate.convertAndSend(PAY_SUCCESS_TOPIC, paySuccessMsg); + System.out.println("支付成功消息已发送,订单号:" + orderNo); + } catch (Exception e) { + e.printStackTrace(); + // 生产环境:消息发送失败需重试/告警,避免订单状态不一致 + throw new RuntimeException("发送支付成功消息失败,订单号:" + orderNo, e); + } + + return true; + } + + + public boolean isPaySuccess(String orderNo) { + // 调用新增的mapper方法,返回1则表示支付成功 + int count = payMapper.checkPaySuccessByOrderNo(orderNo); + return count > 0; + } + + +} \ No newline at end of file diff --git a/pay-service/src/main/resources/application.yml b/pay-service/src/main/resources/application.yml index 18009b2..9c5ff52 100644 --- a/pay-service/src/main/resources/application.yml +++ b/pay-service/src/main/resources/application.yml @@ -43,7 +43,7 @@ rocketmq: name-server: 127.0.0.1:9876 consumer: # 消费者组名(必须唯一,建议按服务+用途命名) - group: order-service-consumer + group: pay-service-consumer # 消费模式:CONCURRENTLY(并发消费,默认)/ORDERLY(顺序消费) consume-mode: CONCURRENTLY # 批量消费最大条数(默认1,单条消费)