feat: 初始化FTP中转双向同步工具并打通双端同步主链路

- 新增 Spring Boot 2.7.18 + JDK 1.8 工程骨架
- 引入 H2、JGit、Commons Net 等核心依赖并补充 Maven 配置
- 增加 application.properties 及 dev-agent/prod-agent 双 profile 配置
- 新增同步任务、检查点、ACK 的 H2 表结构与基础持久化服务
- 实现 FTP 上传、下载、列目录、移动、删除及原子上传能力
- 实现 Git clone/pull/checkout/export/commit/push 能力
- 实现同步包打包、解包、manifest 生成与内容哈希校验
- 实现生产 push/pull 接口调用基础能力
- 打通开发侧与生产侧协调流程及 ACK 回执处理
- 补充总体设计与详细设计文档
- 修正 .gitignore,忽略 target、.m2、work、data 等构建与运行目录
This commit is contained in:
dark 2026-04-15 16:25:12 +08:00
parent 8d46e629ad
commit bd1e4fa69a
23 changed files with 1761 additions and 2588 deletions

2551
.gitignore vendored

File diff suppressed because it is too large Load Diff

View File

@ -343,17 +343,23 @@ spring.jpa.hibernate.ddl-auto=none
- 更新检查点 - 更新检查点
- 记录 ack 回执 - 记录 ack 回执
### 8.2 当前实现的业务服务 ### 8.2 当前实现的业务服务
当前骨架还没有把以下真实能力写完 本轮代码已经补上以下真实能力
- FTP 上传、下载、列目录、重命名 - FTP 上传、下载、列目录、删除、移动、原子重命名上传
- Git clone / pull / checkout / commit / push - Git clone / pull / checkout / commit / push
- zip 打包与解包 - zip 打包与解包
- manifest 生成与校验 - manifest 生成与内容哈希校验
- 生产 `push` / `pull` 接口调用 - 生产 `push` / `pull` 接口调用骨架
这些是下一步真正要补的业务实现层。 当前对应实现文件包括:
- [FtpClientService.java](e:/AIcoding/FtpTool/src/main/java/com/ftptool/sync/service/FtpClientService.java)
- [GitClientService.java](e:/AIcoding/FtpTool/src/main/java/com/ftptool/sync/service/GitClientService.java)
- [PackageService.java](e:/AIcoding/FtpTool/src/main/java/com/ftptool/sync/service/PackageService.java)
- [ProdConfigApiService.java](e:/AIcoding/FtpTool/src/main/java/com/ftptool/sync/service/ProdConfigApiService.java)
- [SyncMetadataService.java](e:/AIcoding/FtpTool/src/main/java/com/ftptool/sync/service/SyncMetadataService.java)
## 9. 当前调度层设计 ## 9. 当前调度层设计
@ -368,7 +374,7 @@ spring.jpa.hibernate.ddl-auto=none
- 已按 `dev-agent` profile 进行隔离 - 已按 `dev-agent` profile 进行隔离
- 已绑定 cron 表达式 - 已绑定 cron 表达式
- 当前仅输出清晰日志和待办动作 - 已串联 Git 拉取、包构建、FTP 上传、FTP 消费、Git 提交和 ACK 上传
### 9.2 生产侧调度 ### 9.2 生产侧调度
@ -381,7 +387,19 @@ spring.jpa.hibernate.ddl-auto=none
- 已按 `prod-agent` profile 进行隔离 - 已按 `prod-agent` profile 进行隔离
- 已绑定 cron 表达式 - 已绑定 cron 表达式
- 当前仅输出清晰日志和待办动作 - 已串联 FTP 消费、生产 `push` 接口调用、生产 `pull` 接口调用、包构建和 ACK 上传
## 9.3 当前接口假设
由于你还没有给出生产 `push/pull` 接口的正式协议,本轮实现采用以下默认假设:
- 生产 `push` 接口使用 `multipart/form-data`
- 上传字段包含 `file``traceId``direction``sourceVersion``contentHash`
- 生产 `pull` 接口使用 `HTTP GET`
- `pull` 返回原始字节内容,当前默认保存为 `prod-config.json`
- 如果响应头里存在 `X-Config-Version``ETag`,优先用它作为来源版本号
后续如果你提供正式接口文档,再把这部分对齐为最终协议即可。
## 10. 当前目录结构 ## 10. 当前目录结构

View File

@ -19,7 +19,7 @@
<properties> <properties>
<java.version>1.8</java.version> <java.version>1.8</java.version>
<jgit.version>6.10.0.202406032230-r</jgit.version> <jgit.version>5.13.3.202401111512-r</jgit.version>
<commons-net.version>3.11.1</commons-net.version> <commons-net.version>3.11.1</commons-net.version>
</properties> </properties>

View File

@ -13,6 +13,12 @@ public class SyncProperties {
private String prodToDevStagingDir; private String prodToDevStagingDir;
private int maxRetryCount = 5; private int maxRetryCount = 5;
private int ackScanBatchSize = 50; private int ackScanBatchSize = 50;
private String remoteDevToProdOutDir;
private String remoteDevToProdAckDir;
private String remoteProdToDevOutDir;
private String remoteProdToDevAckDir;
private String remoteFailedDir;
private String pullResponseFileName;
public String getNodeId() { public String getNodeId() {
return nodeId; return nodeId;
@ -77,4 +83,52 @@ public class SyncProperties {
public void setAckScanBatchSize(int ackScanBatchSize) { public void setAckScanBatchSize(int ackScanBatchSize) {
this.ackScanBatchSize = ackScanBatchSize; this.ackScanBatchSize = ackScanBatchSize;
} }
public String getRemoteDevToProdOutDir() {
return remoteDevToProdOutDir;
}
public void setRemoteDevToProdOutDir(String remoteDevToProdOutDir) {
this.remoteDevToProdOutDir = remoteDevToProdOutDir;
}
public String getRemoteDevToProdAckDir() {
return remoteDevToProdAckDir;
}
public void setRemoteDevToProdAckDir(String remoteDevToProdAckDir) {
this.remoteDevToProdAckDir = remoteDevToProdAckDir;
}
public String getRemoteProdToDevOutDir() {
return remoteProdToDevOutDir;
}
public void setRemoteProdToDevOutDir(String remoteProdToDevOutDir) {
this.remoteProdToDevOutDir = remoteProdToDevOutDir;
}
public String getRemoteProdToDevAckDir() {
return remoteProdToDevAckDir;
}
public void setRemoteProdToDevAckDir(String remoteProdToDevAckDir) {
this.remoteProdToDevAckDir = remoteProdToDevAckDir;
}
public String getRemoteFailedDir() {
return remoteFailedDir;
}
public void setRemoteFailedDir(String remoteFailedDir) {
this.remoteFailedDir = remoteFailedDir;
}
public String getPullResponseFileName() {
return pullResponseFileName;
}
public void setPullResponseFileName(String pullResponseFileName) {
this.pullResponseFileName = pullResponseFileName;
}
} }

View File

@ -0,0 +1,28 @@
package com.ftptool.sync.model;
import java.nio.file.Path;
public class PackageBuildResult {
private final Path zipFile;
private final String packageName;
private final String contentHash;
public PackageBuildResult(Path zipFile, String packageName, String contentHash) {
this.zipFile = zipFile;
this.packageName = packageName;
this.contentHash = contentHash;
}
public Path getZipFile() {
return zipFile;
}
public String getPackageName() {
return packageName;
}
public String getContentHash() {
return contentHash;
}
}

View File

@ -0,0 +1,68 @@
package com.ftptool.sync.model;
public class PackageManifest {
private String traceId;
private SyncDirection direction;
private String sourceEnv;
private String sourceVersion;
private String contentHash;
private String createdAt;
private String packageName;
public String getTraceId() {
return traceId;
}
public void setTraceId(String traceId) {
this.traceId = traceId;
}
public SyncDirection getDirection() {
return direction;
}
public void setDirection(SyncDirection direction) {
this.direction = direction;
}
public String getSourceEnv() {
return sourceEnv;
}
public void setSourceEnv(String sourceEnv) {
this.sourceEnv = sourceEnv;
}
public String getSourceVersion() {
return sourceVersion;
}
public void setSourceVersion(String sourceVersion) {
this.sourceVersion = sourceVersion;
}
public String getContentHash() {
return contentHash;
}
public void setContentHash(String contentHash) {
this.contentHash = contentHash;
}
public String getCreatedAt() {
return createdAt;
}
public void setCreatedAt(String createdAt) {
this.createdAt = createdAt;
}
public String getPackageName() {
return packageName;
}
public void setPackageName(String packageName) {
this.packageName = packageName;
}
}

View File

@ -0,0 +1,22 @@
package com.ftptool.sync.model;
import java.nio.file.Path;
public class PackageReadResult {
private final PackageManifest manifest;
private final Path configDirectory;
public PackageReadResult(PackageManifest manifest, Path configDirectory) {
this.manifest = manifest;
this.configDirectory = configDirectory;
}
public PackageManifest getManifest() {
return manifest;
}
public Path getConfigDirectory() {
return configDirectory;
}
}

View File

@ -0,0 +1,28 @@
package com.ftptool.sync.model;
import java.nio.file.Path;
public class ProdPullResult {
private final Path contentDirectory;
private final String sourceVersion;
private final String contentHash;
public ProdPullResult(Path contentDirectory, String sourceVersion, String contentHash) {
this.contentDirectory = contentDirectory;
this.sourceVersion = sourceVersion;
this.contentHash = contentHash;
}
public Path getContentDirectory() {
return contentDirectory;
}
public String getSourceVersion() {
return sourceVersion;
}
public String getContentHash() {
return contentHash;
}
}

View File

@ -0,0 +1,20 @@
package com.ftptool.sync.model;
public class RemoteFileInfo {
private final String name;
private final String path;
public RemoteFileInfo(String name, String path) {
this.name = name;
this.path = path;
}
public String getName() {
return name;
}
public String getPath() {
return path;
}
}

View File

@ -0,0 +1,68 @@
package com.ftptool.sync.model;
public class SyncAckFile {
private String traceId;
private SyncDirection direction;
private String sourceVersion;
private String ackSide;
private String ackStatus;
private String message;
private String processedAt;
public String getTraceId() {
return traceId;
}
public void setTraceId(String traceId) {
this.traceId = traceId;
}
public SyncDirection getDirection() {
return direction;
}
public void setDirection(SyncDirection direction) {
this.direction = direction;
}
public String getSourceVersion() {
return sourceVersion;
}
public void setSourceVersion(String sourceVersion) {
this.sourceVersion = sourceVersion;
}
public String getAckSide() {
return ackSide;
}
public void setAckSide(String ackSide) {
this.ackSide = ackSide;
}
public String getAckStatus() {
return ackStatus;
}
public void setAckStatus(String ackStatus) {
this.ackStatus = ackStatus;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public String getProcessedAt() {
return processedAt;
}
public void setProcessedAt(String processedAt) {
this.processedAt = processedAt;
}
}

View File

@ -3,11 +3,32 @@ package com.ftptool.sync.orchestrator;
import com.ftptool.sync.config.FtpProperties; import com.ftptool.sync.config.FtpProperties;
import com.ftptool.sync.config.GitRepoProperties; import com.ftptool.sync.config.GitRepoProperties;
import com.ftptool.sync.config.SyncProperties; import com.ftptool.sync.config.SyncProperties;
import com.ftptool.sync.entity.SyncTask;
import com.ftptool.sync.model.PackageBuildResult;
import com.ftptool.sync.model.PackageManifest;
import com.ftptool.sync.model.PackageReadResult;
import com.ftptool.sync.model.RemoteFileInfo;
import com.ftptool.sync.model.SyncAckFile;
import com.ftptool.sync.model.SyncDirection;
import com.ftptool.sync.model.SyncStatus;
import com.ftptool.sync.service.AckFileService;
import com.ftptool.sync.service.AckService;
import com.ftptool.sync.service.CheckpointService;
import com.ftptool.sync.service.FtpClientService;
import com.ftptool.sync.service.GitClientService;
import com.ftptool.sync.service.PackageService;
import com.ftptool.sync.service.SyncMetadataService;
import com.ftptool.sync.service.SyncTaskService;
import com.ftptool.sync.service.WorkDirectoryService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Profile; import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.nio.file.Path;
import java.util.List;
import java.util.Optional;
@Service @Service
@Profile("dev-agent") @Profile("dev-agent")
public class DevSyncCoordinator { public class DevSyncCoordinator {
@ -17,39 +38,239 @@ public class DevSyncCoordinator {
private final SyncProperties syncProperties; private final SyncProperties syncProperties;
private final GitRepoProperties gitRepoProperties; private final GitRepoProperties gitRepoProperties;
private final FtpProperties ftpProperties; private final FtpProperties ftpProperties;
private final WorkDirectoryService workDirectoryService;
private final GitClientService gitClientService;
private final PackageService packageService;
private final FtpClientService ftpClientService;
private final SyncTaskService syncTaskService;
private final CheckpointService checkpointService;
private final AckFileService ackFileService;
private final AckService ackService;
private final SyncMetadataService syncMetadataService;
public DevSyncCoordinator( public DevSyncCoordinator(
SyncProperties syncProperties, SyncProperties syncProperties,
GitRepoProperties gitRepoProperties, GitRepoProperties gitRepoProperties,
FtpProperties ftpProperties FtpProperties ftpProperties,
WorkDirectoryService workDirectoryService,
GitClientService gitClientService,
PackageService packageService,
FtpClientService ftpClientService,
SyncTaskService syncTaskService,
CheckpointService checkpointService,
AckFileService ackFileService,
AckService ackService,
SyncMetadataService syncMetadataService
) { ) {
this.syncProperties = syncProperties; this.syncProperties = syncProperties;
this.gitRepoProperties = gitRepoProperties; this.gitRepoProperties = gitRepoProperties;
this.ftpProperties = ftpProperties; this.ftpProperties = ftpProperties;
this.workDirectoryService = workDirectoryService;
this.gitClientService = gitClientService;
this.packageService = packageService;
this.ftpClientService = ftpClientService;
this.syncTaskService = syncTaskService;
this.checkpointService = checkpointService;
this.ackFileService = ackFileService;
this.ackService = ackService;
this.syncMetadataService = syncMetadataService;
} }
public void scanGitAndStagePackage() { public void scanGitAndStagePackage() {
log.info( try {
"DEV scan tick. nodeId={}, branch={}, localRepo={}, ftpBaseDir={}", log.info(
syncProperties.getNodeId(), "DEV scan tick. nodeId={}, branch={}, localRepo={}, ftpBaseDir={}",
gitRepoProperties.getScanBranch(), syncProperties.getNodeId(),
gitRepoProperties.getLocalPath(), gitRepoProperties.getScanBranch(),
ftpProperties.getBaseDir() gitRepoProperties.getLocalPath(),
); ftpProperties.getBaseDir()
log.info("TODO implement: Git pull -> package build -> upload to FTP dev-to-prod/out"); );
String branch = gitRepoProperties.getScanBranch();
String sourceVersion = gitClientService.prepareRepositoryAndGetHead(branch);
Path exportDirectory = workDirectoryService.getDevToProdStagingDir().resolve("git-" + sourceVersion);
gitClientService.exportBranchSnapshot(branch, exportDirectory);
String contentHash = packageService.calculateDirectoryHash(exportDirectory);
Optional<SyncTask> existing = syncTaskService.findByBusinessKey(
SyncDirection.DEV_TO_PROD,
sourceVersion,
contentHash
);
if (shouldSkipStage(existing)) {
log.info("DEV package already staged or finished. version={}, hash={}", sourceVersion, contentHash);
return;
}
String traceId = existing.map(SyncTask::getTraceId).orElse(syncMetadataService.newTraceId());
PackageManifest manifest = syncMetadataService.createManifest(
traceId,
SyncDirection.DEV_TO_PROD,
"DEV",
sourceVersion,
contentHash
);
if (existing.isPresent() && existing.get().getPackageName() != null) {
manifest.setPackageName(existing.get().getPackageName());
}
PackageBuildResult packageBuildResult = packageService.buildPackageFromDirectory(exportDirectory, manifest);
SyncTask task = syncTaskService.createOrLoadTask(
SyncDirection.DEV_TO_PROD,
sourceVersion,
packageBuildResult.getContentHash(),
packageBuildResult.getPackageName(),
traceId
);
ftpClientService.uploadAtomic(
packageBuildResult.getZipFile(),
syncProperties.getRemoteDevToProdOutDir(),
task.getPackageName()
);
syncTaskService.markStatus(task.getTraceId(), SyncStatus.UPLOADED, null);
log.info("DEV package uploaded. traceId={}, packageName={}", task.getTraceId(), task.getPackageName());
} catch (Exception e) {
log.error("DEV scan and stage failed", e);
}
} }
public void consumeProdPackages() { public void consumeProdPackages() {
log.info( try {
"DEV consume tick. snapshotBranch={}, stagingDir={}", log.info(
gitRepoProperties.getSnapshotBranch(), "DEV consume tick. snapshotBranch={}, stagingDir={}",
syncProperties.getProdToDevStagingDir() gitRepoProperties.getSnapshotBranch(),
); syncProperties.getProdToDevStagingDir()
log.info("TODO implement: download prod-to-dev package -> write Git -> commit/push"); );
List<RemoteFileInfo> remoteFiles = ftpClientService.listFiles(syncProperties.getRemoteProdToDevOutDir(), ".zip");
for (RemoteFileInfo remoteFile : remoteFiles) {
consumeSingleProdPackage(remoteFile);
}
} catch (Exception e) {
log.error("DEV consume prod packages failed", e);
}
} }
public void scanProdAcks() { public void scanProdAcks() {
log.info("DEV ack scan tick. batchSize={}", syncProperties.getAckScanBatchSize()); try {
log.info("TODO implement: read dev-to-prod/ack and update sync_task state"); log.info("DEV ack scan tick. batchSize={}", syncProperties.getAckScanBatchSize());
List<RemoteFileInfo> ackFiles = ftpClientService.listFiles(syncProperties.getRemoteDevToProdAckDir(), ".json");
for (RemoteFileInfo ackFile : ackFiles) {
Path localAck = ftpClientService.download(ackFile.getPath(), workDirectoryService.getPackageTempDir());
SyncAckFile syncAckFile = ackFileService.readAckFile(localAck);
ackService.recordAck(
syncAckFile.getTraceId(),
syncAckFile.getAckSide(),
syncAckFile.getAckStatus(),
syncAckFile.getMessage()
);
syncTaskService.findByTraceId(syncAckFile.getTraceId()).ifPresent(task -> {
SyncStatus status = "SUCCESS".equalsIgnoreCase(syncAckFile.getAckStatus())
? SyncStatus.SUCCESS : SyncStatus.FAILED;
syncTaskService.markStatus(task.getTraceId(), status, syncAckFile.getMessage());
if (status == SyncStatus.SUCCESS) {
checkpointService.saveCheckpoint(task.getDirection(), task.getSourceVersion(), task.getContentHash());
}
});
ftpClientService.deleteFile(ackFile.getPath());
}
} catch (Exception e) {
log.error("DEV ack scan failed", e);
}
}
private void consumeSingleProdPackage(RemoteFileInfo remoteFile) {
PackageManifest manifest = null;
try {
Path localZip = ftpClientService.download(remoteFile.getPath(), workDirectoryService.getProdToDevStagingDir());
PackageReadResult readResult = packageService.extractPackage(localZip);
manifest = readResult.getManifest();
if (manifest.getDirection() != SyncDirection.PROD_TO_DEV) {
log.warn("Ignored remote file with unexpected direction. file={}, direction={}", remoteFile.getName(), manifest.getDirection());
return;
}
SyncTask task = syncTaskService.createOrLoadTask(
manifest.getDirection(),
manifest.getSourceVersion(),
manifest.getContentHash(),
manifest.getPackageName(),
manifest.getTraceId()
);
if (task.getStatus() == SyncStatus.SUCCESS) {
ftpClientService.deleteFile(remoteFile.getPath());
return;
}
String commitMessage = gitRepoProperties.getCommitMessagePrefix()
+ ": traceId=" + manifest.getTraceId()
+ " version=" + manifest.getSourceVersion();
boolean pushed = gitClientService.syncDirectoryToBranch(
readResult.getConfigDirectory(),
gitRepoProperties.getSnapshotBranch(),
commitMessage
);
syncTaskService.markStatus(task.getTraceId(), SyncStatus.SUCCESS, null);
checkpointService.saveCheckpoint(manifest.getDirection(), manifest.getSourceVersion(), manifest.getContentHash());
SyncAckFile ack = syncMetadataService.createAck(
manifest.getTraceId(),
manifest.getDirection(),
manifest.getSourceVersion(),
"DEV",
"SUCCESS",
pushed ? "Snapshot committed to Git" : "No Git changes detected"
);
Path ackPath = ackFileService.writeAckFile(ack, manifest.getTraceId());
ftpClientService.uploadAtomic(
ackPath,
syncProperties.getRemoteProdToDevAckDir(),
syncMetadataService.buildAckFileName(manifest.getTraceId())
);
ackService.recordAck(manifest.getTraceId(), "DEV", "SUCCESS", ack.getMessage());
ftpClientService.deleteFile(remoteFile.getPath());
log.info("DEV consumed PROD package. traceId={}, packageName={}", manifest.getTraceId(), manifest.getPackageName());
} catch (Exception e) {
log.error("DEV failed to consume PROD package: {}", remoteFile.getName(), e);
if (manifest != null) {
syncTaskService.increaseRetryCount(manifest.getTraceId(), summarizeException(e));
syncTaskService.markStatus(manifest.getTraceId(), SyncStatus.FAILED, summarizeException(e));
uploadFailureAck(manifest, summarizeException(e));
}
}
}
private boolean shouldSkipStage(Optional<SyncTask> existing) {
return existing.isPresent()
&& (existing.get().getStatus() == SyncStatus.UPLOADED || existing.get().getStatus() == SyncStatus.SUCCESS);
}
private void uploadFailureAck(PackageManifest manifest, String message) {
try {
SyncAckFile ack = syncMetadataService.createAck(
manifest.getTraceId(),
manifest.getDirection(),
manifest.getSourceVersion(),
"DEV",
"FAILED",
message
);
Path ackPath = ackFileService.writeAckFile(ack, manifest.getTraceId());
ftpClientService.uploadAtomic(
ackPath,
syncProperties.getRemoteProdToDevAckDir(),
syncMetadataService.buildAckFileName(manifest.getTraceId())
);
ackService.recordAck(manifest.getTraceId(), "DEV", "FAILED", message);
} catch (Exception ex) {
log.error("DEV failed to upload failure ack. traceId={}", manifest.getTraceId(), ex);
}
}
private String summarizeException(Exception e) {
String message = e.getMessage();
if (message == null || message.trim().isEmpty()) {
return e.getClass().getSimpleName();
}
return message.length() > 400 ? message.substring(0, 400) : message;
} }
} }

View File

@ -3,11 +3,33 @@ package com.ftptool.sync.orchestrator;
import com.ftptool.sync.config.FtpProperties; import com.ftptool.sync.config.FtpProperties;
import com.ftptool.sync.config.ProdApiProperties; import com.ftptool.sync.config.ProdApiProperties;
import com.ftptool.sync.config.SyncProperties; import com.ftptool.sync.config.SyncProperties;
import com.ftptool.sync.entity.SyncTask;
import com.ftptool.sync.model.PackageBuildResult;
import com.ftptool.sync.model.PackageManifest;
import com.ftptool.sync.model.PackageReadResult;
import com.ftptool.sync.model.ProdPullResult;
import com.ftptool.sync.model.RemoteFileInfo;
import com.ftptool.sync.model.SyncAckFile;
import com.ftptool.sync.model.SyncDirection;
import com.ftptool.sync.model.SyncStatus;
import com.ftptool.sync.service.AckFileService;
import com.ftptool.sync.service.AckService;
import com.ftptool.sync.service.CheckpointService;
import com.ftptool.sync.service.FtpClientService;
import com.ftptool.sync.service.PackageService;
import com.ftptool.sync.service.ProdConfigApiService;
import com.ftptool.sync.service.SyncMetadataService;
import com.ftptool.sync.service.SyncTaskService;
import com.ftptool.sync.service.WorkDirectoryService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Profile; import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.nio.file.Path;
import java.util.List;
import java.util.Optional;
@Service @Service
@Profile("prod-agent") @Profile("prod-agent")
public class ProdSyncCoordinator { public class ProdSyncCoordinator {
@ -17,39 +39,247 @@ public class ProdSyncCoordinator {
private final SyncProperties syncProperties; private final SyncProperties syncProperties;
private final FtpProperties ftpProperties; private final FtpProperties ftpProperties;
private final ProdApiProperties prodApiProperties; private final ProdApiProperties prodApiProperties;
private final WorkDirectoryService workDirectoryService;
private final FtpClientService ftpClientService;
private final PackageService packageService;
private final ProdConfigApiService prodConfigApiService;
private final SyncTaskService syncTaskService;
private final CheckpointService checkpointService;
private final AckFileService ackFileService;
private final AckService ackService;
private final SyncMetadataService syncMetadataService;
public ProdSyncCoordinator( public ProdSyncCoordinator(
SyncProperties syncProperties, SyncProperties syncProperties,
FtpProperties ftpProperties, FtpProperties ftpProperties,
ProdApiProperties prodApiProperties ProdApiProperties prodApiProperties,
WorkDirectoryService workDirectoryService,
FtpClientService ftpClientService,
PackageService packageService,
ProdConfigApiService prodConfigApiService,
SyncTaskService syncTaskService,
CheckpointService checkpointService,
AckFileService ackFileService,
AckService ackService,
SyncMetadataService syncMetadataService
) { ) {
this.syncProperties = syncProperties; this.syncProperties = syncProperties;
this.ftpProperties = ftpProperties; this.ftpProperties = ftpProperties;
this.prodApiProperties = prodApiProperties; this.prodApiProperties = prodApiProperties;
this.workDirectoryService = workDirectoryService;
this.ftpClientService = ftpClientService;
this.packageService = packageService;
this.prodConfigApiService = prodConfigApiService;
this.syncTaskService = syncTaskService;
this.checkpointService = checkpointService;
this.ackFileService = ackFileService;
this.ackService = ackService;
this.syncMetadataService = syncMetadataService;
} }
public void consumeDevPackages() { public void consumeDevPackages() {
log.info( try {
"PROD consume tick. nodeId={}, ftpBaseDir={}, pushPath={}", log.info(
syncProperties.getNodeId(), "PROD consume tick. nodeId={}, ftpBaseDir={}, pushPath={}",
ftpProperties.getBaseDir(), syncProperties.getNodeId(),
prodApiProperties.getPushPath() ftpProperties.getBaseDir(),
); prodApiProperties.getPushPath()
log.info("TODO implement: download dev-to-prod package -> validate -> call prod push API"); );
List<RemoteFileInfo> remoteFiles = ftpClientService.listFiles(syncProperties.getRemoteDevToProdOutDir(), ".zip");
for (RemoteFileInfo remoteFile : remoteFiles) {
consumeSingleDevPackage(remoteFile);
}
} catch (Exception e) {
log.error("PROD consume DEV packages failed", e);
}
} }
public void pullProdConfigAndStagePackage() { public void pullProdConfigAndStagePackage() {
log.info( try {
"PROD pull tick. apiBaseUrl={}, pullPath={}, stagingDir={}", log.info(
prodApiProperties.getBaseUrl(), "PROD pull tick. apiBaseUrl={}, pullPath={}, stagingDir={}",
prodApiProperties.getPullPath(), prodApiProperties.getBaseUrl(),
syncProperties.getProdToDevStagingDir() prodApiProperties.getPullPath(),
); syncProperties.getProdToDevStagingDir()
log.info("TODO implement: call prod pull API -> build package -> upload to FTP prod-to-dev/out"); );
ProdPullResult pullResult = prodConfigApiService.pullConfigSnapshot();
Optional<SyncTask> existing = syncTaskService.findByBusinessKey(
SyncDirection.PROD_TO_DEV,
pullResult.getSourceVersion(),
pullResult.getContentHash()
);
if (shouldSkipStage(existing)) {
log.info("PROD pull result already staged or finished. version={}, hash={}",
pullResult.getSourceVersion(), pullResult.getContentHash());
return;
}
String traceId = existing.map(SyncTask::getTraceId).orElse(syncMetadataService.newTraceId());
PackageManifest manifest = syncMetadataService.createManifest(
traceId,
SyncDirection.PROD_TO_DEV,
"PROD",
pullResult.getSourceVersion(),
pullResult.getContentHash()
);
if (existing.isPresent() && existing.get().getPackageName() != null) {
manifest.setPackageName(existing.get().getPackageName());
}
PackageBuildResult packageBuildResult = packageService.buildPackageFromDirectory(
pullResult.getContentDirectory(),
manifest
);
SyncTask task = syncTaskService.createOrLoadTask(
SyncDirection.PROD_TO_DEV,
pullResult.getSourceVersion(),
packageBuildResult.getContentHash(),
packageBuildResult.getPackageName(),
traceId
);
ftpClientService.uploadAtomic(
packageBuildResult.getZipFile(),
syncProperties.getRemoteProdToDevOutDir(),
task.getPackageName()
);
syncTaskService.markStatus(task.getTraceId(), SyncStatus.UPLOADED, null);
log.info("PROD package uploaded. traceId={}, packageName={}", task.getTraceId(), task.getPackageName());
} catch (Exception e) {
log.error("PROD pull and stage failed", e);
}
} }
public void scanDevAcks() { public void scanDevAcks() {
log.info("PROD ack scan tick. batchSize={}", syncProperties.getAckScanBatchSize()); try {
log.info("TODO implement: read prod-to-dev/ack and update sync_task state"); log.info("PROD ack scan tick. batchSize={}", syncProperties.getAckScanBatchSize());
List<RemoteFileInfo> ackFiles = ftpClientService.listFiles(syncProperties.getRemoteProdToDevAckDir(), ".json");
for (RemoteFileInfo ackFile : ackFiles) {
Path localAck = ftpClientService.download(ackFile.getPath(), workDirectoryService.getPackageTempDir());
SyncAckFile syncAckFile = ackFileService.readAckFile(localAck);
ackService.recordAck(
syncAckFile.getTraceId(),
syncAckFile.getAckSide(),
syncAckFile.getAckStatus(),
syncAckFile.getMessage()
);
syncTaskService.findByTraceId(syncAckFile.getTraceId()).ifPresent(task -> {
SyncStatus status = "SUCCESS".equalsIgnoreCase(syncAckFile.getAckStatus())
? SyncStatus.SUCCESS : SyncStatus.FAILED;
syncTaskService.markStatus(task.getTraceId(), status, syncAckFile.getMessage());
if (status == SyncStatus.SUCCESS) {
checkpointService.saveCheckpoint(task.getDirection(), task.getSourceVersion(), task.getContentHash());
}
});
ftpClientService.deleteFile(ackFile.getPath());
}
} catch (Exception e) {
log.error("PROD ack scan failed", e);
}
}
private void consumeSingleDevPackage(RemoteFileInfo remoteFile) {
PackageManifest manifest = null;
try {
Path localZip = ftpClientService.download(remoteFile.getPath(), workDirectoryService.getDevToProdStagingDir());
PackageReadResult readResult = packageService.extractPackage(localZip);
manifest = readResult.getManifest();
if (manifest.getDirection() != SyncDirection.DEV_TO_PROD) {
log.warn("Ignored remote file with unexpected direction. file={}, direction={}", remoteFile.getName(), manifest.getDirection());
return;
}
SyncTask task = syncTaskService.createOrLoadTask(
manifest.getDirection(),
manifest.getSourceVersion(),
manifest.getContentHash(),
manifest.getPackageName(),
manifest.getTraceId()
);
if (task.getStatus() == SyncStatus.SUCCESS) {
ftpClientService.deleteFile(remoteFile.getPath());
return;
}
prodConfigApiService.pushPackage(manifest, localZip);
syncTaskService.markStatus(task.getTraceId(), SyncStatus.SUCCESS, null);
checkpointService.saveCheckpoint(manifest.getDirection(), manifest.getSourceVersion(), manifest.getContentHash());
SyncAckFile ack = syncMetadataService.createAck(
manifest.getTraceId(),
manifest.getDirection(),
manifest.getSourceVersion(),
"PROD",
"SUCCESS",
"Package pushed to production API"
);
Path ackPath = ackFileService.writeAckFile(ack, manifest.getTraceId());
ftpClientService.uploadAtomic(
ackPath,
syncProperties.getRemoteDevToProdAckDir(),
syncMetadataService.buildAckFileName(manifest.getTraceId())
);
ackService.recordAck(manifest.getTraceId(), "PROD", "SUCCESS", ack.getMessage());
ftpClientService.deleteFile(remoteFile.getPath());
log.info("PROD consumed DEV package. traceId={}, packageName={}", manifest.getTraceId(), manifest.getPackageName());
} catch (Exception e) {
log.error("PROD failed to consume DEV package: {}", remoteFile.getName(), e);
if (manifest != null) {
syncTaskService.increaseRetryCount(manifest.getTraceId(), summarizeException(e));
Optional<SyncTask> task = syncTaskService.findByTraceId(manifest.getTraceId());
int retryCount = task.map(SyncTask::getRetryCount).orElse(0);
if (retryCount >= syncProperties.getMaxRetryCount()) {
syncTaskService.markStatus(manifest.getTraceId(), SyncStatus.FAILED, summarizeException(e));
uploadFailureAck(manifest, summarizeException(e));
moveToFailed(remoteFile, manifest);
}
}
}
}
private boolean shouldSkipStage(Optional<SyncTask> existing) {
return existing.isPresent()
&& (existing.get().getStatus() == SyncStatus.UPLOADED || existing.get().getStatus() == SyncStatus.SUCCESS);
}
private void uploadFailureAck(PackageManifest manifest, String message) {
try {
SyncAckFile ack = syncMetadataService.createAck(
manifest.getTraceId(),
manifest.getDirection(),
manifest.getSourceVersion(),
"PROD",
"FAILED",
message
);
Path ackPath = ackFileService.writeAckFile(ack, manifest.getTraceId());
ftpClientService.uploadAtomic(
ackPath,
syncProperties.getRemoteDevToProdAckDir(),
syncMetadataService.buildAckFileName(manifest.getTraceId())
);
ackService.recordAck(manifest.getTraceId(), "PROD", "FAILED", message);
} catch (Exception ex) {
log.error("PROD failed to upload failure ack. traceId={}", manifest.getTraceId(), ex);
}
}
private void moveToFailed(RemoteFileInfo remoteFile, PackageManifest manifest) {
try {
ftpClientService.moveFile(
remoteFile.getPath(),
syncProperties.getRemoteFailedDir(),
remoteFile.getName()
);
} catch (Exception e) {
log.error("PROD failed to move package to failed dir. traceId={}", manifest.getTraceId(), e);
}
}
private String summarizeException(Exception e) {
String message = e.getMessage();
if (message == null || message.trim().isEmpty()) {
return e.getClass().getSimpleName();
}
return message.length() > 400 ? message.substring(0, 400) : message;
} }
} }

View File

@ -0,0 +1,35 @@
package com.ftptool.sync.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ftptool.sync.model.SyncAckFile;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.OffsetDateTime;
@Service
public class AckFileService {
private final ObjectMapper objectMapper;
private final WorkDirectoryService workDirectoryService;
public AckFileService(ObjectMapper objectMapper, WorkDirectoryService workDirectoryService) {
this.objectMapper = objectMapper;
this.workDirectoryService = workDirectoryService;
}
public Path writeAckFile(SyncAckFile ackFile, String fileNamePrefix) throws IOException {
Path path = Files.createTempFile(workDirectoryService.getPackageTempDir(), fileNamePrefix + "-", ".ack.json");
if (ackFile.getProcessedAt() == null) {
ackFile.setProcessedAt(OffsetDateTime.now().toString());
}
objectMapper.writerWithDefaultPrettyPrinter().writeValue(path.toFile(), ackFile);
return path;
}
public SyncAckFile readAckFile(Path path) throws IOException {
return objectMapper.readValue(path.toFile(), SyncAckFile.class);
}
}

View File

@ -0,0 +1,189 @@
package com.ftptool.sync.service;
import com.ftptool.sync.config.FtpProperties;
import com.ftptool.sync.model.RemoteFileInfo;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
@Service
public class FtpClientService {
private static final Logger log = LoggerFactory.getLogger(FtpClientService.class);
private final FtpProperties ftpProperties;
public FtpClientService(FtpProperties ftpProperties) {
this.ftpProperties = ftpProperties;
}
@Retryable(maxAttempts = 3, backoff = @Backoff(delay = 2000, multiplier = 2.0))
public List<RemoteFileInfo> listFiles(String remoteDirectory, String suffix) throws IOException {
return withClient(client -> {
String normalizedPath = normalizeRemotePath(remoteDirectory);
FTPFile[] files = client.listFiles(normalizedPath);
List<RemoteFileInfo> result = new ArrayList<RemoteFileInfo>();
for (FTPFile file : files) {
if (!file.isFile()) {
continue;
}
if (suffix != null && !file.getName().endsWith(suffix)) {
continue;
}
result.add(new RemoteFileInfo(file.getName(), appendPath(remoteDirectory, file.getName())));
}
result.sort(Comparator.comparing(RemoteFileInfo::getName));
return result;
});
}
@Retryable(maxAttempts = 3, backoff = @Backoff(delay = 2000, multiplier = 2.0))
public Path download(String remotePath, Path localDirectory) throws IOException {
return withClient(client -> {
Files.createDirectories(localDirectory);
String fileName = remotePath.substring(remotePath.lastIndexOf('/') + 1);
Path localFile = localDirectory.resolve(fileName);
try (OutputStream outputStream = Files.newOutputStream(localFile)) {
if (!client.retrieveFile(normalizeRemotePath(remotePath), outputStream)) {
throw new IOException("Failed to download remote file: " + remotePath);
}
}
return localFile;
});
}
@Retryable(maxAttempts = 3, backoff = @Backoff(delay = 2000, multiplier = 2.0))
public void uploadAtomic(Path localFile, String remoteDirectory, String remoteFileName) throws IOException {
withClient(client -> {
ensureDirectoryExists(client, remoteDirectory);
String tempName = remoteFileName + ".tmp";
String tempPath = appendPath(remoteDirectory, tempName);
String finalPath = appendPath(remoteDirectory, remoteFileName);
try (InputStream inputStream = Files.newInputStream(localFile)) {
if (!client.storeFile(tempPath, inputStream)) {
throw new IOException("Failed to upload remote file: " + tempPath);
}
}
if (!client.rename(tempPath, finalPath)) {
throw new IOException("Failed to rename remote file: " + tempPath + " -> " + finalPath);
}
return null;
});
}
@Retryable(maxAttempts = 3, backoff = @Backoff(delay = 2000, multiplier = 2.0))
public void deleteFile(String remotePath) throws IOException {
withClient(client -> {
String normalized = normalizeRemotePath(remotePath);
if (!client.deleteFile(normalized)) {
log.warn("Remote file was not deleted: {}", normalized);
}
return null;
});
}
@Retryable(maxAttempts = 3, backoff = @Backoff(delay = 2000, multiplier = 2.0))
public void moveFile(String remotePath, String targetDirectory, String targetFileName) throws IOException {
withClient(client -> {
ensureDirectoryExists(client, targetDirectory);
String source = normalizeRemotePath(remotePath);
String target = appendPath(targetDirectory, targetFileName);
if (!client.rename(source, target)) {
throw new IOException("Failed to move remote file: " + source + " -> " + target);
}
return null;
});
}
public String appendPath(String directory, String fileName) {
return normalizeRemotePath(normalizeSubPath(directory)) + "/" + fileName;
}
private <T> T withClient(FtpCallback<T> callback) throws IOException {
FTPClient client = new FTPClient();
try {
client.setConnectTimeout(ftpProperties.getConnectTimeoutMs());
client.setDataTimeout(ftpProperties.getDataTimeoutMs());
client.setBufferSize(ftpProperties.getBufferSize());
client.connect(ftpProperties.getHost(), ftpProperties.getPort());
if (!client.login(ftpProperties.getUsername(), ftpProperties.getPassword())) {
throw new IOException("FTP login failed for user " + ftpProperties.getUsername());
}
client.setFileType(FTP.BINARY_FILE_TYPE);
if (ftpProperties.isPassiveMode()) {
client.enterLocalPassiveMode();
}
return callback.doWithClient(client);
} finally {
disconnectQuietly(client);
}
}
private void ensureDirectoryExists(FTPClient client, String directory) throws IOException {
String[] segments = normalizeSubPath(directory).split("/");
StringBuilder current = new StringBuilder();
for (String segment : segments) {
if (segment == null || segment.trim().isEmpty()) {
continue;
}
current.append("/").append(segment);
client.makeDirectory(withBaseDir(current.toString()));
}
}
private String normalizeRemotePath(String path) {
return withBaseDir(path.startsWith("/") ? path : "/" + path);
}
private String withBaseDir(String path) {
String baseDir = ftpProperties.getBaseDir();
if (baseDir == null || baseDir.trim().isEmpty() || "/".equals(baseDir.trim())) {
return path;
}
String normalizedBase = baseDir.startsWith("/") ? baseDir : "/" + baseDir;
normalizedBase = normalizedBase.endsWith("/") ? normalizedBase.substring(0, normalizedBase.length() - 1) : normalizedBase;
return normalizedBase + path;
}
private String normalizeSubPath(String path) {
if (path == null || path.trim().isEmpty()) {
return "/";
}
String normalized = path.startsWith("/") ? path : "/" + path;
return normalized.endsWith("/") && normalized.length() > 1
? normalized.substring(0, normalized.length() - 1)
: normalized;
}
private void disconnectQuietly(FTPClient client) {
if (client == null) {
return;
}
try {
if (client.isConnected()) {
client.logout();
client.disconnect();
}
} catch (IOException e) {
log.warn("Failed to disconnect FTP client cleanly", e);
}
}
private interface FtpCallback<T> {
T doWithClient(FTPClient client) throws IOException;
}
}

View File

@ -0,0 +1,181 @@
package com.ftptool.sync.service;
import com.ftptool.sync.config.GitRepoProperties;
import com.ftptool.sync.util.FileTreeUtils;
import org.eclipse.jgit.api.Git;
import org.eclipse.jgit.api.Status;
import org.eclipse.jgit.api.errors.GitAPIException;
import org.eclipse.jgit.lib.PersonIdent;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.storage.file.FileRepositoryBuilder;
import org.eclipse.jgit.transport.CredentialsProvider;
import org.eclipse.jgit.transport.RefSpec;
import org.eclipse.jgit.transport.UsernamePasswordCredentialsProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.stream.Stream;
@Service
public class GitClientService {
private static final Logger log = LoggerFactory.getLogger(GitClientService.class);
private final GitRepoProperties gitRepoProperties;
private final Object lock = new Object();
public GitClientService(GitRepoProperties gitRepoProperties) {
this.gitRepoProperties = gitRepoProperties;
}
public String prepareRepositoryAndGetHead(String branch) throws IOException, GitAPIException {
synchronized (lock) {
try (Git git = openOrCloneRepository()) {
checkoutBranch(git, branch);
pullIfRemoteBranchExists(git, branch);
return git.getRepository().resolve("HEAD").name();
}
}
}
public Path getRepositoryPath() {
return new File(gitRepoProperties.getLocalPath()).toPath().toAbsolutePath().normalize();
}
public Path exportBranchSnapshot(String branch, Path targetDirectory) throws IOException, GitAPIException {
synchronized (lock) {
try (Git git = openOrCloneRepository()) {
checkoutBranch(git, branch);
pullIfRemoteBranchExists(git, branch);
FileTreeUtils.deleteRecursively(targetDirectory);
FileTreeUtils.ensureDirectory(targetDirectory);
copyWorkingTreeWithoutGit(getRepositoryPath(), targetDirectory);
}
return targetDirectory;
}
}
public boolean syncDirectoryToBranch(Path sourceDirectory, String branch, String message) throws IOException, GitAPIException {
synchronized (lock) {
try (Git git = openOrCloneRepository()) {
checkoutBranch(git, branch);
Path repositoryPath = getRepositoryPath();
if (!Files.exists(repositoryPath.resolve(".git"))) {
throw new IOException("Git repository does not exist: " + repositoryPath);
}
FileTreeUtils.deleteChildrenExcept(repositoryPath, ".git");
FileTreeUtils.copyDirectory(sourceDirectory, repositoryPath);
git.add().addFilepattern(".").call();
git.add().setUpdate(true).addFilepattern(".").call();
Status status = git.status().call();
if (status.isClean()) {
log.info("No Git changes detected on branch {}", branch);
return false;
}
PersonIdent personIdent = new PersonIdent(
gitRepoProperties.getCommitAuthorName(),
gitRepoProperties.getCommitAuthorEmail()
);
git.commit()
.setMessage(message)
.setAuthor(personIdent)
.setCommitter(personIdent)
.call();
git.push()
.setCredentialsProvider(credentialsProvider())
.setRemote("origin")
.setRefSpecs(new RefSpec("refs/heads/" + branch + ":refs/heads/" + branch))
.call();
return true;
}
}
}
private Git openOrCloneRepository() throws IOException, GitAPIException {
Path repositoryPath = getRepositoryPath();
if (Files.exists(repositoryPath.resolve(".git"))) {
return Git.open(repositoryPath.toFile());
}
FileTreeUtils.ensureDirectory(repositoryPath);
return Git.cloneRepository()
.setURI(gitRepoProperties.getRemoteUri())
.setDirectory(repositoryPath.toFile())
.setCredentialsProvider(credentialsProvider())
.call();
}
private void checkoutBranch(Git git, String branch) throws GitAPIException, IOException {
Repository repository = git.getRepository();
Ref localRef = repository.findRef(branch);
Ref remoteRef = repository.findRef("refs/remotes/origin/" + branch);
if (localRef == null) {
if (remoteRef != null) {
git.checkout()
.setCreateBranch(true)
.setName(branch)
.setStartPoint("origin/" + branch)
.setUpstreamMode(org.eclipse.jgit.api.CreateBranchCommand.SetupUpstreamMode.TRACK)
.call();
} else {
git.checkout()
.setCreateBranch(true)
.setName(branch)
.call();
}
} else {
git.checkout().setName(branch).call();
}
}
private void pullIfRemoteBranchExists(Git git, String branch) throws GitAPIException, IOException {
Repository repository = git.getRepository();
Ref remoteRef = repository.findRef("refs/remotes/origin/" + branch);
if (remoteRef == null) {
git.fetch()
.setCredentialsProvider(credentialsProvider())
.setRemote("origin")
.call();
remoteRef = repository.findRef("refs/remotes/origin/" + branch);
}
if (remoteRef != null) {
git.pull()
.setRemote("origin")
.setRemoteBranchName(branch)
.setRebase(gitRepoProperties.isPullRebase())
.setCredentialsProvider(credentialsProvider())
.call();
}
}
private CredentialsProvider credentialsProvider() {
return new UsernamePasswordCredentialsProvider(
gitRepoProperties.getUsername(),
gitRepoProperties.getPassword()
);
}
private void copyWorkingTreeWithoutGit(Path repositoryPath, Path targetDirectory) throws IOException {
try (Stream<Path> stream = Files.list(repositoryPath)) {
stream.filter(path -> !".git".equals(path.getFileName().toString()))
.forEach(path -> {
try {
Path target = targetDirectory.resolve(path.getFileName().toString());
if (Files.isDirectory(path)) {
FileTreeUtils.copyDirectory(path, target);
} else {
Files.copy(path, target, StandardCopyOption.REPLACE_EXISTING);
}
} catch (IOException e) {
throw new IllegalStateException("Failed to export repository snapshot", e);
}
});
}
}
}

View File

@ -0,0 +1,136 @@
package com.ftptool.sync.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ftptool.sync.model.PackageBuildResult;
import com.ftptool.sync.model.PackageManifest;
import com.ftptool.sync.model.PackageReadResult;
import com.ftptool.sync.util.FileHashUtils;
import com.ftptool.sync.util.FileTreeUtils;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.time.OffsetDateTime;
import java.util.stream.Stream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
@Service
public class PackageService {
private static final String CONFIG_DIR = "config";
private static final String MANIFEST_FILE = "manifest.json";
private static final String HASH_FILE = "sha256.txt";
private final ObjectMapper objectMapper;
private final WorkDirectoryService workDirectoryService;
public PackageService(ObjectMapper objectMapper, WorkDirectoryService workDirectoryService) {
this.objectMapper = objectMapper;
this.workDirectoryService = workDirectoryService;
}
public PackageBuildResult buildPackageFromDirectory(Path sourceDirectory, PackageManifest manifest) throws IOException {
String contentHash = calculateDirectoryHash(sourceDirectory);
manifest.setContentHash(contentHash);
if (manifest.getCreatedAt() == null) {
manifest.setCreatedAt(OffsetDateTime.now().toString());
}
Path zipFile = workDirectoryService.getPackageTempDir().resolve(manifest.getPackageName());
FileTreeUtils.ensureDirectory(zipFile.getParent());
try (OutputStream outputStream = Files.newOutputStream(zipFile);
ZipOutputStream zipOutputStream = new ZipOutputStream(outputStream, StandardCharsets.UTF_8)) {
addJsonEntry(zipOutputStream, MANIFEST_FILE, manifest);
addTextEntry(zipOutputStream, HASH_FILE, contentHash);
addDirectoryEntries(zipOutputStream, sourceDirectory, CONFIG_DIR);
}
return new PackageBuildResult(zipFile, manifest.getPackageName(), contentHash);
}
public String calculateDirectoryHash(Path sourceDirectory) throws IOException {
return FileHashUtils.sha256Directory(sourceDirectory);
}
public PackageReadResult extractPackage(Path zipFile) throws IOException {
Path extractDir = Files.createTempDirectory(workDirectoryService.getPackageTempDir(), "pkg-");
Path configDir = extractDir.resolve(CONFIG_DIR);
PackageManifest manifest = null;
try (InputStream inputStream = Files.newInputStream(zipFile);
ZipInputStream zipInputStream = new ZipInputStream(inputStream, StandardCharsets.UTF_8)) {
ZipEntry entry;
while ((entry = zipInputStream.getNextEntry()) != null) {
Path target = extractDir.resolve(entry.getName()).normalize();
if (!target.startsWith(extractDir)) {
throw new IOException("Zip entry escapes target directory: " + entry.getName());
}
if (entry.isDirectory()) {
Files.createDirectories(target);
continue;
}
Files.createDirectories(target.getParent());
Files.copy(zipInputStream, target, StandardCopyOption.REPLACE_EXISTING);
if (MANIFEST_FILE.equals(entry.getName())) {
manifest = objectMapper.readValue(target.toFile(), PackageManifest.class);
}
}
}
if (manifest == null) {
throw new IOException("Package manifest.json is missing");
}
if (Files.notExists(configDir) || !Files.isDirectory(configDir)) {
throw new IOException("Package config directory is missing");
}
String actualHash = calculateDirectoryHash(configDir);
if (manifest.getContentHash() != null
&& !manifest.getContentHash().trim().isEmpty()
&& !manifest.getContentHash().equals(actualHash)) {
throw new IOException("Package content hash mismatch");
}
return new PackageReadResult(manifest, configDir);
}
private void addDirectoryEntries(ZipOutputStream zipOutputStream, Path sourceDirectory, String rootName) throws IOException {
Path gitDirectory = sourceDirectory.resolve(".git");
try (Stream<Path> stream = Files.walk(sourceDirectory)) {
stream.filter(path -> !path.equals(sourceDirectory))
.filter(path -> !path.startsWith(gitDirectory))
.forEach(path -> {
Path relative = sourceDirectory.relativize(path);
String entryName = rootName + "/" + relative.toString().replace('\\', '/');
try {
if (Files.isDirectory(path)) {
zipOutputStream.putNextEntry(new ZipEntry(entryName + "/"));
zipOutputStream.closeEntry();
} else {
zipOutputStream.putNextEntry(new ZipEntry(entryName));
Files.copy(path, zipOutputStream);
zipOutputStream.closeEntry();
}
} catch (IOException e) {
throw new IllegalStateException("Failed to package path: " + path, e);
}
});
}
}
private void addJsonEntry(ZipOutputStream zipOutputStream, String fileName, Object object) throws IOException {
zipOutputStream.putNextEntry(new ZipEntry(fileName));
zipOutputStream.write(objectMapper.writerWithDefaultPrettyPrinter().writeValueAsBytes(object));
zipOutputStream.closeEntry();
}
private void addTextEntry(ZipOutputStream zipOutputStream, String fileName, String value) throws IOException {
zipOutputStream.putNextEntry(new ZipEntry(fileName));
zipOutputStream.write(value.getBytes(StandardCharsets.UTF_8));
zipOutputStream.closeEntry();
}
}

View File

@ -0,0 +1,126 @@
package com.ftptool.sync.service;
import com.ftptool.sync.config.ProdApiProperties;
import com.ftptool.sync.config.SyncProperties;
import com.ftptool.sync.model.PackageManifest;
import com.ftptool.sync.model.ProdPullResult;
import com.ftptool.sync.util.FileHashUtils;
import com.ftptool.sync.util.FileTreeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.FileSystemResource;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.client.RestTemplate;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
@Service
public class ProdConfigApiService {
private static final Logger log = LoggerFactory.getLogger(ProdConfigApiService.class);
private final ProdApiProperties prodApiProperties;
private final SyncProperties syncProperties;
private final RestTemplate restTemplate;
private final WorkDirectoryService workDirectoryService;
public ProdConfigApiService(
ProdApiProperties prodApiProperties,
SyncProperties syncProperties,
RestTemplate restTemplate,
WorkDirectoryService workDirectoryService
) {
this.prodApiProperties = prodApiProperties;
this.syncProperties = syncProperties;
this.restTemplate = restTemplate;
this.workDirectoryService = workDirectoryService;
}
public void pushPackage(PackageManifest manifest, Path zipFile) {
String url = buildUrl(prodApiProperties.getPushPath());
HttpHeaders headers = defaultHeaders();
headers.setContentType(MediaType.MULTIPART_FORM_DATA);
MultiValueMap<String, Object> body = new LinkedMultiValueMap<String, Object>();
body.add("file", new FileSystemResource(zipFile.toFile()));
body.add("traceId", manifest.getTraceId());
body.add("direction", manifest.getDirection().name());
body.add("sourceVersion", manifest.getSourceVersion());
body.add("contentHash", manifest.getContentHash());
ResponseEntity<String> response = restTemplate.postForEntity(url, new HttpEntity<MultiValueMap<String, Object>>(body, headers), String.class);
if (!response.getStatusCode().is2xxSuccessful()) {
throw new IllegalStateException("Prod push API failed with status " + response.getStatusCodeValue());
}
log.info("Prod push API finished. traceId={}, status={}", manifest.getTraceId(), response.getStatusCodeValue());
}
public ProdPullResult pullConfigSnapshot() throws IOException {
String url = buildUrl(prodApiProperties.getPullPath());
HttpHeaders headers = defaultHeaders();
ResponseEntity<byte[]> response = restTemplate.exchange(
url,
HttpMethod.GET,
new HttpEntity<Void>(headers),
byte[].class
);
if (!response.getStatusCode().is2xxSuccessful()) {
throw new IllegalStateException("Prod pull API failed with status " + response.getStatusCodeValue());
}
byte[] body = response.getBody();
if (body == null || body.length == 0) {
throw new IllegalStateException("Prod pull API returned empty content");
}
Path tempDir = Files.createTempDirectory(workDirectoryService.getProdToDevStagingDir(), "pull-");
FileTreeUtils.ensureDirectory(tempDir);
Path targetFile = tempDir.resolve(syncProperties.getPullResponseFileName());
Files.write(targetFile, body);
String contentHash = FileHashUtils.sha256(body);
String sourceVersion = firstNonBlank(
response.getHeaders().getFirst("X-Config-Version"),
response.getHeaders().getETag(),
contentHash
);
return new ProdPullResult(tempDir, sourceVersion, contentHash);
}
private HttpHeaders defaultHeaders() {
HttpHeaders headers = new HttpHeaders();
headers.setAccept(java.util.Collections.singletonList(MediaType.APPLICATION_JSON));
if (prodApiProperties.getToken() != null && !prodApiProperties.getToken().trim().isEmpty()) {
headers.setBearerAuth(prodApiProperties.getToken().trim());
}
return headers;
}
private String buildUrl(String path) {
String base = prodApiProperties.getBaseUrl();
if (base.endsWith("/") && path.startsWith("/")) {
return base.substring(0, base.length() - 1) + path;
}
if (!base.endsWith("/") && !path.startsWith("/")) {
return base + "/" + path;
}
return base + path;
}
private String firstNonBlank(String... candidates) {
for (String candidate : candidates) {
if (candidate != null && !candidate.trim().isEmpty()) {
return candidate;
}
}
return null;
}
}

View File

@ -0,0 +1,69 @@
package com.ftptool.sync.service;
import com.ftptool.sync.model.PackageManifest;
import com.ftptool.sync.model.SyncAckFile;
import com.ftptool.sync.model.SyncDirection;
import org.springframework.stereotype.Service;
import java.time.OffsetDateTime;
import java.util.UUID;
@Service
public class SyncMetadataService {
public String newTraceId() {
return UUID.randomUUID().toString().replace("-", "");
}
public PackageManifest createManifest(
String traceId,
SyncDirection direction,
String sourceEnv,
String sourceVersion,
String contentHash
) {
PackageManifest manifest = new PackageManifest();
manifest.setTraceId(traceId);
manifest.setDirection(direction);
manifest.setSourceEnv(sourceEnv);
manifest.setSourceVersion(sourceVersion);
manifest.setContentHash(contentHash);
manifest.setCreatedAt(OffsetDateTime.now().toString());
manifest.setPackageName(buildPackageFileName(direction, sourceVersion, traceId));
return manifest;
}
public SyncAckFile createAck(
String traceId,
SyncDirection direction,
String sourceVersion,
String ackSide,
String ackStatus,
String message
) {
SyncAckFile ackFile = new SyncAckFile();
ackFile.setTraceId(traceId);
ackFile.setDirection(direction);
ackFile.setSourceVersion(sourceVersion);
ackFile.setAckSide(ackSide);
ackFile.setAckStatus(ackStatus);
ackFile.setMessage(message);
ackFile.setProcessedAt(OffsetDateTime.now().toString());
return ackFile;
}
public String buildPackageFileName(SyncDirection direction, String sourceVersion, String traceId) {
return direction.name().toLowerCase() + "-" + sanitize(sourceVersion) + "-" + sanitize(traceId) + ".zip";
}
public String buildAckFileName(String traceId) {
return "ack-" + sanitize(traceId) + ".json";
}
private String sanitize(String value) {
if (value == null || value.trim().isEmpty()) {
return "unknown";
}
return value.replaceAll("[^a-zA-Z0-9._-]", "_");
}
}

View File

@ -21,6 +21,17 @@ public class SyncTaskService {
@Transactional @Transactional
public SyncTask createOrLoadTask(SyncDirection direction, String sourceVersion, String contentHash, String packageName) { public SyncTask createOrLoadTask(SyncDirection direction, String sourceVersion, String contentHash, String packageName) {
return createOrLoadTask(direction, sourceVersion, contentHash, packageName, null);
}
@Transactional
public SyncTask createOrLoadTask(
SyncDirection direction,
String sourceVersion,
String contentHash,
String packageName,
String preferredTraceId
) {
Optional<SyncTask> existing = syncTaskRepository.findByDirectionAndSourceVersionAndContentHash( Optional<SyncTask> existing = syncTaskRepository.findByDirectionAndSourceVersionAndContentHash(
direction, sourceVersion, contentHash direction, sourceVersion, contentHash
); );
@ -29,7 +40,9 @@ public class SyncTaskService {
} }
SyncTask task = new SyncTask(); SyncTask task = new SyncTask();
task.setTraceId(UUID.randomUUID().toString().replace("-", "")); task.setTraceId(preferredTraceId == null || preferredTraceId.trim().isEmpty()
? UUID.randomUUID().toString().replace("-", "")
: preferredTraceId);
task.setDirection(direction); task.setDirection(direction);
task.setSourceVersion(sourceVersion); task.setSourceVersion(sourceVersion);
task.setContentHash(contentHash); task.setContentHash(contentHash);
@ -43,6 +56,11 @@ public class SyncTaskService {
return syncTaskRepository.findByTraceId(traceId); return syncTaskRepository.findByTraceId(traceId);
} }
@Transactional(readOnly = true)
public Optional<SyncTask> findByBusinessKey(SyncDirection direction, String sourceVersion, String contentHash) {
return syncTaskRepository.findByDirectionAndSourceVersionAndContentHash(direction, sourceVersion, contentHash);
}
@Transactional @Transactional
public void markStatus(String traceId, SyncStatus status, String errorMsg) { public void markStatus(String traceId, SyncStatus status, String errorMsg) {
syncTaskRepository.findByTraceId(traceId).ifPresent(task -> { syncTaskRepository.findByTraceId(traceId).ifPresent(task -> {

View File

@ -0,0 +1,44 @@
package com.ftptool.sync.service;
import com.ftptool.sync.config.SyncProperties;
import com.ftptool.sync.util.FileTreeUtils;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
@Service
public class WorkDirectoryService {
private final SyncProperties syncProperties;
public WorkDirectoryService(SyncProperties syncProperties) {
this.syncProperties = syncProperties;
}
@PostConstruct
public void initialize() throws IOException {
FileTreeUtils.ensureDirectory(getWorkDir());
FileTreeUtils.ensureDirectory(getPackageTempDir());
FileTreeUtils.ensureDirectory(getDevToProdStagingDir());
FileTreeUtils.ensureDirectory(getProdToDevStagingDir());
}
public Path getWorkDir() {
return Paths.get(syncProperties.getWorkDir()).toAbsolutePath().normalize();
}
public Path getPackageTempDir() {
return Paths.get(syncProperties.getPackageTempDir()).toAbsolutePath().normalize();
}
public Path getDevToProdStagingDir() {
return Paths.get(syncProperties.getDevToProdStagingDir()).toAbsolutePath().normalize();
}
public Path getProdToDevStagingDir() {
return Paths.get(syncProperties.getProdToDevStagingDir()).toAbsolutePath().normalize();
}
}

View File

@ -0,0 +1,77 @@
package com.ftptool.sync.util;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public final class FileHashUtils {
private FileHashUtils() {
}
public static String sha256(Path file) throws IOException {
MessageDigest digest = newDigest();
try (InputStream inputStream = Files.newInputStream(file);
DigestInputStream digestInputStream = new DigestInputStream(inputStream, digest)) {
byte[] buffer = new byte[8192];
while (digestInputStream.read(buffer) != -1) {
// Consume stream for digest calculation.
}
}
return toHex(digest.digest());
}
public static String sha256(byte[] bytes) {
MessageDigest digest = newDigest();
digest.update(bytes);
return toHex(digest.digest());
}
public static String sha256Directory(Path directory) throws IOException {
MessageDigest digest = newDigest();
List<Path> files = listRegularFiles(directory);
for (Path file : files) {
Path relative = directory.relativize(file);
digest.update(relative.toString().replace('\\', '/').getBytes(StandardCharsets.UTF_8));
digest.update((byte) '\n');
digest.update(Files.readAllBytes(file));
digest.update((byte) '\n');
}
return toHex(digest.digest());
}
private static List<Path> listRegularFiles(Path directory) throws IOException {
try (Stream<Path> stream = Files.walk(directory)) {
return stream
.filter(Files::isRegularFile)
.sorted(Comparator.comparing(path -> directory.relativize(path).toString().replace('\\', '/')))
.collect(Collectors.toCollection(ArrayList::new));
}
}
private static MessageDigest newDigest() {
try {
return MessageDigest.getInstance("SHA-256");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException("SHA-256 digest is unavailable", e);
}
}
private static String toHex(byte[] bytes) {
StringBuilder builder = new StringBuilder(bytes.length * 2);
for (byte aByte : bytes) {
builder.append(String.format("%02x", aByte));
}
return builder.toString();
}
}

View File

@ -0,0 +1,72 @@
package com.ftptool.sync.util;
import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Comparator;
import java.util.stream.Stream;
public final class FileTreeUtils {
private FileTreeUtils() {
}
public static void ensureDirectory(Path path) throws IOException {
if (path != null) {
Files.createDirectories(path);
}
}
public static void deleteChildrenExcept(Path directory, String reservedName) throws IOException {
try (Stream<Path> stream = Files.list(directory)) {
for (Path child : stream.sorted(Comparator.reverseOrder()).toArray(Path[]::new)) {
if (reservedName.equals(child.getFileName().toString())) {
continue;
}
deleteRecursively(child);
}
}
}
public static void deleteRecursively(Path path) throws IOException {
if (path == null || Files.notExists(path)) {
return;
}
Files.walkFileTree(path, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.deleteIfExists(file);
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
Files.deleteIfExists(dir);
return FileVisitResult.CONTINUE;
}
});
}
public static void copyDirectory(Path source, Path target) throws IOException {
ensureDirectory(target);
Files.walkFileTree(source, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
Path relative = source.relativize(dir);
Files.createDirectories(target.resolve(relative));
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Path relative = source.relativize(file);
Files.copy(file, target.resolve(relative), StandardCopyOption.REPLACE_EXISTING);
return FileVisitResult.CONTINUE;
}
});
}
}

View File

@ -27,6 +27,12 @@ sync.dev-to-prod-staging-dir=./work/staging/dev-to-prod
sync.prod-to-dev-staging-dir=./work/staging/prod-to-dev sync.prod-to-dev-staging-dir=./work/staging/prod-to-dev
sync.max-retry-count=5 sync.max-retry-count=5
sync.ack-scan-batch-size=50 sync.ack-scan-batch-size=50
sync.remote-dev-to-prod-out-dir=/dev-to-prod/out
sync.remote-dev-to-prod-ack-dir=/dev-to-prod/ack
sync.remote-prod-to-dev-out-dir=/prod-to-dev/out
sync.remote-prod-to-dev-ack-dir=/prod-to-dev/ack
sync.remote-failed-dir=/failed
sync.pull-response-file-name=prod-config.json
# FTP defaults # FTP defaults
ftp.host=127.0.0.1 ftp.host=127.0.0.1