From c0e73fccfc9396d99407b21d49fd866d79b5065f Mon Sep 17 00:00:00 2001 From: wangshilong Date: Tue, 10 Dec 2024 23:06:28 +0800 Subject: [PATCH] =?UTF-8?q?ETC=E6=96=B0=E5=A2=9E=E7=82=B9=E8=A1=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../modbus/data/CacheService.java | 1 - .../data/Redis2DBPersistenceService.java | 11 ++-- .../data/impl/EtcPersistenceHandler.java | 60 +++++++++++++++---- .../data/impl/ScssPersistenceHandler.java | 2 +- .../ModbusMessagePersistListener.java | 2 +- .../vo/command/scss/ControlMode.java | 2 +- src/main/resources/sql/CREATE_ETC.sql | 47 ++++++++++----- src/main/resources/sql/CREATE_SCSS.sql | 2 +- src/main/resources/sql/INSERT_ETC.sql | 29 ++++++++- 9 files changed, 117 insertions(+), 39 deletions(-) diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/CacheService.java b/src/main/java/com/isu/gaswellwatch/modbus/data/CacheService.java index d6993a6..2450931 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/CacheService.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/CacheService.java @@ -27,7 +27,6 @@ public class CacheService { public String execute(RedisOperations operations) throws DataAccessException { operations.delete(PersistenceHandler.DEVICE_INFO_CACHE + deviceId); operations.delete(PersistenceHandler.DEVICE_DATA_CACHE + deviceId); - operations.opsForHash().delete(PersistenceHandler.ONLINE_DEVICE_CACHE, deviceId); return null; } }); diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java b/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java index f20b2fc..1df1416 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/Redis2DBPersistenceService.java @@ -72,9 +72,6 @@ public class Redis2DBPersistenceService { if (Objects.isNull(deviceId)) { continue; } - if (!this.redisTemplate.hasKey(PersistenceHandler.DEVICE_INFO_CACHE + deviceId)) { - continue; - } if (Objects.nonNull(idGatewayMappingMap)) { operations.put(PersistenceHandler.DEVICE_DATA_CACHE + deviceId, "online", String.valueOf(idGatewayMappingMap.containsKey(deviceId))); @@ -84,14 +81,18 @@ public class Redis2DBPersistenceService { String modbusDeviceProductCode = (String) operations.get( PersistenceHandler.DEVICE_INFO_CACHE + deviceId, "modbus_device_product_code"); if (StringUtils.isEmpty(modbusDeviceProductCode)) { - Map deviceInfo = this.jdbcTemplate.queryForMap(DEVICE_INFO_SQL + deviceId); - if (ObjectUtils.isNotEmpty(deviceInfo)) { + List> deviceInfoList = this.jdbcTemplate.queryForList(DEVICE_INFO_SQL + deviceId); + if (ObjectUtils.isNotEmpty(deviceInfoList)) { + Map deviceInfo = deviceInfoList.get(0); Map cacheDeviceInfo = new HashMap<>(deviceInfo.size()); deviceInfo.forEach((key, value) -> cacheDeviceInfo.put(key, Objects.isNull(value) ? null : String.valueOf(value))); modbusDeviceProductCode = (String) cacheDeviceInfo.get("modbus_device_product_code"); operations.putAll(PersistenceHandler.DEVICE_INFO_CACHE + deviceId, cacheDeviceInfo); } } + if (!this.redisTemplate.hasKey(PersistenceHandler.DEVICE_INFO_CACHE + deviceId)) { + continue; + } tableName = DEFAULT_DATA_TABLE + deviceId; queueName = Queues.getDeviceEventsQueue(deviceId); persistenceHandler = persistenceHandlerMap.get(modbusDeviceProductCode); diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/impl/EtcPersistenceHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/EtcPersistenceHandler.java index 58e93e9..8650225 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/impl/EtcPersistenceHandler.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/EtcPersistenceHandler.java @@ -8,6 +8,7 @@ import org.springframework.stereotype.Component; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Types; +import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -18,17 +19,37 @@ import java.util.Objects; @Component(PersistenceHandler.ETC_MODBUS_TYPE) public class EtcPersistenceHandler extends AbstractPersistenceHandler { - private static final Map ETC_COLUMN_MAPPING_MAP = Map.of( - "solenoid_valve_status", "solenoidValveStatus", - "controller_current_status", "controllerCurrentStatus", - "current_status_remaining_time", "currentStatusRemainingTime", - "current_status_start_time", "currentStatusStartTime", - "cas_pressure", "casPressure", - "oil_pressure", "oilPressure", - "plunger_rise_time", "plungerRiseTime", - "arrival_sensor_delay_time", "arrivalSensorDelayTime", - "well_shut_in_time", "wellShutInTime", - "after_flow_time", "afterFlowTime"); + private static final Map ETC_COLUMN_MAPPING_MAP = new HashMap<>(); + + static { + ETC_COLUMN_MAPPING_MAP.put("solenoid_valve_status", "solenoidValveStatus"); + ETC_COLUMN_MAPPING_MAP.put("controller_current_status", "controllerCurrentStatus"); + ETC_COLUMN_MAPPING_MAP.put("current_status_remaining_time", "currentStatusRemainingTime"); + ETC_COLUMN_MAPPING_MAP.put("current_status_start_time", "currentStatusStartTime"); + ETC_COLUMN_MAPPING_MAP.put("cas_pressure", "casPressure"); + ETC_COLUMN_MAPPING_MAP.put("oil_pressure", "oilPressure"); + ETC_COLUMN_MAPPING_MAP.put("plunger_rise_time", "plungerRiseTime"); + ETC_COLUMN_MAPPING_MAP.put("arrival_sensor_delay_time", "arrivalSensorDelayTime"); + ETC_COLUMN_MAPPING_MAP.put("well_shut_in_time", "wellShutInTime"); + ETC_COLUMN_MAPPING_MAP.put("after_flow_time", "afterFlowTime"); + ETC_COLUMN_MAPPING_MAP.put("min_close_well_time", "minCloseWellTime"); + ETC_COLUMN_MAPPING_MAP.put("min_open_well_time", "minOpenWellTime"); + ETC_COLUMN_MAPPING_MAP.put("max_open_well_time", "maxOpenWellTime"); + ETC_COLUMN_MAPPING_MAP.put("max_close_well_time", "maxCloseWellTime"); + ETC_COLUMN_MAPPING_MAP.put("cas_pressure_switch", "casPressureSwitch"); + ETC_COLUMN_MAPPING_MAP.put("cas_pressure_sensor_range", "casPressureSensorRange"); + ETC_COLUMN_MAPPING_MAP.put("open_cas_pressure", "openCasPressure"); + ETC_COLUMN_MAPPING_MAP.put("open_reset_cas_pressure", "openResetCasPressure"); + ETC_COLUMN_MAPPING_MAP.put("open_cas_pressure_stable_time", "openCasPressureStableTime"); + ETC_COLUMN_MAPPING_MAP.put("close_cas_pressure", "closeCasPressure"); + ETC_COLUMN_MAPPING_MAP.put("close_trip_cas_pressure", "closeTripCasPressure"); + ETC_COLUMN_MAPPING_MAP.put("close_cas_pressure_stable_time", "closeCasPressureStableTime"); + ETC_COLUMN_MAPPING_MAP.put("tub_pressure_switch", "tubPressureSwitch"); + ETC_COLUMN_MAPPING_MAP.put("tub_pressure_sensor_range", "tubPressureSensorRange"); + ETC_COLUMN_MAPPING_MAP.put("open_tub_pressure", "openTubPressure"); + ETC_COLUMN_MAPPING_MAP.put("open_reset_tub_pressure", "openResetTubPressure"); + ETC_COLUMN_MAPPING_MAP.put("open_tub_pressure_stable_time", "openTubPressureStableTime"); + } @Override public void createTable(String tableName, Long deviceId) { @@ -60,6 +81,23 @@ public class EtcPersistenceHandler extends AbstractPersistenceHandler { EtcPersistenceHandler.this.setValue(ps, newRow, 12, "arrivalSensorDelayTime", Types.VARCHAR); EtcPersistenceHandler.this.setValue(ps, newRow, 13, "wellShutInTime", Types.VARCHAR); EtcPersistenceHandler.this.setValue(ps, newRow, 14, "afterFlowTime", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, newRow, 15, "minCloseWellTime", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, newRow, 16, "minOpenWellTime", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, newRow, 17, "maxOpenWellTime", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, newRow, 18, "maxCloseWellTime", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, newRow, 19, "casPressureSwitch", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, newRow, 20, "casPressureSensorRange", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, newRow, 21, "openCasPressure", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, newRow, 22, "openResetCasPressure", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, newRow, 23, "openCasPressureStableTime", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, newRow, 24, "closeCasPressure", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, newRow, 25, "closeTripCasPressure", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, newRow, 26, "closeCasPressureStableTime", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, newRow, 27, "tubPressureSwitch", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, newRow, 28, "tubPressureSensorRange", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, newRow, 29, "openTubPressure", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, newRow, 30, "openResetTubPressure", Types.VARCHAR); + EtcPersistenceHandler.this.setValue(ps, newRow, 31, "openTubPressureStableTime", Types.VARCHAR); return ps.executeUpdate(); } }); diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/impl/ScssPersistenceHandler.java b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/ScssPersistenceHandler.java index 4ac26bf..221aca5 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/impl/ScssPersistenceHandler.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/impl/ScssPersistenceHandler.java @@ -157,7 +157,7 @@ public class ScssPersistenceHandler extends AbstractPersistenceHandler { ScssPersistenceHandler.this.setValue(ps, newRow, 34, "flowVoltageMinValue", Types.DECIMAL); ScssPersistenceHandler.this.setValue(ps, newRow, 35, "flowVoltageMaxValue", Types.DECIMAL); ScssPersistenceHandler.this.setValue(ps, newRow, 36, "continuousSamplingIntervalDuration", Types.INTEGER); - ScssPersistenceHandler.this.setValue(ps, newRow, 37, "sensorSignalEffectiveLevel", Types.DECIMAL); + ScssPersistenceHandler.this.setValue(ps, newRow, 37, "sensorSignalEffectiveLevel", Types.INTEGER); ScssPersistenceHandler.this.setValue(ps, newRow, 38, "pressureCompensationPolarityFlag", Types.INTEGER); ScssPersistenceHandler.this.setValue(ps, newRow, 39, "pressureCompensationValueSetting", Types.INTEGER); ScssPersistenceHandler.this.setValue(ps, newRow, 40, "oilPressureCompensationPolarityFlag", Types.INTEGER); diff --git a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/ModbusMessagePersistListener.java b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/ModbusMessagePersistListener.java index a1e2080..18a3936 100644 --- a/src/main/java/com/isu/gaswellwatch/modbus/data/listener/ModbusMessagePersistListener.java +++ b/src/main/java/com/isu/gaswellwatch/modbus/data/listener/ModbusMessagePersistListener.java @@ -159,7 +159,7 @@ public class ModbusMessagePersistListener implements BatchMessageListener { .build()); } } catch (Exception e) { - log.error("初始数据解析异常: {}, {}", message.getMessageProperties().getConsumerQueue(), messageString, e); + log.error("数据解析异常: {}, {}", message.getMessageProperties().getConsumerQueue(), messageString, e); } } } diff --git a/src/main/java/com/isu/gaswellwatch/vo/command/scss/ControlMode.java b/src/main/java/com/isu/gaswellwatch/vo/command/scss/ControlMode.java index f65cb67..3af840a 100644 --- a/src/main/java/com/isu/gaswellwatch/vo/command/scss/ControlMode.java +++ b/src/main/java/com/isu/gaswellwatch/vo/command/scss/ControlMode.java @@ -161,7 +161,7 @@ public class ControlMode extends Command { protected Collection builderModbusCommand() { StringBuilder command = new StringBuilder(250); // 地址码 功能码 起始地址 连续长度 连续字长 - // 01 10 0054 0036 6C + // 01 10 0032 0036 6C command.append("0110003200366C"); command.append(StringUtils.leftPad(Integer.toHexString(this.ctlModel), 8, "0")); command.append(StringUtils.leftPad(Integer.toHexString(this.minPressure.multiply(ONE_HUNDRED).intValue()), 8, "0")); diff --git a/src/main/resources/sql/CREATE_ETC.sql b/src/main/resources/sql/CREATE_ETC.sql index 96aeda2..bfff4ae 100644 --- a/src/main/resources/sql/CREATE_ETC.sql +++ b/src/main/resources/sql/CREATE_ETC.sql @@ -1,20 +1,37 @@ CREATE TABLE `$TableName$` ( - `id` bigint NOT NULL COMMENT '主键', - `device_id` int NOT NULL COMMENT '设备标识', - `created_time` datetime NOT NULL COMMENT '数据落库时间', - `collection_time` datetime NOT NULL COMMENT '采集指令下发时间', - `receive_time` datetime NOT NULL COMMENT '接收到数据时间', - `solenoid_valve_status` int NULL DEFAULT NULL COMMENT '电磁阀状态', - `controller_current_status` int NULL DEFAULT NULL COMMENT '控制器当前状态', - `current_status_remaining_time` varchar(10) NULL DEFAULT NULL COMMENT '当前状态剩余时间', - `current_status_start_time` varchar(30) NULL DEFAULT NULL COMMENT '当前状态开始时间', - `cas_pressure` decimal(10, 1) NULL DEFAULT NULL COMMENT '套压', - `oil_pressure` decimal(10, 1) NULL DEFAULT NULL COMMENT '油压', - `plunger_rise_time` varchar(10) NULL DEFAULT NULL COMMENT '柱塞上升时间', - `arrival_sensor_delay_time` varchar(10) NULL DEFAULT NULL COMMENT '到达传感器延时时间', - `well_shut_in_time` varchar(10) NULL DEFAULT NULL COMMENT '关井时间', - `after_flow_time` varchar(10) NULL DEFAULT NULL COMMENT '续流时间', + `id` bigint NOT NULL COMMENT '主键', + `device_id` int NOT NULL COMMENT '设备标识', + `created_time` datetime NOT NULL COMMENT '数据落库时间', + `collection_time` datetime NOT NULL COMMENT '采集指令下发时间', + `receive_time` datetime NOT NULL COMMENT '接收到数据时间', + `solenoid_valve_status` int NULL DEFAULT NULL COMMENT '电磁阀状态', + `controller_current_status` int NULL DEFAULT NULL COMMENT '控制器当前状态', + `current_status_remaining_time` varchar(10) NULL DEFAULT NULL COMMENT '当前状态剩余时间', + `current_status_start_time` varchar(30) NULL DEFAULT NULL COMMENT '当前状态开始时间', + `cas_pressure` decimal(10, 1) NULL DEFAULT NULL COMMENT '套压', + `oil_pressure` decimal(10, 1) NULL DEFAULT NULL COMMENT '油压', + `plunger_rise_time` varchar(10) NULL DEFAULT NULL COMMENT '柱塞上升时间', + `arrival_sensor_delay_time` varchar(10) NULL DEFAULT NULL COMMENT '到达传感器延时时间', + `well_shut_in_time` varchar(10) NULL DEFAULT NULL COMMENT '关井时间', + `after_flow_time` varchar(10) NULL DEFAULT NULL COMMENT '续流时间', + `min_close_well_time` varchar(10) NULL DEFAULT NULL COMMENT '最小关井时间', + `min_open_well_time` varchar(10) NULL DEFAULT NULL COMMENT '最小开井时间', + `max_open_well_time` varchar(10) NULL DEFAULT NULL COMMENT '最大开井时间', + `max_close_well_time` varchar(10) NULL DEFAULT NULL COMMENT '最大关井时间', + `cas_pressure_switch` int NULL DEFAULT NULL COMMENT '套压开关', + `cas_pressure_sensor_range` decimal(10, 1) NULL DEFAULT NULL COMMENT '套压PSI比例', + `open_cas_pressure` decimal(10, 1) NULL DEFAULT NULL COMMENT '开井套压', + `open_reset_cas_pressure` decimal(10, 1) NULL DEFAULT NULL COMMENT '复位套压', + `open_cas_pressure_stable_time` varchar(10) NULL DEFAULT NULL COMMENT '稳定时间', + `close_cas_pressure` decimal(10, 1) NULL DEFAULT NULL COMMENT '关井套压', + `close_trip_cas_pressure` decimal(10, 1) NULL DEFAULT NULL COMMENT '跳闸套压', + `close_cas_pressure_stable_time` varchar(10) NULL DEFAULT NULL COMMENT '稳定时长', + `tub_pressure_switch` int NULL DEFAULT NULL COMMENT '油压开关', + `tub_pressure_sensor_range` decimal(10, 1) NULL DEFAULT NULL COMMENT '油压PSI比例', + `open_tub_pressure` decimal(10, 1) NULL DEFAULT NULL COMMENT '开井油压', + `open_reset_tub_pressure` decimal(10, 1) NULL DEFAULT NULL COMMENT '复位油压', + `open_tub_pressure_stable_time` varchar(10) NULL DEFAULT NULL COMMENT '油压稳定时间', PRIMARY KEY (`id`) USING BTREE, UNIQUE INDEX `udx_device_create_time` (`device_id` ASC, `collection_time` ASC) USING BTREE COMMENT '设备采集数据唯一键' ) ENGINE = InnoDB COMMENT = '设备ID:$DeviceId$的采集数据' \ No newline at end of file diff --git a/src/main/resources/sql/CREATE_SCSS.sql b/src/main/resources/sql/CREATE_SCSS.sql index 0d96fde..bb4a0db 100644 --- a/src/main/resources/sql/CREATE_SCSS.sql +++ b/src/main/resources/sql/CREATE_SCSS.sql @@ -37,7 +37,7 @@ CREATE TABLE `$TableName$` `flow_voltage_min_value` decimal(10, 2) NULL DEFAULT NULL COMMENT '流量最小电压', `flow_voltage_max_value` decimal(10, 2) NULL DEFAULT NULL COMMENT '流量最大电压', `continuous_sampling_interval_duration` int NULL DEFAULT NULL COMMENT '连续采样间隔', - `sensor_signal_effective_level` decimal(10, 2) NULL DEFAULT NULL COMMENT '到达传感器有效电平', + `sensor_signal_effective_level` int NULL DEFAULT NULL COMMENT '到达传感器有效电平', `pressure_compensation_polarity_flag` int NULL DEFAULT NULL COMMENT '套压补偿极性', `pressure_compensation_value_setting` int NULL DEFAULT NULL COMMENT '套压补偿值', `oil_pressure_compensation_polarity_flag` int NULL DEFAULT NULL COMMENT '油压补偿极性', diff --git a/src/main/resources/sql/INSERT_ETC.sql b/src/main/resources/sql/INSERT_ETC.sql index d792910..ade586e 100644 --- a/src/main/resources/sql/INSERT_ETC.sql +++ b/src/main/resources/sql/INSERT_ETC.sql @@ -2,8 +2,14 @@ INSERT INTO `$TableName$` (`id`, `device_id`, `created_time`, `collection_time`, `solenoid_valve_status`, `controller_current_status`, `current_status_remaining_time`, `current_status_start_time`, `cas_pressure`, `oil_pressure`, `plunger_rise_time`, `arrival_sensor_delay_time`, - `well_shut_in_time`, `after_flow_time`) -VALUES (?, ?, NOW(), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `well_shut_in_time`, `after_flow_time`, `min_close_well_time`, `min_open_well_time`, + `max_open_well_time`, `max_close_well_time`, `cas_pressure_switch`, + `cas_pressure_sensor_range`, + `open_cas_pressure`, `open_reset_cas_pressure`, `open_cas_pressure_stable_time`, + `close_cas_pressure`, `close_trip_cas_pressure`, `close_cas_pressure_stable_time`, + `tub_pressure_switch`, `tub_pressure_sensor_range`, `open_tub_pressure`, + `open_reset_tub_pressure`, `open_tub_pressure_stable_time`) +VALUES (?, ?, NOW(), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE receive_time=VALUES(receive_time), solenoid_valve_status=VALUES(solenoid_valve_status), controller_current_status=VALUES(controller_current_status), @@ -14,4 +20,21 @@ ON DUPLICATE KEY UPDATE receive_time=VALUES(receive_time), plunger_rise_time=VALUES(plunger_rise_time), arrival_sensor_delay_time=VALUES(arrival_sensor_delay_time), well_shut_in_time=VALUES(well_shut_in_time), - after_flow_time=VALUES(after_flow_time) \ No newline at end of file + after_flow_time=VALUES(after_flow_time), + min_close_well_time=VALUES(min_close_well_time), + min_open_well_time=VALUES(min_open_well_time), + max_open_well_time=VALUES(max_open_well_time), + max_close_well_time=VALUES(max_close_well_time), + cas_pressure_switch=VALUES(cas_pressure_switch), + cas_pressure_sensor_range=VALUES(cas_pressure_sensor_range), + open_cas_pressure=VALUES(open_cas_pressure), + open_reset_cas_pressure=VALUES(open_reset_cas_pressure), + open_cas_pressure_stable_time=VALUES(open_cas_pressure_stable_time), + close_cas_pressure=VALUES(close_cas_pressure), + close_trip_cas_pressure=VALUES(close_trip_cas_pressure), + close_cas_pressure_stable_time=VALUES(close_cas_pressure_stable_time), + tub_pressure_switch=VALUES(tub_pressure_switch), + tub_pressure_sensor_range=VALUES(tub_pressure_sensor_range), + open_tub_pressure=VALUES(open_tub_pressure), + open_reset_tub_pressure=VALUES(open_reset_tub_pressure), + open_tub_pressure_stable_time=VALUES(open_tub_pressure_stable_time) \ No newline at end of file