提交 a5252897 作者: guofeng

增量更新逻辑

上级 e3c0e580
package com.huazheng.project.greenplum.service.impl;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -10,12 +8,16 @@ import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Service;
import com.huazheng.project.greenplum.mapper.GPMapper;
import com.huazheng.project.greenplum.utils.SomeUtils;
import com.huazheng.project.hana.mapper.SapMapper;
import com.huazheng.project.hana.model.Afko;
import com.huazheng.project.hana.model.Afpo;
import com.huazheng.project.hana.model.Aufk;
import com.huazheng.project.hana.model.Bkpf;
import com.huazheng.project.hana.model.Kna1;
import com.huazheng.project.hana.model.Likp;
import com.huazheng.project.hana.model.Lips;
import com.huazheng.project.hana.model.Pa0002;
import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2;
......@@ -26,24 +28,11 @@ public class CheckDeleteServiceImpl {
@Autowired
private RedisTemplate<String, String> redis1Template;
@Autowired
private GPMapper gpMapper;
@Autowired
private SapMapper sapMapper;
public String getErrorInfoFromException(Exception e) {
try {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) {
return "bad getErrorInfoFromException";
}
}
public void selectAufkCheckByDelete() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......@@ -70,10 +59,9 @@ public class CheckDeleteServiceImpl {
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkDeleteError:Aufk:rowNum", getErrorInfoFromException(e));
redis1Template.opsForValue().set("huazheng:checkDeleteError:Aufk:rowNum", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectAfkoCheckByDelete() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......@@ -100,10 +88,9 @@ public class CheckDeleteServiceImpl {
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkDeleteError:Afko:rowNum", getErrorInfoFromException(e));
redis1Template.opsForValue().set("huazheng:checkDeleteError:Afko:rowNum", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectAfpoCheckByDelete() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......@@ -131,10 +118,9 @@ public class CheckDeleteServiceImpl {
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkDeleteError:Afpo:rowNum", getErrorInfoFromException(e));
redis1Template.opsForValue().set("huazheng:checkDeleteError:Afpo:rowNum", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectBkpfCheckByDelete() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......@@ -162,10 +148,9 @@ public class CheckDeleteServiceImpl {
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkDeleteError:Bkpf:rowNum", getErrorInfoFromException(e));
redis1Template.opsForValue().set("huazheng:checkDeleteError:Bkpf:rowNum", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectKna1CheckByDelete() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......@@ -193,7 +178,97 @@ public class CheckDeleteServiceImpl {
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkDeleteError:Kna1:rowNum", getErrorInfoFromException(e));
redis1Template.opsForValue().set("huazheng:checkDeleteError:Kna1:rowNum", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectLikpCheckByDelete() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkDelete:Likp:rowNum", "0");
String rowNum = opsForValue.get("huazheng:checkDelete:Likp:rowNum");
Likp build = Likp.builder().rowNum(rowNum).build();
List<Likp> list = gpMapper.selectLikpCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
redis1Template.opsForValue().set("huazheng:checkDelete:Likp:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Likp source = sapMapper.selectLikpById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteLikp(target); // 删除数仓中的数据
operator = "delete";
}
redis1Template.opsForValue().set("huazheng:checkDelete:Likp:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectLikpcheckDelete --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkDeleteError:Likp:rowNum", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectLipsCheckByDelete() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkDelete:Lips:rowNum", "0");
String rowNum = opsForValue.get("huazheng:checkDelete:Lips:rowNum");
Lips build = Lips.builder().rowNum(rowNum).build();
List<Lips> list = gpMapper.selectLipsCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
redis1Template.opsForValue().set("huazheng:checkDelete:Lips:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Lips source = sapMapper.selectLipsById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteLips(target); // 删除数仓中的数据
operator = "delete";
}
redis1Template.opsForValue().set("huazheng:checkDelete:Lips:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectLipscheckDelete --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkDeleteError:Lips:rowNum", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectPa0002CheckByDelete() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkDelete:Pa0002:rowNum", "0");
String rowNum = opsForValue.get("huazheng:checkDelete:Pa0002:rowNum");
Pa0002 build = Pa0002.builder().rowNum(rowNum).build();
List<Pa0002> list = gpMapper.selectPa0002Check(build); // 从数仓中查询一组数据
if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
redis1Template.opsForValue().set("huazheng:checkDelete:Pa0002:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Pa0002 source = sapMapper.selectPa0002ById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deletePa0002(target); // 删除数仓中的数据
operator = "delete";
}
redis1Template.opsForValue().set("huazheng:checkDelete:Pa0002:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectPa0002checkDelete --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkDeleteError:Pa0002:rowNum", SomeUtils.getErrorInfoFromException(e));
}
}
......
package com.huazheng.project.greenplum.service.impl;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Date;
import java.util.List;
......@@ -11,12 +9,16 @@ import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Service;
import com.huazheng.project.greenplum.mapper.GPMapper;
import com.huazheng.project.greenplum.utils.SomeUtils;
import com.huazheng.project.hana.mapper.SapMapper;
import com.huazheng.project.hana.model.Afko;
import com.huazheng.project.hana.model.Afpo;
import com.huazheng.project.hana.model.Aufk;
import com.huazheng.project.hana.model.Bkpf;
import com.huazheng.project.hana.model.Kna1;
import com.huazheng.project.hana.model.Likp;
import com.huazheng.project.hana.model.Lips;
import com.huazheng.project.hana.model.Pa0002;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.thread.ThreadUtil;
......@@ -30,24 +32,11 @@ public class CheckUpdateServiceImpl {
@Autowired
private RedisTemplate<String, String> redis1Template;
@Autowired
private GPMapper gpMapper;
@Autowired
private SapMapper sapMapper;
public String getErrorInfoFromException(Exception e) {
try {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) {
return "bad getErrorInfoFromException";
}
}
public void selectAufkCheckByUpdate() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......@@ -92,6 +81,7 @@ public class CheckUpdateServiceImpl {
while (true) {
try {
gpMapper.updateAufk(source); // 更新数据到数仓中
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
......@@ -118,10 +108,9 @@ public class CheckUpdateServiceImpl {
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkUpdateError:Aufk:rowids", getErrorInfoFromException(e));
redis1Template.opsForValue().set("huazheng:checkUpdateError:Aufk:rowids", SomeUtils.getErrorInfoFromException(e));
}
}
// 所属selectAufkCheckByUpdate的级联
private void cascadeAfkoCheckByUpdate(Afko source, Afko target) {
if (target != null) { // 目标库有数据
......@@ -153,7 +142,6 @@ public class CheckUpdateServiceImpl {
}
}
}
// 所属selectAufkCheckByUpdate的级联
private void cascadeAfpoCheckByUpdate(Afpo source, Afpo target) {
if (target != null) {
......@@ -175,7 +163,6 @@ public class CheckUpdateServiceImpl {
}
}
}
public void selectBkpfCheckByUpdate() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......@@ -220,6 +207,7 @@ public class CheckUpdateServiceImpl {
while (true) {
try {
gpMapper.updateBkpf(source); // 更新数据到数仓中
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
......@@ -234,10 +222,9 @@ public class CheckUpdateServiceImpl {
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkUpdateError:Bkpf:rowids", getErrorInfoFromException(e));
redis1Template.opsForValue().set("huazheng:checkUpdateError:Bkpf:rowids", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectKna1CheckByUpdate() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......@@ -274,6 +261,7 @@ public class CheckUpdateServiceImpl {
while (true) {
try {
gpMapper.updateKna1(source); // 更新数据到数仓中
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
......@@ -289,9 +277,173 @@ public class CheckUpdateServiceImpl {
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkUpdateError:Kna1:rowids", getErrorInfoFromException(e));
redis1Template.opsForValue().set("huazheng:checkUpdateError:Kna1:rowids", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectLikpCheckUpdate() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkUpdate:Likp:rowids", "0");
Long rowids = Long.parseLong(opsForValue.get("huazheng:checkUpdate:Likp:rowids"));
Likp build = Likp.builder().rowids(rowids).build();
List<Likp> slist = sapMapper.selectLikpCheckByUpdate(build); // 从数仓中查询一组数据
if (slist.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
redis1Template.opsForValue().set("huazheng:checkUpdate:Likp:rowids", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
slist.forEach(source -> { // 遍历要检查的数据
Likp target = gpMapper.selectLikp(source); // 根据主键查询源库中的数据
String operator = "none";
Long srowids = source.getRowids();
if (target != null) {
source.setRowids(null);
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
String wadat_ist = source.getWadat_ist();
String lfuhr = source.getLfuhr();
if ("00000000".equals(wadat_ist)) {
source.setWadat_ist2lfuhr(null);
} else {
Date date = DateUtil.parse(wadat_ist+lfuhr, "yyyyMMddHHmmss");
source.setWadat_ist2lfuhr(date);
}
source.setErdat1(SomeUtils.caDate(source.getErdat())); // 日期00000000格式转换,已处理异常
source.setWadat_ist1(SomeUtils.caDate(source.getWadat_ist())); // 日期00000000格式转换,已处理异常
// ===============================
while (true) {
try {
gpMapper.updateLikp(source); // 更新数据到数仓中
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:checkUpdate:Likp:rowids", srowids.toString());
if (!operator.equals("none")) {
log.info(String.format("selectLikpcheckUpdate --> rowNum:%s, operator:%s", srowids.toString(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkUpdateError:Likp:rowids", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectLipsCheckUpdate() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkUpdate:Lips:rowids", "0");
Long rowids = Long.parseLong(opsForValue.get("huazheng:checkUpdate:Lips:rowids"));
Lips build = Lips.builder().rowids(rowids).build();
List<Lips> slist = sapMapper.selectLipsCheckByUpdate(build); // 从数仓中查询一组数据
if (slist.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
redis1Template.opsForValue().set("huazheng:checkUpdate:Lips:rowids", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
slist.forEach(source -> { // 遍历要检查的数据
Lips target = gpMapper.selectLips(source); // 根据主键查询源库中的数据
String operator = "none";
Long srowids = source.getRowids();
if (target != null) {
source.setRowids(null);
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
if (source.getErdat() != null && source.getErzet() != null) {
String erdat2 = new StringBuffer(source.getErdat()).insert(4, "-").insert(7, "-").toString();
String erzet2 = new StringBuffer(source.getErzet()).insert(2, ":").insert(5, ":").toString();
String dateStr = erdat2 + " " + erzet2;
Date date = DateUtil.parse(dateStr);
source.setErdat1(date);
source.setErdat2(date);
}
// ===============================
while (true) {
try {
gpMapper.updateLips(source); // 更新数据到数仓中
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:checkUpdate:Lips:rowids", srowids.toString());
if (!operator.equals("none")) {
log.info(String.format("selectLipscheckUpdate --> rowids:%s, operator:%s", srowids.toString(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkUpdateError:Lips:rowids", SomeUtils.getErrorInfoFromException(e));
}
}
public void selectPa0002CheckUpdate() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkUpdate:Pa0002:rowids", "0");
Long rowids = Long.parseLong(opsForValue.get("huazheng:checkUpdate:Pa0002:rowids"));
Pa0002 build = Pa0002.builder().rowids(rowids).build();
List<Pa0002> slist = sapMapper.selectPa0002CheckByUpdate(build); // 从数仓中查询一组数据
if (slist.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
redis1Template.opsForValue().set("huazheng:checkUpdate:Pa0002:rowids", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
slist.forEach(source -> { // 遍历要检查的数据
Pa0002 target = gpMapper.selectPa0002(source); // 根据主键查询源库中的数据
String operator = "none";
Long srowids = source.getRowids();
if (target != null) {
source.setRowids(null);
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
if (source.getEndda() != null) {
String erdat2 = new StringBuffer(source.getEndda()).insert(4, "-").insert(7, "-").toString();
Date date = DateUtil.parse(erdat2);
source.setEndda1(date);
}
source.setYeWuYMC(source.getNachn() + source.getVorna());
// ===============================
while (true) {
try {
gpMapper.updatePa0002(source); // 更新数据到数仓中
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:checkUpdate:Pa0002:rowids", srowids.toString());
if (!operator.equals("none")) {
log.info(String.format("selectPa0002checkUpdate --> rowids:%s, operator:%s", srowids.toString(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkUpdateError:Pa0002:rowids", SomeUtils.getErrorInfoFromException(e));
}
}
}
......@@ -28,13 +28,11 @@ import com.huazheng.project.hana.model.Knvp;
import com.huazheng.project.hana.model.Knvv;
import com.huazheng.project.hana.model.Konv;
import com.huazheng.project.hana.model.Likp;
import com.huazheng.project.hana.model.Lips;
import com.huazheng.project.hana.model.Makt;
import com.huazheng.project.hana.model.Mara;
import com.huazheng.project.hana.model.Mkpf;
import com.huazheng.project.hana.model.Mseg;
import com.huazheng.project.hana.model.Mska;
import com.huazheng.project.hana.model.Pa0002;
import com.huazheng.project.hana.model.S066;
import com.huazheng.project.hana.model.S067;
import com.huazheng.project.hana.model.T001w;
......@@ -936,11 +934,9 @@ public class DeleteUpdateJobServiceImpl {
selectKnkkCheck(); // 19
selectKnvvCheck(); // 20
selectLipsCheck(); // 23
selectMaktCheck(); // 24
selectMaraCheck(); // 25
selectMskaCheck(); // 26
selectPa0002Check(); // 27
selectS066Check(); // 28
selectS067Check(); // 29
selectT001wCheck(); // 30
......@@ -1438,232 +1434,11 @@ public class DeleteUpdateJobServiceImpl {
redis1Template.opsForValue().set("huazheng:checkError:Konv:rowNum", getErrorInfoFromException(e));
}
}
// 计数器复位删除2
private void selectLikpCheck() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:check:Likp:rowNum", "0");
String rowNum = opsForValue.get("huazheng:check:Likp:rowNum");
Likp build = Likp.builder().rowNum(rowNum).build();
List<Likp> list = gpMapper.selectLikpCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
redis1Template.opsForValue().set("huazheng:check:Likp:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Likp source = sapMapper.selectLikpById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteLikp(target); // 删除数仓中的数据
operator = "delete";
} else { // 源库中有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
String wadat_ist = source.getWadat_ist();
String lfuhr = source.getLfuhr();
if ("00000000".equals(wadat_ist)) {
source.setWadat_ist2lfuhr(null);
} else {
Date date = DateUtil.parse(wadat_ist+lfuhr, "yyyyMMddHHmmss");
source.setWadat_ist2lfuhr(date);
}
source.setErdat1(caDate(source.getErdat())); // 日期00000000格式转换,已处理异常
source.setWadat_ist1(caDate(source.getWadat_ist())); // 日期00000000格式转换,已处理异常
// ===============================
while (true) {
try {
gpMapper.updateLikp(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:check:Likp:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectLikpCheck --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Likp:rowNum", getErrorInfoFromException(e));
}
}
public void selectLikpCheckUpdate() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkUpdate:Likp:rowids", "0");
Long rowids = Long.parseLong(opsForValue.get("huazheng:checkUpdate:Likp:rowids"));
Likp build = Likp.builder().rowids(rowids).build();
List<Likp> slist = sapMapper.selectLikpCheckByUpdate(build); // 从数仓中查询一组数据
if (slist.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
redis1Template.opsForValue().set("huazheng:checkUpdate:Likp:rowids", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
slist.forEach(source -> { // 遍历要检查的数据
Likp target = gpMapper.selectLikp(source); // 根据主键查询源库中的数据
String operator = "none";
Long srowids = source.getRowids();
if (target != null) {
source.setRowids(null);
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
String wadat_ist = source.getWadat_ist();
String lfuhr = source.getLfuhr();
if ("00000000".equals(wadat_ist)) {
source.setWadat_ist2lfuhr(null);
} else {
Date date = DateUtil.parse(wadat_ist+lfuhr, "yyyyMMddHHmmss");
source.setWadat_ist2lfuhr(date);
}
source.setErdat1(caDate(source.getErdat())); // 日期00000000格式转换,已处理异常
source.setWadat_ist1(caDate(source.getWadat_ist())); // 日期00000000格式转换,已处理异常
// ===============================
while (true) {
try {
gpMapper.updateLikp(source); // 更新数据到数仓中
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:checkUpdate:Likp:rowids", srowids.toString());
if (!operator.equals("none")) {
log.info(String.format("selectLikpCheckUpdate --> rowNum:%s, operator:%s", srowids.toString(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkUpdateError:Likp:rowids", getErrorInfoFromException(e));
}
}
// 计数器复位删除3
private void selectLipsCheck() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:check:Lips:rowNum", "0");
String rowNum = opsForValue.get("huazheng:check:Lips:rowNum");
Lips build = Lips.builder().rowNum(rowNum).build();
List<Lips> list = gpMapper.selectLipsCheck(build); // 从数仓中查询一组数据
if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
redis1Template.opsForValue().set("huazheng:check:Lips:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Lips source = sapMapper.selectLipsById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deleteLips(target); // 删除数仓中的数据
operator = "delete";
} else { // 源库中有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
if (source.getErdat() != null && source.getErzet() != null) {
String erdat2 = new StringBuffer(source.getErdat()).insert(4, "-").insert(7, "-").toString();
String erzet2 = new StringBuffer(source.getErzet()).insert(2, ":").insert(5, ":").toString();
String dateStr = erdat2 + " " + erzet2;
Date date = DateUtil.parse(dateStr);
source.setErdat1(date);
source.setErdat2(date);
}
// ===============================
while (true) {
try {
gpMapper.updateLips(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:check:Lips:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectLipsCheck --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Lips:rowNum", getErrorInfoFromException(e));
}
}
public void selectLipsCheckUpdate() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkUpdate:Lips:rowids", "0");
Long rowids = Long.parseLong(opsForValue.get("huazheng:checkUpdate:Lips:rowids"));
Lips build = Lips.builder().rowids(rowids).build();
List<Lips> slist = sapMapper.selectLipsCheckByUpdate(build); // 从数仓中查询一组数据
if (slist.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
redis1Template.opsForValue().set("huazheng:checkUpdate:Lips:rowids", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
slist.forEach(source -> { // 遍历要检查的数据
Lips target = gpMapper.selectLips(source); // 根据主键查询源库中的数据
String operator = "none";
Long srowids = source.getRowids();
if (target != null) {
source.setRowids(null);
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
if (source.getErdat() != null && source.getErzet() != null) {
String erdat2 = new StringBuffer(source.getErdat()).insert(4, "-").insert(7, "-").toString();
String erzet2 = new StringBuffer(source.getErzet()).insert(2, ":").insert(5, ":").toString();
String dateStr = erdat2 + " " + erzet2;
Date date = DateUtil.parse(dateStr);
source.setErdat1(date);
source.setErdat2(date);
}
// ===============================
while (true) {
try {
gpMapper.updateLips(source); // 更新数据到数仓中
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:checkUpdate:Lips:rowids", srowids.toString());
if (!operator.equals("none")) {
log.info(String.format("selectLipscheckUpdate --> rowids:%s, operator:%s", srowids.toString(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkUpdateError:Lips:rowids", getErrorInfoFromException(e));
}
}
private void selectMaktCheck() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......@@ -1817,113 +1592,29 @@ public class DeleteUpdateJobServiceImpl {
redis1Template.opsForValue().set("huazheng:checkError:Mska:rowNum", getErrorInfoFromException(e));
}
}
// 计数器复位删除
private void selectPa0002Check() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:check:Pa0002:rowNum", "0");
String rowNum = opsForValue.get("huazheng:check:Pa0002:rowNum");
Pa0002 build = Pa0002.builder().rowNum(rowNum).build();
List<Pa0002> list = gpMapper.selectPa0002Check(build); // 从数仓中查询一组数据
if (list.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
redis1Template.opsForValue().set("huazheng:check:Pa0002:rowNum", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
list.forEach(target -> { // 遍历要检查的数据
Pa0002 source = sapMapper.selectPa0002ById(target); // 根据主键查询源库中的数据
String operator = "none";
if (source == null) { // 如果源库中没有数据
gpMapper.deletePa0002(target); // 删除数仓中的数据
operator = "delete";
} else { // 源库中有数据
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
if (source.getEndda() != null) {
String erdat2 = new StringBuffer(source.getEndda()).insert(4, "-").insert(7, "-").toString();
Date date = DateUtil.parse(erdat2);
source.setEndda1(date);
}
source.setYeWuYMC(source.getNachn() + source.getVorna());
// ===============================
while (true) {
try {
gpMapper.updatePa0002(source); // 更新数据到数仓中
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:check:Pa0002:rowNum", target.getRowNum());
if (!operator.equals("none")) {
log.info(String.format("selectPa0002Check --> rowNum:%s, operator:%s", target.getRowNum(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkError:Pa0002:rowNum", getErrorInfoFromException(e));
}
}
public void selectPa0002CheckUpdate() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
opsForValue.setIfAbsent("huazheng:checkUpdate:Pa0002:rowids", "0");
Long rowids = Long.parseLong(opsForValue.get("huazheng:checkUpdate:Pa0002:rowids"));
Pa0002 build = Pa0002.builder().rowids(rowids).build();
List<Pa0002> slist = sapMapper.selectPa0002CheckByUpdate(build); // 从数仓中查询一组数据
if (slist.size() == 0) { // rowid已完成,则转为由更新时间字段开始同步
redis1Template.opsForValue().set("huazheng:checkUpdate:Pa0002:rowids", "0"); // 计数器复位
ThreadUtil.sleep(1000); // 没有数据了,休眠一下
}
slist.forEach(source -> { // 遍历要检查的数据
Pa0002 target = gpMapper.selectPa0002(source); // 根据主键查询源库中的数据
String operator = "none";
Long srowids = source.getRowids();
if (target != null) {
source.setRowids(null);
String shash = SecureUtil.md5(JSONUtil.toJsonStr(source)); // 源库中数据的hash结果
String thash = target.getHashResult(); // 数仓中数据的hash结果
if (!shash.equals(thash)) { // 如果hash结果不一致
source.setHashResult(shash);
// ===============================
if (source.getEndda() != null) {
String erdat2 = new StringBuffer(source.getEndda()).insert(4, "-").insert(7, "-").toString();
Date date = DateUtil.parse(erdat2);
source.setEndda1(date);
}
source.setYeWuYMC(source.getNachn() + source.getVorna());
// ===============================
while (true) {
try {
gpMapper.updatePa0002(source); // 更新数据到数仓中
operator = "update";
break;
} catch (RuntimeException e) {
log.error(e.getMessage());ThreadUtil.safeSleep(500);
}
}
ThreadUtil.safeSleep(500);
}
}
redis1Template.opsForValue().set("huazheng:checkUpdate:Pa0002:rowids", srowids.toString());
if (!operator.equals("none")) {
log.info(String.format("selectPa0002checkUpdate --> rowids:%s, operator:%s", srowids.toString(), operator));
}
});
} catch (Exception e) {
redis1Template.opsForValue().set("huazheng:checkUpdateError:Pa0002:rowids", getErrorInfoFromException(e));
}
}
private void selectS066Check() {
try {
ValueOperations<String, String> opsForValue = redis1Template.opsForValue();
......@@ -3394,7 +3085,6 @@ public class DeleteUpdateJobServiceImpl {
// 执行紧急任务
public void checkBusy() {
selectLikpCheck(); // 22
selectVbakCheck(); // 32
}
......
package com.huazheng.project.greenplum.utils;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Date;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
public class SomeUtils {
public static String getErrorInfoFromException(Exception e) {
try {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) {
return "bad getErrorInfoFromException";
}
}
public static Date caDate(String datestr) {
try {
if (datestr == null || datestr.equals("00000000") || StrUtil.isBlank(datestr)) {
return null;
} else {
Date date = DateUtil.parse(datestr, "yyyyMMdd");
return date;
}
} catch (Exception e) {
return null;
}
}
}
......@@ -329,28 +329,9 @@
where "$rowid$" &gt; #{rowids} and aedat != '00000000' ${hana_mandt}
order by "$rowid$"
</select>
<select id="selectLikpCheckByUpdate" parameterType="Likp" resultType="Likp">
select top 20 "$rowid$" as rowids,
vbeln, mandt, erdat, wadat_ist, ctlpc, kunnr, aedat,lfuhr
from ${hana_user}.likp
where "$rowid$" &gt;#{rowids} and aedat != '00000000' ${hana_mandt}
order by "$rowid$"
</select>
<select id="selectLipsCheckByUpdate" parameterType="Lips" resultType="Lips">
select top 20 "$rowid$" as rowids,
vbeln, posnr, vgbel, vgpos, mandt, matnr, matkl, arktx, werks, lgort,
charg, lfimg, uebto, untto, erdat, erzet, aedat
from ${hana_user}.lips
where "$rowid$" &gt; #{rowids} and aedat != '00000000' ${hana_mandt}
order by "$rowid$"
</select>
<select id="selectPa0002CheckByUpdate" parameterType="Pa0002" resultType="Pa0002">
select top 20 "$rowid$" as rowids,
mandt,pernr,subty,objps,sprps,endda,begda,seqnr,nachn,vorna,aedtm
from ${hana_user}.Pa0002
where "$rowid$" &gt; #{rowids} and aedtm != '00000000' ${hana_mandt}
order by "$rowid$"
</select>
<select id="selectVbakCheckByUpdate" parameterType="Vbak" resultType="Vbak">
select top 20 a."$rowid$" as rowids,
a.vbeln, a.mandt, a.vbtyp, a.audat, a.waerk, a.kalsm, a.ctlpc, a.kunnr, a.bukrs_vf, a.bstnk,
......@@ -430,9 +411,28 @@
where "$rowid$" &gt; #{rowids} and updat != '00000000' and updat = CURRENT_DATE ${hana_mandt}
order by "$rowid$"
</select>
<select id="selectLikpCheckByUpdate" parameterType="Likp" resultType="Likp">
select top 20 "$rowid$" as rowids,
vbeln, mandt, erdat, wadat_ist, ctlpc, kunnr, aedat,lfuhr
from ${hana_user}.likp
where "$rowid$" &gt;#{rowids} and aedat != '00000000' and aedat = CURRENT_DATE ${hana_mandt}
order by "$rowid$"
</select>
<select id="selectLipsCheckByUpdate" parameterType="Lips" resultType="Lips">
select top 20 "$rowid$" as rowids,
vbeln, posnr, vgbel, vgpos, mandt, matnr, matkl, arktx, werks, lgort,
charg, lfimg, uebto, untto, erdat, erzet, aedat
from ${hana_user}.lips
where "$rowid$" &gt; #{rowids} and aedat != '00000000' and aedat = CURRENT_DATE ${hana_mandt}
order by "$rowid$"
</select>
<select id="selectPa0002CheckByUpdate" parameterType="Pa0002" resultType="Pa0002">
select top 20 "$rowid$" as rowids,
mandt,pernr,subty,objps,sprps,endda,begda,seqnr,nachn,vorna,aedtm
from ${hana_user}.Pa0002
where "$rowid$" &gt; #{rowids} and aedtm != '00000000' ${hana_mandt}
order by "$rowid$"
</select>
<select id="selectAfkoById" parameterType="Afko" resultType="Afko">
......
......@@ -705,24 +705,8 @@
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="deleteUpdateJobServiceImpl" />
<property name="targetMethod" value="selectLikpCheckUpdate" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="deleteUpdateJobServiceImpl" />
<property name="targetMethod" value="selectLipsCheckUpdate" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
......@@ -809,6 +793,33 @@
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkUpdateServiceImpl" />
<property name="targetMethod" value="selectLikpCheckUpdate" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkUpdateServiceImpl" />
<property name="targetMethod" value="selectLipsCheckUpdate" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkUpdateServiceImpl" />
<property name="targetMethod" value="selectPa0002CheckUpdate" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<!-- 删除流程 -->
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
......@@ -856,6 +867,33 @@
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkDeleteServiceImpl" />
<property name="targetMethod" value="selectLikpCheckByDelete" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkDeleteServiceImpl" />
<property name="targetMethod" value="selectLipsCheckByDelete" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
<bean class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail">
<bean parent="methodJobDetail">
<property name="targetObject" ref="checkDeleteServiceImpl" />
<property name="targetMethod" value="selectPa0002CheckByDelete" />
</bean>
</property>
<property name="cronExpression" value="* * * * * ?" />
</bean>
</list>
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论