支付服务器补单机制

支付业务通常分订单服务器(OrderServer)和支付服务器(BillingServer)。OrderServer用于生成订单,BillingServer用于
接收第三方(微信、支付宝等)用户支付完成的回调,并通知其他业务服务器。本文仅介绍BillingServer的补单机制,补单演进分如下几个阶段。

阶段一 先天

无补单机制。BillingServer接收第三方回调后,记录入库并打印日志,通知业务服务器,根据响应的成功或失败状态,更新订单记录。
若因网络等问题通知业务服务器失败,只能通过查库和日志的方式手动补单。

阶段二 紫府

三次重试。接收第三方回调后,通知业务服务器,若返回失败,则重试通知。

阶段三 万象

自动补单机制应运而生。通过延时队列,九次补单,实现自动补单机制。简单实现如下:
延时队列因子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class DelayedElement<T> implements Delayed {
private T element;
/**
* 延迟时间
*/
private long delay;
/**
* 到期时间
*/
private long expire;
public DelayedElement(T t, long delay) {
this.element = t;
this.delay = delay;
this.expire = System.currentTimeMillis() + delay;
}
/**
* 获取剩余时间
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/**
* 优先级规则
*/
@Override
public int compareTo(Delayed o) {
return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
}
public T getElement() {
return element;
}
}

订单类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
public class PushOrderInfo {
/**
* 订单Id
*/
private String orderId;
/**
* 补单次数
*/
private int pushNum;
/**
* 用户Id
*/
private int userId;
/**
* 商品id
*/
private int itemId;
/**
* 应用Id
*/
private int appId;
/**
* 购买类型
*/
private int buyType;
public PushOrderInfo(String orderId, int pushNum, int userId, int itemId, int appId, int buyType) {
this.orderId = orderId;
this.pushNum = pushNum;
this.userId = userId;
this.itemId = itemId;
this.appId = appId;
this.buyType = buyType;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public int getPushNum() {
return pushNum;
}
public void setPushNum(int pushNum) {
this.pushNum = pushNum;
}
public int getUserId() {
return userId;
}
public void setUserId(int userId) {
this.userId = userId;
}
public int getItemId() {
return itemId;
}
public void setItemId(int itemId) {
this.itemId = itemId;
}
public int getAppId() {
return appId;
}
public void setAppId(int appId) {
this.appId = appId;
}
public int getBuyType() {
return buyType;
}
public void setBuyType(int buyType) {
this.buyType = buyType;
}
}

补单主流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
@Component
public class PushManager {
private static final Logger logger = LoggerFactory.getLogger(PushManager.class);
/**
* 订单状态
* 未完成为0; 成功为1; 失败订单(达到最大补单次数仍未成功)为2
*/
private static final int ORDER_UNFINISHED = 0;
private static final int ORDER_SUCCESS = 1;
private static final int ORDER_FAILED = 2;
/**
* 补单时间及次数
*/
private static final int MAX_PUSH_NUM = 9;
private static final long SECOND = 1000L;
private static final long MINUTE = 60 * SECOND;
private static final long HOUR = 60 * MINUTE;
private Map<Integer, Long> delayTimes = new HashMap<Integer, Long>() {
{
put(1, 15 * SECOND); //第一次15s
put(2, 3 * MINUTE); //第二次3m
put(3, 10 * MINUTE); //第三次10m
put(4, 30 * MINUTE); //第四次30m
put(5, 30 * MINUTE); //第五次30m
put(6, HOUR); //第六次1h
put(7, 2 * HOUR); //第七次2h
put(8, 6 * HOUR); //第八次6h
put(9, 15 * HOUR); //第九次15h
}
};
private DbDoService dbDoService;
private DelayQueue<DelayedElement<PushOrderInfo>> delayQueue = new DelayQueue();
@Autowired
public PushManager(DbDoService dbDoService, AppConfigs appConfigs) {
this.dbDoService = dbDoService;
for (Integer appId : appConfigs.getAppConfigs().keySet()) {
//启动时先将之前未完成的订单推送,数据库数据加载到内存
List<HashMap<String, Object>> unFinishedOrder = dbDoService.queryUnFinishedOrder(appId);
if (unFinishedOrder != null) {
for (HashMap<String, Object> order : unFinishedOrder) {
String orderId = order.get("order_id").toString();
int pushNum = Integer.parseInt(order.get("push_num").toString());
int userId = Integer.parseInt(order.get("user_id").toString());
int itemId = Integer.parseInt(order.get("item_id").toString());
int buyType = Integer.parseInt(order.get("buy_type").toString());
try {
pushOrder(new PushOrderInfo(orderId, pushNum, userId, itemId, appId, buyType));
} catch (Exception e) {
logger.error("push order failed, orderId: {}, exception: {}", orderId, e);
}
}
}
}
new Thread(() -> {
while (true) {
try {
DelayedElement<PushOrderInfo> orderDelayedElement = delayQueue.take();
PushOrderInfo order = orderDelayedElement.getElement();
if (order == null) {
logger.warn("push order is null");
}
pushOrder(order);
} catch (Exception e) {
logger.error("push order failed exception: {}", e);
}
}
}).start();
logger.info("-------------push thread start-----------------");
}
/**
* 加入补单队列
*/
@SuppressWarnings("unchecked")
private void addOrder(PushOrderInfo order) {
Long delay = delayTimes.get(order.getPushNum());
if (delay == null) {
logger.warn("can not find delay by pushNum: {}", order.getPushNum());
return;
}
if(order.getPushNum() > 3){
logger.warn("order pushNum is more than 3, orderId: {}, appId: {}, userId: {}, pushNum: {}", order.getOrderId(), order.getAppId(), order.getUserId(), order.getPushNum());
}
delayQueue.add(new DelayedElement(order, delay));
}
/**
* 失败订单处理
*/
private void failedOrder(PushOrderInfo order){
order.setPushNum(order.getPushNum() + 1);
if (order.getPushNum() <= MAX_PUSH_NUM) {
//加入补单队列
addOrder(order);
dbDoService.updateOrderRecord(order.getAppId(), order.getOrderId(), ORDER_UNFINISHED, order.getPushNum());
} else {
//已达最大补单次数,仍补单失败
dbDoService.updateOrderRecord(order.getAppId(), order.getOrderId(), ORDER_FAILED, order.getPushNum());
}
}
/**
* 推送订单
*/
public void pushOrder(PushOrderInfo order) throws Exception {
//判断业务服务器是否提供服务
if (PublicService.otherServerInfo.get(ConstValue.MAIN_SERVER_TYPE) == null) {
logger.warn("pushOrder, main server isn't running");
failedOrder(order);
return;
}
//生成推送的url
String url = generatePushUrl(order);
logger.info("pushOrder, push url: {}", url);
HttpClient.doGetAsync(url, new Callback() {
@Override
public void onFailure(Call call, IOException e) {
failedOrder(order);
}
@Override
public void onResponse(Call call, Response response) throws IOException {
try {
if (response != null && response.isSuccessful()) {
String responseStr = response.body().string();
logger.info("order push response: {}", responseStr);
JSONObject jsonObject = JSONObject.parseObject(responseStr);
Integer result = jsonObject.getInteger("result");
if (result != null && result == ConstValue.SUCCESS) { //通信正常
Integer data = jsonObject.getInteger("data");
if (data != null && data == ConstValue.SUCCESS) { //合法
logger.info("orderId: {} success", order.getOrderId());
dbDoService.updateOrderRecord(order.getAppId(), order.getOrderId(), ORDER_SUCCESS, order.getPushNum());
} else {
logger.error("orderId: {} failed, response data error, data is: {}", order.getOrderId(), jsonObject.getInteger("data"));
dbDoService.updateOrderRecord(order.getAppId(), order.getOrderId(), ORDER_FAILED, order.getPushNum());
}
return;
}
}
} catch (Exception e) {
Util.closeQuietly(response);
logger.error("server response error: {}", e);
}
failedOrder(order);
}
});
}
}

阶段四 元神

多点部署。保证各节点数据一致性,引入Redis,将补单队列由本地内存移到Redis缓存中。引入jesque依赖。

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/net.greghaines/jesque -->
<dependency>
<groupId>net.greghaines</groupId>
<artifactId>jesque</artifactId>
<version>2.1.2</version>
</dependency>

redis连接配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import com.google.common.base.Splitter;
import com.google.common.collect.Sets;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisSentinelPool;
import redis.clients.jedis.Protocol;
import redis.clients.util.Pool;
import java.util.Set;
@Configuration
public class JesqueConfig {
@Bean
public Pool<Jedis> jedisPool(RedisConf redisConf, JedisPoolConfig jedisPoolConfig) {
Iterable<String> sentinelParts = Splitter.on(',').trimResults().omitEmptyStrings().split(redisConf.getSentinelAddress());
final Set<String> sentinels = Sets.newHashSet(sentinelParts);
return new JedisSentinelPool(redisConf.getMasterName(), sentinels
, jedisPoolConfig, Protocol.DEFAULT_TIMEOUT, redisConf.getPassword(), redisConf.getDbIndex());
}
@Bean
public JedisPoolConfig jedisPoolConfig(RedisConf redisConf) {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxWaitMillis(redisConf.getMaxWaitTime());
return jedisPoolConfig;
}
}

延时队列具体使用查看jesque(Github地址)

阶段五 返虚

高可靠消息队列, 如Apache Kafka(GitHub地址), 生产者与消费者约定group、topic等,第三方服务器回调后,BillingServer将订单成功的消息放到队列中,业务服务器取出。

阶段六 天仙

http与消息队列并存。通过热更新配置切换,结合ELK日志监控报警和Grafana应用监控,防止某种方式不可用。