提交 49c3c231 作者: guofeng

12

上级 afcbbc9e
target/ target/
!.mvn/wrapper/maven-wrapper.jar !.mvn/wrapper/maven-wrapper.jar
### STS ### ### STS ###
.apt_generated .apt_generated
.classpath .classpath
.factorypath .factorypath
.project .project
.settings .settings
.springBeans .springBeans
### IntelliJ IDEA ### ### IntelliJ IDEA ###
.idea .idea
*.iws *.iws
*.iml *.iml
*.ipr *.ipr
### NetBeans ### ### NetBeans ###
nbproject/private/ nbproject/private/
build/ build/
nbbuild/ nbbuild/
dist/ dist/
nbdist/ nbdist/
.nb-gradle/ .nb-gradle/
# ----------------------------------------------+ # ----------------------------------------------+
# OFF 0 | WARN 3 | TRACE 6 | # OFF 0 | WARN 3 | TRACE 6 |
# FATAL 1 | INFO 4 | ALL 7 | # FATAL 1 | INFO 4 | ALL 7 |
# ERROR 2 | DEBUG 5 | | # ERROR 2 | DEBUG 5 | |
# ----------------------------------------------+ # ----------------------------------------------+
log4j.rootLogger=INFO, console log4j.rootLogger=INFO, console
log4j.logger.com.mengdee=INFO log4j.logger.com.mengdee=INFO
# ============================== console ===================================== # ============================== console =====================================
log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Target=System.out log4j.appender.console.Target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %5p %c{1}:%L - %m %n log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %5p %c{1}:%L - %m %n
#log4j.appender.file=org.apache.log4j.DailyRollingFileAppender #log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
#log4j.appender.file.layout=org.apache.log4j.PatternLayout #log4j.appender.file.layout=org.apache.log4j.PatternLayout
#log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %5p %c{1}:%L - %m %n #log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %5p %c{1}:%L - %m %n
#log4j.appender.file.file=/usr/local/spark-app/logs/spark-area-security-manage-data.log #log4j.appender.file.file=/usr/local/spark-app/logs/spark-area-security-manage-data.log
#log4j.appender.file.Threshold=WARN #log4j.appender.file.Threshold=WARN
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<Configuration package="log4j.test" <Configuration package="log4j.test"
status="WARN"> status="WARN">
<Appenders> <Appenders>
<Console name="Console" target="SYSTEM_OUT"> <Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/> <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console> </Console>
</Appenders> </Appenders>
<Loggers> <Loggers>
<Logger name="log4j.test.Log4jTest" level="trace"> <Logger name="log4j.test.Log4jTest" level="trace">
<AppenderRef ref="Console"/> <AppenderRef ref="Console"/>
</Logger> </Logger>
<Root level="info"> <Root level="info">
<AppenderRef ref="Console"/> <AppenderRef ref="Console"/>
</Root> </Root>
</Loggers> </Loggers>
</Configuration> </Configuration>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" <beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<context:component-scan base-package="com.huazheng.project.stream.service" /> <context:component-scan base-package="com.huazheng.project.stream.service" />
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations"> <property name="locations">
<list> <list>
<value>classpath:application.properties</value> <value>classpath:application.properties</value>
</list> </list>
</property> </property>
</bean> </bean>
<bean id="proper" class="org.springframework.beans.factory.config.PropertiesFactoryBean"> <bean id="proper" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
<property name="locations"> <property name="locations">
<array> <array>
<value>classpath:application.properties</value> <value>classpath:application.properties</value>
</array> </array>
</property> </property>
</bean> </bean>
<!-- 定义连接池 --> <!-- 定义连接池 -->
<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig"> <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
<property name="maxTotal" value="${redis.maxTotal}" /> <property name="maxTotal" value="${redis.maxTotal}" />
<property name="maxIdle" value="${redis.maxIdle}" /> <property name="maxIdle" value="${redis.maxIdle}" />
<property name="MaxWaitMillis" value="${redis.maxWaitMillis}" /> <property name="MaxWaitMillis" value="${redis.maxWaitMillis}" />
<property name="testOnBorrow" value="true" /> <property name="testOnBorrow" value="true" />
</bean> </bean>
<!-- 定义连接工厂 --> <!-- 定义连接工厂 -->
<bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"> <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
<property name="hostName" value="${redis.hostName}" /> <property name="hostName" value="${redis.hostName}" />
<property name="port" value="${redis.port}" /> <property name="port" value="${redis.port}" />
<property name="database" value="${redis.database}" /> <property name="database" value="${redis.database}" />
<property name="usePool" value="true" /> <property name="usePool" value="true" />
<property name="poolConfig" ref="jedisPoolConfig" /> <property name="poolConfig" ref="jedisPoolConfig" />
</bean> </bean>
<!-- 定义模板 --> <!-- 定义模板 -->
<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"> <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="jedisConnectionFactory" /> <property name="connectionFactory" ref="jedisConnectionFactory" />
<property name="keySerializer"> <property name="keySerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer" /> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer" />
</property> </property>
<property name="valueSerializer"> <property name="valueSerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer" /> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer" />
</property> </property>
<property name="hashKeySerializer"> <property name="hashKeySerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer" /> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer" />
</property> </property>
<property name="hashValueSerializer"> <property name="hashValueSerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer" /> <bean class="org.springframework.data.redis.serializer.StringRedisSerializer" />
</property> </property>
</bean> </bean>
</beans> </beans>
package com.huazheng.project.greenplum.flatmap; package com.huazheng.project.greenplum.flatmap;
import java.io.Serializable; import java.io.Serializable;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.springframework.beans.BeansException; import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.greenplum.service.impl.GPServiceImpl; import com.huazheng.project.greenplum.service.impl.GPServiceImpl;
import cn.hutool.core.util.ReflectUtil; import cn.hutool.core.util.ReflectUtil;
@Service @Service
public class GreenPlumFlatMapFunction<T, O> implements FlatMapFunction<T, O>, ApplicationContextAware, Serializable { public class GreenPlumFlatMapFunction<T, O> implements FlatMapFunction<T, O>, ApplicationContextAware, Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private static GPServiceImpl gpserviceImpl; private static GPServiceImpl gpserviceImpl;
@Override @Override
public void flatMap(T value, Collector<O> out) throws Exception { public void flatMap(T value, Collector<O> out) throws Exception {
String simpleName = value.getClass().getSimpleName(); String simpleName = value.getClass().getSimpleName();
ReflectUtil.invoke(gpserviceImpl, "process" + simpleName, value, out); ReflectUtil.invoke(gpserviceImpl, "process" + simpleName, value, out);
} }
@Override @Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
gpserviceImpl = (GPServiceImpl) applicationContext.getBean("GPServiceImpl"); gpserviceImpl = (GPServiceImpl) applicationContext.getBean("GPServiceImpl");
} }
} }
package com.huazheng.project.greenplum.richsink; package com.huazheng.project.greenplum.richsink;
import java.io.Serializable; import java.io.Serializable;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.springframework.beans.BeansException; import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.greenplum.service.impl.GPServiceImpl; import com.huazheng.project.greenplum.service.impl.GPServiceImpl;
import cn.hutool.core.util.ReflectUtil; import cn.hutool.core.util.ReflectUtil;
@Service @Service
public class GreenPlumRichSinkFunction<T> extends RichSinkFunction<T> implements ApplicationContextAware, Serializable { public class GreenPlumRichSinkFunction<T> extends RichSinkFunction<T> implements ApplicationContextAware, Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private static GPServiceImpl gpserviceImpl; private static GPServiceImpl gpserviceImpl;
@Override @Override
public void invoke(T value) throws Exception { public void invoke(T value) throws Exception {
String simpleName = value.getClass().getSimpleName(); String simpleName = value.getClass().getSimpleName();
ReflectUtil.invoke(gpserviceImpl, "sink" + simpleName, value); ReflectUtil.invoke(gpserviceImpl, "sink" + simpleName, value);
} }
@Override @Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
gpserviceImpl = (GPServiceImpl) applicationContext.getBean("GPServiceImpl"); gpserviceImpl = (GPServiceImpl) applicationContext.getBean("GPServiceImpl");
} }
} }
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class AfkoSource implements SourceFunction<String> { public class AfkoSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Afko:sendcount", "huazheng:Afko:id", "huazheng:list:Afko", "huazheng:Afko:receivecount"); List<String> keys = Arrays.asList("huazheng:Afko:sendcount", "huazheng:Afko:id", "huazheng:list:Afko", "huazheng:Afko:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Afko:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:Afko:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class AfpoSource implements SourceFunction<String> { public class AfpoSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Afpo:sendcount", "huazheng:Afpo:id", "huazheng:list:Afpo", "huazheng:Afpo:receivecount"); List<String> keys = Arrays.asList("huazheng:Afpo:sendcount", "huazheng:Afpo:id", "huazheng:list:Afpo", "huazheng:Afpo:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Afpo:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:Afpo:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class AufkSource implements SourceFunction<String> { public class AufkSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Aufk:sendcount", "huazheng:Aufk:id", "huazheng:list:Aufk", "huazheng:Aufk:receivecount"); List<String> keys = Arrays.asList("huazheng:Aufk:sendcount", "huazheng:Aufk:id", "huazheng:list:Aufk", "huazheng:Aufk:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Aufk:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:Aufk:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class AufmSource implements SourceFunction<String> { public class AufmSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Aufm:sendcount", "huazheng:Aufm:id", "huazheng:list:Aufm", "huazheng:Aufm:receivecount"); List<String> keys = Arrays.asList("huazheng:Aufm:sendcount", "huazheng:Aufm:id", "huazheng:list:Aufm", "huazheng:Aufm:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Aufm:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:Aufm:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class BsadSource implements SourceFunction<String> { public class BsadSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:bsad:sendcount", "huazheng:bsad:id", "huazheng:list:bsad", "huazheng:bsad:receivecount"); List<String> keys = Arrays.asList("huazheng:bsad:sendcount", "huazheng:bsad:id", "huazheng:list:bsad", "huazheng:bsad:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:bsad:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:bsad:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class BsidSource implements SourceFunction<String> { public class BsidSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:bsid:sendcount", "huazheng:bsid:id", "huazheng:list:bsid", "huazheng:bsid:receivecount"); List<String> keys = Arrays.asList("huazheng:bsid:sendcount", "huazheng:bsid:id", "huazheng:list:bsid", "huazheng:bsid:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:bsid:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:bsid:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class Kna1Source implements SourceFunction<String> { public class Kna1Source implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Kna1:sendcount", "huazheng:Kna1:id", "huazheng:list:Kna1", "huazheng:Kna1:receivecount"); List<String> keys = Arrays.asList("huazheng:Kna1:sendcount", "huazheng:Kna1:id", "huazheng:list:Kna1", "huazheng:Kna1:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Kna1:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:Kna1:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class KnkkSource implements SourceFunction<String> { public class KnkkSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Knkk:sendcount", "huazheng:Knkk:id", "huazheng:list:Knkk", "huazheng:Knkk:receivecount"); List<String> keys = Arrays.asList("huazheng:Knkk:sendcount", "huazheng:Knkk:id", "huazheng:list:Knkk", "huazheng:Knkk:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Knkk:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:Knkk:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class KnvvSource implements SourceFunction<String> { public class KnvvSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Knvv:sendcount", "huazheng:Knvv:id", "huazheng:list:Knvv", "huazheng:Knvv:receivecount"); List<String> keys = Arrays.asList("huazheng:Knvv:sendcount", "huazheng:Knvv:id", "huazheng:list:Knvv", "huazheng:Knvv:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Knvv:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:Knvv:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class KonvSource implements SourceFunction<String> { public class KonvSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Konv:sendcount", "huazheng:Konv:id", "huazheng:list:Konv", "huazheng:Konv:receivecount"); List<String> keys = Arrays.asList("huazheng:Konv:sendcount", "huazheng:Konv:id", "huazheng:list:Konv", "huazheng:Konv:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Konv:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:Konv:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class LikpSource implements SourceFunction<String> { public class LikpSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Likp:sendcount", "huazheng:Likp:id", "huazheng:list:Likp", "huazheng:Likp:receivecount"); List<String> keys = Arrays.asList("huazheng:Likp:sendcount", "huazheng:Likp:id", "huazheng:list:Likp", "huazheng:Likp:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Likp:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:Likp:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class LipsSource implements SourceFunction<String> { public class LipsSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Lips:sendcount", "huazheng:Lips:id", "huazheng:list:Lips", "huazheng:Lips:receivecount"); List<String> keys = Arrays.asList("huazheng:Lips:sendcount", "huazheng:Lips:id", "huazheng:list:Lips", "huazheng:Lips:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Lips:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:Lips:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class MaktSource implements SourceFunction<String> { public class MaktSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Makt:sendcount", "huazheng:Makt:id", "huazheng:list:Makt", "huazheng:Makt:receivecount"); List<String> keys = Arrays.asList("huazheng:Makt:sendcount", "huazheng:Makt:id", "huazheng:list:Makt", "huazheng:Makt:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Makt:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:Makt:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class MaraSource implements SourceFunction<String> { public class MaraSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Mara:sendcount", "huazheng:Mara:id", "huazheng:list:Mara", "huazheng:Mara:receivecount"); List<String> keys = Arrays.asList("huazheng:Mara:sendcount", "huazheng:Mara:id", "huazheng:list:Mara", "huazheng:Mara:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Mara:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:Mara:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class MskaSource implements SourceFunction<String> { public class MskaSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Mska:sendcount", "huazheng:Mska:id", "huazheng:list:Mska", "huazheng:Mska:receivecount"); List<String> keys = Arrays.asList("huazheng:Mska:sendcount", "huazheng:Mska:id", "huazheng:list:Mska", "huazheng:Mska:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Mska:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:Mska:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class Pa0002Source implements SourceFunction<String> { public class Pa0002Source implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Pa0002:sendcount", "huazheng:Pa0002:id", "huazheng:list:Pa0002", "huazheng:Pa0002:receivecount"); List<String> keys = Arrays.asList("huazheng:Pa0002:sendcount", "huazheng:Pa0002:id", "huazheng:list:Pa0002", "huazheng:Pa0002:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Pa0002:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:Pa0002:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class S066Source implements SourceFunction<String> { public class S066Source implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:S066:sendcount", "huazheng:S066:id", "huazheng:list:S066", "huazheng:S066:receivecount"); List<String> keys = Arrays.asList("huazheng:S066:sendcount", "huazheng:S066:id", "huazheng:list:S066", "huazheng:S066:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:S066:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:S066:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class S067Source implements SourceFunction<String> { public class S067Source implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:S067:sendcount", "huazheng:S067:id", "huazheng:list:S067", "huazheng:S067:receivecount"); List<String> keys = Arrays.asList("huazheng:S067:sendcount", "huazheng:S067:id", "huazheng:list:S067", "huazheng:S067:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:S067:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:S067:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class T001wSource implements SourceFunction<String> { public class T001wSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:T001w:sendcount", "huazheng:T001w:id", "huazheng:list:T001w", "huazheng:T001w:receivecount"); List<String> keys = Arrays.asList("huazheng:T001w:sendcount", "huazheng:T001w:id", "huazheng:list:T001w", "huazheng:T001w:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:T001w:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:T001w:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class T023tSource implements SourceFunction<String> { public class T023tSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:T023t:sendcount", "huazheng:T023t:id", "huazheng:list:T023t", "huazheng:T023t:receivecount"); List<String> keys = Arrays.asList("huazheng:T023t:sendcount", "huazheng:T023t:id", "huazheng:list:T023t", "huazheng:T023t:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:T023t:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:T023t:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class TvkbtSource implements SourceFunction<String> { public class TvkbtSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Tvkbt:sendcount", "huazheng:Tvkbt:id", "huazheng:list:Tvkbt", "huazheng:Tvkbt:receivecount"); List<String> keys = Arrays.asList("huazheng:Tvkbt:sendcount", "huazheng:Tvkbt:id", "huazheng:list:Tvkbt", "huazheng:Tvkbt:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Tvkbt:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:Tvkbt:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import com.huazheng.project.hana.model.Vbak; import com.huazheng.project.hana.model.Vbak;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class VbakSource implements SourceFunction<Vbak> { public class VbakSource implements SourceFunction<Vbak> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<Vbak> ctx) throws Exception { public void run(SourceContext<Vbak> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Vbak:sendcount", "huazheng:Vbak:id", "huazheng:list:Vbak", "huazheng:Vbak:receivecount"); List<String> keys = Arrays.asList("huazheng:Vbak:sendcount", "huazheng:Vbak:id", "huazheng:list:Vbak", "huazheng:Vbak:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
Vbak data = JSONUtil.toBean(values[1], Vbak.class); Vbak data = JSONUtil.toBean(values[1], Vbak.class);
ctx.collect(data); ctx.collect(data);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Vbak:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:Vbak:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class VbapSource implements SourceFunction<String> { public class VbapSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:vbap:sendcount", "huazheng:vbap:flagDate", "huazheng:list:vbap", "huazheng:vbap:receivecount"); List<String> keys = Arrays.asList("huazheng:vbap:sendcount", "huazheng:vbap:flagDate", "huazheng:list:vbap", "huazheng:vbap:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:vbap:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:vbap:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class VbepSource implements SourceFunction<String> { public class VbepSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Vbep:sendcount", "huazheng:Vbep:id", "huazheng:list:Vbep", "huazheng:Vbep:receivecount"); List<String> keys = Arrays.asList("huazheng:Vbep:sendcount", "huazheng:Vbep:id", "huazheng:list:Vbep", "huazheng:Vbep:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Vbep:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:Vbep:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class VbpaSource implements SourceFunction<String> { public class VbpaSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Vbpa:sendcount", "huazheng:Vbpa:id", "huazheng:list:Vbpa", "huazheng:Vbpa:receivecount"); List<String> keys = Arrays.asList("huazheng:Vbpa:sendcount", "huazheng:Vbpa:id", "huazheng:list:Vbpa", "huazheng:Vbpa:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Vbpa:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:Vbpa:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class VbrkSource implements SourceFunction<String> { public class VbrkSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Vbrk:sendcount", "huazheng:Vbrk:id", "huazheng:list:Vbrk", "huazheng:Vbrk:receivecount"); List<String> keys = Arrays.asList("huazheng:Vbrk:sendcount", "huazheng:Vbrk:id", "huazheng:list:Vbrk", "huazheng:Vbrk:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Vbrk:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:Vbrk:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class VbrpSource implements SourceFunction<String> { public class VbrpSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Vbrp:sendcount", "huazheng:Vbrp:id", "huazheng:list:Vbrp", "huazheng:Vbrp:receivecount"); List<String> keys = Arrays.asList("huazheng:Vbrp:sendcount", "huazheng:Vbrp:id", "huazheng:list:Vbrp", "huazheng:Vbrp:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Vbrp:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:Vbrp:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class VbukSource implements SourceFunction<String> { public class VbukSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Vbuk:sendcount", "huazheng:Vbuk:id", "huazheng:list:Vbuk", "huazheng:Vbuk:receivecount"); List<String> keys = Arrays.asList("huazheng:Vbuk:sendcount", "huazheng:Vbuk:id", "huazheng:list:Vbuk", "huazheng:Vbuk:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Vbuk:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:Vbuk:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class ZmdpcSource implements SourceFunction<String> { public class ZmdpcSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Zmdpc:sendcount", "huazheng:Zmdpc:id", "huazheng:list:Zmdpc", "huazheng:Zmdpc:receivecount"); List<String> keys = Arrays.asList("huazheng:Zmdpc:sendcount", "huazheng:Zmdpc:id", "huazheng:list:Zmdpc", "huazheng:Zmdpc:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Zmdpc:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:Zmdpc:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class Zsd06Source implements SourceFunction<String> { public class Zsd06Source implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Zsd06:sendcount", "huazheng:Zsd06:id", "huazheng:list:Zsd06", "huazheng:Zsd06:receivecount"); List<String> keys = Arrays.asList("huazheng:Zsd06:sendcount", "huazheng:Zsd06:id", "huazheng:list:Zsd06", "huazheng:Zsd06:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Zsd06:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:Zsd06:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.hana; package com.huazheng.project.greenplum.source.hana;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import com.huazheng.project.hana.model.Zsdfhzl; import com.huazheng.project.hana.model.Zsdfhzl;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class ZsdfhzlSource implements SourceFunction<Zsdfhzl> { public class ZsdfhzlSource implements SourceFunction<Zsdfhzl> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<Zsdfhzl> ctx) throws Exception { public void run(SourceContext<Zsdfhzl> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:Zsdfhzl:sendcount", "huazheng:Zsdfhzl:id", "huazheng:list:Zsdfhzl", "huazheng:Zsdfhzl:receivecount"); List<String> keys = Arrays.asList("huazheng:Zsdfhzl:sendcount", "huazheng:Zsdfhzl:id", "huazheng:list:Zsdfhzl", "huazheng:Zsdfhzl:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
Zsdfhzl data = JSONUtil.toBean(values[1], Zsdfhzl.class); Zsdfhzl data = JSONUtil.toBean(values[1], Zsdfhzl.class);
ctx.collect(data); ctx.collect(data);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:Zsdfhzl:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:Zsdfhzl:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
package com.huazheng.project.greenplum.source.mssql; package com.huazheng.project.greenplum.source.mssql;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.huazheng.project.HZDataStream; import com.huazheng.project.HZDataStream;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@Log4j2 @Log4j2
@Service @Service
public class SalesContractProcessSource implements SourceFunction<String> { public class SalesContractProcessSource implements SourceFunction<String> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public String getErrorInfoFromException(Exception e) { public String getErrorInfoFromException(Exception e) {
try { try {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw); e.printStackTrace(pw);
return "\r\n" + sw.toString() + "\r\n"; return "\r\n" + sw.toString() + "\r\n";
} catch (Exception e2) { } catch (Exception e2) {
return "bad getErrorInfoFromException"; return "bad getErrorInfoFromException";
} }
} }
@Override @Override
public void run(SourceContext<String> ctx) throws Exception { public void run(SourceContext<String> ctx) throws Exception {
DefaultRedisScript<String> script = new DefaultRedisScript<String>(); DefaultRedisScript<String> script = new DefaultRedisScript<String>();
script.setResultType(String.class); script.setResultType(String.class);
script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua"))); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("luascript/vbap.lua")));
List<String> keys = Arrays.asList("huazheng:SalesContractProcess:sendcount", "huazheng:SalesContractProcess:id", "huazheng:list:SalesContractProcess", "huazheng:SalesContractProcess:receivecount"); List<String> keys = Arrays.asList("huazheng:SalesContractProcess:sendcount", "huazheng:SalesContractProcess:id", "huazheng:list:SalesContractProcess", "huazheng:SalesContractProcess:receivecount");
while (true) { while (true) {
try { try {
String value = HZDataStream.redis1Template.execute(script, keys, ""); String value = HZDataStream.redis1Template.execute(script, keys, "");
String[] values = value.toString().split("=========="); String[] values = value.toString().split("==========");
String checkString = values[0]; String checkString = values[0];
String[] split = checkString.split(", "); String[] split = checkString.split(", ");
boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]); boolean check = split[0].split(":")[1].equals(split[1].split(":")[1]);
if (values.length > 1) { // 有数据字符串 if (values.length > 1) { // 有数据字符串
// log.info(msg + " " + value.toString() + " " + check); // log.info(msg + " " + value.toString() + " " + check);
log.info(checkString + " " + check); log.info(checkString + " " + check);
ctx.collect(values[1]); ctx.collect(values[1]);
} else { // 没有数据字符串 } else { // 没有数据字符串
ThreadUtil.sleep(1000); // 没有数据了,休眠一下 ThreadUtil.sleep(1000); // 没有数据了,休眠一下
} }
} catch (Exception e) { } catch (Exception e) {
HZDataStream.redis1Template.opsForHash().put("huazheng:SalesContractProcess:error", "receivecount_elseerror", getErrorInfoFromException(e)); HZDataStream.redis1Template.opsForHash().put("huazheng:SalesContractProcess:error", "receivecount_elseerror", getErrorInfoFromException(e));
} }
} }
} }
@Override @Override
public void cancel() { public void cancel() {
} }
} }
This source diff could not be displayed because it is too large. You can view the blob instead.
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论