From 3f6f29f4dae83d68d508220178cd6ba9bf523423 Mon Sep 17 00:00:00 2001 From: wangshilong Date: Sun, 22 Dec 2024 15:20:42 +0800 Subject: [PATCH] =?UTF-8?q?BUG=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../data/Redis2DBPersistenceService.java | 87 ++++++++++++------- .../impl/LocalDateTimeDecodeHandler.java | 3 + .../SimpleLocalDateTimeDecodeHandler.java | 9 +- .../data/impl/AbstractPersistenceHandler.java | 3 +- .../data/impl/ScssPersistenceHandler.java | 2 +- .../service/impl/DeviceOptLogServiceImpl.java | 5 +- 6 files changed, 73 insertions(+), 36 deletions(-) diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java b/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java index 33d88fd..b11bd5a 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java @@ -12,10 +12,8 @@ import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.core.ParameterizedTypeReference; -import org.springframework.data.redis.core.Cursor; -import org.springframework.data.redis.core.HashOperations; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.data.redis.core.ScanOptions; +import org.springframework.dao.DataAccessException; +import org.springframework.data.redis.core.*; import org.springframework.http.RequestEntity; import org.springframework.http.ResponseEntity; import org.springframework.jdbc.core.JdbcTemplate; @@ -23,7 +21,10 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -71,7 +72,7 @@ public class Redis2DBPersistenceService { @Scheduled(cron = "20/30 * * * * ? ") public void write() { - Map idGatewayMappingMap = writeOnlineGateway(); + writeOnlineGateway(); HashOperations operations = this.redisTemplate.opsForHash(); try (Cursor cursor = this.redisTemplate.scan(ScanOptions.scanOptions() .match(PersistenceHandler.DEVICE_DATA_CACHE + "*").build())) { @@ -89,7 +90,7 @@ public class Redis2DBPersistenceService { if (Objects.isNull(deviceId)) { continue; } - this.persistenceThreadPool.execute(new Worker(deviceId, cacheKey, operations, deviceMap, idGatewayMappingMap)); + this.persistenceThreadPool.execute(new Worker(deviceId, cacheKey, operations, deviceMap)); } } } @@ -97,16 +98,12 @@ public class Redis2DBPersistenceService { private void persistenceDeviceData(Long deviceId, String cacheKey, HashOperations operations, - Map deviceMap, - Map idGatewayMappingMap) { + Map deviceMap) { String tableName; String queueName; PersistenceHandler persistenceHandler; List> existsTableList; try { - if (Objects.nonNull(idGatewayMappingMap)) { - updateDeviceOnlineStatus(operations, deviceId, String.valueOf(idGatewayMappingMap.containsKey(deviceId))); - } String modbusDeviceProductCode = (String) operations.get( PersistenceHandler.DEVICE_INFO_CACHE + deviceId, "modbus_device_product_code"); if (StringUtils.isEmpty(modbusDeviceProductCode)) { @@ -146,43 +143,72 @@ public class Redis2DBPersistenceService { } } - private Map writeOnlineGateway() { + private void writeOnlineGateway() { try { RequestEntity request = RequestEntity.get("http://localhost:9999/modbus-tcp/online").build(); ResponseEntity>> response = this.restTemplate.exchange(request, new ParameterizedTypeReference>>() { }); if (Objects.isNull(response) || Objects.isNull(response.getBody())) { - return null; + return; } if (ObjectUtils.isEmpty(response.getBody().getData())) { - return Collections.emptyMap(); + allOffline(); + return; } - List> idGatewayMappingList = this.jdbcTemplate.queryForList("select id, gateway_sn" - + " from device where gateway_sn in (" - + response.getBody() + String onlineGatewaySns = response.getBody() .getData() .stream() + .filter(Objects::nonNull) .map(value -> new String(HexUtil.hexStringToBytes(value))) + .filter(StringUtils::isNotBlank) .map(value -> "'" + value + "'") - .collect(Collectors.joining(",")) + ")"); + .collect(Collectors.joining(", ")); + if (StringUtils.isBlank(onlineGatewaySns)) { + onlineGatewaySns = "''"; + } + List> idGatewayMappingList = this.jdbcTemplate.queryForList("SELECT a.id, " + + "IF(b.gateway_sn IS NOT NULL, 'true', 'false') AS status FROM device a LEFT JOIN (" + + "SELECT id, gateway_sn FROM device WHERE gateway_sn IN (" + + onlineGatewaySns + ")) b ON a.id = b.id ORDER BY a.id"); if (ObjectUtils.isEmpty(idGatewayMappingList)) { - return null; + allOffline(); + return; } - Long deviceId; - HashOperations operations = this.redisTemplate.opsForHash(); - Map idGatewayMappingMap = new HashMap(); - for (Map map : idGatewayMappingList) { - idGatewayMappingMap.put(deviceId = MapUtil.getLong(map, "id"), MapUtil.getStr(map, "gateway_sn")); - updateDeviceOnlineStatus(operations, deviceId, String.valueOf(idGatewayMappingMap.containsKey(deviceId))); - } - return idGatewayMappingMap; + this.redisTemplate.executePipelined(new SessionCallback() { + @Override + public Object execute(RedisOperations operations) throws DataAccessException { + String deviceId, status; + HashOperations hashOperations = operations.opsForHash(); + for (Map map : idGatewayMappingList) { + deviceId = MapUtil.getStr(map, "id"); + status = MapUtil.getStr(map, "status"); + hashOperations.put(PersistenceHandler.ONLINE_DEVICE_CACHE, deviceId, status); + hashOperations.put(PersistenceHandler.DEVICE_DATA_CACHE + deviceId, "online", status); + } + return null; + } + }); } catch (Exception e) { log.warn("Get online devices list fail", e); - return null; } } + private void allOffline() { + HashOperations hashOperations = this.redisTemplate.opsForHash(); + Map onlineDeviceStatusMap = hashOperations.entries(PersistenceHandler.ONLINE_DEVICE_CACHE); + this.redisTemplate.executePipelined(new SessionCallback() { + @Override + public Object execute(RedisOperations operations) throws DataAccessException { + onlineDeviceStatusMap.forEach((deviceId, status) -> { + hashOperations.put(PersistenceHandler.ONLINE_DEVICE_CACHE, deviceId, "false"); + hashOperations.put(PersistenceHandler.DEVICE_DATA_CACHE + deviceId, "online", "false"); + }); + return null; + } + }); + } + private void updateDeviceOnlineStatus(HashOperations operations, Long deviceId, String onlineStatus) { operations.put(PersistenceHandler.ONLINE_DEVICE_CACHE, String.valueOf(deviceId), onlineStatus); operations.put(PersistenceHandler.DEVICE_DATA_CACHE + deviceId, "online", onlineStatus); @@ -194,11 +220,10 @@ public class Redis2DBPersistenceService { private final String cacheKey; private final HashOperations operations; private final Map deviceMap; - private final Map idGatewayMappingMap; @Override public void run() { - Redis2DBPersistenceService.this.persistenceDeviceData(deviceId, cacheKey, operations, deviceMap, idGatewayMappingMap); + Redis2DBPersistenceService.this.persistenceDeviceData(deviceId, cacheKey, operations, deviceMap); } } diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/LocalDateTimeDecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/LocalDateTimeDecodeHandler.java index f807564..54d4a07 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/LocalDateTimeDecodeHandler.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/LocalDateTimeDecodeHandler.java @@ -28,6 +28,9 @@ public class LocalDateTimeDecodeHandler implements DecodeHandler { if (StringUtils.isBlank(value)) { return value; } + if (StringUtils.startsWith(value, "0-0-0 ")) { + return ""; + } return LocalDateTime.parse(value, IN_FORMATTER).format(OUT_FORMATTER); } diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/SimpleLocalDateTimeDecodeHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/SimpleLocalDateTimeDecodeHandler.java index 5c961e2..fc08c55 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/SimpleLocalDateTimeDecodeHandler.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/decode/impl/SimpleLocalDateTimeDecodeHandler.java @@ -28,7 +28,14 @@ public class SimpleLocalDateTimeDecodeHandler implements DecodeHandler { if (StringUtils.isBlank(value)) { return value; } - return LocalDateTime.parse(value, IN_FORMATTER).format(OUT_FORMATTER); + if (StringUtils.startsWith(value, "0-0-0 ")) { + return ""; + } + try { + return LocalDateTime.parse(value, IN_FORMATTER).format(OUT_FORMATTER); + } catch (Exception e) { + return ""; + } } @Override diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/impl/AbstractPersistenceHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/AbstractPersistenceHandler.java index 83d64db..44c0f55 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/impl/AbstractPersistenceHandler.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/AbstractPersistenceHandler.java @@ -17,7 +17,6 @@ import java.nio.charset.StandardCharsets; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.Map; -import java.util.Objects; /** * @author 王仕龙 @@ -58,7 +57,7 @@ public abstract class AbstractPersistenceHandler implements PersistenceHandler { protected void setValue(PreparedStatement ps, Map row, int index, String key, int sqlType) throws SQLException { String value = MapUtil.getStr(row, key); - if (Objects.isNull(value)) { + if (StringUtils.isBlank(value)) { ps.setNull(index, sqlType); } else { ps.setObject(index, value); diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/impl/ScssPersistenceHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/ScssPersistenceHandler.java index 79fee62..ccf9f24 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/impl/ScssPersistenceHandler.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/ScssPersistenceHandler.java @@ -135,7 +135,7 @@ public class ScssPersistenceHandler extends AbstractPersistenceHandler { ScssPersistenceHandler.this.setValue(ps, newRow, 12, "prePressure", Types.INTEGER); ScssPersistenceHandler.this.setValue(ps, newRow, 13, "internetTraffic", Types.INTEGER); ScssPersistenceHandler.this.setValue(ps, newRow, 14, "loadFactor", Types.INTEGER); - ScssPersistenceHandler.this.setValue(ps, newRow, 15, "dataTime", Types.TIMESTAMP); + ScssPersistenceHandler.this.setValue(ps, newRow, 15, "dataTime", Types.DATE); ScssPersistenceHandler.this.setValue(ps, newRow, 16, "showDelay", Types.INTEGER); ScssPersistenceHandler.this.setValue(ps, newRow, 17, "openWellSamplingInterval", Types.INTEGER); ScssPersistenceHandler.this.setValue(ps, newRow, 18, "closeWellSamplingInterval", Types.INTEGER); diff --git a/src/main/java/com/isu/gaswellwatch/service/impl/DeviceOptLogServiceImpl.java b/src/main/java/com/isu/gaswellwatch/service/impl/DeviceOptLogServiceImpl.java index 2df0d00..e1c0ad0 100644 --- a/src/main/java/com/isu/gaswellwatch/service/impl/DeviceOptLogServiceImpl.java +++ b/src/main/java/com/isu/gaswellwatch/service/impl/DeviceOptLogServiceImpl.java @@ -48,7 +48,10 @@ public class DeviceOptLogServiceImpl extends ServiceImpl