feat:注释补充
This commit is contained in:
parent
114bcf33d8
commit
9bef786b21
@ -16,9 +16,9 @@ public class GitRepoProperties {
|
|||||||
private String username;
|
private String username;
|
||||||
/** Git 访问密码或 Token。 */
|
/** Git 访问密码或 Token。 */
|
||||||
private String password;
|
private String password;
|
||||||
/** 开发主配置分支,Git -> PROD 只读取此分支。 */
|
/** 当前待同步的版本分支,Git -> PROD 只读取此分支。 */
|
||||||
private String scanBranch;
|
private String scanBranch;
|
||||||
/** 生产快照分支,PROD -> Git 只写入此分支。 */
|
/** 生产快照分支前缀,PROD -> Git 会写入该前缀下的动态版本分支。 */
|
||||||
private String snapshotBranch;
|
private String snapshotBranch;
|
||||||
/** Git 机器人提交用户名。 */
|
/** Git 机器人提交用户名。 */
|
||||||
private String commitAuthorName;
|
private String commitAuthorName;
|
||||||
|
|||||||
@ -3,21 +3,39 @@ package com.ftptool.sync.config;
|
|||||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||||
|
|
||||||
@ConfigurationProperties(prefix = "prod.api")
|
@ConfigurationProperties(prefix = "prod.api")
|
||||||
|
/**
|
||||||
|
* 生产接口配置。
|
||||||
|
* 当前同时覆盖 pushConfig / pullConfig / login 三类接口的访问参数。
|
||||||
|
*/
|
||||||
public class ProdApiProperties {
|
public class ProdApiProperties {
|
||||||
|
|
||||||
|
/** 生产接口基础地址。 */
|
||||||
private String baseUrl;
|
private String baseUrl;
|
||||||
|
/** pushConfig 路径。 */
|
||||||
private String pushPath;
|
private String pushPath;
|
||||||
|
/** pullConfig 路径。 */
|
||||||
private String pullPath;
|
private String pullPath;
|
||||||
|
/** login 路径。 */
|
||||||
private String loginPath;
|
private String loginPath;
|
||||||
|
/** 静态 token,可选。 */
|
||||||
private String token;
|
private String token;
|
||||||
|
/** token 请求头名称。 */
|
||||||
private String tokenHeaderName = "token";
|
private String tokenHeaderName = "token";
|
||||||
|
/** pullConfig 可选机场过滤。 */
|
||||||
private String airportId;
|
private String airportId;
|
||||||
|
/** pullConfig 可选模块过滤。 */
|
||||||
private String appName;
|
private String appName;
|
||||||
|
/** pullConfig 可选版本过滤。 */
|
||||||
private String pullConfigVersion;
|
private String pullConfigVersion;
|
||||||
|
/** pullConfig 可选文件过滤。 */
|
||||||
private String pullFileName;
|
private String pullFileName;
|
||||||
|
/** login 用户名。 */
|
||||||
private String loginName;
|
private String loginName;
|
||||||
|
/** login 密码。 */
|
||||||
private String loginPassword;
|
private String loginPassword;
|
||||||
|
/** HTTP 连接超时。 */
|
||||||
private int connectTimeoutMs = 10000;
|
private int connectTimeoutMs = 10000;
|
||||||
|
/** HTTP 读取超时。 */
|
||||||
private int readTimeoutMs = 30000;
|
private int readTimeoutMs = 30000;
|
||||||
|
|
||||||
public String getBaseUrl() {
|
public String getBaseUrl() {
|
||||||
|
|||||||
@ -17,7 +17,8 @@ import javax.persistence.UniqueConstraint;
|
|||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tracks ackSuc/ackFail status and retry metadata for pulled production configs.
|
* pullConfig ACK 落库记录。
|
||||||
|
* 除了 remote id 和 ack 状态外,还保存定向重拉所需的业务上下文。
|
||||||
*/
|
*/
|
||||||
@Entity
|
@Entity
|
||||||
@Table(name = "prod_pull_ack", uniqueConstraints = {
|
@Table(name = "prod_pull_ack", uniqueConstraints = {
|
||||||
@ -51,12 +52,15 @@ public class ProdPullAckRecord {
|
|||||||
@Column(name = "file_name", length = 512)
|
@Column(name = "file_name", length = 512)
|
||||||
private String fileName;
|
private String fileName;
|
||||||
|
|
||||||
|
/** 已发生的 ACK 定向重拉次数。 */
|
||||||
@Column(name = "retry_count", nullable = false)
|
@Column(name = "retry_count", nullable = false)
|
||||||
private Integer retryCount;
|
private Integer retryCount;
|
||||||
|
|
||||||
|
/** 下一次允许重拉的时间点。 */
|
||||||
@Column(name = "next_retry_at")
|
@Column(name = "next_retry_at")
|
||||||
private LocalDateTime nextRetryAt;
|
private LocalDateTime nextRetryAt;
|
||||||
|
|
||||||
|
/** 最近一次失败摘要。 */
|
||||||
@Lob
|
@Lob
|
||||||
@Column(name = "last_error_msg")
|
@Column(name = "last_error_msg")
|
||||||
private String lastErrorMsg;
|
private String lastErrorMsg;
|
||||||
|
|||||||
@ -41,7 +41,7 @@ public class SyncTask {
|
|||||||
@Column(name = "direction", nullable = false, length = 32)
|
@Column(name = "direction", nullable = false, length = 32)
|
||||||
private SyncDirection direction;
|
private SyncDirection direction;
|
||||||
|
|
||||||
/** 来源版本号,例如 Git commit 或生产配置版本号。 */
|
/** 来源版本号,例如 Git 版本分支名或生产快照分组版本。 */
|
||||||
@Column(name = "source_version", nullable = false, length = 128)
|
@Column(name = "source_version", nullable = false, length = 128)
|
||||||
private String sourceVersion;
|
private String sourceVersion;
|
||||||
|
|
||||||
|
|||||||
@ -12,7 +12,7 @@ public class PackageManifest {
|
|||||||
private SyncDirection direction;
|
private SyncDirection direction;
|
||||||
/** 来源环境标识,例如 DEV、PROD。 */
|
/** 来源环境标识,例如 DEV、PROD。 */
|
||||||
private String sourceEnv;
|
private String sourceEnv;
|
||||||
/** 来源版本号,通常为 Git commit 或接口版本。 */
|
/** 来源版本号,当前通常为 Git 版本分支名或生产快照分组版本。 */
|
||||||
private String sourceVersion;
|
private String sourceVersion;
|
||||||
/** 配置内容哈希,用于幂等和校验。 */
|
/** 配置内容哈希,用于幂等和校验。 */
|
||||||
private String contentHash;
|
private String contentHash;
|
||||||
|
|||||||
@ -6,7 +6,8 @@ import java.util.Collections;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Local representation of one pulled production snapshot group.
|
* 一组生产侧 pull 结果在本地落盘后的封装。
|
||||||
|
* 当前一组数据对应一个 sourceVersion,后续会写入一个动态快照分支。
|
||||||
*/
|
*/
|
||||||
public class ProdPullResult {
|
public class ProdPullResult {
|
||||||
|
|
||||||
@ -72,6 +73,7 @@ public class ProdPullResult {
|
|||||||
|
|
||||||
public static class PulledConfigRef {
|
public static class PulledConfigRef {
|
||||||
|
|
||||||
|
/** 供 ACK 回传和定向重拉使用的最小业务上下文。 */
|
||||||
private final String remoteConfigId;
|
private final String remoteConfigId;
|
||||||
private final String airportId;
|
private final String airportId;
|
||||||
private final String appName;
|
private final String appName;
|
||||||
|
|||||||
@ -37,7 +37,8 @@ import java.util.stream.Collectors;
|
|||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Coordinates the two production-side sync flows:
|
* 生产侧主协调器。
|
||||||
|
* 负责串联两条正式同步链路:
|
||||||
* 1. Git -> PROD
|
* 1. Git -> PROD
|
||||||
* 2. PROD -> Git
|
* 2. PROD -> Git
|
||||||
*/
|
*/
|
||||||
@ -86,7 +87,8 @@ public class ProdSyncCoordinator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Pull the configured Git version branch and push its config files to PROD.
|
* 拉取当前版本分支,并把配置文件推送到生产接口。
|
||||||
|
* 当前约定:分支名本身就是 configVersion。
|
||||||
*/
|
*/
|
||||||
public void syncLatestGitToProd() {
|
public void syncLatestGitToProd() {
|
||||||
String traceId = null;
|
String traceId = null;
|
||||||
@ -153,7 +155,10 @@ public class ProdSyncCoordinator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Pull the current production snapshot and write it back to the Git snapshot branch.
|
* 拉取生产配置快照,并按版本写回 Git 快照分支。
|
||||||
|
* 执行顺序是:
|
||||||
|
* 1. 先尝试消费上轮失败的 ACK 定向重拉
|
||||||
|
* 2. 再执行本轮正常的 pullConfig 拉取
|
||||||
*/
|
*/
|
||||||
public void syncProdSnapshotToGit() {
|
public void syncProdSnapshotToGit() {
|
||||||
log.info(
|
log.info(
|
||||||
@ -179,6 +184,10 @@ public class ProdSyncCoordinator {
|
|||||||
return existing.isPresent() && existing.get().getStatus() == SyncStatus.SUCCESS;
|
return existing.isPresent() && existing.get().getStatus() == SyncStatus.SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理一组已经按 sourceVersion 切分好的生产快照。
|
||||||
|
* retryAttempt=true 表示该组来自 ACK 失败后的定向重拉。
|
||||||
|
*/
|
||||||
private void syncSingleProdSnapshotToGit(ProdPullResult pullResult, boolean retryAttempt) {
|
private void syncSingleProdSnapshotToGit(ProdPullResult pullResult, boolean retryAttempt) {
|
||||||
String traceId = null;
|
String traceId = null;
|
||||||
try {
|
try {
|
||||||
@ -204,6 +213,7 @@ public class ProdSyncCoordinator {
|
|||||||
);
|
);
|
||||||
syncTaskService.markStatus(task.getTraceId(), SyncStatus.CONSUMING, null);
|
syncTaskService.markStatus(task.getTraceId(), SyncStatus.CONSUMING, null);
|
||||||
|
|
||||||
|
// 生产快照按版本动态落到 snapshot 前缀下,避免不同版本互相覆盖。
|
||||||
String targetBranch = resolveSnapshotBranch(task.getSourceVersion());
|
String targetBranch = resolveSnapshotBranch(task.getSourceVersion());
|
||||||
String commitMessage = gitRepoProperties.getCommitMessagePrefix()
|
String commitMessage = gitRepoProperties.getCommitMessagePrefix()
|
||||||
+ ": traceId=" + task.getTraceId()
|
+ ": traceId=" + task.getTraceId()
|
||||||
@ -234,6 +244,11 @@ public class ProdSyncCoordinator {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 按 ACK 表里记录的失败上下文做定向重拉。
|
||||||
|
* 当前重拉粒度是:
|
||||||
|
* airportId + appName + configVersion + fileName
|
||||||
|
*/
|
||||||
private void retryFailedProdPulls() {
|
private void retryFailedProdPulls() {
|
||||||
List<ProdPullAckService.RetryPullRequest> retryPullRequests =
|
List<ProdPullAckService.RetryPullRequest> retryPullRequests =
|
||||||
prodPullAckService.getRetryPullRequests(syncProperties.getMaxRetryCount());
|
prodPullAckService.getRetryPullRequests(syncProperties.getMaxRetryCount());
|
||||||
@ -263,10 +278,10 @@ public class ProdSyncCoordinator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Generate the directory to push this round:
|
* 计算本轮 Git -> PROD 实际要推送的目录:
|
||||||
* 1. First push of a branch is full
|
* 1. 首次同步走全量
|
||||||
* 2. Deletions fall back to full push
|
* 2. 一旦发现删除,回退为全量
|
||||||
* 3. Otherwise only changed files are pushed
|
* 3. 其余场景只推送变更文件
|
||||||
*/
|
*/
|
||||||
private Path preparePushDirectory(Path exportDirectory, String branch, String stagingKey) throws IOException {
|
private Path preparePushDirectory(Path exportDirectory, String branch, String stagingKey) throws IOException {
|
||||||
Path baselineDirectory = workDirectoryService.getGitToProdBaselineDir(branch);
|
Path baselineDirectory = workDirectoryService.getGitToProdBaselineDir(branch);
|
||||||
@ -359,6 +374,10 @@ public class ProdSyncCoordinator {
|
|||||||
return sanitizePathToken(branch) + "-" + sanitizePathToken(sourceRevision);
|
return sanitizePathToken(branch) + "-" + sanitizePathToken(sourceRevision);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 把 snapshot 分支前缀和当前 sourceVersion 组装成最终回写分支。
|
||||||
|
* 例如:config-prod-snapshot/R_XXX_V3.0.3_XXX
|
||||||
|
*/
|
||||||
private String resolveSnapshotBranch(String sourceVersion) {
|
private String resolveSnapshotBranch(String sourceVersion) {
|
||||||
String baseBranch = gitRepoProperties.getSnapshotBranch();
|
String baseBranch = gitRepoProperties.getSnapshotBranch();
|
||||||
String versionSegment = sanitizePathToken(sourceVersion);
|
String versionSegment = sanitizePathToken(sourceVersion);
|
||||||
|
|||||||
@ -3,8 +3,9 @@ package com.ftptool.sync.service;
|
|||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Central extension point for config content encryption and decryption.
|
* 配置内容加解密扩展点。
|
||||||
* The current implementation is intentionally a no-op placeholder.
|
* 当前默认实现是透传,占位的目的是把算法接入点固定在一个服务里,
|
||||||
|
* 后续替换正式算法时不需要再改 push/pull 主链路。
|
||||||
*/
|
*/
|
||||||
@Service
|
@Service
|
||||||
public class ConfigCryptoService {
|
public class ConfigCryptoService {
|
||||||
@ -16,7 +17,7 @@ public class ConfigCryptoService {
|
|||||||
String fileName,
|
String fileName,
|
||||||
String plainContent
|
String plainContent
|
||||||
) {
|
) {
|
||||||
// TODO: Replace the pass-through implementation with the production encryption algorithm.
|
// TODO: 在这里替换为正式的推送前加密算法。
|
||||||
return plainContent;
|
return plainContent;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -27,7 +28,7 @@ public class ConfigCryptoService {
|
|||||||
String fileName,
|
String fileName,
|
||||||
String encryptedContent
|
String encryptedContent
|
||||||
) {
|
) {
|
||||||
// TODO: Replace the pass-through implementation with the production decryption algorithm.
|
// TODO: 在这里替换为正式的拉取后解密算法。
|
||||||
return encryptedContent;
|
return encryptedContent;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -35,7 +35,8 @@ import java.util.stream.Collectors;
|
|||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Encapsulates HTTP calls to production pushConfig / pullConfig / login APIs.
|
* 生产接口访问服务。
|
||||||
|
* 统一封装 pushConfig / pullConfig / login 三类 HTTP 调用。
|
||||||
*/
|
*/
|
||||||
@Service
|
@Service
|
||||||
public class ProdConfigApiService {
|
public class ProdConfigApiService {
|
||||||
@ -71,7 +72,7 @@ public class ProdConfigApiService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Push the files in the given directory to production as a JSON array.
|
* 把目录中的配置文件按 JSON 数组推送到生产 pushConfig 接口。
|
||||||
*/
|
*/
|
||||||
public void pushPackage(PackageManifest manifest, Path sourceDirectory) throws IOException {
|
public void pushPackage(PackageManifest manifest, Path sourceDirectory) throws IOException {
|
||||||
String url = buildUrl(prodApiProperties.getPushPath());
|
String url = buildUrl(prodApiProperties.getPushPath());
|
||||||
@ -101,7 +102,7 @@ public class ProdConfigApiService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Pull production config snapshots using optional configured filters.
|
* 按当前配置项里的过滤条件拉取生产配置。
|
||||||
*/
|
*/
|
||||||
public List<ProdPullResult> pullConfigSnapshots() throws IOException {
|
public List<ProdPullResult> pullConfigSnapshots() throws IOException {
|
||||||
return pullConfigSnapshots(
|
return pullConfigSnapshots(
|
||||||
@ -113,7 +114,8 @@ public class ProdConfigApiService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Pull production config snapshots using optional configVersion/fileName filters.
|
* 按指定过滤条件拉取生产配置。
|
||||||
|
* 这里会把 ACK 待回传参数一并带上。
|
||||||
*/
|
*/
|
||||||
public List<ProdPullResult> pullConfigSnapshots(
|
public List<ProdPullResult> pullConfigSnapshots(
|
||||||
String airportIdFilter,
|
String airportIdFilter,
|
||||||
@ -127,7 +129,7 @@ public class ProdConfigApiService {
|
|||||||
UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(url);
|
UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(url);
|
||||||
ProdPullAckService.PendingAckSummary pendingAckSummary = prodPullAckService.getPendingAckSummary();
|
ProdPullAckService.PendingAckSummary pendingAckSummary = prodPullAckService.getPendingAckSummary();
|
||||||
|
|
||||||
// Optional filters: leave them empty to pull all approved unsynced configs.
|
// 过滤条件为空时,依赖生产端返回“已审核且未同步”的全量数据。
|
||||||
if (StringUtils.hasText(airportIdFilter) && !isPlaceholder(airportIdFilter)) {
|
if (StringUtils.hasText(airportIdFilter) && !isPlaceholder(airportIdFilter)) {
|
||||||
builder.queryParam("airportId", airportIdFilter.trim());
|
builder.queryParam("airportId", airportIdFilter.trim());
|
||||||
}
|
}
|
||||||
@ -230,8 +232,8 @@ public class ProdConfigApiService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convert a branch snapshot directory to the pushConfig JSON array.
|
* 把 Git 分支快照目录转换成 pushConfig 所需的 JSON 数组。
|
||||||
* Layout is expected to be: airportId/appName/fileName
|
* 当前目录约定必须是:airportId/appName/fileName
|
||||||
*/
|
*/
|
||||||
private List<ProdPushConfigItem> buildPushRequest(PackageManifest manifest, Path sourceDirectory) throws IOException {
|
private List<ProdPushConfigItem> buildPushRequest(PackageManifest manifest, Path sourceDirectory) throws IOException {
|
||||||
List<ProdPushConfigItem> result = new ArrayList<ProdPushConfigItem>();
|
List<ProdPushConfigItem> result = new ArrayList<ProdPushConfigItem>();
|
||||||
@ -248,6 +250,7 @@ public class ProdConfigApiService {
|
|||||||
item.setAppName(gitConfigPath.getAppName());
|
item.setAppName(gitConfigPath.getAppName());
|
||||||
item.setConfigVersion(manifest.getSourceVersion());
|
item.setConfigVersion(manifest.getSourceVersion());
|
||||||
String plainContent = new String(Files.readAllBytes(file), StandardCharsets.UTF_8);
|
String plainContent = new String(Files.readAllBytes(file), StandardCharsets.UTF_8);
|
||||||
|
// 加密扩展点统一收口在 ConfigCryptoService,主链路不再直接感知算法细节。
|
||||||
item.setConfigContent(configCryptoService.encryptForPush(
|
item.setConfigContent(configCryptoService.encryptForPush(
|
||||||
gitConfigPath.getAirportId(),
|
gitConfigPath.getAirportId(),
|
||||||
gitConfigPath.getAppName(),
|
gitConfigPath.getAppName(),
|
||||||
@ -267,7 +270,8 @@ public class ProdConfigApiService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Restore one pulled config item under airportId/appName/fileName.
|
* 把 pullConfig 的单条结果恢复为本地文件。
|
||||||
|
* 当前落盘结构固定为:airportId/appName/fileName
|
||||||
*/
|
*/
|
||||||
private void writePulledConfigItem(Path baseDirectory, ProdPulledConfigItem item) throws IOException {
|
private void writePulledConfigItem(Path baseDirectory, ProdPulledConfigItem item) throws IOException {
|
||||||
String airportId = requireDirectorySegment(item.getAirportId(), "airportId");
|
String airportId = requireDirectorySegment(item.getAirportId(), "airportId");
|
||||||
@ -282,6 +286,7 @@ public class ProdConfigApiService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Files.createDirectories(targetFile.getParent());
|
Files.createDirectories(targetFile.getParent());
|
||||||
|
// 解密扩展点同样统一收口在 ConfigCryptoService。
|
||||||
String decryptedContent = configCryptoService.decryptAfterPull(
|
String decryptedContent = configCryptoService.decryptAfterPull(
|
||||||
airportId,
|
airportId,
|
||||||
appName,
|
appName,
|
||||||
@ -315,9 +320,14 @@ public class ProdConfigApiService {
|
|||||||
return ids;
|
return ids;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 按 configVersion 把 pullConfig 结果拆成多个结果组。
|
||||||
|
* 这样后续 PROD -> Git 可以按版本动态写入不同快照分支。
|
||||||
|
*/
|
||||||
private List<ProdPullResult> buildPullResults(List<ProdPulledConfigItem> items) throws IOException {
|
private List<ProdPullResult> buildPullResults(List<ProdPulledConfigItem> items) throws IOException {
|
||||||
Map<String, List<ProdPulledConfigItem>> itemsByVersion = new LinkedHashMap<String, List<ProdPulledConfigItem>>();
|
Map<String, List<ProdPulledConfigItem>> itemsByVersion = new LinkedHashMap<String, List<ProdPulledConfigItem>>();
|
||||||
for (ProdPulledConfigItem item : items) {
|
for (ProdPulledConfigItem item : items) {
|
||||||
|
// 没有显式版本号时,先归到一个占位组,后面再回退为 contentHash 版本。
|
||||||
String versionKey = StringUtils.hasText(item.getConfigVersion())
|
String versionKey = StringUtils.hasText(item.getConfigVersion())
|
||||||
? item.getConfigVersion().trim()
|
? item.getConfigVersion().trim()
|
||||||
: "__missing_version__";
|
: "__missing_version__";
|
||||||
@ -348,6 +358,9 @@ public class ProdConfigApiService {
|
|||||||
return results;
|
return results;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 为 ACK 重拉保存足够的上下文字段,便于后续按文件维度定向重拉。
|
||||||
|
*/
|
||||||
private List<ProdPullResult.PulledConfigRef> collectPulledRefs(List<ProdPulledConfigItem> items, String sourceVersion) {
|
private List<ProdPullResult.PulledConfigRef> collectPulledRefs(List<ProdPulledConfigItem> items, String sourceVersion) {
|
||||||
List<ProdPullResult.PulledConfigRef> refs = new ArrayList<ProdPullResult.PulledConfigRef>();
|
List<ProdPullResult.PulledConfigRef> refs = new ArrayList<ProdPullResult.PulledConfigRef>();
|
||||||
for (ProdPulledConfigItem item : items) {
|
for (ProdPulledConfigItem item : items) {
|
||||||
@ -408,6 +421,9 @@ public class ProdConfigApiService {
|
|||||||
return value == null ? null : value.trim();
|
return value == null ? null : value.trim();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 从 Git 相对路径里解析 airportId / appName / fileName。
|
||||||
|
*/
|
||||||
private GitConfigPath parseGitConfigPath(Path sourceDirectory, Path file) {
|
private GitConfigPath parseGitConfigPath(Path sourceDirectory, Path file) {
|
||||||
Path relativePath = sourceDirectory.relativize(file);
|
Path relativePath = sourceDirectory.relativize(file);
|
||||||
if (relativePath.getNameCount() < 3) {
|
if (relativePath.getNameCount() < 3) {
|
||||||
|
|||||||
@ -16,11 +16,16 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stores pullConfig ack states and retry plans.
|
* pullConfig ACK 状态与重试计划服务。
|
||||||
|
* 负责保存 ackSuc/ackFail、失败上下文以及定向重拉计划。
|
||||||
*/
|
*/
|
||||||
@Service
|
@Service
|
||||||
public class ProdPullAckService {
|
public class ProdPullAckService {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ACK 定向重拉的基础退避时间。
|
||||||
|
* 实际延迟按 30s / 60s / 120s ... 指数增长。
|
||||||
|
*/
|
||||||
private static final int RETRY_DELAY_BASE_SECONDS = 30;
|
private static final int RETRY_DELAY_BASE_SECONDS = 30;
|
||||||
|
|
||||||
private final ProdPullAckRecordRepository prodPullAckRecordRepository;
|
private final ProdPullAckRecordRepository prodPullAckRecordRepository;
|
||||||
@ -40,6 +45,10 @@ public class ProdPullAckService {
|
|||||||
return new PendingAckSummary(successIds, failedIds);
|
return new PendingAckSummary(successIds, failedIds);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 本地处理成功后,把对应 remote id 记录为 ackSuc。
|
||||||
|
* 成功会清空失败重试状态。
|
||||||
|
*/
|
||||||
@Transactional
|
@Transactional
|
||||||
public void recordAckSuccess(Collection<ProdPullResult.PulledConfigRef> pulledConfigs) {
|
public void recordAckSuccess(Collection<ProdPullResult.PulledConfigRef> pulledConfigs) {
|
||||||
if (pulledConfigs == null) {
|
if (pulledConfigs == null) {
|
||||||
@ -64,6 +73,11 @@ public class ProdPullAckService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 本地处理失败后,把失败项记录为 ackFail。
|
||||||
|
* 首次失败会立刻允许下一轮进入定向重拉;
|
||||||
|
* 重拉再次失败则进入指数退避。
|
||||||
|
*/
|
||||||
@Transactional
|
@Transactional
|
||||||
public void recordPullFailure(
|
public void recordPullFailure(
|
||||||
Collection<ProdPullResult.PulledConfigRef> pulledConfigs,
|
Collection<ProdPullResult.PulledConfigRef> pulledConfigs,
|
||||||
@ -98,6 +112,10 @@ public class ProdPullAckService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 如果连“定向重拉请求”本身都失败了,就只更新退避时间和错误摘要,
|
||||||
|
* 等待下一轮继续尝试。
|
||||||
|
*/
|
||||||
@Transactional
|
@Transactional
|
||||||
public void markRetryAttemptFailed(RetryPullRequest retryPullRequest, String errorMsg) {
|
public void markRetryAttemptFailed(RetryPullRequest retryPullRequest, String errorMsg) {
|
||||||
if (retryPullRequest == null || retryPullRequest.getRemoteConfigIds().isEmpty()) {
|
if (retryPullRequest == null || retryPullRequest.getRemoteConfigIds().isEmpty()) {
|
||||||
@ -116,6 +134,10 @@ public class ProdPullAckService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 从 ACK 失败表里挑出当前可执行的定向重拉请求。
|
||||||
|
* 分组粒度:sourceVersion + airportId + appName + fileName
|
||||||
|
*/
|
||||||
@Transactional(readOnly = true)
|
@Transactional(readOnly = true)
|
||||||
public List<RetryPullRequest> getRetryPullRequests(int maxRetryCount) {
|
public List<RetryPullRequest> getRetryPullRequests(int maxRetryCount) {
|
||||||
List<ProdPullAckRecord> failedRecords = prodPullAckRecordRepository.findByAckStatusOrderByUpdatedAtAsc(ProdPullAckStatus.FAILED);
|
List<ProdPullAckRecord> failedRecords = prodPullAckRecordRepository.findByAckStatusOrderByUpdatedAtAsc(ProdPullAckStatus.FAILED);
|
||||||
@ -192,6 +214,9 @@ public class ProdPullAckService {
|
|||||||
return record.getRetryCount() == null ? 0 : record.getRetryCount().intValue();
|
return record.getRetryCount() == null ? 0 : record.getRetryCount().intValue();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 计算下一次允许重拉的时间点。
|
||||||
|
*/
|
||||||
private LocalDateTime calculateNextRetryAt(int retryCount) {
|
private LocalDateTime calculateNextRetryAt(int retryCount) {
|
||||||
long delaySeconds = RETRY_DELAY_BASE_SECONDS * (1L << Math.max(0, retryCount - 1));
|
long delaySeconds = RETRY_DELAY_BASE_SECONDS * (1L << Math.max(0, retryCount - 1));
|
||||||
return LocalDateTime.now().plusSeconds(delaySeconds);
|
return LocalDateTime.now().plusSeconds(delaySeconds);
|
||||||
@ -243,6 +268,9 @@ public class ProdPullAckService {
|
|||||||
private final String fileName;
|
private final String fileName;
|
||||||
private final List<String> remoteConfigIds = new ArrayList<String>();
|
private final List<String> remoteConfigIds = new ArrayList<String>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 一次定向重拉请求,对应一组相同过滤条件的失败项。
|
||||||
|
*/
|
||||||
public RetryPullRequest(String sourceVersion, String airportId, String appName, String fileName) {
|
public RetryPullRequest(String sourceVersion, String airportId, String appName, String fileName) {
|
||||||
this.sourceVersion = sourceVersion;
|
this.sourceVersion = sourceVersion;
|
||||||
this.airportId = airportId;
|
this.airportId = airportId;
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user