提交 50f630ee 作者: guofeng

KNVV更新

上级 ada585d5
......@@ -13,8 +13,10 @@ import com.huazheng.project.hana.mapper.SapMapper;
import com.huazheng.project.hana.model.Afko;
import com.huazheng.project.hana.model.Afpo;
import com.huazheng.project.hana.model.Aufk;
import com.huazheng.project.hana.model.Ausp;
import com.huazheng.project.hana.model.Bkpf;
import com.huazheng.project.hana.model.Kna1;
import com.huazheng.project.hana.model.Knvv;
import com.huazheng.project.hana.model.Likp;
import com.huazheng.project.hana.model.Lips;
import com.huazheng.project.hana.model.Mara;
......@@ -25,6 +27,8 @@ import com.huazheng.project.hana.model.Zsd06;
import com.huazheng.project.hana.model.Zsdfhzl;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.crypto.SecureUtil;
import cn.hutool.json.JSONUtil;
import lombok.extern.log4j.Log4j2;
@Log4j2
......@@ -425,5 +429,79 @@ public class CheckDeleteServiceImpl {
redis1Template.opsForValue().set("huazheng:checkDeleteError:Mara:rowNum", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectAuspCheckByDelete() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkDelete:Ausp:rowNum", "0");
String rowNum = opsForValue.get("huazheng:checkDelete:Ausp:rowNum");
Ausp build = Ausp.builder().rowNum(rowNum).build();
List<Ausp> list = gpMapper.selectAuspCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) {
redis1Template.opsForValue().set("huazheng:checkDelete:Ausp:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Ausp source = sapMapper.selectAuspById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteAusp(target); // 删除数仓中的数据
operator = "delete";
}
redis1Template.opsForValue().set("huazheng:checkDelete:Ausp:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectAuspcheckDelete --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkDeleteError:Ausp:rowNum", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectKnvvCheckByDelete() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkDelete:Knvv:rowNum", "0");
String rowNum = opsForValue.get("huazheng:checkDelete:Knvv:rowNum");
Knvv build = Knvv.builder().rowNum(rowNum).build();
List<Knvv> list = gpMapper.selectKnvvCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) {
redis1Template.opsForValue().set("huazheng:checkDelete:Knvv:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Knvv source = sapMapper.selectKnvvById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteKnvv(target); // 删除数仓中的数据
operator = "delete";
} else { // 源库中有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
while (true) {
try {
gpMapper.updateKnvv(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:checkDelete:Knvv:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectKnvvcheckDelete --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkDeleteError:Knvv:rowNum", SomeUtils.getErrorInfoFromException(e));
}
}
}
......@@ -17,6 +17,7 @@ import com.huazheng.project.hana.model.Aufk;
import com.huazheng.project.hana.model.Ausp;
import com.huazheng.project.hana.model.Bkpf;
import com.huazheng.project.hana.model.Kna1;
import com.huazheng.project.hana.model.Knvv;
import com.huazheng.project.hana.model.Likp;
import com.huazheng.project.hana.model.Lips;
import com.huazheng.project.hana.model.Mara;
......@@ -273,6 +274,12 @@ public class CheckUpdateServiceImpl {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
// 级联更新业务
Knvv sKnvv = sapMapper.cascadeKnvvByKna1(source); // 级联查询源库afko表
Knvv tKnvv = gpMapper.selectKnvv(sKnvv); // 查询目标库中afko表
cascadeKnvvCheckByUpdate(sKnvv, tKnvv); // 级联更新afko表
ThreadUtil.safeSleep(500);
}
}
......@@ -286,6 +293,28 @@ public class CheckUpdateServiceImpl {
redis1Template.opsForValue().set("huazheng:checkUpdateError:Kna1:rowids", SomeUtils.getErrorInfoFromException(e));
}
}
// 所属selectKna1CheckByUpdate的级联
public void cascadeKnvvCheckByUpdate(Knvv source, Knvv target) {
if (target != null) { // 目标库有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
// ===============================
while (true) {
try {
gpMapper.updateKnvv(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
}
public void selectLikpCheckUpdate() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......@@ -727,6 +756,7 @@ public class CheckUpdateServiceImpl {
redis1Template.opsForValue().set("huazheng:checkUpdateError:Mara:rowids", SomeUtils.getErrorInfoFromException(e));
}
}
// 所属selectMaraCheckUpdate的级联
private void cascadeAuspCheckByUpdate(Ausp source, Ausp target) {
if (target != null) { // 目标库有数据
......
......@@ -18,14 +18,12 @@ import com.huazheng.project.greenplum.mapper.GPMapper;
import com.huazheng.project.hana.mapper.SapMapper;
import com.huazheng.project.hana.model.Afvc;
import com.huazheng.project.hana.model.Aufk;
import com.huazheng.project.hana.model.Ausp;
import com.huazheng.project.hana.model.Bsad;
import com.huazheng.project.hana.model.Bsid;
//import com.huazheng.project.hana.model.Bsid2Bsad;
import com.huazheng.project.hana.model.Kna1;
import com.huazheng.project.hana.model.Knkk;
import com.huazheng.project.hana.model.Knvp;
import com.huazheng.project.hana.model.Knvv;
import com.huazheng.project.hana.model.Konv;
import com.huazheng.project.hana.model.Likp;
import com.huazheng.project.hana.model.Makt;
......@@ -917,7 +915,6 @@ public class DeleteUpdateJobServiceImpl {
// selectKonvCheck(); // 21 注释掉
selectKnkkCheck(); // 19
selectKnvvCheck(); // 20
selectMaktCheck(); // 24
selectMskaCheck(); // 26
selectS066Check(); // 28
......@@ -933,7 +930,6 @@ public class DeleteUpdateJobServiceImpl {
selectAfvcCheck();
selectTspatCheck();
selectZpoeditCheck();
selectAuspCheck();
selectKnvpCheck();
}
......@@ -1030,48 +1026,6 @@ public class DeleteUpdateJobServiceImpl {
redis1Template.opsForValue().set("huazheng:checkUpdateError:Knkk:rowids", getErrorInfoFromException(e));
}
}
private void selectKnvvCheck() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:check:Knvv:rowNum", "0");
String rowNum = opsForValue.get("huazheng:check:Knvv:rowNum");
Knvv build = Knvv.builder().rowNum(rowNum).build();
List<Knvv> list = gpMapper.selectKnvvCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) {
redis1Template.opsForValue().set("huazheng:check:Knvv:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Knvv source = sapMapper.selectKnvvById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteKnvv(target); // 删除数仓中的数据
operator = "delete";
} else { // 源库中有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
while (true) {
try {
gpMapper.updateKnvv(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:check:Knvv:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectKnvvCheck --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Knvv:rowNum", getErrorInfoFromException(e));
}
}
private void selectAfvcCheck() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......@@ -1114,7 +1068,6 @@ public class DeleteUpdateJobServiceImpl {
redis1Template.opsForValue().set("huazheng:checkError:Afvc:rowNum", getErrorInfoFromException(e));
}
}
private void selectKnvpCheck() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......@@ -1157,49 +1110,6 @@ public class DeleteUpdateJobServiceImpl {
redis1Template.opsForValue().set("huazheng:checkError:Knvp:rowNum", getErrorInfoFromException(e));
}
}
private void selectAuspCheck() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:check:Ausp:rowNum", "0");
String rowNum = opsForValue.get("huazheng:check:Ausp:rowNum");
Ausp build = Ausp.builder().rowNum(rowNum).build();
List<Ausp> list = gpMapper.selectAuspCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) {
redis1Template.opsForValue().set("huazheng:check:Ausp:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Ausp source = sapMapper.selectAuspById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteAusp(target); // 删除数仓中的数据
operator = "delete";
} else { // 源库中有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
while (true) {
try {
gpMapper.updateAusp(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:check:Ausp:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectAuspCheck --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Ausp:rowNum", getErrorInfoFromException(e));
}
}
private void selectTspatCheck() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......@@ -1329,7 +1239,6 @@ public class DeleteUpdateJobServiceImpl {
}
}
private void selectMaktCheck() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......@@ -1429,7 +1338,6 @@ public class DeleteUpdateJobServiceImpl {
redis1Template.opsForValue().set("huazheng:checkError:Mska:rowNum", getErrorInfoFromException(e));
}
}
private void selectS066Check() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......@@ -1636,7 +1544,6 @@ public class DeleteUpdateJobServiceImpl {
redis1Template.opsForValue().set("huazheng:checkError:T023t:rowNum", getErrorInfoFromException(e));
}
}
private void selectVbepCheck() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......@@ -1730,7 +1637,6 @@ public class DeleteUpdateJobServiceImpl {
redis1Template.opsForValue().set("huazheng:checkError:Vbpa:rowNum", getErrorInfoFromException(e));
}
}
private void selectVbrpCheck() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......@@ -1863,15 +1769,6 @@ public class DeleteUpdateJobServiceImpl {
redis1Template.opsForValue().set("huazheng:checkError:Zmdpc:rowNum", getErrorInfoFromException(e));
}
}
private void selectTvkbtCheck() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......@@ -2184,15 +2081,6 @@ public class DeleteUpdateJobServiceImpl {
// 有更新时间的表的删除计数器需要拆分出来,避免数据不能删除
// 把这些方法的删除部分从原方法种剥离出来
public void checkJob6() {
// selectKnkkCheckUpdate();
// selectLikpCheckUpdate();
// selectLipsCheckUpdate();
// selectPa0002CheckUpdate();
// selectVbakCheckUpdate();
// selectVbrkCheckUpdate();
// selectZsd06CheckUpdate();
// selectZsdfhzlCheckUpdate();
selectVbapCheckDelete();
}
......
......@@ -88,16 +88,12 @@ public class JobServiceImpl {
@Autowired
private SapMapper sapMapper;
@Autowired
private CrmMapper crmMapper;
@Autowired
private TmsMapper tmsMapper;
@Autowired
private HzcrmMapper hzcrmMapper;
@Autowired
private UltimusDBMapper ultimusDBMapper;
......@@ -588,7 +584,6 @@ public class JobServiceImpl {
}
}
}
public void selectBkpfNew() {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
......@@ -630,7 +625,6 @@ public class JobServiceImpl {
}
}
}
private void selectKnvpNew() {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
......@@ -656,7 +650,6 @@ public class JobServiceImpl {
}
}
}
private void selectAfruNew() {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
......@@ -682,7 +675,6 @@ public class JobServiceImpl {
}
}
}
private void selectTspatNew() {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
......@@ -708,7 +700,6 @@ public class JobServiceImpl {
}
}
}
private void selectZpoeditNew() {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
......@@ -734,7 +725,6 @@ public class JobServiceImpl {
}
}
}
private void selectMkpfNew() {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
......@@ -760,7 +750,6 @@ public class JobServiceImpl {
}
}
}
private void selectMsegNew() {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
......@@ -786,7 +775,6 @@ public class JobServiceImpl {
}
}
}
private void selectAuspNew() {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
......@@ -812,7 +800,6 @@ public class JobServiceImpl {
}
}
}
private void selectAfpoNew() {
DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class);
......
......@@ -95,6 +95,7 @@ public interface SapMapper {
public Afpo cascadeAfpoByAufk(Aufk aufk);
public List<Bkpf> selectBkpfCheckByUpdate(Bkpf bkpf);
public List<Kna1> selectKna1CheckUpdate(Kna1 kna1);
public Knvv cascadeKnvvByKna1(Kna1 kna1);
public List<Likp> selectLikpCheckByUpdate(Likp likp);
public List<Lips> selectLipsCheckByUpdate(Lips lips);
public List<Pa0002> selectPa0002CheckByUpdate(Pa0002 pa0002);
......
......@@ -368,6 +368,12 @@
where "$rowid$" &gt; #{rowids} and updat != '00000000' and updat = CURRENT_DATE ${hana_mandt}
order by "$rowid$"
</select>
<select id="cascadeKnvvByKna1" parameterType="Kna1" resultType="Knvv">
select
mandt, kunnr, vkorg, vtweg, spart, waers, klabc
from ${hana_user}.Knvv
where kunnr = #{kunnr} ${hana_mandt}
</select>
<select id="selectLikpCheckByUpdate" parameterType="Likp" resultType="Likp">
select top 20 "$rowid$" as rowids,
vbeln, mandt, erdat, wadat_ist, ctlpc, kunnr, aedat,lfuhr
......
......@@ -925,6 +925,24 @@
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkDeleteServiceImpl" />
<property name="targetMethod" value="selectAuspCheckByDelete" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkDeleteServiceImpl" />
<property name="targetMethod" value="selectKnvvCheckByDelete" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
</list>
</constructor-arg>
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论