Commit 9c7d310f authored by zhoushiguang's avatar zhoushiguang

路口状态数据同步

parent c335a8d1
package com.wanji.indicators.entity;
import lombok.Data;
/**
* @author fengyi
* @date 2023/10/16
* @description
*/
@Data
public class CrossStateData {
private String crossId;
private double trafficIndex;
private long startTime;
private long endTime;
private int isUnbalance;
private int isSpillover;
private int isCongestion;
private double unbalanceIndex;
private double spilloverIndex;
private double congestionIndex;
private double clearRate;
private int notClearCarNums;
}
package com.wanji.indicators.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.wanji.indicators.entity.CrossStateData;
import java.util.List;
public interface CrossStateDataMapper extends BaseMapper<CrossStateData>{
/**
* 批量保存
*/
void batchInsert(List<CrossStateData> list);
}
package com.wanji.indicators.service;
import com.wanji.indicators.entity.CrossStateData;
import java.util.List;
/**
* @author fengyi
* @date 2024/1/23
* @description
*/
public interface CrossStateDataService {
public void save(List<CrossStateData> list);
}
package com.wanji.indicators.service.impl;
import com.wanji.indicators.entity.CrossStateData;
import com.wanji.indicators.mapper.CrossStateDataMapper;
import com.wanji.indicators.service.CrossStateDataService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
/**
* @author fengyi
* @date 2024/1/23
* @description
*/
@Service
@Slf4j
public class CrossStateDataServiceImpl implements CrossStateDataService {
@Resource
CrossStateDataMapper crossStateDataMapper;
@Override
public void save(List<CrossStateData> list) {
crossStateDataMapper.batchInsert(list);
}
}
package com.wanji.indicators.task.crossstate.service.func;
import com.alibaba.fastjson.JSONArray;
import com.wanji.indicators.entity.CrossStateData;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* 帧数据转换
*/
public class CrossStateFromJsonFlatMap implements FlatMapFunction<String, List<CrossStateData>> {
private static final Logger log = LoggerFactory.getLogger(CrossStateFromJsonFlatMap.class);
@Override
public void flatMap(String trackJson, Collector<List<CrossStateData>> collector) throws Exception {
List<CrossStateData> list = JSONArray.parseArray(trackJson, CrossStateData.class);
collector.collect(list);
}
}
package com.wanji.indicators.task.crossstate.service.sink;
import com.wanji.indicators.entity.CrossStateData;
import com.wanji.indicators.service.CrossStateDataService;
import com.wanji.indicators.service.impl.CrossStateDataServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.List;
/**
* @author fengyi
* @date 2024/1/23
* @description
*/
@Slf4j
public class CrossStateSinkFunction extends RichSinkFunction<List<CrossStateData>> {
CrossStateDataService crossStateDataService;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ApplicationContext beanConf = new ClassPathXmlApplicationContext("spring-container.xml");
crossStateDataService = beanConf.getBean(CrossStateDataServiceImpl.class);
}
@Override
public void invoke(List<CrossStateData> values, Context context) throws Exception {
try {
crossStateDataService.save(values);
} catch (Exception e) {
log.error(this.getClass().getName() + ".invoke()执行失败", e);
}
}
}
package com.wanji.indicators.task.crossstate.stream;
import com.wanji.indicators.entity.CrossStateData;
import com.wanji.indicators.model.CarTrackModel;
import com.wanji.indicators.task.crossstate.service.func.CrossStateFromJsonFlatMap;
import com.wanji.indicators.task.crossstate.service.sink.CrossStateSinkFunction;
import com.wanji.indicators.task.track.service.func.CarTrackFlatMap;
import com.wanji.indicators.util.PropertiesHelper;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.List;
import java.util.Properties;
@Slf4j
public class CrossStateDataSynchronizeMain {
//private static final Logger log = LoggerFactory.getLogger(TrafficEventMain.class);
private static final String GROUP_ID = "CrossStateDataSynchronizeMain";
public static void main(String[] args) {
ParameterTool parameter = ParameterTool.fromArgs(args);
PropertiesHelper instance = PropertiesHelper.getInstance();
Properties properties = instance.getProperties();
String topic = "cross_real_time_data";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> source = KafkaSource.<String>builder()
.setProperties(instance.getConsumerProperties())
.setProperty("enable.auto.commit", "true")
.setProperty("auto.commit.interval.ms", "1000")
.setProperty("auto.offset.reset", "earliest")
.setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setTopics(topic)
.setGroupId(GROUP_ID)
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
try {
DataStream<String> stream = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "CrossStateDataSynchronizeMain");
SingleOutputStreamOperator<List<CrossStateData>> frameModelStream =
stream
.flatMap(new CrossStateFromJsonFlatMap())
.setParallelism(1)
.name("路口状态数据-JsonToObject");
frameModelStream.addSink(new CrossStateSinkFunction());
env.execute("路口状态数据同步到Mysql-工作流");
} catch (Exception e) {
log.error("交通指标计算任务异常 : " + e);
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.wanji.indicators.mapper.CrossStateDataMapper">
<!--批量插入-->
<select id="batchInsert" parameterType="com.wanji.indicators.entity.CrossStateData">
INSERT INTO os_top_rank
(
cross_id,
congest_index,
granularity,
window_start_time,
window_end_time,
red_no_clear_rate,
green_use_rate,
green_empty_rate,
green_interval_clear_rate,
slow_drive_clear_rate,
unbalance_index,
overflow_index,
is_unbalance,
is_overflow,
dt
)
VALUES
<foreach collection="list" item="item" separator=",">
(
#{crossId}
,#{congestIndex}
,#{granularity}
,#{windowStartTime}
,#{windowEndTime}
,#{redNoClearRate}
,#{greenUseRate}
,#{greenEmptyRate}
,#{greenIntervalClearRate}
,#{slowDriveClearRate}
,#{unbalanceIndex}
,#{overflowIndex}
,#{isUnbalance}
,#{isOverflow}
,#{dt}
)
</foreach>
ON DUPLICATE KEY UPDATE
cross_id=values(cross_id),
congest_index=values(congest_index),
granularity=values(granularity),
window_start_time=values(window_start_time),
window_end_time=values(window_end_time),
red_no_clear_rate=values(red_no_clear_rate),
green_use_rate=values(green_use_rate),
green_empty_rate=values(green_empty_rate),
green_interval_clear_rate=values(green_interval_clear_rate),
slow_drive_clear_rate=values(slow_drive_clear_rate),
unbalance_index=values(unbalance_index),
overflow_index=values(overflow_index),
is_unbalance=values(is_unbalance),
is_overflow=values(is_overflow),
dt=values(dt)
</select>
</mapper>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment